Files
hirres_tractor_vision/lib/camera/ArenaCamera.py
2025-05-30 16:30:37 +08:00

294 lines
12 KiB
Python

import ctypes
import time
import numpy as np
import cv2
# 定义错误代码
ARENA_ACQ_SUCCESS = 0
# 打印错误信息
def print_error(operation, error):
print(f"{operation} failed with error code: {error}")
# 重连机制函数
def retry_operation(operation_func, handle, *args, max_retries=1, delay=1):
"""
重试机制函数
:param operation_func: 要执行的操作函数
:param handle: 相机句柄
:param args: 操作函数的参数
:param max_retries: 最大重试次数
:param delay: 重试间隔时间(秒)
:return: 操作结果
"""
for attempt in range(max_retries + 1):
err = operation_func(handle, *args)
if err == ARENA_ACQ_SUCCESS:
return True
else:
print(f"Attempt {attempt + 1}/{max_retries} failed. Retrying in {delay} seconds...")
time.sleep(delay)
print_error(operation_func.__name__, err)
return False
class ArenaCamera:
# 静态变量存储共享库
libArenaAcq = ctypes.CDLL('libArenaAcq.so')
sys_handle = ctypes.c_void_p()
functions_initialized = False # Class-level flag
def __init__(self, sn):
self.sn = sn
self.handle = ctypes.c_void_p()
self.count = 0
# Initialize function argument types and return types if not already done
if not ArenaCamera.functions_initialized:
self._initialize_functions()
ArenaCamera.functions_initialized = True
@classmethod
def _initialize_functions(cls):
# 定义 ArenaAcq_Initialize
cls.libArenaAcq.ArenaAcq_Initialize.argtypes = [ctypes.POINTER(ctypes.c_void_p)]
cls.libArenaAcq.ArenaAcq_Initialize.restype = ctypes.c_int
# 定义 ArenaAcq_Connect
cls.libArenaAcq.ArenaAcq_Connect.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p),ctypes.c_char_p]
cls.libArenaAcq.ArenaAcq_Connect.restype = ctypes.c_int
# 定义 ArenaAcq_SetProperty
cls.libArenaAcq.ArenaAcq_SetProperty.argtypes = [
ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p
]
cls.libArenaAcq.ArenaAcq_SetProperty.restype = ctypes.c_int
# 定义 ArenaAcq_GetProperty
cls.libArenaAcq.ArenaAcq_GetProperty.argtypes = [
ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_size_t)
]
cls.libArenaAcq.ArenaAcq_GetProperty.restype = ctypes.c_int
# 定义 ArenaAcq_StartAcquisitionStream
cls.libArenaAcq.ArenaAcq_StartAcquisitionStream.argtypes = [ctypes.c_void_p]
cls.libArenaAcq.ArenaAcq_StartAcquisitionStream.restype = ctypes.c_int
# 定义 ArenaAcq_GetDepthImage
cls.libArenaAcq.ArenaAcq_GetDepthImage.argtypes = [
ctypes.c_void_p,
ctypes.POINTER(ctypes.POINTER(ctypes.c_uint16)),
ctypes.POINTER(ctypes.POINTER(ctypes.c_uint16)),
ctypes.POINTER(ctypes.POINTER(ctypes.c_uint16)),
ctypes.POINTER(ctypes.c_size_t),
ctypes.POINTER(ctypes.c_size_t),
ctypes.POINTER(ctypes.c_size_t),
ctypes.POINTER(ctypes.c_size_t)
]
cls.libArenaAcq.ArenaAcq_GetDepthImage.restype = ctypes.c_int
# 定义 ArenaAcq_GetImage
cls.libArenaAcq.ArenaAcq_GetImage.argtypes = [
ctypes.c_void_p,
ctypes.POINTER(ctypes.POINTER(ctypes.c_uint16)),
ctypes.POINTER(ctypes.c_size_t),
ctypes.POINTER(ctypes.c_size_t),
ctypes.POINTER(ctypes.c_size_t),
ctypes.POINTER(ctypes.c_size_t)
]
cls.libArenaAcq.ArenaAcq_GetImage.restype = ctypes.c_int
# 定义 ArenaAcq_ReleaseImage
cls.libArenaAcq.ArenaAcq_ReleaseImage.argtypes = [ctypes.POINTER(ctypes.c_uint8)]
cls.libArenaAcq.ArenaAcq_ReleaseImage.restype = ctypes.c_int
cls.libArenaAcq.ArenaAcq_ReleaseHandle.argtypes = [ctypes.POINTER(ctypes.c_void_p)]
cls.libArenaAcq.ArenaAcq_ReleaseHandle.restype = ctypes.c_int
cls.libArenaAcq.ArenaAcq_ReleaseDevice.argtypes = [ctypes.POINTER(ctypes.c_void_p)]
cls.libArenaAcq.ArenaAcq_ReleaseDevice.restype = ctypes.c_int
# 定义 ArenaAcq_StopAcquisitionStream
cls.libArenaAcq.ArenaAcq_StopAcquisitionStream.argtypes = [ctypes.c_void_p]
cls.libArenaAcq.ArenaAcq_StopAcquisitionStream.restype = ctypes.c_int
# 定义 ArenaAcq_Disconnect
cls.libArenaAcq.ArenaAcq_Disconnect.argtypes = [ctypes.c_void_p,ctypes.c_void_p]
cls.libArenaAcq.ArenaAcq_Disconnect.restype = ctypes.c_int
# 定义 ArenaAcq_Cleanup
cls.libArenaAcq.ArenaAcq_Cleanup.argtypes = [ctypes.c_void_p]
cls.libArenaAcq.ArenaAcq_Cleanup.restype = ctypes.c_int
cls.libArenaAcq.ArenaAcq_SaveDepthImageAsPGM.argtypes = [ctypes.POINTER(ctypes.c_uint16),ctypes.c_size_t,ctypes.c_size_t,ctypes.c_char_p]
cls.libArenaAcq.ArenaAcq_SaveDepthImageAsPGM.restype = ctypes.c_int
# 初始化库
if not retry_operation(cls.libArenaAcq.ArenaAcq_Initialize, ctypes.byref(cls.sys_handle)):
print(f"Initialization failed")
cls.libArenaAcq.ArenaAcq_Cleanup(cls.sys_handle)
cls.libArenaAcq.ArenaAcq_ReleaseHandle(cls.sys_handle)
return
print(f"Library initialized successfully")
@classmethod
def shutdown(cls):
cls.libArenaAcq.ArenaAcq_Cleanup(cls.sys_handle)
print(f"Library shutdown successfully")
def create(self):
# 连接到相机
if not retry_operation(self.libArenaAcq.ArenaAcq_Connect,self.sys_handle, self.handle, self.sn.encode('utf-8')):
print(f"Connection failed for camera {self.sn}")
self.libArenaAcq.ArenaAcq_Cleanup(self.handle)
return
print(f"Connected to camera {self.sn} successfully")
# 开始采集流
if not retry_operation(self.libArenaAcq.ArenaAcq_StartAcquisitionStream, self.handle):
print(f"Start acquisition failed for camera {self.sn}")
self.destroy()
return
print(f"Acquisition started successfully for camera {self.sn}")
def destroy(self):
# 停止采集流
if not retry_operation(self.libArenaAcq.ArenaAcq_StopAcquisitionStream, self.handle):
print(f"Stop acquisition failed for camera {self.sn}")
self.libArenaAcq.ArenaAcq_Disconnect(self.sys_handle,self.handle)
self.libArenaAcq.ArenaAcq_Cleanup(self.handle)
return
print(f"Acquisition stopped successfully for camera {self.sn}")
# 断开与相机的连接
if not retry_operation(self.libArenaAcq.ArenaAcq_Disconnect,self.sys_handle,self.handle):
print(f"Disconnection failed for camera {self.sn}")
self.libArenaAcq.ArenaAcq_Cleanup(self.handle)
return
print(f"Disconnected from camera {self.sn} successfully")
# 清理资源
if not retry_operation(self.libArenaAcq.ArenaAcq_Cleanup, self.handle):
print(f"Cleanup failed for camera {self.sn}")
return
def read(self):
image_data = ctypes.POINTER(ctypes.c_uint16)()
image_size = ctypes.c_size_t()
width = ctypes.c_size_t()
height = ctypes.c_size_t()
bits_per_pixel = ctypes.c_size_t()
if not retry_operation(self.libArenaAcq.ArenaAcq_GetImage, self.handle, ctypes.byref(image_data),
ctypes.byref(image_size), ctypes.byref(width), ctypes.byref(height),
ctypes.byref(bits_per_pixel)):
print(f"Image acquisition failed for camera {self.sn}")
return None
if image_size.value<=0:
print(f"Image size is 0 for camera {self.sn}")
return None
# 将图像数据转换为OpenCV格式
image_array = ctypes.cast(image_data, ctypes.POINTER(ctypes.c_uint16 * (width.value * height.value))).contents
image_np = np.array(image_array).reshape(height.value, width.value)
image_np = np.copy(image_np)
self.count=self.count+1
# 释放图像资源
self.libArenaAcq.ArenaAcq_ReleaseImage(ctypes.cast(image_data, ctypes.POINTER(ctypes.c_uint8)))
return image_np
def capture(self):
image_data = ctypes.POINTER(ctypes.c_uint16)()
x_image_data = ctypes.POINTER(ctypes.c_uint16)()
y_image_data = ctypes.POINTER(ctypes.c_uint16)()
image_size = ctypes.c_size_t()
width = ctypes.c_size_t()
height = ctypes.c_size_t()
bits_per_pixel = ctypes.c_size_t()
if not retry_operation(self.libArenaAcq.ArenaAcq_GetDepthImage, self.handle, ctypes.byref(image_data),
ctypes.byref(x_image_data), ctypes.byref(y_image_data),
ctypes.byref(image_size), ctypes.byref(width), ctypes.byref(height),
ctypes.byref(bits_per_pixel)):
print(f"Image acquisition failed for camera {self.sn}")
return None, None, None
if image_size.value<=0:
return None,None,None
# 将图像数据转换为OpenCV格式
depth_image_array = ctypes.cast(image_data, ctypes.POINTER(ctypes.c_uint16 * (width.value * height.value))).contents
x_image_array = ctypes.cast(x_image_data, ctypes.POINTER(ctypes.c_uint16 * (width.value * height.value))).contents
y_image_array = ctypes.cast(y_image_data, ctypes.POINTER(ctypes.c_uint16 * (width.value * height.value))).contents
depth_image_np = np.array(depth_image_array).reshape(height.value, width.value)
x_image_np = np.array(x_image_array).reshape(height.value, width.value)
y_image_np = np.array(y_image_array).reshape(height.value, width.value)
depth_image_np = np.copy(depth_image_np)
x_image_np = np.copy(x_image_np)
y_image_np = np.copy(y_image_np)
# print("copied.")
# self.libArenaAcq.ArenaAcq_SaveDepthImageAsPGM(image_data,width,height,f'dbg_{self.sn}_{self.count}'.encode('utf-8'))
self.count=self.count+1
# 释放图像资源
self.libArenaAcq.ArenaAcq_ReleaseImage(ctypes.cast(image_data, ctypes.POINTER(ctypes.c_uint8)))
self.libArenaAcq.ArenaAcq_ReleaseImage(ctypes.cast(x_image_data, ctypes.POINTER(ctypes.c_uint8)))
self.libArenaAcq.ArenaAcq_ReleaseImage(ctypes.cast(y_image_data, ctypes.POINTER(ctypes.c_uint8)))
return x_image_np, y_image_np, depth_image_np
if __name__ == '__main__':
cam2 = ArenaCamera("233801341") # Hypothetical second camera serial number
cam2.create()
cam1 = ArenaCamera("233800056")
cam1.create()
# cv2.namedWindow("Z1",cv2.WINDOW_AUTOSIZE)
# cv2.namedWindow("Z2",cv2.WINDOW_AUTOSIZE)
i=0
while i<20:
raw1 = cam1.read()
X2, Y2, Z2 = cam2.capture()
# X1, Y1, Z1 = cam1.capture()
# Display images for the first camera
# cv2.imshow("X1", X1)
# cv2.imshow("Y1", Y1)
# cv2.imshow("Z1", Z1)
# Display images for the second camera
# cv2.imshow("X2", X2)
# cv2.imshow("Y2", Y2)
# cv2.imshow("Z2", Z2)
# if cv2.waitKey(0) & 0xFF == ord('q'):
# break
# cv2.imwrite("z1.png",Z1)
cv2.imwrite("z2.png",Z2)
# time.sleep(3)
i=i+1
cv2.imwrite("raw1.png",raw1)
print(f'read {i}')
cv2.destroyAllWindows()
cam1.destroy()
cam2.destroy()
ArenaCamera.shutdown()