Files
hirres_tractor_vision/lib/presenter/presenter.py
2025-06-03 14:48:23 +08:00

249 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Semaphore
import datetime
from typing import Deque
from lib.alg.image_processing_3d import detect_obstacles_in_box
import time
from multiprocessing import Manager
import json
from lib.io.process3d import Process3D
from lib.io.process2d import Process2D
from collections import deque, OrderedDict
import numpy as np
import socket
from lib.camera.ArenaCamera import ArenaCamera
from lib.tcp.tcp_server import TcpServer
import open3d as o3d
from lib.cfg.cfg import (
CAMERA_2D_CFFS,
CAMERA_3D_CFGS,
CameraControl,
VisionMode,
HOST,
PORT,
OBSTACLE_KEYS,
MAPPING,
RAIL_KEYS,
TITLE2D_KEY,
PERIOD,
)
# NOTE: Presentre类
# 1. 管理相机进程通过往相机进程的输入队列中发送数据控制相机采集数据没发送一次1相机通过输出队列返回一个图片数据
# 2. 接收Tcp客户端发送过来的数据来切换模式根据不同的模式采集不同的图像并解析成数据
# 3. 将解析好的数据通过TCP服务发送出去
class Presenter:
def __init__(self) -> None:
mgr = Manager()
# 存放2D 3D相机采集图像的数据队列
self.fifo_2d = mgr.Queue()
self.fifo_3d = mgr.Queue()
# TODO: 初始化进程队列
self.mode = VisionMode.OBSTACLE_DETECTION
self.pkt = OrderedDict() # tcp发送数据包
self.process3d_info = {}
self.process2d_info = {}
self.depth_img = {}
with open(CAMERA_3D_CFGS, encoding="utf-8") as f:
cfg3d = json.load(f)["camera"]
for cfg in cfg3d:
in_q = mgr.Queue()
pro = Process3D(cfg, in_q, self.fifo_3d)
self.process3d_info[cfg["title"]] = pro
pro.start()
with open(CAMERA_2D_CFFS, encoding="utf-8") as f:
cfg2d = json.load(f)["camera"]
for cfg in cfg2d:
in_q = mgr.Queue()
pro = Process2D(cfg, in_q, self.fifo_2d)
self.process2d_info[cfg["title"]] = pro
pro.start()
# NOTE: 障碍物状态历史队列
# 前左前右障碍物距离检测
# 轨道检测数据历史队列
# 轨道数据数值历史队列
# 2d检测数值队列
self.hist_ok = {k: deque(maxlen=10) for k in OBSTACLE_KEYS}
self.last_d = {k: None for k in OBSTACLE_KEYS}
self.hist_rail = {k: deque(maxlen=5) for k in RAIL_KEYS}
self.last_rail = {k: {"offset": None, "angle": None} for k in RAIL_KEYS}
self.two_d_hist = {k: deque(maxlen=10) for k in TITLE2D_KEY.values()}
def front_mode_data_handle(self):
pass
def rear_mode_data_handle(self):
pass
def handle_obstacle_data(self, img_data):
"""通过算法处理图像数据,并返回分析结果"""
intrinsic = o3d.camera.PinholeCameraIntrinsic(640, 480, 474, 505, 320, 240)
img = o3d.geometry.Image(img_data["dep_img"].astype(np.float32))
pcd = o3d.geometry.PointCloud.create_from_depth_image(
img, intrinsic, depth_scale=1000.0, depth_trunc=8.0
)
# 设置检测区域
if img_data["title"].startswith(""):
box = (np.array([-1050, -600, 500]), np.array([1050, 1000, 6000]))
else:
box = (np.array([-800, -600, 800]), np.array([800, 1100, 6000]))
nearest, _ = detect_obstacles_in_box(pcd, box[0], box[1], 640, 480)
if nearest:
d = float(np.linalg.norm(nearest["position"]))
res = {"distance": round(d, 2), "status": "NG"}
else:
res = {"distance": None, "status": "OK"}
return {
"ok": res["status"] == "OK",
"distance": res["distance"],
"title": img_data["title"],
}
# TODO: 避障模式相机采集和数据处理
def obstacle_mode_data_handle(self):
"""获取所有3D避障相机的数据并处理直到所有处理完成再继续"""
futures = []
# 1. 发出CAPTURE命令
for key in self.process3d_info.keys():
if key.endswith("上轨"):
continue
self.process3d_info[key].in_q.put(CameraControl.CAPTURE)
with ThreadPoolExecutor(max_workers=4) as executor:
# 2. 等待图像并提交处理任务
while not self.fifo_3d.empty():
img_data = self.fifo_3d.get()
future = executor.submit(self.handle_obstacle_data, img_data)
futures.append(future)
# 3. 等待所有算法任务完成(阻塞)
for future in as_completed(futures):
result = future.result()
# 更新历史状态(主线程写入更安全)
self.hist_ok[MAPPING[result["title"]]].append(result["ok"])
if not result["ok"]:
self.last_d[MAPPING[result["title"]]] = result["distance"]
for key in OBSTACLE_KEYS:
hist = self.hist_ok[key]
last_dist = self.last_d[key]
if len(hist) == 10 and all(hist):
dist_str = "000"
else:
dist_str = "000" if last_dist is None else f"{last_dist:.2f}"
self.pkt[key] = dist_str
# 6. 总状态字段
self.pkt["f_obstacle_status"] = (
"OK"
if self.pkt["f_l_obstacle_distance"] == "000"
and self.pkt["f_r_obstacle_distance"] == "000"
else "NG"
)
self.pkt["b_obstacle_status"] = (
"OK"
if self.pkt["b_l_obstacle_distance"] == "000"
and self.pkt["b_r_obstacle_distance"] == "000"
else "NG"
)
def wait_rec_tcp_data(self):
pass
def send_tcp_data(self):
pass
# TODO: 对tcp发回的数据进行按行处理并且返回一个数组(不返回了,它接收数据只是为了切换模式)
def rec_tcp_data_handle(self, data):
if data:
data += data.decode("utf-8", errors="ignore")
while "\n" in data:
line, data = data.split("\n", 1)
if not line.strip():
continue
try:
cmd = json.loads(line)
print(f"[SERVER] Cmd: {cmd}")
front = cmd.get("FrontCouplerSignal", False)
rear = cmd.get("RearCouplerSignal", False)
obs = cmd.get("ObstacleDetection", False)
if obs:
self.mode = VisionMode.OBSTACLE_DETECTION
elif front:
self.mode = VisionMode.FRONT_2D_DETECTION
elif rear:
self.mode = VisionMode.REAR_2D_DETECTION
except json.JSONDecodeError:
pass
def run(self):
# TODO: 初始化TCP服务和收收数据缓存
server = TcpServer(host=HOST, port=PORT)
tcp_rec_buf = ""
tcp_send_buf = ""
try:
server.accept_client()
while True:
# TODO: next_time 记录时钟控制帧率
next_time = time.perf_counter() + PERIOD
try:
# TODO: 接收tcp接收的数据根据数据并转换模式
tcp_rec_buf = server.recv_data()
if tcp_rec_buf:
self.rec_tcp_data_handle(tcp_rec_buf)
elif not tcp_rec_buf:
print("Client disconnected gracefully")
continue
except ConnectionResetError:
print("Warring: clietn force disconnect!!! ")
break
except socket.error as e:
print(f"Net Error: {e}")
break
# TODO: 清空发送包
self.pkt.clear()
self.pkt["time_str"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# TODO: 根据模式发送不同的数据
if self.mode == VisionMode.OBSTACLE_DETECTION:
self.obstacle_mode_data_handle()
elif self.mode == VisionMode.FRONT_2D_DETECTION:
self.front_mode_data_handle()
elif self.mode == VisionMode.REAR_2D_DETECTION:
self.rear_mode_data_handle()
# TODO: tcp发送数据
try:
tcp_send_buf = (
json.dumps(self.pkt, ensure_ascii=False) + "\n"
).encode()
except TypeError as e:
print(f"JSON encode failed: {e}")
tcp_send_buf = b"{}"
server.send_data(tcp_send_buf)
# TODO: 控制帧率
now = time.perf_counter()
wait = next_time - now
if wait > 0:
time.sleep(wait)
next_time += PERIOD
except KeyboardInterrupt:
print("KeyboardInterrupt (Ctrl+C) shutting down")
finally:
for key in self.process3d_info.keys():
self.process3d_info[key].in_q.put(0)
for key in self.process2d_info.keys():
self.process2d_info[key].in_q.put(0)
ArenaCamera.shutdown()
print("关闭连接")
server.close()