from ctypes import set_errno import datetime from lib.alg.image_processing_3d import detect_obstacles_in_box import time from multiprocessing import Manager import json from lib.io import process2d from lib.io import process3d from lib.io.process3d import Process3D from lib.io.process2d import Process2D from collections import deque, OrderedDict import numpy as np import socket from lib.camera.ArenaCamera import ArenaCamera from lib.tcp.tcp_server import TcpServer import open3d as o3d from lib.cfg.cfg import ( CAMERA_2D_CFFS, CAMERA_3D_CFGS, CameraControl, VisionMode, HOST, PORT, OBSTACLE_KEYS, MAPPING, RAIL_KEYS, TITLE2D_KEY, PERIOD, ) # NOTE: Presentre类 # 1. 管理相机进程,通过往相机进程的输入队列中发送数据控制相机采集数据,没发送一次1,相机通过输出队列返回一个图片数据 # 2. 接收Tcp客户端发送过来的数据来切换模式,根据不同的模式采集不同的图像并解析成数据 # 3. 将解析好的数据通过TCP服务发送出去 class Presenter: def __init__(self) -> None: # TODO: 初始化进程队列 self.mode = VisionMode.OBSTACLE_RECO self.process3d_info = {} self.process2d_info = {} mgr = Manager() with open(CAMERA_3D_CFGS, encoding="utf-8") as f: cfg3d = json.load(f)["camera"] for cfg in cfg3d: in_q = mgr.Queue() out_q = mgr.Queue() pro = Process3D(cfg["sn"], in_q, out_q) self.process3d_info[cfg["title"]] = pro pro.start() with open(CAMERA_2D_CFFS, encoding="utf-8") as f: cfg2d = json.load(f)["camera"] for cfg in cfg2d: in_q = mgr.Queue() out_q = mgr.Queue() pro = Process2D(cfg["sn"], in_q, out_q) self.process2d_info[cfg["title"]] = pro pro.start() # NOTE: 障碍物状态历史队列 # 前左前右障碍物距离检测 # 轨道检测数据历史队列 # 轨道数据数值历史队列 # 2d检测数值队列 self.hist_ok = {k: deque(maxlen=10) for k in OBSTACLE_KEYS} self.last_d = {k: None for k in OBSTACLE_KEYS} self.hist_rail = {k: deque(maxlen=5) for k in RAIL_KEYS} self.last_rail = {k: {"offset": None, "angle": None} for k in RAIL_KEYS} self.two_d_hist = {k: deque(maxlen=10) for k in TITLE2D_KEY.values()} def get_camera_data(self): pass def handle_camera_3d_data(self): pass def handle_camera_2d_data(self): pass def front_mode_data_handle(self, pkt: OrderedDict): pass def rear_mode_data_handle(self, pkt: OrderedDict): pass def obstacle_mode_data_handle(self, pkt: OrderedDict): """1.获取所有3d避障相机的数据""" obstacle_depth_img = {} for key in self.process3d_info.keys(): """ 过滤掉所有上轨的相机 """ if key.endswith("上轨"): continue self.process3d_info[key].in_q.put(CameraControl.CAPTURE) obstacle_depth_img[key] = self.process3d_info[key].out_q().get() """2. 通过算法处理图像数据 这段算法应该跑在进程里 """ for key in obstacle_depth_img.keys(): intrinsic = o3d.camera.PinholeCameraIntrinsic(640, 480, 474, 505, 320, 240) img = o3d.geometry.Image(obstacle_depth_img[key].astype(np.float32)) pcd = o3d.geometry.PointCloud.create_from_depth_image( img, intrinsic, depth_scale=1000.0, depth_trunc=8.0 ) # TODO: 绘制矩形区域,检测区域内障碍物 if key.startswith("前"): box = (np.array([-1050, -600, 500]), np.array([1050, 1000, 6000])) else: box = (np.array([-800, -600, 800]), np.array([800, 1100, 6000])) nearest, _ = detect_obstacles_in_box(pcd, box[0], box[1], 640, 480) if nearest: d = float(np.linalg.norm(nearest["position"])) res = {"distance": round(d, 2), "status": "NG"} else: res = {"distance": None, "status": "OK"} """ 往障碍识别状态里添加历史记录, 只保存前10帧, 以及保存上一次的距离值 """ ok = res["status"] == "OK" self.hist_ok[MAPPING[key]].append(ok) if not ok: self.last_d[MAPPING[key]] = res["distance"] def wait_rec_tcp_data(self): pass def send_tcp_data(self): pass # TODO: 对tcp发回的数据进行按行处理,并且返回一个数组(不返回了,它接收数据只是为了切换模式) def rec_tcp_data_handle(self, data): if data: data += data.decode("utf-8", errors="ignore") while "\n" in data: line, data = data.split("\n", 1) if not line.strip(): continue try: cmd = json.loads(line) print(f"[SERVER] Cmd: {cmd}") front = cmd.get("FrontCouplerSignal", False) rear = cmd.get("RearCouplerSignal", False) obs = cmd.get("ObstacleDetection", False) if obs: self.mode = VisionMode.OBSTACLE_DETECTION elif front: self.mode = VisionMode.FRONT_2D_DETECTION elif rear: self.mode = VisionMode.REAR_2D_DETECTION except json.JSONDecodeError: pass def run(self): # TODO: 初始化TCP服务和收收数据缓存 server = TcpServer(host=HOST, port=PORT) tcp_rec_buf = "" tcp_send_buf = "" pkt = OrderedDict() # tcp发送数据包 try: server.accept_client() while True: # TODO: next_time 记录时钟控制帧率 next_time = time.perf_counter() + PERIOD try: # TODO: 接收tcp接收的数据,根据数据并转换模式 tcp_rec_buf = server.recv_data() if tcp_rec_buf: self.rec_tcp_data_handle(tcp_rec_buf) elif not tcp_rec_buf: print("Client disconnected gracefully") continue except ConnectionResetError: print("Warring: clietn force disconnect!!! ") break except socket.error as e: print(f"Net Error: {e}") break # 清空发送包 pkt.clear() pkt["time_str"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # TODO: 根据模式发送不同的数据 if self.mode == VisionMode.OBSTACLE_DETECTION: self.obstacle_mode_data_handle(pkt) elif self.mode == VisionMode.FRONT_2D_DETECTION: self.front_mode_data_handle(pkt) elif self.mode == VisionMode.REAR_2D_DETECTION: self.rear_mode_data_handle(pkt) # TODO: tcp发送数据 try: tcp_send_buf = (json.dumps(pkt, ensure_ascii=False) + "\n").encode() except TypeError as e: print(f"JSON encode failed: {e}") tcp_send_buf = b"{}" server.send_data(tcp_send_buf) # TODO: 控制帧率 now = time.perf_counter() wait = next_time - now if wait > 0: time.sleep(wait) next_time += PERIOD except KeyboardInterrupt: print("KeyboardInterrupt(Ctrl+C) shutting down") finally: for key in self.process3d_info.keys(): self.process3d_info[key].in_q.put(0) for key in self.process2d_info.keys(): self.process2d_info[key].in_q.put(0) ArenaCamera.shutdown() print("关闭连接") server.close()