from ctypes import set_er from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Semaphore import datetime from typing import Deque from lib.alg.image_processing_3d import detect_obstacles_in_box import time from multiprocessing import Manager import json from lib.io.process3d import Process3D from lib.io.process2d import Process2D from collections import deque, OrderedDict import numpy as np import socket from lib.camera.ArenaCamera import ArenaCamera from lib.tcp.tcp_server import TcpServer import open3d as o3d from lib.cfg.cfg import ( CAMERA_2D_CFFS, CAMERA_3D_CFGS, CameraControl, VisionMode, HOST, PORT, OBSTACLE_KEYS, MAPPING, RAIL_KEYS, TITLE2D_KEY, PERIOD, ) # NOTE: Presentre类 # 1. 管理相机进程,通过往相机进程的输入队列中发送数据控制相机采集数据,没发送一次1,相机通过输出队列返回一个图片数据 # 2. 接收Tcp客户端发送过来的数据来切换模式,根据不同的模式采集不同的图像并解析成数据 # 3. 将解析好的数据通过TCP服务发送出去 class Presenter: def __init__(self) -> None: mgr = Manager() # 存放2D 3D相机采集图像的数据队列 self.fifo_2d = mgr.Queue() self.fifo_3d = mgr.Queue() # TODO: 初始化进程队列 self.mode = VisionMode.OBSTACLE_RECO self.pkt = OrderedDict() # tcp发送数据包 self.process3d_info = {} self.process2d_info = {} self.depth_img = {} with open(CAMERA_3D_CFGS, encoding="utf-8") as f: cfg3d = json.load(f)["camera"] for cfg in cfg3d: in_q = mgr.Queue() pro = Process3D(cfg, in_q, self.fifo_3d) self.process3d_info[cfg["title"]] = pro pro.start() with open(CAMERA_2D_CFFS, encoding="utf-8") as f: cfg2d = json.load(f)["camera"] for cfg in cfg2d: in_q = mgr.Queue() pro = Process2D(cfg, in_q, self.fifo_2d) self.process2d_info[cfg["title"]] = pro pro.start() # NOTE: 障碍物状态历史队列 # 前左前右障碍物距离检测 # 轨道检测数据历史队列 # 轨道数据数值历史队列 # 2d检测数值队列 self.hist_ok = {k: deque(maxlen=10) for k in OBSTACLE_KEYS} self.last_d = {k: None for k in OBSTACLE_KEYS} self.hist_rail = {k: deque(maxlen=5) for k in RAIL_KEYS} self.last_rail = {k: {"offset": None, "angle": None} for k in RAIL_KEYS} self.two_d_hist = {k: deque(maxlen=10) for k in TITLE2D_KEY.values()} def front_mode_data_handle(self): pass def rear_mode_data_handle(self): pass def handle_obstacle_data(self, img_data): """通过算法处理图像数据,并返回分析结果""" intrinsic = o3d.camera.PinholeCameraIntrinsic(640, 480, 474, 505, 320, 240) img = o3d.geometry.Image(img_data["dep_img"].astype(np.float32)) pcd = o3d.geometry.PointCloud.create_from_depth_image( img, intrinsic, depth_scale=1000.0, depth_trunc=8.0 ) # 设置检测区域 if img_data["title"].startswith("前"): box = (np.array([-1050, -600, 500]), np.array([1050, 1000, 6000])) else: box = (np.array([-800, -600, 800]), np.array([800, 1100, 6000])) nearest, _ = detect_obstacles_in_box(pcd, box[0], box[1], 640, 480) if nearest: d = float(np.linalg.norm(nearest["position"])) res = {"distance": round(d, 2), "status": "NG"} else: res = {"distance": None, "status": "OK"} return { "ok": res["status"] == "OK", "distance": res["distance"], "title": img_data["title"], } # TODO: 避障模式相机采集和数据处理 def obstacle_mode_data_handle(self): """获取所有3D避障相机的数据并处理,直到所有处理完成再继续""" futures = [] # 1. 发出CAPTURE命令 for key in self.process3d_info.keys(): if key.endswith("上轨"): continue self.process3d_info[key].in_q.put(CameraControl.CAPTURE) with ThreadPoolExecutor(max_workers=4) as executor: # 2. 等待图像并提交处理任务 while not self.fifo_3d.empty(): img_data = self.fifo_3d.get() future = executor.submit(self.handle_obstacle_data, img_data) futures.append(future) # 3. 等待所有算法任务完成(阻塞) for future in as_completed(futures): result = future.result() # 更新历史状态(主线程写入更安全) self.hist_ok[MAPPING[result["title"]]].append(result["ok"]) if not result["ok"]: self.last_d[MAPPING[result["title"]]] = result["distance"] for key in OBSTACLE_KEYS: hist = self.hist_ok[key] last_dist = self.last_d[key] if len(hist) == 10 and all(hist): dist_str = "000" else: dist_str = "000" if last_dist is None else f"{last_dist:.2f}" self.pkt[key] = dist_str # 6. 总状态字段 self.pkt["f_obstacle_status"] = ( "OK" if self.pkt["f_l_obstacle_distance"] == "000" and self.pkt["f_r_obstacle_distance"] == "000" else "NG" ) self.pkt["b_obstacle_status"] = ( "OK" if self.pkt["b_l_obstacle_distance"] == "000" and self.pkt["b_r_obstacle_distance"] == "000" else "NG" ) def wait_rec_tcp_data(self): pass def send_tcp_data(self): pass # TODO: 对tcp发回的数据进行按行处理,并且返回一个数组(不返回了,它接收数据只是为了切换模式) def rec_tcp_data_handle(self, data): if data: data += data.decode("utf-8", errors="ignore") while "\n" in data: line, data = data.split("\n", 1) if not line.strip(): continue try: cmd = json.loads(line) print(f"[SERVER] Cmd: {cmd}") front = cmd.get("FrontCouplerSignal", False) rear = cmd.get("RearCouplerSignal", False) obs = cmd.get("ObstacleDetection", False) if obs: self.mode = VisionMode.OBSTACLE_DETECTION elif front: self.mode = VisionMode.FRONT_2D_DETECTION elif rear: self.mode = VisionMode.REAR_2D_DETECTION except json.JSONDecodeError: pass def run(self): # TODO: 初始化TCP服务和收收数据缓存 server = TcpServer(host=HOST, port=PORT) tcp_rec_buf = "" tcp_send_buf = "" try: server.accept_client() while True: # TODO: next_time 记录时钟控制帧率 next_time = time.perf_counter() + PERIOD try: # TODO: 接收tcp接收的数据,根据数据并转换模式 tcp_rec_buf = server.recv_data() if tcp_rec_buf: self.rec_tcp_data_handle(tcp_rec_buf) elif not tcp_rec_buf: print("Client disconnected gracefully") continue except ConnectionResetError: print("Warring: clietn force disconnect!!! ") break except socket.error as e: print(f"Net Error: {e}") break # TODO: 清空发送包 self.pkt.clear() self.pkt["time_str"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # TODO: 根据模式发送不同的数据 if self.mode == VisionMode.OBSTACLE_DETECTION: self.obstacle_mode_data_handle() elif self.mode == VisionMode.FRONT_2D_DETECTION: self.front_mode_data_handle() elif self.mode == VisionMode.REAR_2D_DETECTION: self.rear_mode_data_handle() # TODO: tcp发送数据 try: tcp_send_buf = ( json.dumps(self.pkt, ensure_ascii=False) + "\n" ).encode() except TypeError as e: print(f"JSON encode failed: {e}") tcp_send_buf = b"{}" server.send_data(tcp_send_buf) # TODO: 控制帧率 now = time.perf_counter() wait = next_time - now if wait > 0: time.sleep(wait) next_time += PERIOD except KeyboardInterrupt: print("KeyboardInterrupt (Ctrl+C) shutting down") finally: for key in self.process3d_info.keys(): self.process3d_info[key].in_q.put(0) for key in self.process2d_info.keys(): self.process2d_info[key].in_q.put(0) ArenaCamera.shutdown() print("关闭连接") server.close()