263 lines
9.9 KiB
Python
263 lines
9.9 KiB
Python
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||
from lib.alg.track_detection import TrackDetector
|
||
from ctypes import set_errno
|
||
from threading import Semaphore
|
||
from datetime import datetime
|
||
from typing import Deque
|
||
from lib.alg.image_processing_3d import detect_obstacles_in_box
|
||
import time
|
||
from multiprocessing import Manager, set_start_method
|
||
import json
|
||
from lib.io.process3d import Process3D
|
||
from lib.io.process2d import Process2D
|
||
from collections import deque, OrderedDict
|
||
import numpy as np
|
||
import socket
|
||
from lib.camera.ArenaCamera import ArenaCamera
|
||
from lib.tcp.tcp_server import TcpServer
|
||
import open3d as o3d
|
||
from lib.cfg.cfg import (
|
||
CAMERA_2D_CFFS,
|
||
CAMERA_3D_CFGS,
|
||
CameraControl,
|
||
VisionMode,
|
||
HOST,
|
||
PORT,
|
||
OBSTACLE_KEYS,
|
||
MAPPING,
|
||
RAIL_KEYS,
|
||
TITLE2D_KEY,
|
||
PERIOD,
|
||
)
|
||
|
||
|
||
# NOTE: Presentre类
|
||
# 1. 管理相机进程,通过往相机进程的输入队列中发送数据控制相机采集数据,没发送一次1,相机通过输出队列返回一个图片数据
|
||
# 2. 接收Tcp客户端发送过来的数据来切换模式,根据不同的模式采集不同的图像并解析成数据
|
||
# 3. 将解析好的数据通过TCP服务发送出去
|
||
class Presenter:
|
||
def __init__(self) -> None:
|
||
set_start_method("spawn")
|
||
mgr = Manager()
|
||
# 存放2D 3D相机采集图像的数据队列
|
||
self.fifo_2d = mgr.Queue()
|
||
self.fifo_3d = mgr.Queue()
|
||
# TODO: 初始化进程队列
|
||
self.mode = VisionMode.OBSTACLE_DETECTION
|
||
self.pkt = OrderedDict() # tcp发送数据包
|
||
self.process3d_info = {}
|
||
self.process2d_info = {}
|
||
self.depth_img = {}
|
||
with open(CAMERA_3D_CFGS, encoding="utf-8") as f:
|
||
cfg3d = json.load(f)["camera"]
|
||
for cfg in cfg3d:
|
||
in_q = mgr.Queue()
|
||
pro = Process3D(cfg, in_q, self.fifo_3d)
|
||
self.process3d_info[cfg["title"]] = pro
|
||
pro.start()
|
||
|
||
with open(CAMERA_2D_CFFS, encoding="utf-8") as f:
|
||
cfg2d = json.load(f)["camera"]
|
||
for cfg in cfg2d:
|
||
in_q = mgr.Queue()
|
||
pro = Process2D(cfg, in_q, self.fifo_2d)
|
||
self.process2d_info[cfg["title"]] = pro
|
||
pro.start()
|
||
|
||
print("DEBUG:init completed")
|
||
|
||
# NOTE: 障碍物状态历史队列
|
||
# 前左前右障碍物距离检测
|
||
# 轨道检测数据历史队列
|
||
# 轨道数据数值历史队列
|
||
# 2d检测数值队列
|
||
self.hist_ok = {k: deque(maxlen=10) for k in OBSTACLE_KEYS}
|
||
self.last_d = {k: None for k in OBSTACLE_KEYS}
|
||
self.hist_rail = {k: deque(maxlen=5) for k in RAIL_KEYS}
|
||
self.last_rail = {k: {"offset": None, "angle": None} for k in RAIL_KEYS}
|
||
self.two_d_hist = {k: deque(maxlen=10) for k in TITLE2D_KEY.values()}
|
||
|
||
def front_mode_data_handle(self):
|
||
pass
|
||
|
||
def rear_mode_data_handle(self):
|
||
pass
|
||
|
||
def handle_obstacle_data(self, img_data):
|
||
"""通过算法处理图像数据,并返回分析结果"""
|
||
intrinsic = o3d.camera.PinholeCameraIntrinsic(640, 480, 474, 505, 320, 240)
|
||
img = o3d.geometry.Image(img_data["dep_img"].astype(np.float32))
|
||
pcd = o3d.geometry.PointCloud.create_from_depth_image(
|
||
img, intrinsic, depth_scale=1000.0, depth_trunc=8.0
|
||
)
|
||
|
||
# 设置检测区域
|
||
if img_data["title"].startswith("前"):
|
||
box = (np.array([-1050, -600, 500]), np.array([1050, 1000, 6000]))
|
||
else:
|
||
box = (np.array([-800, -600, 800]), np.array([800, 1100, 6000]))
|
||
|
||
nearest, _ = detect_obstacles_in_box(pcd, box[0], box[1], 640, 480)
|
||
|
||
if nearest:
|
||
d = float(np.linalg.norm(nearest["position"]))
|
||
res = {"distance": round(d, 2), "status": "NG"}
|
||
else:
|
||
res = {"distance": None, "status": "OK"}
|
||
|
||
return {
|
||
"ok": res["status"] == "OK",
|
||
"distance": res["distance"],
|
||
"title": img_data["title"],
|
||
}
|
||
|
||
def track_tecetion_mode_data_handle(self):
|
||
tractor_dic = {}
|
||
for key in self.process3d_info.keys():
|
||
if key.endswith("避障"):
|
||
continue
|
||
|
||
# TODO: 避障模式相机采集和数据处理
|
||
def obstacle_mode_data_handle(self):
|
||
"""获取所有3D避障相机的数据并处理,直到所有处理完成再继续"""
|
||
futures = []
|
||
# 1. 发出CAPTURE命令
|
||
for key in self.process3d_info.keys():
|
||
if key.endswith("上轨"):
|
||
continue
|
||
self.process3d_info[key].in_q.put(CameraControl.CAPTURE)
|
||
with ThreadPoolExecutor(max_workers=4) as executor:
|
||
# 2. 等待图像并提交处理任务
|
||
while not self.fifo_3d.empty():
|
||
img_data = self.fifo_3d.get()
|
||
future = executor.submit(self.handle_obstacle_data, img_data)
|
||
futures.append(future)
|
||
|
||
# 3. 等待所有算法任务完成(阻塞)
|
||
for future in as_completed(futures):
|
||
result = future.result()
|
||
# 更新历史状态(主线程写入更安全)
|
||
self.hist_ok[MAPPING[result["title"]]].append(result["ok"])
|
||
if not result["ok"]:
|
||
self.last_d[MAPPING[result["title"]]] = result["distance"]
|
||
|
||
for key in OBSTACLE_KEYS:
|
||
hist = self.hist_ok[key]
|
||
last_dist = self.last_d[key]
|
||
if len(hist) == 10 and all(hist):
|
||
dist_str = "000"
|
||
else:
|
||
dist_str = "000" if last_dist is None else f"{last_dist:.2f}"
|
||
|
||
self.pkt[key + "_distance"] = dist_str
|
||
|
||
# 6. 总状态字段
|
||
self.pkt["f_obstacle_status"] = (
|
||
"OK"
|
||
if self.pkt["f_l_obstacle_distance"] == "000"
|
||
and self.pkt["f_r_obstacle_distance"] == "000"
|
||
else "NG"
|
||
)
|
||
self.pkt["b_obstacle_status"] = (
|
||
"OK"
|
||
if self.pkt["b_l_obstacle_distance"] == "000"
|
||
and self.pkt["b_r_obstacle_distance"] == "000"
|
||
else "NG"
|
||
)
|
||
|
||
def wait_rec_tcp_data(self):
|
||
pass
|
||
|
||
def send_tcp_data(self):
|
||
pass
|
||
|
||
# TODO: 对tcp发回的数据进行按行处理,并且返回一个数组(不返回了,它接收数据只是为了切换模式)
|
||
def rec_tcp_data_handle(self, data):
|
||
if data:
|
||
data = data.decode("utf-8", errors="ignore")
|
||
while "\n" in data:
|
||
line, data = data.split("\n", 1)
|
||
if not line.strip():
|
||
continue
|
||
try:
|
||
cmd = json.loads(line)
|
||
print(f"[SERVER] Cmd: {cmd}")
|
||
front = cmd.get("FrontCouplerSignal", False)
|
||
rear = cmd.get("RearCouplerSignal", False)
|
||
obs = cmd.get("ObstacleDetection", False)
|
||
if obs:
|
||
self.mode = VisionMode.OBSTACLE_DETECTION
|
||
elif front:
|
||
self.mode = VisionMode.FRONT_2D_DETECTION
|
||
elif rear:
|
||
self.mode = VisionMode.REAR_2D_DETECTION
|
||
except json.JSONDecodeError:
|
||
pass
|
||
|
||
def run(self):
|
||
# TODO: 初始化TCP服务和收收数据缓存
|
||
server = TcpServer(host=HOST, port=PORT)
|
||
print("DEBUG:TCPServer init completed")
|
||
tcp_rec_buf = ""
|
||
tcp_send_buf = ""
|
||
try:
|
||
server.accept_client()
|
||
while True:
|
||
# TODO: next_time 记录时钟控制帧率
|
||
next_time = time.perf_counter() + PERIOD
|
||
try:
|
||
# TODO: 接收tcp接收的数据,根据数据并转换模式
|
||
tcp_rec_buf = server.recv_data()
|
||
if tcp_rec_buf:
|
||
self.rec_tcp_data_handle(tcp_rec_buf)
|
||
elif not tcp_rec_buf:
|
||
print("Client disconnected gracefully")
|
||
continue
|
||
except ConnectionResetError:
|
||
print("Warring: clietn force disconnect!!! ")
|
||
break
|
||
except socket.error as e:
|
||
print(f"Net Error: {e}")
|
||
break
|
||
|
||
# TODO: 清空发送包
|
||
self.pkt.clear()
|
||
self.pkt["time_str"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
# TODO: 根据模式发送不同的数据
|
||
if self.mode == VisionMode.OBSTACLE_DETECTION:
|
||
self.obstacle_mode_data_handle()
|
||
elif self.mode == VisionMode.FRONT_2D_DETECTION:
|
||
self.front_mode_data_handle()
|
||
elif self.mode == VisionMode.REAR_2D_DETECTION:
|
||
self.rear_mode_data_handle()
|
||
elif self.mode == VisionMode.TRACK_INSPECTION:
|
||
pass
|
||
|
||
# TODO: tcp发送数据
|
||
try:
|
||
tcp_send_buf = (
|
||
json.dumps(self.pkt, ensure_ascii=False) + "\n"
|
||
).encode()
|
||
except TypeError as e:
|
||
print(f"JSON encode failed: {e}")
|
||
tcp_send_buf = b"{}"
|
||
server.send_data(tcp_send_buf)
|
||
|
||
# TODO: 控制帧率
|
||
now = time.perf_counter()
|
||
wait = next_time - now
|
||
if wait > 0:
|
||
time.sleep(wait)
|
||
next_time += PERIOD
|
||
except KeyboardInterrupt:
|
||
print("KeyboardInterrupt (Ctrl+C) shutting down")
|
||
|
||
finally:
|
||
for key in self.process3d_info.keys():
|
||
self.process3d_info[key].in_q.put(0)
|
||
for key in self.process2d_info.keys():
|
||
self.process2d_info[key].in_q.put(0)
|
||
ArenaCamera.shutdown()
|
||
print("关闭连接")
|
||
server.close()
|