TractorVision初期移植

This commit is contained in:
CHAMPION923
2025-05-30 16:30:37 +08:00
commit 2acf920a87
36 changed files with 3898 additions and 0 deletions

0
lib/__init__.py Normal file
View File

0
lib/alg/__init__.py Normal file
View File

View File

@@ -0,0 +1,55 @@
# distance_detector.py
import numpy as np
# 每个相机SN对应的ROI配置中心坐标和基准尺寸
# 画框测距
ROI_SETTINGS = {
'233800055': {'center': (220, 340), 'base_size': (200, 200)},
'234000035': {'center': (220, 340), 'base_size': (200, 200)},
# 可根据需要添加更多相机
}
# 默认基准深度(毫米)
DEFAULT_BASE_DEPTH_MM = 2000
class DistanceDetector:
"""
基于 TOF 深度图的动态 ROI 距离检测器ROI 参数内置到算法中。
算法流程:
1. 使用相机 SN 对应的预设 ROI 中心和大小。
2. 在像素中心读取深度值 d。
3. 根据基准深度与当前深度的比例动态计算 ROI 大小。
4. 在 ROI 内提取非零深度值并计算平均值作为目标距离(毫米)。
"""
def __init__(self, sn, base_depth_mm=DEFAULT_BASE_DEPTH_MM):
if sn not in ROI_SETTINGS:
raise ValueError(f"No ROI settings for camera SN: {sn}")
cfg = ROI_SETTINGS[sn]
self.cx, self.cy = cfg['center']
self.base_w, self.base_h = cfg['base_size']
self.base_depth = base_depth_mm
def _compute_dynamic_roi(self, depth_image):
h, w = depth_image.shape
d = float(depth_image[int(self.cy), int(self.cx)])
if d <= 0:
return None
scale = self.base_depth / d
rw = int(self.base_w * scale)
rh = int(self.base_h * scale)
x1 = max(0, int(self.cx - rw // 2))
y1 = max(0, int(self.cy - rh // 2))
x2 = min(w, x1 + rw)
y2 = min(h, y1 + rh)
return x1, y1, x2, y2
def compute_distance(self, depth_image):
coords = self._compute_dynamic_roi(depth_image)
if coords is None:
return None
x1, y1, x2, y2 = coords
roi = depth_image[y1:y2, x1:x2]
valid = roi[roi > 0]
if valid.size == 0:
return None
return float(np.mean(valid))

View File

@@ -0,0 +1,183 @@
import os
import cv2
import numpy as np
import ctypes
import math
import open3d as o3d
import time
# +++++++++++++++++++++++++++++++++++++++++++++++++++++保存深度图像函数+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
def save_depth_image(image_data, width, height, sn, queue,image_count):
# 将图像数据转换为OpenCV格式
depth_image_array = ctypes.cast(image_data, ctypes.POINTER(ctypes.c_uint16 * (width.value * height.value))).contents
depth_image_np = np.array(depth_image_array).reshape(height.value, width.value)
# 对每个像素点的深度数据乘以 0.25
# 使用OpenCV保存图像
image_name = f"{image_count}DepthImage_{sn}.png"
# output_directory = "OutputDirectory/Linux/x64Release/Images"
output_directory = "src/Python/src/ToolsFuncsDemo/ImagesTest/Trac_up_down02"
os.makedirs(output_directory, exist_ok=True)
output_path = os.path.join(output_directory, image_name)
cv2.imwrite(output_path, depth_image_np)
print(f"Depth image saved as {output_path} for camera {sn}")
queue.put(f"Depth image saved as {output_path} for camera {sn}")
return f"Depth image saved as {output_path} for camera {sn}"
#+++++++++++++++++++++++++++++++++++++++++++++++++++++++障碍物检测++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#+++++++++++++++++++++++++++++++++++++++++++用统计滤波去除孤立点++++++++++++++++++++++++++++++++++++++
# def statistical_outlier_removal(point_cloud,nb_neighbors, std_ratio):
# """
# 使用统计滤波去除孤立点
# :param point_cloud: Open3D点云对象
# :param nb_neighbors: 每个点考虑的邻域点数
# :param std_ratio: 标准差倍数,用于判断是否为离群点
# :return: 去除孤立点后的点云对象
# """
# # # 执行统计滤波
# cl, ind = point_cloud.remove_radius_outlier(20,10)
# # # 选择过滤后的点云
# point_cloud_filtered = point_cloud.select_by_index(ind)
# return point_cloud_filtered
def statistical_outlier_removal(point_cloud, nb_neighbors=20, std_ratio=2.0):
cl, ind = point_cloud.remove_statistical_outlier(nb_neighbors=nb_neighbors, std_ratio=std_ratio)
return cl
#+++++++++++++++++++++++++++++++++++++++++++ 深度图-》点云++++++++++++++++++++++++++++++++++++++
def depth_to_point_cloud(depth_image_data,x_image_np,y_image_np,width, height, min_depth, max_depth):
"""
将深度图像转换为点云使用像素坐标作为x和y并筛选出有效范围内的点
:param depth_image_data: 深度图像数据 (二维数组,每个元素表示深度值,单位为毫米)
:param width: 图像宽度 (整数)
:param height: 图像高度 (整数)
:param min_depth: 最小有效深度 (毫米)默认为500毫米 (0.5米)
:param max_depth: 最大有效深度 (毫米)默认为6000毫米 (6米)
:return: Open3D点云对象
"""
# 确保宽度和高度是整数
if not isinstance(width, int) or not isinstance(height, int):
raise ValueError("Width and height must be integers")
# 创建点云对象
point_cloud = o3d.geometry.PointCloud()
# 创建点云数据
points = []
colors = []
# 确保深度图像数据是 NumPy 数组
depth_image_data = np.array(depth_image_data)
x_image_data = np.array(x_image_np)
y_image_data = np.array(y_image_np)
# 相机中心位置
center_x = width / 2
center_y = height / 2
for v in range(height):
for u in range(width):
# 获取深度值
z = depth_image_data[v, u]
# 检查深度值是否在有效范围内
if z < min_depth or z > max_depth:
continue
# 计算像素大小(毫米/像素)
pixel_size_x_mm = 2 * z * np.tan(np.radians(69 / 2)) / width
pixel_size_y_mm = 2 * z * np.tan(np.radians(51 / 2)) / height
# 计算以相机中心为原点的坐标,并调整坐标系
x = (u - center_x) # X 轴相对于相机中心
y = (center_y - v) # 相机的y+朝下
x_mm = x * pixel_size_x_mm
y_mm = y * pixel_size_y_mm
points.append([x_mm, y_mm, z])
colors.append([1.0, 0.0, 0.0]) # 设置颜色为红色
# 将点云数据转换为NumPy数组
points = np.array(points)
colors = np.array(colors)
# 设置点云的点和颜色
point_cloud.points = o3d.utility.Vector3dVector(points)
point_cloud.colors = o3d.utility.Vector3dVector(colors)
# # 可视化点云
# o3d.visualization.draw_geometries([point_cloud])
# 去除噪声(孤立点),根据现场情况再调整
start_time = time.perf_counter() # 记录开始时间
point_cloud = statistical_outlier_removal(point_cloud, 20, 1.0)
end_time = time.perf_counter() # 记录结束时间
runtime = end_time - start_time # 计算运行时间
# print(f"statistical_outlier_removal函数运行时间{runtime:.6f} 秒")
return point_cloud
#+++++++++++++++++++++++++++++++++++++++++++ 绘制立体矩形框并检测障碍物++++++++++++++++++++++++++++++++++++++
def detect_obstacles_in_box(point_cloud, min_bound, max_bound, width, height, save_name="detected_obstacle.ply", visualize=False):
# 1. 滤波
filtered_point_cloud = statistical_outlier_removal(point_cloud, 20, 1.0)
# 2. 创建 AABB 框
box = o3d.geometry.AxisAlignedBoundingBox(min_bound=min_bound, max_bound=max_bound)
box.color = (0.0, 1.0, 0.0) # ✅ 绿色边框
# 3. 提取框内点
indices = box.get_point_indices_within_bounding_box(filtered_point_cloud.points)
if len(indices) == 0:
# print("未检测到任何点在框内")
return None, filtered_point_cloud
points = np.asarray(filtered_point_cloud.points)
colors = np.asarray(filtered_point_cloud.colors)
if len(colors) == 0:
colors = np.ones((len(points), 3)) # 白色
filtered_point_cloud.colors = o3d.utility.Vector3dVector(colors)
colors = np.asarray(filtered_point_cloud.colors)
# 框内红色,框外绿色
colors[indices] = [1.0, 0.0, 0.0]
mask = np.ones(len(points), dtype=bool)
mask[indices] = False
colors[mask] = [0.0, 1.0, 0.0]
# 最近点 → 黑色
reference_point = np.array([0.0, 0.0, 0.0])
obstacle_points = points[indices]
distances = np.linalg.norm(obstacle_points - reference_point, axis=1)
closest_index = np.argmin(distances)
closest_point = obstacle_points[closest_index]
min_distance = distances[closest_index]
global_closest_index = indices[closest_index]
colors[global_closest_index] = [0.0, 0.0, 0.0]
# 更新颜色
filtered_point_cloud.colors = o3d.utility.Vector3dVector(colors)
# ✅ 可视化
if visualize:
o3d.visualization.draw_geometries([filtered_point_cloud, box])
# TODO: 返回一个像素点和距离
return {
'position': closest_point.tolist(),
'distance': float(min_distance)
}, filtered_point_cloud

115
lib/alg/line_detection.py Normal file
View File

@@ -0,0 +1,115 @@
import cv2
import numpy as np
def detect_lines_in_depth_image(depth_image, roi,min_depth):
"""
Detect lines in a depth image within a specified Region of Interest (ROI).
:param depth_image: The depth image in grayscale16 format.
:param roi: The Region of Interest (ROI) to limit the detection region within the depth image.
:return: The detected lines with coordinates relative to the original depth image.
"""
lines_list = []
# Convert the depth image to a format suitable for line detection using OpenCV
# Scale the depth image from uint16 to uint8
# Extract the ROI from the depth image
x, y, w, h = roi
roi_image = depth_image[y:y+h, x:x+w]
min_dist = np.min(roi_image)
max_dist = np.max(roi_image)
print(f"ROI shape{roi_image.shape}")
print(f"ROI深度范围{min_dist} - {max_dist}")
# norm_img = cv2.normalize(roi_image, None, 0,255, cv2.NORM_MINMAX, cv2.CV_8U)
min_dist = min_depth
# max_dist = 1800
norm_img = (roi_image-min_dist)/(max_dist-min_dist)
norm_img = norm_img * 255
norm_img = np.clip(norm_img,0,255)
norm_img = norm_img.astype(np.uint8)
# cv2.imwrite('dbg_norm.png',norm_img)
edges = cv2.Canny(norm_img,15,30,apertureSize=3)
# cv2.imwrite('dbg_edges.png',edges)
# Use OpenCV's line detection algorithm (e.g., HoughLines or HoughLinesP) to detect lines within the specified ROI
lines = cv2.HoughLinesP(edges, 1, np.pi/180, 20, minLineLength=10, maxLineGap=100)
norm_depth_img = cv2.normalize(depth_image, None, 0,255, cv2.NORM_MINMAX, cv2.CV_8U)
colored_img = cv2.cvtColor(norm_depth_img,cv2.COLOR_GRAY2BGR)
# Adjust the line coordinates to be relative to the original depth image
if lines is not None:
for points in lines:
# Extracted points nested in the list
x1,y1,x2,y2=points[0]
x1=x1+x
y1=y1+y
x2=x2+x
y2=y2+y
# Draw the lines joing the points
# On the original image
cv2.line(colored_img,(x1,y1),(x2,y2),(0,255,0),2)
# Maintain a simples lookup list for points
lines_list.append([(x1,y1),(x2,y2)])
# Save the result image
# cv2.imwrite('dbg_detectedLines.png',colored_img)
# Return the detected lines with adjusted coordinates
return lines_list
def calculate_line_angle(line):
"""
计算线条与水平线x轴之间的角度。
:param line: 线条的两个端点坐标,格式为 [(x1, y1), (x2, y2)]
:return: 线条与水平线之间的角度以度为单位范围为0-180度
"""
# 提取两个点的坐标
(x1, y1), (x2, y2) = line
# 计算斜率,处理垂直线的情况
if x2 - x1 == 0:
return 90.0 # 垂直线
# 计算斜率
slope = (y2 - y1) / (x2 - x1)
# 计算角度(弧度)并转换为度
angle = np.arctan(slope) * 180 / np.pi
# 确保角度为正0-180度范围内
if angle < 0:
angle += 180
#转换成与相机中线之间的角度
return angle
def calculate_distance_point_to_line(point, line):
"""
计算一个点到一条线的距离。
:param point: 点的坐标,格式为 (x, y)
:param line: 线的两个端点坐标,格式为 [(x1, y1), (x2, y2)]
:return: 点到线的距离
"""
(x0, y0) = point
(x1, y1), (x2, y2) = line
numerator = abs((y2 - y1) * x0 - (x2 - x1) * y0 + x2 * y1 - y2 * x1)
denominator = np.sqrt((y2 - y1) ** 2 + (x2 - x1) ** 2)
return (numerator / denominator)*2.5+5
if __name__ == '__main__':
img = cv2.imread('dbg_233900303_z.png',cv2.IMREAD_UNCHANGED)
line_list = detect_lines_in_depth_image(img,(320,240,640,480),1100)
print(line_list)
for line in line_list:
angle = calculate_line_angle(line)
dist = calculate_distance_point_to_line((320,240),line)
print(f'angle={angle} dist={dist}')

View File

@@ -0,0 +1,84 @@
import cv2
from ultralytics import YOLO
# YOLOv8Detector类用于加载YOLO模型进行目标检测并在图像上绘制信息
class YOLOv8Detector:
def __init__(self):
"""
初始化YOLOv8检测器
:param classes: 自定义类别名称列表(暂未使用)
"""
self.model = YOLO(r"model/yolov8n.pt") # 使用YOLOv8官方预训练模型
# self.model = YOLO(r"model/best.pt") # 使用自训练模型
self.class_names = self.model.names # 获取类别名称
self._frame_counter = 0 # 帧计数器(用于调试打印)
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
# 执行YOLO目标检测并在图像中绘制检测框、中心坐标和尺寸信息不再过滤类别
# +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
def detect_and_draw(self, image):
height, width = image.shape[:2]
# 使用YOLO模型进行检测
results = self.model(image, imgsz=640)
frame_id = self._frame_counter
self._frame_counter += 1
result_info = [] # 存储每个目标的检测信息
# print(f"\n[FRAME {frame_id}] Detection Result:")
for result in results:
boxes = result.boxes
# 遍历所有检测框(不再按类别筛选)
for i in range(len(boxes)):
cls_id = int(boxes.cls[i].item()) # 类别索引
cls_name = self.class_names[cls_id] if cls_id < len(self.class_names) else f"class_{cls_id}"
score = float(boxes.conf[i].item()) # 置信度
# 1boxes.xyxy[i]提取第i个种类边界框坐标[x1, y1, x2, y2]2tolist()实现将PyTorch 的张量Tensor[x1, y1, x2, y2]转换为 普通的 Python 列表3使用 map() 函数将列表中的 浮点数坐标转换为整数坐标,因为像素坐标用于图像绘制时必须是整数。
x1_box, y1_box, x2_box, y2_box = map(int, boxes.xyxy[i].tolist())
# 计算中心点与宽高
x_center = (x1_box + x2_box) / 2
y_center = (y1_box + y2_box) / 2
bbox_width = x2_box - x1_box
bbox_height = y2_box - y1_box
# 打印调试信息
# print(f"[{cls_name}] Score: {score:.2f}")
# print(f" Center: ({x_center:.1f}, {y_center:.1f})")
# print(f" BBox: Width={bbox_width}px, Height={bbox_height}px")
# print(f" 左上角坐标: ({x1_box}px, {y1_box}px)")
# 绘制检测框绿色参数2表示线宽
cv2.rectangle(image, (x1_box, y1_box), (x2_box, y2_box), (0, 255, 0), 2)
# 绘制框右上角顶部标签(类别名+得分)
label = f"{cls_name} {score:.2f}"
cv2.putText(image, label, (x2_box, y1_box - 60),
cv2.FONT_HERSHEY_SIMPLEX, 2.5, (0, 255, 0), 3)
# 绘制框右上角顶部标签(类别名+得分)
cv2.putText(image, f"L-corner-coor:{x1_box, y1_box}", (x1_box, y1_box - 20),
cv2.FONT_HERSHEY_SIMPLEX, 2.5, (0, 255, 0), 3)
# 在框底部左下角显示中心坐标和尺寸信息
cv2.putText(image, f"Center: ({x_center:.1f}, {y_center:.1f})",
(x1_box, y2_box + 70),
cv2.FONT_HERSHEY_SIMPLEX, 2.5, (0, 255, 255), 3)
cv2.putText(image, f"Width: {bbox_width}px, Height: {bbox_height}px",
(x1_box, y2_box + 150),
cv2.FONT_HERSHEY_SIMPLEX, 2.5, (0, 255, 255), 3)
# 保存检测信息字典."L-corner-coor":(x1_box, y1_box)左上角坐标点
result_dict = {
"class": cls_name,
"score": score,
"center": (x_center, y_center),
"bbox": (bbox_width, bbox_height),
"L-corner-coor":(x1_box, y1_box)
}
result_info.append(result_dict)
return image, result_info

173
lib/alg/track_detection.py Normal file
View File

@@ -0,0 +1,173 @@
# 文件: tractor_vision/alg/track_detection.py
import cv2
import numpy as np
import math
from collections import deque
# ——— 相机标定参数 ———
FX = 474.0 # X轴焦距
FY = 505.0 # Y轴焦距
CX = 320.0 # CX 中心x轴中标
CY = 240.0 # CY 中心y轴座标
CAMERA_TILT_DEG = 10.0 # 向下为正
# ——— ROI & 阈值 ———
ROI = (190, 40, 250, 420)
MIN_DEPTH = 1100
ANGLE_TOL_DEG = 45
def preprocess_depth(depth, blur_ksize=5, kernel_size=7):
"""
- 中值滤波消噪
- 形态学闭运算填补空洞
"""
d = cv2.medianBlur(depth, blur_ksize)
kernel = np.ones((kernel_size, kernel_size), np.uint8)
closed = cv2.morphologyEx(d, cv2.MORPH_CLOSE, kernel)
d[d == 0] = closed[d == 0]
return d
def offset_after_rotation(line):
(x1, y1), (x2, y2) = line
phi = math.degrees(math.atan2(y2 - y1, x2 - x1))
rot = math.radians(90.0 - phi)
mx, my = (x1 + x2) / 2, (y1 + y2) / 2
dx, dy = mx - CX, my - CY
x_rot = dx * math.cos(rot) - dy * math.sin(rot)
return -x_rot, (int(mx), int(my))
def compute_metrics(depth):
# NOTE: 先强行对数据进行转换保持数据为uint16的副本
depth_mm = (
depth.copy().astype(np.uint16) if depth.dtype != np.uint16 else depth.copy()
)
# NOTE: 中值滤波消噪
den = preprocess_depth(depth_mm)
# NOTE: 裁剪出感兴趣的ROI区域进行分析
x, y, w, h = ROI
crop = den[y : y + h, x : x + w]
if not (crop >= MIN_DEPTH).any():
return None, None, None, None, depth_mm, ROI, None
# NOTE: 将深度裁剪区域 crop 归一化到0~255区间并转换为 uint8 类型,方便后续边缘检测。
# 归一化是为了适配 OpenCV 的边缘检测算法对8位图像的需求。
norm = cv2.normalize(crop, None, 0, 255, cv2.NORM_MINMAX).astype("uint8")
# NOTE: 对归一化后的图像执行 Canny 边缘检测,提取边缘轮廓。
# 50和150是边缘检测的低高阈值。
edges = cv2.Canny(norm, 50, 150)
lines = cv2.HoughLinesP(edges, 1, np.pi / 180, 30, minLineLength=50, maxLineGap=10)
# NOTE: 如果没检测边缘直线,则返回空
if lines is None:
return None, None, None, None, depth_mm, ROI, None
# NOTE: 将摄像机俯仰角(单位度)转为弧度,后续角度计算时用。
tilt = math.radians(CAMERA_TILT_DEG)
# NOTE:初始化空列表,存放符合角度条件的候选轨道线。
cands = []
# NOTE: 遍历霍夫检测到的所有直线,每条线由两个端点坐标 (x1_, y1_) 和 (x2_, y2_) 表示。
for x1_, y1_, x2_, y2_ in lines.reshape(-1, 4):
# NOTE:将裁剪区域的坐标偏移加回,转换为深度图整体坐标系的位置。
# 因为霍夫检测是在裁剪区域内需补偿ROI偏移。
p1 = (x1_ + x, y1_ + y)
p2 = (x2_ + x, y2_ + y)
# NOTE: 计算线段水平和垂直方向的向量分量
dx, dy = p2[0] - p1[0], p2[1] - p1[1]
# NOTE: 计算线段与垂直方向的角度(弧度)
raw = -math.atan2(dx, -dy)
# NOTE: 考虑摄像头俯仰角 tilt修正线段的角度。把线段角度投影到水平面得到真实的轨道角度。
grd = math.asin(math.sin(raw) * math.cos(tilt))
# NOTE: 角度转换成度数,并取绝对值。
ang = abs(math.degrees(grd))
# NOTE: 如果角度小于预设阈值 ANGLE_TOL_DEG即近似垂直轨道的线认为是候选轨道线添加到列表。
if ang <= ANGLE_TOL_DEG:
cands.append({"line": (p1, p2), "mid_x": (p1[0] + p2[0]) / 2})
# NOTE: 如果没找到符合角度的候选线,返回 None。
if not cands:
return None, None, None, None, depth_mm, ROI, None
# NOTE: 选择最左边的轨道线作为“最佳”轨道线,依据中点横坐标最小。
best = min(cands, key=lambda c: c["mid_x"])
# NOTE: 拆包出最佳轨道线的两个端点坐标。
line = best["line"]
(x1, y1), (x2, y2) = line
# 交点
dxl, dyl = x2 - x1, y2 - y1
# NOTE: 计算轨道线与摄像头中心线(垂直线 x=CX交点坐标。
# 如果轨道线不是垂直的,则通过线性插值求出交点 y 坐标。
# 如果轨道线垂直dxl=0交点为中点 y 坐标。
if dxl != 0:
t = (CX - x1) / dxl
y_int = y1 + t * dyl
inter = (int(CX), int(y_int))
else:
inter = (int(CX), int((y1 + y2) / 2))
# NOTE: 通过各种公式计算出角度和偏移亮
p_up = (x1, y1) if y1 < y2 else (x2, y2)
vx, vy = p_up[0] - inter[0], p_up[1] - inter[1]
raw2 = math.atan2(vx, -vy)
grd2 = math.asin(math.sin(raw2) * math.cos(tilt))
signed = math.degrees(grd2)
off_px, midpt = offset_after_rotation(line)
pts = [
np.clip(pt, [0, 0], [depth_mm.shape[1] - 1, depth_mm.shape[0] - 1])
for pt in line
]
Zs = [float(depth_mm[pt[1], pt[0]]) for pt in pts]
Zavg = np.mean([z for z in Zs if z > 0]) if any(z > 0 for z in Zs) else MIN_DEPTH
off_mm = off_px * Zavg / FX * math.cos(tilt)
return signed, off_mm, line, midpt, depth_mm, ROI, inter
class TrackDetector:
def __init__(self, camera_list, alpha=0.6, win_raw=5, win_smooth=5, interval=0.2):
self.alpha, self.interval = alpha, interval
self.ang_raw = {sn: deque(maxlen=win_raw) for sn in camera_list}
self.off_raw = {sn: deque(maxlen=win_raw) for sn in camera_list}
self.ang_smooth = {sn: deque(maxlen=win_smooth) for sn in camera_list}
self.off_smooth = {sn: deque(maxlen=win_smooth) for sn in camera_list}
self.last_ang = {sn: None for sn in camera_list}
self.last_off = {sn: None for sn in camera_list}
self.last_t = {sn: 0.0 for sn in camera_list}
def process(self, depth, sn):
# TODO: 先进行对当前时钟判断如果小于interval则返回空
now = cv2.getTickCount() / cv2.getTickFrequency()
if now - self.last_t[sn] < self.interval:
return None
self.last_t[sn] = now
res = compute_metrics(depth)
if res[0] is None:
return None
ang, off, line, midpt, _, _, inter = res
# TODO: 平滑
self.ang_raw[sn].append(ang)
self.off_raw[sn].append(off)
med_ang = float(np.median(self.ang_raw[sn]))
med_off = float(np.median(self.off_raw[sn]))
pa, po = self.last_ang[sn], self.last_off[sn]
ang_exp = (
med_ang if pa is None else self.alpha * med_ang + (1 - self.alpha) * pa
)
off_exp = (
med_off if po is None else self.alpha * med_off + (1 - self.alpha) * po
)
self.last_ang[sn], self.last_off[sn] = ang_exp, off_exp
self.ang_smooth[sn].append(ang_exp)
self.off_smooth[sn].append(off_exp)
disp_ang = float(np.median(self.ang_smooth[sn]))
disp_off = float(np.median(self.off_smooth[sn]))
return {
"angle": -disp_ang, # 左正右负
"offset": disp_off,
"line": line,
"midpoint": midpt,
"intersection": inter,
}

294
lib/camera/ArenaCamera.py Normal file
View File

@@ -0,0 +1,294 @@
import ctypes
import time
import numpy as np
import cv2
# 定义错误代码
ARENA_ACQ_SUCCESS = 0
# 打印错误信息
def print_error(operation, error):
print(f"{operation} failed with error code: {error}")
# 重连机制函数
def retry_operation(operation_func, handle, *args, max_retries=1, delay=1):
"""
重试机制函数
:param operation_func: 要执行的操作函数
:param handle: 相机句柄
:param args: 操作函数的参数
:param max_retries: 最大重试次数
:param delay: 重试间隔时间(秒)
:return: 操作结果
"""
for attempt in range(max_retries + 1):
err = operation_func(handle, *args)
if err == ARENA_ACQ_SUCCESS:
return True
else:
print(f"Attempt {attempt + 1}/{max_retries} failed. Retrying in {delay} seconds...")
time.sleep(delay)
print_error(operation_func.__name__, err)
return False
class ArenaCamera:
# 静态变量存储共享库
libArenaAcq = ctypes.CDLL('libArenaAcq.so')
sys_handle = ctypes.c_void_p()
functions_initialized = False # Class-level flag
def __init__(self, sn):
self.sn = sn
self.handle = ctypes.c_void_p()
self.count = 0
# Initialize function argument types and return types if not already done
if not ArenaCamera.functions_initialized:
self._initialize_functions()
ArenaCamera.functions_initialized = True
@classmethod
def _initialize_functions(cls):
# 定义 ArenaAcq_Initialize
cls.libArenaAcq.ArenaAcq_Initialize.argtypes = [ctypes.POINTER(ctypes.c_void_p)]
cls.libArenaAcq.ArenaAcq_Initialize.restype = ctypes.c_int
# 定义 ArenaAcq_Connect
cls.libArenaAcq.ArenaAcq_Connect.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p),ctypes.c_char_p]
cls.libArenaAcq.ArenaAcq_Connect.restype = ctypes.c_int
# 定义 ArenaAcq_SetProperty
cls.libArenaAcq.ArenaAcq_SetProperty.argtypes = [
ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p
]
cls.libArenaAcq.ArenaAcq_SetProperty.restype = ctypes.c_int
# 定义 ArenaAcq_GetProperty
cls.libArenaAcq.ArenaAcq_GetProperty.argtypes = [
ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_size_t)
]
cls.libArenaAcq.ArenaAcq_GetProperty.restype = ctypes.c_int
# 定义 ArenaAcq_StartAcquisitionStream
cls.libArenaAcq.ArenaAcq_StartAcquisitionStream.argtypes = [ctypes.c_void_p]
cls.libArenaAcq.ArenaAcq_StartAcquisitionStream.restype = ctypes.c_int
# 定义 ArenaAcq_GetDepthImage
cls.libArenaAcq.ArenaAcq_GetDepthImage.argtypes = [
ctypes.c_void_p,
ctypes.POINTER(ctypes.POINTER(ctypes.c_uint16)),
ctypes.POINTER(ctypes.POINTER(ctypes.c_uint16)),
ctypes.POINTER(ctypes.POINTER(ctypes.c_uint16)),
ctypes.POINTER(ctypes.c_size_t),
ctypes.POINTER(ctypes.c_size_t),
ctypes.POINTER(ctypes.c_size_t),
ctypes.POINTER(ctypes.c_size_t)
]
cls.libArenaAcq.ArenaAcq_GetDepthImage.restype = ctypes.c_int
# 定义 ArenaAcq_GetImage
cls.libArenaAcq.ArenaAcq_GetImage.argtypes = [
ctypes.c_void_p,
ctypes.POINTER(ctypes.POINTER(ctypes.c_uint16)),
ctypes.POINTER(ctypes.c_size_t),
ctypes.POINTER(ctypes.c_size_t),
ctypes.POINTER(ctypes.c_size_t),
ctypes.POINTER(ctypes.c_size_t)
]
cls.libArenaAcq.ArenaAcq_GetImage.restype = ctypes.c_int
# 定义 ArenaAcq_ReleaseImage
cls.libArenaAcq.ArenaAcq_ReleaseImage.argtypes = [ctypes.POINTER(ctypes.c_uint8)]
cls.libArenaAcq.ArenaAcq_ReleaseImage.restype = ctypes.c_int
cls.libArenaAcq.ArenaAcq_ReleaseHandle.argtypes = [ctypes.POINTER(ctypes.c_void_p)]
cls.libArenaAcq.ArenaAcq_ReleaseHandle.restype = ctypes.c_int
cls.libArenaAcq.ArenaAcq_ReleaseDevice.argtypes = [ctypes.POINTER(ctypes.c_void_p)]
cls.libArenaAcq.ArenaAcq_ReleaseDevice.restype = ctypes.c_int
# 定义 ArenaAcq_StopAcquisitionStream
cls.libArenaAcq.ArenaAcq_StopAcquisitionStream.argtypes = [ctypes.c_void_p]
cls.libArenaAcq.ArenaAcq_StopAcquisitionStream.restype = ctypes.c_int
# 定义 ArenaAcq_Disconnect
cls.libArenaAcq.ArenaAcq_Disconnect.argtypes = [ctypes.c_void_p,ctypes.c_void_p]
cls.libArenaAcq.ArenaAcq_Disconnect.restype = ctypes.c_int
# 定义 ArenaAcq_Cleanup
cls.libArenaAcq.ArenaAcq_Cleanup.argtypes = [ctypes.c_void_p]
cls.libArenaAcq.ArenaAcq_Cleanup.restype = ctypes.c_int
cls.libArenaAcq.ArenaAcq_SaveDepthImageAsPGM.argtypes = [ctypes.POINTER(ctypes.c_uint16),ctypes.c_size_t,ctypes.c_size_t,ctypes.c_char_p]
cls.libArenaAcq.ArenaAcq_SaveDepthImageAsPGM.restype = ctypes.c_int
# 初始化库
if not retry_operation(cls.libArenaAcq.ArenaAcq_Initialize, ctypes.byref(cls.sys_handle)):
print(f"Initialization failed")
cls.libArenaAcq.ArenaAcq_Cleanup(cls.sys_handle)
cls.libArenaAcq.ArenaAcq_ReleaseHandle(cls.sys_handle)
return
print(f"Library initialized successfully")
@classmethod
def shutdown(cls):
cls.libArenaAcq.ArenaAcq_Cleanup(cls.sys_handle)
print(f"Library shutdown successfully")
def create(self):
# 连接到相机
if not retry_operation(self.libArenaAcq.ArenaAcq_Connect,self.sys_handle, self.handle, self.sn.encode('utf-8')):
print(f"Connection failed for camera {self.sn}")
self.libArenaAcq.ArenaAcq_Cleanup(self.handle)
return
print(f"Connected to camera {self.sn} successfully")
# 开始采集流
if not retry_operation(self.libArenaAcq.ArenaAcq_StartAcquisitionStream, self.handle):
print(f"Start acquisition failed for camera {self.sn}")
self.destroy()
return
print(f"Acquisition started successfully for camera {self.sn}")
def destroy(self):
# 停止采集流
if not retry_operation(self.libArenaAcq.ArenaAcq_StopAcquisitionStream, self.handle):
print(f"Stop acquisition failed for camera {self.sn}")
self.libArenaAcq.ArenaAcq_Disconnect(self.sys_handle,self.handle)
self.libArenaAcq.ArenaAcq_Cleanup(self.handle)
return
print(f"Acquisition stopped successfully for camera {self.sn}")
# 断开与相机的连接
if not retry_operation(self.libArenaAcq.ArenaAcq_Disconnect,self.sys_handle,self.handle):
print(f"Disconnection failed for camera {self.sn}")
self.libArenaAcq.ArenaAcq_Cleanup(self.handle)
return
print(f"Disconnected from camera {self.sn} successfully")
# 清理资源
if not retry_operation(self.libArenaAcq.ArenaAcq_Cleanup, self.handle):
print(f"Cleanup failed for camera {self.sn}")
return
def read(self):
image_data = ctypes.POINTER(ctypes.c_uint16)()
image_size = ctypes.c_size_t()
width = ctypes.c_size_t()
height = ctypes.c_size_t()
bits_per_pixel = ctypes.c_size_t()
if not retry_operation(self.libArenaAcq.ArenaAcq_GetImage, self.handle, ctypes.byref(image_data),
ctypes.byref(image_size), ctypes.byref(width), ctypes.byref(height),
ctypes.byref(bits_per_pixel)):
print(f"Image acquisition failed for camera {self.sn}")
return None
if image_size.value<=0:
print(f"Image size is 0 for camera {self.sn}")
return None
# 将图像数据转换为OpenCV格式
image_array = ctypes.cast(image_data, ctypes.POINTER(ctypes.c_uint16 * (width.value * height.value))).contents
image_np = np.array(image_array).reshape(height.value, width.value)
image_np = np.copy(image_np)
self.count=self.count+1
# 释放图像资源
self.libArenaAcq.ArenaAcq_ReleaseImage(ctypes.cast(image_data, ctypes.POINTER(ctypes.c_uint8)))
return image_np
def capture(self):
image_data = ctypes.POINTER(ctypes.c_uint16)()
x_image_data = ctypes.POINTER(ctypes.c_uint16)()
y_image_data = ctypes.POINTER(ctypes.c_uint16)()
image_size = ctypes.c_size_t()
width = ctypes.c_size_t()
height = ctypes.c_size_t()
bits_per_pixel = ctypes.c_size_t()
if not retry_operation(self.libArenaAcq.ArenaAcq_GetDepthImage, self.handle, ctypes.byref(image_data),
ctypes.byref(x_image_data), ctypes.byref(y_image_data),
ctypes.byref(image_size), ctypes.byref(width), ctypes.byref(height),
ctypes.byref(bits_per_pixel)):
print(f"Image acquisition failed for camera {self.sn}")
return None, None, None
if image_size.value<=0:
return None,None,None
# 将图像数据转换为OpenCV格式
depth_image_array = ctypes.cast(image_data, ctypes.POINTER(ctypes.c_uint16 * (width.value * height.value))).contents
x_image_array = ctypes.cast(x_image_data, ctypes.POINTER(ctypes.c_uint16 * (width.value * height.value))).contents
y_image_array = ctypes.cast(y_image_data, ctypes.POINTER(ctypes.c_uint16 * (width.value * height.value))).contents
depth_image_np = np.array(depth_image_array).reshape(height.value, width.value)
x_image_np = np.array(x_image_array).reshape(height.value, width.value)
y_image_np = np.array(y_image_array).reshape(height.value, width.value)
depth_image_np = np.copy(depth_image_np)
x_image_np = np.copy(x_image_np)
y_image_np = np.copy(y_image_np)
# print("copied.")
# self.libArenaAcq.ArenaAcq_SaveDepthImageAsPGM(image_data,width,height,f'dbg_{self.sn}_{self.count}'.encode('utf-8'))
self.count=self.count+1
# 释放图像资源
self.libArenaAcq.ArenaAcq_ReleaseImage(ctypes.cast(image_data, ctypes.POINTER(ctypes.c_uint8)))
self.libArenaAcq.ArenaAcq_ReleaseImage(ctypes.cast(x_image_data, ctypes.POINTER(ctypes.c_uint8)))
self.libArenaAcq.ArenaAcq_ReleaseImage(ctypes.cast(y_image_data, ctypes.POINTER(ctypes.c_uint8)))
return x_image_np, y_image_np, depth_image_np
if __name__ == '__main__':
cam2 = ArenaCamera("233801341") # Hypothetical second camera serial number
cam2.create()
cam1 = ArenaCamera("233800056")
cam1.create()
# cv2.namedWindow("Z1",cv2.WINDOW_AUTOSIZE)
# cv2.namedWindow("Z2",cv2.WINDOW_AUTOSIZE)
i=0
while i<20:
raw1 = cam1.read()
X2, Y2, Z2 = cam2.capture()
# X1, Y1, Z1 = cam1.capture()
# Display images for the first camera
# cv2.imshow("X1", X1)
# cv2.imshow("Y1", Y1)
# cv2.imshow("Z1", Z1)
# Display images for the second camera
# cv2.imshow("X2", X2)
# cv2.imshow("Y2", Y2)
# cv2.imshow("Z2", Z2)
# if cv2.waitKey(0) & 0xFF == ord('q'):
# break
# cv2.imwrite("z1.png",Z1)
cv2.imwrite("z2.png",Z2)
# time.sleep(3)
i=i+1
cv2.imwrite("raw1.png",raw1)
print(f'read {i}')
cv2.destroyAllWindows()
cam1.destroy()
cam2.destroy()
ArenaCamera.shutdown()

View File

@@ -0,0 +1,153 @@
import sys
import platform
from PySide6.QtWidgets import QApplication, QMainWindow, QLabel
from PySide6.QtCore import Qt
from PySide6.QtGui import QImage, QPixmap
from PySide6.QtCore import QTimer
import numpy as np
import ctypes
from lib.camera.mvsdk import *
from lib.camera import mvsdk
class MindVisionCamera:
def __init__(self, sn):
self.sn = sn # 相机序列号
self.hCamera = None # 相机句柄
self.FrameBufferSize = None # 帧缓冲区大小
self.pFrameBuffer = None # 帧缓冲区指针
self.monoCamera = None # 是否为单色相机标志
self.cap = None # 相机能力参数
# Initialize camera
self._initialize_camera()
def _initialize_camera(self):
# Enumerate cameras
DevList = CameraEnumerateDevice()
nDev = len(DevList)
if nDev < 1:
print("No camera was found!")
return
for i, DevInfo in enumerate(DevList):
# 检索指定sn相机
if DevInfo.GetSn() == self.sn:
print(f"Found camera with SN: {self.sn}")
break
else:
print(f"Camera with SN: {self.sn} not found!")
return
# Open camera
try:
self.hCamera = CameraInit(DevInfo, -1, -1)
except CameraException as e:
print("CameraInit Failed({}): {}".format(e.error_code, e.message))
return
# 获取相机参数
self.cap = CameraGetCapability(self.hCamera)
# 判断是否为单色相机
self.monoCamera = self.cap.sIspCapacity.bMonoSensor != 0
if self.monoCamera:
CameraSetIspOutFormat(self.hCamera, CAMERA_MEDIA_TYPE_MONO8)
CameraSetTriggerMode(self.hCamera, 0) # 设置触发模式为连续采集
CameraSetAeState(self.hCamera, 1) # 关闭自动曝光
# CameraSetExposureTime(self.hCamera, 30 * 1000) # 设置曝光时间为30ms
CameraPlay(self.hCamera) # 开始采集
# 分配帧缓冲区
self.FrameBufferSize = (
self.cap.sResolutionRange.iWidthMax
* self.cap.sResolutionRange.iHeightMax
* (1 if self.monoCamera else 3)
)
self.pFrameBuffer = CameraAlignMalloc(self.FrameBufferSize, 16)
def capture(self):
try:
# 从相机获取图像数据2000ms超时返回原始数据指针和帧头信息
pRawData, FrameHead = CameraGetImageBuffer(self.hCamera, 2000)
CameraImageProcess(self.hCamera, pRawData, self.pFrameBuffer, FrameHead)
# 反转
if platform.system() == "Windows":
mvsdk.CameraFlipFrameBuffer(self.pFrameBuffer, FrameHead, 1)
# 此时图片已经存储在pFrameBuffer中 对于彩色相机pFrameBuffer=RGB数据 黑白相机pFrameBuffer=8位灰度数据
# 将图像数据转换为OpenCV格式
image_array = (mvsdk.c_ubyte * FrameHead.uBytes).from_address(
self.pFrameBuffer
)
if self.monoCamera:
image_np = np.array(image_array).reshape(
FrameHead.iHeight, FrameHead.iWidth
)
else:
image_np = np.array(image_array).reshape(
FrameHead.iHeight, FrameHead.iWidth, 3
)
# Convert to OpenCV format
if FrameHead.uiMediaType == CAMERA_MEDIA_TYPE_MONO8:
image = np.frombuffer(image_np, dtype=np.uint8).reshape(
FrameHead.iHeight, FrameHead.iWidth
)
else:
image = np.frombuffer(image_np, dtype=np.uint8).reshape(
FrameHead.iHeight, FrameHead.iWidth, 3
)
CameraReleaseImageBuffer(self.hCamera, pRawData)
return image
except CameraException as e:
print("CameraGetImageBuffer failed({}): {}".format(e.error_code, e.message))
return None
def __del__(self):
if self.hCamera:
CameraUnInit(self.hCamera)
if self.pFrameBuffer:
CameraAlignFree(self.pFrameBuffer)
# class MainWindow(QMainWindow):
# def __init__(self, camera):
# super().__init__()
# self.setWindowTitle("MindVision Camera")
# self.setGeometry(100, 100, 800, 600)
# self.label = QLabel(self)
# self.label.setAlignment(Qt.AlignCenter)
# self.label.setScaledContents(True) # 自动缩放图像以适应标签大小
# self.camera = camera
# self.timer = QTimer(self)
# self.timer.timeout.connect(self.update_frame)
# self.timer.start(30) # Update frame every 30ms
# def closeEvent(self, event):
# self.timer.stop()
# self.camera.__del__()
# event.accept()
# def resizeEvent(self, event):
# super().resizeEvent(event)
# self.update_frame() # 窗口大小改变时更新图像
# def update_frame(self):
# image = self.camera.capture()
# if image is not None:
# if self.camera.monoCamera:
# qimage = QImage(image.data, image.shape[1], image.shape[0], QImage.Format_Grayscale8)
# else:
# qimage = QImage(image.data, image.shape[1], image.shape[0], QImage.Format_BGR888)
# pixmap = QPixmap.fromImage(qimage)
# self.label.setPixmap(pixmap)
# self.label.resize(self.size()) # 调整标签大小以适应窗口
# if __name__ == '__main__':
# app = QApplication(sys.argv)
# cam = MindVisionCamera("054042423002")
# window = MainWindow(cam)
# window.show()
# sys.exit(app.exec())

0
lib/camera/__init__.py Normal file
View File

2394
lib/camera/mvsdk.py Normal file

File diff suppressed because it is too large Load Diff

0
lib/cfg/__init__.py Normal file
View File

42
lib/cfg/cfg.py Normal file
View File

@@ -0,0 +1,42 @@
import sys
from pathlib import Path
import enum
project_root = Path(__file__).resolve().parents[3]
sys.path.insert(0, str(project_root))
# 键定义与映射
OBSTACLE_KEYS = ["f_l_obstacle", "f_r_obstacle", "b_l_obstacle", "b_r_obstacle"]
RAIL_KEYS = ["f_r_rail", "b_l_rail"]
TITLE2D_KEY = {"前2D": "front_2d", "后2D": "rear_2d"}
MAPPING = {
"前左避障": "f_l_obstacle",
"前右避障": "f_r_obstacle",
"后左避障": "b_l_obstacle",
"后右避障": "b_r_obstacle",
"前右上轨": "f_r_rail",
"后左上轨": "b_l_rail",
}
CAMERA_3D_CFGS = project_root + "config/camera_sn.json"
CAMERA_2D_CFFS = project_root + "config/2d_camera.json"
# TODO: TCP服务器配置
HOST = "0.0.0.0"
PORT = 65444
# TODO: 帧率控制配置
PERIOD = 0.1
class VisionMode(enum):
OBSTACLE_DETECTION = 1 # 前后四个3d相机开启障碍识别 测距
FRONT_2D_DETECTION = 2 # 前面开始2d检测相机 后面依然是距离检测
REAR_2D_DETECTION = 3 # 与前者相反
TRACK_INSPECTION = 4 # 轨道键测
class CameraControl(enum):
DESTORY = 0
CAPTURE = 1

0
lib/gui/__init__.py Normal file
View File

0
lib/io/__init__.py Normal file
View File

30
lib/io/process2d.py Normal file
View File

@@ -0,0 +1,30 @@
from multiprocessing import Process
from lib.camera.MindVisionCamera import MindVisionCamera
from lib.cfg.cfg import CameraControl
class Process2D(Process):
def __init__(self, cfg, in_q, out_q):
# 设置进程名称
super().__init__(name=f"3D-{cfg['title']}")
self.cfg = cfg
self.in_q = in_q
self.out_q = out_q
self.camera = MindVisionCamera(cfg["sn"])
self.status = 0
# 仅对轨道摄像头启用 TrackDetector
def run(self):
while True:
sig = self.in_q.get()
if sig == CameraControl.CAPTURE:
img = self.camera.capture()
self.out_q.put(img)
elif sig == CameraControl.DESTORY:
for fn in ("destroy", "close", "release"):
if hasattr(self.camera, fn):
try:
getattr(self.camera, fn)()
except:
pass
break

26
lib/io/process3d.py Normal file
View File

@@ -0,0 +1,26 @@
from multiprocessing import Process
from collections import deque
from lib.camera.ArenaCamera import ArenaCamera
from lib.cfg.cfg import CameraControl
class Process3D(Process):
def __init__(self, cfg, in_q: deque, out_q: deque):
# 设置进程名称
super().__init__(name=f"3D-{cfg['title']}")
self.cfg = cfg
self.in_q = in_q
self.out_q = out_q
self.camera = ArenaCamera(cfg["sn"])
self.status = 0
# 仅对轨道摄像头启用 TrackDetector
def run(self):
while True:
sig = self.in_q.get()
if sig == CameraControl.CAPTURE:
_, _, dep_img = self.camera.capture()
self.out_q.put(dep_img)
elif sig == CameraControl.DESTORY:
self.camera.destroy()
break

0
lib/mod/__init__.py Normal file
View File

View File

211
lib/presenter/presenter.py Normal file
View File

@@ -0,0 +1,211 @@
from ctypes import set_errno
import datetime
from lib.alg.image_processing_3d import detect_obstacles_in_box
import time
from multiprocessing import Manager
import json
from lib.io import process2d
from lib.io import process3d
from lib.io.process3d import Process3D
from lib.io.process2d import Process2D
from collections import deque, OrderedDict
import numpy as np
import socket
from lib.camera.ArenaCamera import ArenaCamera
from lib.tcp.tcp_server import TcpServer
import open3d as o3d
from lib.cfg.cfg import (
CAMERA_2D_CFFS,
CAMERA_3D_CFGS,
CameraControl,
VisionMode,
HOST,
PORT,
OBSTACLE_KEYS,
MAPPING,
RAIL_KEYS,
TITLE2D_KEY,
PERIOD,
)
# NOTE: Presentre类
# 1. 管理相机进程通过往相机进程的输入队列中发送数据控制相机采集数据没发送一次1相机通过输出队列返回一个图片数据
# 2. 接收Tcp客户端发送过来的数据来切换模式根据不同的模式采集不同的图像并解析成数据
# 3. 将解析好的数据通过TCP服务发送出去
class Presenter:
def __init__(self) -> None:
# TODO: 初始化进程队列
self.mode = VisionMode.OBSTACLE_RECO
self.process3d_info = {}
self.process2d_info = {}
mgr = Manager()
with open(CAMERA_3D_CFGS, encoding="utf-8") as f:
cfg3d = json.load(f)["camera"]
for cfg in cfg3d:
in_q = mgr.Queue()
out_q = mgr.Queue()
pro = Process3D(cfg["sn"], in_q, out_q)
self.process3d_info[cfg["title"]] = pro
pro.start()
with open(CAMERA_2D_CFFS, encoding="utf-8") as f:
cfg2d = json.load(f)["camera"]
for cfg in cfg2d:
in_q = mgr.Queue()
out_q = mgr.Queue()
pro = Process2D(cfg["sn"], in_q, out_q)
self.process2d_info[cfg["title"]] = pro
pro.start()
# NOTE: 障碍物状态历史队列
# 前左前右障碍物距离检测
# 轨道检测数据历史队列
# 轨道数据数值历史队列
# 2d检测数值队列
self.hist_ok = {k: deque(maxlen=10) for k in OBSTACLE_KEYS}
self.last_d = {k: None for k in OBSTACLE_KEYS}
self.hist_rail = {k: deque(maxlen=5) for k in RAIL_KEYS}
self.last_rail = {k: {"offset": None, "angle": None} for k in RAIL_KEYS}
self.two_d_hist = {k: deque(maxlen=10) for k in TITLE2D_KEY.values()}
def get_camera_data(self):
pass
def handle_camera_3d_data(self):
pass
def handle_camera_2d_data(self):
pass
def front_mode_data_handle(self, pkt: OrderedDict):
pass
def rear_mode_data_handle(self, pkt: OrderedDict):
pass
def obstacle_mode_data_handle(self, pkt: OrderedDict):
"""1.获取所有3d避障相机的数据"""
obstacle_depth_img = {}
for key in self.process3d_info.keys():
""" 过滤掉所有上轨的相机 """
if key.endswith("上轨"):
continue
self.process3d_info[key].in_q.put(CameraControl.CAPTURE)
obstacle_depth_img[key] = self.process3d_info[key].out_q().get()
"""2. 通过算法处理图像数据 这段算法应该跑在进程里 """
for key in obstacle_depth_img.keys():
intrinsic = o3d.camera.PinholeCameraIntrinsic(640, 480, 474, 505, 320, 240)
img = o3d.geometry.Image(obstacle_depth_img[key].astype(np.float32))
pcd = o3d.geometry.PointCloud.create_from_depth_image(
img, intrinsic, depth_scale=1000.0, depth_trunc=8.0
)
# TODO: 绘制矩形区域,检测区域内障碍物
if key.startswith(""):
box = (np.array([-1050, -600, 500]), np.array([1050, 1000, 6000]))
else:
box = (np.array([-800, -600, 800]), np.array([800, 1100, 6000]))
nearest, _ = detect_obstacles_in_box(pcd, box[0], box[1], 640, 480)
if nearest:
d = float(np.linalg.norm(nearest["position"]))
res = {"distance": round(d, 2), "status": "NG"}
else:
res = {"distance": None, "status": "OK"}
""" 往障碍识别状态里添加历史记录, 只保存前10帧, 以及保存上一次的距离值 """
ok = res["status"] == "OK"
self.hist_ok[MAPPING[key]].append(ok)
if not ok:
self.last_d[MAPPING[key]] = res["distance"]
def wait_rec_tcp_data(self):
pass
def send_tcp_data(self):
pass
# TODO: 对tcp发回的数据进行按行处理并且返回一个数组(不返回了,它接收数据只是为了切换模式)
def rec_tcp_data_handle(self, data):
if data:
data += data.decode("utf-8", errors="ignore")
while "\n" in data:
line, data = data.split("\n", 1)
if not line.strip():
continue
try:
cmd = json.loads(line)
print(f"[SERVER] Cmd: {cmd}")
front = cmd.get("FrontCouplerSignal", False)
rear = cmd.get("RearCouplerSignal", False)
obs = cmd.get("ObstacleDetection", False)
if obs:
self.mode = VisionMode.OBSTACLE_DETECTION
elif front:
self.mode = VisionMode.FRONT_2D_DETECTION
elif rear:
self.mode = VisionMode.REAR_2D_DETECTION
except json.JSONDecodeError:
pass
def run(self):
# TODO: 初始化TCP服务和收收数据缓存
server = TcpServer(host="0.0.0.0", port=65444)
tcp_rec_buf = ""
tcp_send_buf = ""
pkt = OrderedDict() # tcp发送数据包
try:
server.accept_client()
while True:
# TODO: next_time 记录时钟控制帧率
next_time = time.perf_counter() + PERIOD
try:
# TODO: 接收tcp接收的数据根据数据并转换模式
tcp_rec_buf = server.recv_data()
if tcp_rec_buf:
self.rec_tcp_data_handle(tcp_rec_buf)
elif not tcp_rec_buf:
print("Client disconnected gracefully")
continue
except ConnectionResetError:
print("Warring: clietn force disconnect!!! ")
break
except socket.error as e:
print(f"Net Error: {e}")
break
# 清空发送包
pkt.clear()
pkt["time_str"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# TODO: 根据模式发送不同的数据
if self.mode == VisionMode.OBSTACLE_DETECTION:
self.obstacle_mode_data_handle(pkt)
elif self.mode == VisionMode.FRONT_2D_DETECTION:
self.front_mode_data_handle(pkt)
elif self.mode == VisionMode.REAR_2D_DETECTION:
self.rear_mode_data_handle(pkt)
# TODO: tcp发送数据
try:
tcp_send_buf = (json.dumps(pkt, ensure_ascii=False) + "\n").encode()
except TypeError as e:
print(f"JSON encode failed: {e}")
tcp_send_buf = b"{}"
server.send_data(tcp_send_buf)
# TODO: 控制帧率
now = time.perf_counter()
wait = next_time - now
if wait > 0:
time.sleep(wait)
next_time += PERIOD
except KeyboardInterrupt:
print("KeyboardInterruptCtrl+C shutting down")
finally:
for key in self.process3d_info.keys():
self.process3d_info[key].in_q.put(0)
for key in self.process2d_info.keys():
self.process2d_info[key].in_q.put(0)
ArenaCamera.shutdown()
print("关闭连接")
server.close()

0
lib/tcp/__init__.py Normal file
View File

26
lib/tcp/tcp_server.py Normal file
View File

@@ -0,0 +1,26 @@
import socket
class TcpServer:
def __init__(self, host="0.0.0.0", port=65444) -> None:
self.host = host
self.port = port
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.bind((self.host, self.port))
self.sock.listen(1) # 允许最多一个连接(可改大)
print(f"Listening on {self.host}:{self.port}")
def accept_client(self):
self.conn, self.addr = self.sock.accept()
print(f"Connected by {self.addr}")
def recv_data(self, bufsize=4096):
data = self.conn.recv(bufsize)
return data
def send_data(self, data: bytes):
self.conn.sendall(data)
def close(self):
self.conn.close()
self.sock.close()