From 7ed2131c3a53f0cfcfc9e03e4773e92e3e830167 Mon Sep 17 00:00:00 2001 From: CHAMPION923 <1056156912@qq.com> Date: Mon, 2 Jun 2025 19:52:03 +0800 Subject: [PATCH] obstacle detection function complete --- lib/io/process2d.py | 4 +- lib/io/process3d.py | 6 +- lib/presenter/presenter.py | 142 +++++++++++++++++++++++-------------- 3 files changed, 95 insertions(+), 57 deletions(-) diff --git a/lib/io/process2d.py b/lib/io/process2d.py index 7e0e0df..a5bcf17 100644 --- a/lib/io/process2d.py +++ b/lib/io/process2d.py @@ -1,5 +1,6 @@ from multiprocessing import Process from lib.camera.MindVisionCamera import MindVisionCamera +from lib.cfg import cfg from lib.cfg.cfg import CameraControl @@ -12,14 +13,13 @@ class Process2D(Process): self.out_q = out_q self.camera = MindVisionCamera(cfg["sn"]) self.status = 0 - # 仅对轨道摄像头启用 TrackDetector def run(self): while True: sig = self.in_q.get() if sig == CameraControl.CAPTURE: img = self.camera.capture() - self.out_q.put(img) + self.out_q.put({"title": cfg["title"], "img": img}) elif sig == CameraControl.DESTORY: for fn in ("destroy", "close", "release"): if hasattr(self.camera, fn): diff --git a/lib/io/process3d.py b/lib/io/process3d.py index 9b57edb..76c46fc 100644 --- a/lib/io/process3d.py +++ b/lib/io/process3d.py @@ -1,5 +1,6 @@ from multiprocessing import Process from collections import deque +from lib import cfg from lib.camera.ArenaCamera import ArenaCamera from lib.cfg.cfg import CameraControl @@ -12,15 +13,14 @@ class Process3D(Process): self.in_q = in_q self.out_q = out_q self.camera = ArenaCamera(cfg["sn"]) - self.status = 0 - # 仅对轨道摄像头启用 TrackDetector def run(self): while True: sig = self.in_q.get() if sig == CameraControl.CAPTURE: _, _, dep_img = self.camera.capture() - self.out_q.put(dep_img) + # self.out_q.put({self.cfg["title"]: dep_img}) + self.out_q.put({"title": cfg["title"], "dep_img": dep_img}) elif sig == CameraControl.DESTORY: self.camera.destroy() break diff --git a/lib/presenter/presenter.py b/lib/presenter/presenter.py index 466f3c7..9f81f89 100644 --- a/lib/presenter/presenter.py +++ b/lib/presenter/presenter.py @@ -1,11 +1,12 @@ -from ctypes import set_errno +from ctypes import set_er +from concurrent.futures import ThreadPoolExecutor, as_completed +from threading import Semaphore import datetime +from typing import Deque from lib.alg.image_processing_3d import detect_obstacles_in_box import time from multiprocessing import Manager import json -from lib.io import process2d -from lib.io import process3d from lib.io.process3d import Process3D from lib.io.process2d import Process2D from collections import deque, OrderedDict @@ -35,17 +36,21 @@ from lib.cfg.cfg import ( # 3. 将解析好的数据通过TCP服务发送出去 class Presenter: def __init__(self) -> None: + mgr = Manager() + # 存放2D 3D相机采集图像的数据队列 + self.fifo_2d = mgr.Queue() + self.fifo_3d = mgr.Queue() # TODO: 初始化进程队列 self.mode = VisionMode.OBSTACLE_RECO + self.pkt = OrderedDict() # tcp发送数据包 self.process3d_info = {} self.process2d_info = {} - mgr = Manager() + self.depth_img = {} with open(CAMERA_3D_CFGS, encoding="utf-8") as f: cfg3d = json.load(f)["camera"] for cfg in cfg3d: in_q = mgr.Queue() - out_q = mgr.Queue() - pro = Process3D(cfg["sn"], in_q, out_q) + pro = Process3D(cfg, in_q, self.fifo_3d) self.process3d_info[cfg["title"]] = pro pro.start() @@ -53,8 +58,7 @@ class Presenter: cfg2d = json.load(f)["camera"] for cfg in cfg2d: in_q = mgr.Queue() - out_q = mgr.Queue() - pro = Process2D(cfg["sn"], in_q, out_q) + pro = Process2D(cfg, in_q, self.fifo_2d) self.process2d_info[cfg["title"]] = pro pro.start() @@ -69,54 +73,87 @@ class Presenter: self.last_rail = {k: {"offset": None, "angle": None} for k in RAIL_KEYS} self.two_d_hist = {k: deque(maxlen=10) for k in TITLE2D_KEY.values()} - def get_camera_data(self): + def front_mode_data_handle(self): pass - def handle_camera_3d_data(self): + def rear_mode_data_handle(self): pass - def handle_camera_2d_data(self): - pass + def handle_obstacle_data(self, img_data): + """通过算法处理图像数据,并返回分析结果""" + intrinsic = o3d.camera.PinholeCameraIntrinsic(640, 480, 474, 505, 320, 240) + img = o3d.geometry.Image(img_data["dep_img"].astype(np.float32)) + pcd = o3d.geometry.PointCloud.create_from_depth_image( + img, intrinsic, depth_scale=1000.0, depth_trunc=8.0 + ) - def front_mode_data_handle(self, pkt: OrderedDict): - pass + # 设置检测区域 + if img_data["title"].startswith("前"): + box = (np.array([-1050, -600, 500]), np.array([1050, 1000, 6000])) + else: + box = (np.array([-800, -600, 800]), np.array([800, 1100, 6000])) - def rear_mode_data_handle(self, pkt: OrderedDict): - pass + nearest, _ = detect_obstacles_in_box(pcd, box[0], box[1], 640, 480) - def obstacle_mode_data_handle(self, pkt: OrderedDict): - """1.获取所有3d避障相机的数据""" - obstacle_depth_img = {} + if nearest: + d = float(np.linalg.norm(nearest["position"])) + res = {"distance": round(d, 2), "status": "NG"} + else: + res = {"distance": None, "status": "OK"} + + return { + "ok": res["status"] == "OK", + "distance": res["distance"], + "title": img_data["title"], + } + + # TODO: 避障模式相机采集和数据处理 + def obstacle_mode_data_handle(self): + """获取所有3D避障相机的数据并处理,直到所有处理完成再继续""" + futures = [] + # 1. 发出CAPTURE命令 for key in self.process3d_info.keys(): - """ 过滤掉所有上轨的相机 """ if key.endswith("上轨"): continue self.process3d_info[key].in_q.put(CameraControl.CAPTURE) - obstacle_depth_img[key] = self.process3d_info[key].out_q().get() + with ThreadPoolExecutor(max_workers=4) as executor: + # 2. 等待图像并提交处理任务 + while not self.fifo_3d.empty(): + img_data = self.fifo_3d.get() + future = executor.submit(self.handle_obstacle_data, img_data) + futures.append(future) - """2. 通过算法处理图像数据 这段算法应该跑在进程里 """ - for key in obstacle_depth_img.keys(): - intrinsic = o3d.camera.PinholeCameraIntrinsic(640, 480, 474, 505, 320, 240) - img = o3d.geometry.Image(obstacle_depth_img[key].astype(np.float32)) - pcd = o3d.geometry.PointCloud.create_from_depth_image( - img, intrinsic, depth_scale=1000.0, depth_trunc=8.0 + # 3. 等待所有算法任务完成(阻塞) + for future in as_completed(futures): + result = future.result() + # 更新历史状态(主线程写入更安全) + self.hist_ok[MAPPING[result["title"]]].append(result["ok"]) + if not result["ok"]: + self.last_d[MAPPING[result["title"]]] = result["distance"] + + for key in OBSTACLE_KEYS: + hist = self.hist_ok[key] + last_dist = self.last_d[key] + if len(hist) == 10 and all(hist): + dist_str = "000" + else: + dist_str = "000" if last_dist is None else f"{last_dist:.2f}" + + self.pkt[key] = dist_str + + # 6. 总状态字段 + self.pkt["f_obstacle_status"] = ( + "OK" + if self.pkt["f_l_obstacle_distance"] == "000" + and self.pkt["f_r_obstacle_distance"] == "000" + else "NG" + ) + self.pkt["b_obstacle_status"] = ( + "OK" + if self.pkt["b_l_obstacle_distance"] == "000" + and self.pkt["b_r_obstacle_distance"] == "000" + else "NG" ) - # TODO: 绘制矩形区域,检测区域内障碍物 - if key.startswith("前"): - box = (np.array([-1050, -600, 500]), np.array([1050, 1000, 6000])) - else: - box = (np.array([-800, -600, 800]), np.array([800, 1100, 6000])) - nearest, _ = detect_obstacles_in_box(pcd, box[0], box[1], 640, 480) - if nearest: - d = float(np.linalg.norm(nearest["position"])) - res = {"distance": round(d, 2), "status": "NG"} - else: - res = {"distance": None, "status": "OK"} - """ 往障碍识别状态里添加历史记录, 只保存前10帧, 以及保存上一次的距离值 """ - ok = res["status"] == "OK" - self.hist_ok[MAPPING[key]].append(ok) - if not ok: - self.last_d[MAPPING[key]] = res["distance"] def wait_rec_tcp_data(self): pass @@ -152,7 +189,6 @@ class Presenter: server = TcpServer(host=HOST, port=PORT) tcp_rec_buf = "" tcp_send_buf = "" - pkt = OrderedDict() # tcp发送数据包 try: server.accept_client() while True: @@ -173,20 +209,22 @@ class Presenter: print(f"Net Error: {e}") break - # 清空发送包 - pkt.clear() - pkt["time_str"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + # TODO: 清空发送包 + self.pkt.clear() + self.pkt["time_str"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # TODO: 根据模式发送不同的数据 if self.mode == VisionMode.OBSTACLE_DETECTION: - self.obstacle_mode_data_handle(pkt) + self.obstacle_mode_data_handle() elif self.mode == VisionMode.FRONT_2D_DETECTION: - self.front_mode_data_handle(pkt) + self.front_mode_data_handle() elif self.mode == VisionMode.REAR_2D_DETECTION: - self.rear_mode_data_handle(pkt) + self.rear_mode_data_handle() # TODO: tcp发送数据 try: - tcp_send_buf = (json.dumps(pkt, ensure_ascii=False) + "\n").encode() + tcp_send_buf = ( + json.dumps(self.pkt, ensure_ascii=False) + "\n" + ).encode() except TypeError as e: print(f"JSON encode failed: {e}") tcp_send_buf = b"{}" @@ -199,7 +237,7 @@ class Presenter: time.sleep(wait) next_time += PERIOD except KeyboardInterrupt: - print("KeyboardInterrupt(Ctrl+C) shutting down") + print("KeyboardInterrupt (Ctrl+C) shutting down") finally: for key in self.process3d_info.keys():