import ctypes import time import numpy as np import cv2 # 定义错误代码 ARENA_ACQ_SUCCESS = 0 # 打印错误信息 def print_error(operation, error): print(f"{operation} failed with error code: {error}") # 重连机制函数 def retry_operation(operation_func, handle, *args, max_retries=1, delay=1): """ 重试机制函数 :param operation_func: 要执行的操作函数 :param handle: 相机句柄 :param args: 操作函数的参数 :param max_retries: 最大重试次数 :param delay: 重试间隔时间(秒) :return: 操作结果 """ for attempt in range(max_retries + 1): err = operation_func(handle, *args) if err == ARENA_ACQ_SUCCESS: return True else: print(f"Attempt {attempt + 1}/{max_retries} failed. Retrying in {delay} seconds...") time.sleep(delay) print_error(operation_func.__name__, err) return False class ArenaCamera: # 静态变量存储共享库 libArenaAcq = ctypes.CDLL('libArenaAcq.so') sys_handle = ctypes.c_void_p() functions_initialized = False # Class-level flag def __init__(self, sn): self.sn = sn self.handle = ctypes.c_void_p() self.count = 0 # Initialize function argument types and return types if not already done if not ArenaCamera.functions_initialized: self._initialize_functions() ArenaCamera.functions_initialized = True @classmethod def _initialize_functions(cls): # 定义 ArenaAcq_Initialize cls.libArenaAcq.ArenaAcq_Initialize.argtypes = [ctypes.POINTER(ctypes.c_void_p)] cls.libArenaAcq.ArenaAcq_Initialize.restype = ctypes.c_int # 定义 ArenaAcq_Connect cls.libArenaAcq.ArenaAcq_Connect.argtypes = [ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p),ctypes.c_char_p] cls.libArenaAcq.ArenaAcq_Connect.restype = ctypes.c_int # 定义 ArenaAcq_SetProperty cls.libArenaAcq.ArenaAcq_SetProperty.argtypes = [ ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p ] cls.libArenaAcq.ArenaAcq_SetProperty.restype = ctypes.c_int # 定义 ArenaAcq_GetProperty cls.libArenaAcq.ArenaAcq_GetProperty.argtypes = [ ctypes.c_void_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(ctypes.c_size_t) ] cls.libArenaAcq.ArenaAcq_GetProperty.restype = ctypes.c_int # 定义 ArenaAcq_StartAcquisitionStream cls.libArenaAcq.ArenaAcq_StartAcquisitionStream.argtypes = [ctypes.c_void_p] cls.libArenaAcq.ArenaAcq_StartAcquisitionStream.restype = ctypes.c_int # 定义 ArenaAcq_GetDepthImage cls.libArenaAcq.ArenaAcq_GetDepthImage.argtypes = [ ctypes.c_void_p, ctypes.POINTER(ctypes.POINTER(ctypes.c_uint16)), ctypes.POINTER(ctypes.POINTER(ctypes.c_uint16)), ctypes.POINTER(ctypes.POINTER(ctypes.c_uint16)), ctypes.POINTER(ctypes.c_size_t), ctypes.POINTER(ctypes.c_size_t), ctypes.POINTER(ctypes.c_size_t), ctypes.POINTER(ctypes.c_size_t) ] cls.libArenaAcq.ArenaAcq_GetDepthImage.restype = ctypes.c_int # 定义 ArenaAcq_GetImage cls.libArenaAcq.ArenaAcq_GetImage.argtypes = [ ctypes.c_void_p, ctypes.POINTER(ctypes.POINTER(ctypes.c_uint16)), ctypes.POINTER(ctypes.c_size_t), ctypes.POINTER(ctypes.c_size_t), ctypes.POINTER(ctypes.c_size_t), ctypes.POINTER(ctypes.c_size_t) ] cls.libArenaAcq.ArenaAcq_GetImage.restype = ctypes.c_int # 定义 ArenaAcq_ReleaseImage cls.libArenaAcq.ArenaAcq_ReleaseImage.argtypes = [ctypes.POINTER(ctypes.c_uint8)] cls.libArenaAcq.ArenaAcq_ReleaseImage.restype = ctypes.c_int cls.libArenaAcq.ArenaAcq_ReleaseHandle.argtypes = [ctypes.POINTER(ctypes.c_void_p)] cls.libArenaAcq.ArenaAcq_ReleaseHandle.restype = ctypes.c_int cls.libArenaAcq.ArenaAcq_ReleaseDevice.argtypes = [ctypes.POINTER(ctypes.c_void_p)] cls.libArenaAcq.ArenaAcq_ReleaseDevice.restype = ctypes.c_int # 定义 ArenaAcq_StopAcquisitionStream cls.libArenaAcq.ArenaAcq_StopAcquisitionStream.argtypes = [ctypes.c_void_p] cls.libArenaAcq.ArenaAcq_StopAcquisitionStream.restype = ctypes.c_int # 定义 ArenaAcq_Disconnect cls.libArenaAcq.ArenaAcq_Disconnect.argtypes = [ctypes.c_void_p,ctypes.c_void_p] cls.libArenaAcq.ArenaAcq_Disconnect.restype = ctypes.c_int # 定义 ArenaAcq_Cleanup cls.libArenaAcq.ArenaAcq_Cleanup.argtypes = [ctypes.c_void_p] cls.libArenaAcq.ArenaAcq_Cleanup.restype = ctypes.c_int cls.libArenaAcq.ArenaAcq_SaveDepthImageAsPGM.argtypes = [ctypes.POINTER(ctypes.c_uint16),ctypes.c_size_t,ctypes.c_size_t,ctypes.c_char_p] cls.libArenaAcq.ArenaAcq_SaveDepthImageAsPGM.restype = ctypes.c_int # 初始化库 if not retry_operation(cls.libArenaAcq.ArenaAcq_Initialize, ctypes.byref(cls.sys_handle)): print(f"Initialization failed") cls.libArenaAcq.ArenaAcq_Cleanup(cls.sys_handle) cls.libArenaAcq.ArenaAcq_ReleaseHandle(cls.sys_handle) return print(f"Library initialized successfully") @classmethod def shutdown(cls): cls.libArenaAcq.ArenaAcq_Cleanup(cls.sys_handle) print(f"Library shutdown successfully") def create(self): # 连接到相机 if not retry_operation(self.libArenaAcq.ArenaAcq_Connect,self.sys_handle, self.handle, self.sn.encode('utf-8')): print(f"Connection failed for camera {self.sn}") self.libArenaAcq.ArenaAcq_Cleanup(self.handle) return print(f"Connected to camera {self.sn} successfully") # 开始采集流 if not retry_operation(self.libArenaAcq.ArenaAcq_StartAcquisitionStream, self.handle): print(f"Start acquisition failed for camera {self.sn}") self.destroy() return print(f"Acquisition started successfully for camera {self.sn}") def destroy(self): # 停止采集流 if not retry_operation(self.libArenaAcq.ArenaAcq_StopAcquisitionStream, self.handle): print(f"Stop acquisition failed for camera {self.sn}") self.libArenaAcq.ArenaAcq_Disconnect(self.sys_handle,self.handle) self.libArenaAcq.ArenaAcq_Cleanup(self.handle) return print(f"Acquisition stopped successfully for camera {self.sn}") # 断开与相机的连接 if not retry_operation(self.libArenaAcq.ArenaAcq_Disconnect,self.sys_handle,self.handle): print(f"Disconnection failed for camera {self.sn}") self.libArenaAcq.ArenaAcq_Cleanup(self.handle) return print(f"Disconnected from camera {self.sn} successfully") # 清理资源 if not retry_operation(self.libArenaAcq.ArenaAcq_Cleanup, self.handle): print(f"Cleanup failed for camera {self.sn}") return def read(self): image_data = ctypes.POINTER(ctypes.c_uint16)() image_size = ctypes.c_size_t() width = ctypes.c_size_t() height = ctypes.c_size_t() bits_per_pixel = ctypes.c_size_t() if not retry_operation(self.libArenaAcq.ArenaAcq_GetImage, self.handle, ctypes.byref(image_data), ctypes.byref(image_size), ctypes.byref(width), ctypes.byref(height), ctypes.byref(bits_per_pixel)): print(f"Image acquisition failed for camera {self.sn}") return None if image_size.value<=0: print(f"Image size is 0 for camera {self.sn}") return None # 将图像数据转换为OpenCV格式 image_array = ctypes.cast(image_data, ctypes.POINTER(ctypes.c_uint16 * (width.value * height.value))).contents image_np = np.array(image_array).reshape(height.value, width.value) image_np = np.copy(image_np) self.count=self.count+1 # 释放图像资源 self.libArenaAcq.ArenaAcq_ReleaseImage(ctypes.cast(image_data, ctypes.POINTER(ctypes.c_uint8))) return image_np def capture(self): image_data = ctypes.POINTER(ctypes.c_uint16)() x_image_data = ctypes.POINTER(ctypes.c_uint16)() y_image_data = ctypes.POINTER(ctypes.c_uint16)() image_size = ctypes.c_size_t() width = ctypes.c_size_t() height = ctypes.c_size_t() bits_per_pixel = ctypes.c_size_t() if not retry_operation(self.libArenaAcq.ArenaAcq_GetDepthImage, self.handle, ctypes.byref(image_data), ctypes.byref(x_image_data), ctypes.byref(y_image_data), ctypes.byref(image_size), ctypes.byref(width), ctypes.byref(height), ctypes.byref(bits_per_pixel)): print(f"Image acquisition failed for camera {self.sn}") return None, None, None if image_size.value<=0: return None,None,None # 将图像数据转换为OpenCV格式 depth_image_array = ctypes.cast(image_data, ctypes.POINTER(ctypes.c_uint16 * (width.value * height.value))).contents x_image_array = ctypes.cast(x_image_data, ctypes.POINTER(ctypes.c_uint16 * (width.value * height.value))).contents y_image_array = ctypes.cast(y_image_data, ctypes.POINTER(ctypes.c_uint16 * (width.value * height.value))).contents depth_image_np = np.array(depth_image_array).reshape(height.value, width.value) x_image_np = np.array(x_image_array).reshape(height.value, width.value) y_image_np = np.array(y_image_array).reshape(height.value, width.value) depth_image_np = np.copy(depth_image_np) x_image_np = np.copy(x_image_np) y_image_np = np.copy(y_image_np) # print("copied.") # self.libArenaAcq.ArenaAcq_SaveDepthImageAsPGM(image_data,width,height,f'dbg_{self.sn}_{self.count}'.encode('utf-8')) self.count=self.count+1 # 释放图像资源 self.libArenaAcq.ArenaAcq_ReleaseImage(ctypes.cast(image_data, ctypes.POINTER(ctypes.c_uint8))) self.libArenaAcq.ArenaAcq_ReleaseImage(ctypes.cast(x_image_data, ctypes.POINTER(ctypes.c_uint8))) self.libArenaAcq.ArenaAcq_ReleaseImage(ctypes.cast(y_image_data, ctypes.POINTER(ctypes.c_uint8))) return x_image_np, y_image_np, depth_image_np if __name__ == '__main__': cam2 = ArenaCamera("233801341") # Hypothetical second camera serial number cam2.create() cam1 = ArenaCamera("233800056") cam1.create() # cv2.namedWindow("Z1",cv2.WINDOW_AUTOSIZE) # cv2.namedWindow("Z2",cv2.WINDOW_AUTOSIZE) i=0 while i<20: raw1 = cam1.read() X2, Y2, Z2 = cam2.capture() # X1, Y1, Z1 = cam1.capture() # Display images for the first camera # cv2.imshow("X1", X1) # cv2.imshow("Y1", Y1) # cv2.imshow("Z1", Z1) # Display images for the second camera # cv2.imshow("X2", X2) # cv2.imshow("Y2", Y2) # cv2.imshow("Z2", Z2) # if cv2.waitKey(0) & 0xFF == ord('q'): # break # cv2.imwrite("z1.png",Z1) cv2.imwrite("z2.png",Z2) # time.sleep(3) i=i+1 cv2.imwrite("raw1.png",raw1) print(f'read {i}') cv2.destroyAllWindows() cam1.destroy() cam2.destroy() ArenaCamera.shutdown()