from concurrent.futures import ThreadPoolExecutor, as_completed from lib.alg.track_detection import TrackDetector from ctypes import set_errno from threading import Semaphore from datetime import datetime from typing import Deque from lib.alg.image_processing_3d import detect_obstacles_in_box import time from multiprocessing import Manager, set_start_method import json from lib.io.process3d import Process3D from lib.io.process2d import Process2D from collections import deque, OrderedDict import numpy as np import socket from lib.camera.ArenaCamera import ArenaCamera from lib.tcp.tcp_server import TcpServer import open3d as o3d from lib.cfg.cfg import ( CAMERA_2D_CFFS, CAMERA_3D_CFGS, CameraControl, VisionMode, HOST, PORT, OBSTACLE_KEYS, MAPPING, RAIL_KEYS, TITLE2D_KEY, PERIOD, ) # NOTE: Presentre类 # 1. 管理相机进程,通过往相机进程的输入队列中发送数据控制相机采集数据,没发送一次1,相机通过输出队列返回一个图片数据 # 2. 接收Tcp客户端发送过来的数据来切换模式,根据不同的模式采集不同的图像并解析成数据 # 3. 将解析好的数据通过TCP服务发送出去 class Presenter: def __init__(self) -> None: set_start_method("spawn") mgr = Manager() # 存放2D 3D相机采集图像的数据队列 self.fifo_2d = mgr.Queue() self.fifo_3d = mgr.Queue() # TODO: 初始化进程队列 self.mode = VisionMode.OBSTACLE_DETECTION self.pkt = OrderedDict() # tcp发送数据包 self.process3d_info = {} self.process2d_info = {} self.depth_img = {} with open(CAMERA_3D_CFGS, encoding="utf-8") as f: cfg3d = json.load(f)["camera"] for cfg in cfg3d: in_q = mgr.Queue() pro = Process3D(cfg, in_q, self.fifo_3d) self.process3d_info[cfg["title"]] = pro pro.start() with open(CAMERA_2D_CFFS, encoding="utf-8") as f: cfg2d = json.load(f)["camera"] for cfg in cfg2d: in_q = mgr.Queue() pro = Process2D(cfg, in_q, self.fifo_2d) self.process2d_info[cfg["title"]] = pro pro.start() print("DEBUG:init completed") # NOTE: 障碍物状态历史队列 # 前左前右障碍物距离检测 # 轨道检测数据历史队列 # 轨道数据数值历史队列 # 2d检测数值队列 self.hist_ok = {k: deque(maxlen=10) for k in OBSTACLE_KEYS} self.last_d = {k: None for k in OBSTACLE_KEYS} self.hist_rail = {k: deque(maxlen=5) for k in RAIL_KEYS} self.last_rail = {k: {"offset": None, "angle": None} for k in RAIL_KEYS} self.two_d_hist = {k: deque(maxlen=10) for k in TITLE2D_KEY.values()} def front_mode_data_handle(self): pass def rear_mode_data_handle(self): pass def handle_obstacle_data(self, img_data): """通过算法处理图像数据,并返回分析结果""" intrinsic = o3d.camera.PinholeCameraIntrinsic(640, 480, 474, 505, 320, 240) img = o3d.geometry.Image(img_data["dep_img"].astype(np.float32)) pcd = o3d.geometry.PointCloud.create_from_depth_image( img, intrinsic, depth_scale=1000.0, depth_trunc=8.0 ) # 设置检测区域 if img_data["title"].startswith("前"): box = (np.array([-1050, -600, 500]), np.array([1050, 1000, 6000])) else: box = (np.array([-800, -600, 800]), np.array([800, 1100, 6000])) nearest, _ = detect_obstacles_in_box(pcd, box[0], box[1], 640, 480) if nearest: d = float(np.linalg.norm(nearest["position"])) res = {"distance": round(d, 2), "status": "NG"} else: res = {"distance": None, "status": "OK"} return { "ok": res["status"] == "OK", "distance": res["distance"], "title": img_data["title"], } def track_tecetion_mode_data_handle(self): tractor_dic = {} for key in self.process3d_info.keys(): if key.endswith("避障"): continue # TODO: 避障模式相机采集和数据处理 def obstacle_mode_data_handle(self): """获取所有3D避障相机的数据并处理,直到所有处理完成再继续""" futures = [] # 1. 发出CAPTURE命令 for key in self.process3d_info.keys(): if key.endswith("上轨"): continue self.process3d_info[key].in_q.put(CameraControl.CAPTURE) with ThreadPoolExecutor(max_workers=4) as executor: # 2. 等待图像并提交处理任务 while not self.fifo_3d.empty(): img_data = self.fifo_3d.get() future = executor.submit(self.handle_obstacle_data, img_data) futures.append(future) # 3. 等待所有算法任务完成(阻塞) for future in as_completed(futures): result = future.result() # 更新历史状态(主线程写入更安全) self.hist_ok[MAPPING[result["title"]]].append(result["ok"]) if not result["ok"]: self.last_d[MAPPING[result["title"]]] = result["distance"] for key in OBSTACLE_KEYS: hist = self.hist_ok[key] last_dist = self.last_d[key] if len(hist) == 10 and all(hist): dist_str = "000" else: dist_str = "000" if last_dist is None else f"{last_dist:.2f}" self.pkt[key + "_distance"] = dist_str # 6. 总状态字段 self.pkt["f_obstacle_status"] = ( "OK" if self.pkt["f_l_obstacle_distance"] == "000" and self.pkt["f_r_obstacle_distance"] == "000" else "NG" ) self.pkt["b_obstacle_status"] = ( "OK" if self.pkt["b_l_obstacle_distance"] == "000" and self.pkt["b_r_obstacle_distance"] == "000" else "NG" ) def wait_rec_tcp_data(self): pass def send_tcp_data(self): pass # TODO: 对tcp发回的数据进行按行处理,并且返回一个数组(不返回了,它接收数据只是为了切换模式) def rec_tcp_data_handle(self, data): if data: data = data.decode("utf-8", errors="ignore") while "\n" in data: line, data = data.split("\n", 1) if not line.strip(): continue try: cmd = json.loads(line) print(f"[SERVER] Cmd: {cmd}") front = cmd.get("FrontCouplerSignal", False) rear = cmd.get("RearCouplerSignal", False) obs = cmd.get("ObstacleDetection", False) if obs: self.mode = VisionMode.OBSTACLE_DETECTION elif front: self.mode = VisionMode.FRONT_2D_DETECTION elif rear: self.mode = VisionMode.REAR_2D_DETECTION except json.JSONDecodeError: pass def run(self): # TODO: 初始化TCP服务和收收数据缓存 server = TcpServer(host=HOST, port=PORT) print("DEBUG:TCPServer init completed") tcp_rec_buf = "" tcp_send_buf = "" try: server.accept_client() while True: start = time.time() # TODO: next_time 记录时钟控制帧率 next_time = time.perf_counter() + PERIOD try: # TODO: 接收tcp接收的数据,根据数据并转换模式 tcp_rec_buf = server.recv_data() if tcp_rec_buf: self.rec_tcp_data_handle(tcp_rec_buf) except ConnectionResetError: print("Warring: clietn force disconnect!!! ") break except socket.error as e: print(f"Net Error: {e}") break # TODO: 清空发送包 self.pkt.clear() self.pkt["time_str"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # TODO: 根据模式发送不同的数据 if self.mode == VisionMode.OBSTACLE_DETECTION: self.obstacle_mode_data_handle() elif self.mode == VisionMode.FRONT_2D_DETECTION: self.front_mode_data_handle() elif self.mode == VisionMode.REAR_2D_DETECTION: self.rear_mode_data_handle() elif self.mode == VisionMode.TRACK_INSPECTION: pass # TODO: tcp发送数据 try: tcp_send_buf = ( json.dumps(self.pkt, ensure_ascii=False) + "\n" ).encode() except TypeError as e: print(f"JSON encode failed: {e}") tcp_send_buf = b"{}" print(f"DEBUG:{self.pkt}") server.send_data(tcp_send_buf) end = time.time() duration_ms = (end - start) * 1000 print(f"[INFO] 每帧耗时: {duration_ms:.2f} ms") # TODO: 控制帧率 now = time.perf_counter() wait = next_time - now if wait > 0: time.sleep(wait) next_time += PERIOD except KeyboardInterrupt: print("KeyboardInterrupt (Ctrl+C) shutting down") finally: for key in self.process3d_info.keys(): self.process3d_info[key].in_q.put(0) for key in self.process2d_info.keys(): self.process2d_info[key].in_q.put(0) ArenaCamera.shutdown() print("关闭连接") server.close()