174 lines
7.0 KiB
Python
174 lines
7.0 KiB
Python
# 文件: tractor_vision/alg/track_detection.py
|
||
import cv2
|
||
import numpy as np
|
||
import math
|
||
from collections import deque
|
||
|
||
# ——— 相机标定参数 ———
|
||
FX = 474.0 # X轴焦距
|
||
FY = 505.0 # Y轴焦距
|
||
CX = 320.0 # CX 中心x轴中标
|
||
CY = 240.0 # CY 中心y轴座标
|
||
CAMERA_TILT_DEG = 10.0 # 向下为正
|
||
|
||
# ——— ROI & 阈值 ———
|
||
ROI = (190, 40, 250, 420)
|
||
MIN_DEPTH = 1100
|
||
ANGLE_TOL_DEG = 45
|
||
|
||
|
||
def preprocess_depth(depth, blur_ksize=5, kernel_size=7):
|
||
"""
|
||
- 中值滤波消噪
|
||
- 形态学闭运算填补空洞
|
||
"""
|
||
d = cv2.medianBlur(depth, blur_ksize)
|
||
kernel = np.ones((kernel_size, kernel_size), np.uint8)
|
||
closed = cv2.morphologyEx(d, cv2.MORPH_CLOSE, kernel)
|
||
d[d == 0] = closed[d == 0]
|
||
return d
|
||
|
||
|
||
def offset_after_rotation(line):
|
||
(x1, y1), (x2, y2) = line
|
||
phi = math.degrees(math.atan2(y2 - y1, x2 - x1))
|
||
rot = math.radians(90.0 - phi)
|
||
mx, my = (x1 + x2) / 2, (y1 + y2) / 2
|
||
dx, dy = mx - CX, my - CY
|
||
x_rot = dx * math.cos(rot) - dy * math.sin(rot)
|
||
return -x_rot, (int(mx), int(my))
|
||
|
||
|
||
def compute_metrics(depth):
|
||
# NOTE: 先强行对数据进行转换,保持数据为uint16的副本
|
||
depth_mm = (
|
||
depth.copy().astype(np.uint16) if depth.dtype != np.uint16 else depth.copy()
|
||
)
|
||
# NOTE: 中值滤波消噪
|
||
den = preprocess_depth(depth_mm)
|
||
# NOTE: 裁剪出感兴趣的ROI区域进行分析
|
||
x, y, w, h = ROI
|
||
crop = den[y : y + h, x : x + w]
|
||
if not (crop >= MIN_DEPTH).any():
|
||
return None, None, None, None, depth_mm, ROI, None
|
||
|
||
# NOTE: 将深度裁剪区域 crop 归一化到0~255区间,并转换为 uint8 类型,方便后续边缘检测。
|
||
# 归一化是为了适配 OpenCV 的边缘检测算法对8位图像的需求。
|
||
norm = cv2.normalize(crop, None, 0, 255, cv2.NORM_MINMAX).astype("uint8")
|
||
# NOTE: 对归一化后的图像执行 Canny 边缘检测,提取边缘轮廓。
|
||
# 50和150是边缘检测的低高阈值。
|
||
edges = cv2.Canny(norm, 50, 150)
|
||
lines = cv2.HoughLinesP(edges, 1, np.pi / 180, 30, minLineLength=50, maxLineGap=10)
|
||
# NOTE: 如果没检测边缘直线,则返回空
|
||
if lines is None:
|
||
return None, None, None, None, depth_mm, ROI, None
|
||
# NOTE: 将摄像机俯仰角(单位度)转为弧度,后续角度计算时用。
|
||
tilt = math.radians(CAMERA_TILT_DEG)
|
||
# NOTE:初始化空列表,存放符合角度条件的候选轨道线。
|
||
cands = []
|
||
# NOTE: 遍历霍夫检测到的所有直线,每条线由两个端点坐标 (x1_, y1_) 和 (x2_, y2_) 表示。
|
||
for x1_, y1_, x2_, y2_ in lines.reshape(-1, 4):
|
||
# NOTE:将裁剪区域的坐标偏移加回,转换为深度图整体坐标系的位置。
|
||
# 因为霍夫检测是在裁剪区域内,需补偿ROI偏移。
|
||
p1 = (x1_ + x, y1_ + y)
|
||
p2 = (x2_ + x, y2_ + y)
|
||
# NOTE: 计算线段水平和垂直方向的向量分量
|
||
dx, dy = p2[0] - p1[0], p2[1] - p1[1]
|
||
# NOTE: 计算线段与垂直方向的角度(弧度)
|
||
raw = -math.atan2(dx, -dy)
|
||
# NOTE: 考虑摄像头俯仰角 tilt,修正线段的角度。把线段角度投影到水平面,得到真实的轨道角度。
|
||
grd = math.asin(math.sin(raw) * math.cos(tilt))
|
||
# NOTE: 角度转换成度数,并取绝对值。
|
||
ang = abs(math.degrees(grd))
|
||
# NOTE: 如果角度小于预设阈值 ANGLE_TOL_DEG(即近似垂直轨道的线),认为是候选轨道线,添加到列表。
|
||
if ang <= ANGLE_TOL_DEG:
|
||
cands.append({"line": (p1, p2), "mid_x": (p1[0] + p2[0]) / 2})
|
||
# NOTE: 如果没找到符合角度的候选线,返回 None。
|
||
if not cands:
|
||
return None, None, None, None, depth_mm, ROI, None
|
||
# NOTE: 选择最左边的轨道线作为“最佳”轨道线,依据中点横坐标最小。
|
||
best = min(cands, key=lambda c: c["mid_x"])
|
||
# NOTE: 拆包出最佳轨道线的两个端点坐标。
|
||
line = best["line"]
|
||
(x1, y1), (x2, y2) = line
|
||
|
||
# 交点
|
||
dxl, dyl = x2 - x1, y2 - y1
|
||
# NOTE: 计算轨道线与摄像头中心线(垂直线 x=CX)交点坐标。
|
||
# 如果轨道线不是垂直的,则通过线性插值求出交点 y 坐标。
|
||
# 如果轨道线垂直(dxl=0),交点为中点 y 坐标。
|
||
if dxl != 0:
|
||
t = (CX - x1) / dxl
|
||
y_int = y1 + t * dyl
|
||
inter = (int(CX), int(y_int))
|
||
else:
|
||
inter = (int(CX), int((y1 + y2) / 2))
|
||
|
||
# NOTE: 通过各种公式计算出角度和偏移亮
|
||
p_up = (x1, y1) if y1 < y2 else (x2, y2)
|
||
vx, vy = p_up[0] - inter[0], p_up[1] - inter[1]
|
||
raw2 = math.atan2(vx, -vy)
|
||
grd2 = math.asin(math.sin(raw2) * math.cos(tilt))
|
||
signed = math.degrees(grd2)
|
||
|
||
off_px, midpt = offset_after_rotation(line)
|
||
pts = [
|
||
np.clip(pt, [0, 0], [depth_mm.shape[1] - 1, depth_mm.shape[0] - 1])
|
||
for pt in line
|
||
]
|
||
Zs = [float(depth_mm[pt[1], pt[0]]) for pt in pts]
|
||
Zavg = np.mean([z for z in Zs if z > 0]) if any(z > 0 for z in Zs) else MIN_DEPTH
|
||
off_mm = off_px * Zavg / FX * math.cos(tilt)
|
||
|
||
return signed, off_mm, line, midpt, depth_mm, ROI, inter
|
||
|
||
|
||
class TrackDetector:
|
||
def __init__(self, camera_list, alpha=0.6, win_raw=5, win_smooth=5, interval=0.2):
|
||
self.alpha, self.interval = alpha, interval
|
||
self.ang_raw = {sn: deque(maxlen=win_raw) for sn in camera_list}
|
||
self.off_raw = {sn: deque(maxlen=win_raw) for sn in camera_list}
|
||
self.ang_smooth = {sn: deque(maxlen=win_smooth) for sn in camera_list}
|
||
self.off_smooth = {sn: deque(maxlen=win_smooth) for sn in camera_list}
|
||
self.last_ang = {sn: None for sn in camera_list}
|
||
self.last_off = {sn: None for sn in camera_list}
|
||
self.last_t = {sn: 0.0 for sn in camera_list}
|
||
|
||
def process(self, depth, sn):
|
||
# TODO: 先进行对当前时钟判断,如果小于interval则返回空
|
||
now = cv2.getTickCount() / cv2.getTickFrequency()
|
||
if now - self.last_t[sn] < self.interval:
|
||
return None
|
||
|
||
self.last_t[sn] = now
|
||
res = compute_metrics(depth)
|
||
if res[0] is None:
|
||
return None
|
||
ang, off, line, midpt, _, _, inter = res
|
||
|
||
# TODO: 平滑
|
||
self.ang_raw[sn].append(ang)
|
||
self.off_raw[sn].append(off)
|
||
med_ang = float(np.median(self.ang_raw[sn]))
|
||
med_off = float(np.median(self.off_raw[sn]))
|
||
pa, po = self.last_ang[sn], self.last_off[sn]
|
||
ang_exp = (
|
||
med_ang if pa is None else self.alpha * med_ang + (1 - self.alpha) * pa
|
||
)
|
||
off_exp = (
|
||
med_off if po is None else self.alpha * med_off + (1 - self.alpha) * po
|
||
)
|
||
self.last_ang[sn], self.last_off[sn] = ang_exp, off_exp
|
||
self.ang_smooth[sn].append(ang_exp)
|
||
self.off_smooth[sn].append(off_exp)
|
||
disp_ang = float(np.median(self.ang_smooth[sn]))
|
||
disp_off = float(np.median(self.off_smooth[sn]))
|
||
|
||
return {
|
||
"angle": -disp_ang, # 左正右负
|
||
"offset": disp_off,
|
||
"line": line,
|
||
"midpoint": midpt,
|
||
"intersection": inter,
|
||
}
|