# -- coding: utf-8 -- import threading import msvcrt import ctypes import time import os import struct from thread_pool import submit_task from ctypes import * from datetime import datetime import Point as point from Mv3dRgbdImport.Mv3dRgbdDefine import * from Mv3dRgbdImport.Mv3dRgbdApi import * from Mv3dRgbdImport.Mv3dRgbdDefine import DeviceType_Ethernet, DeviceType_USB, DeviceType_Ethernet_Vir, DeviceType_USB_Vir, MV3D_RGBD_FLOAT_EXPOSURETIME, \ ParamType_Float, ParamType_Int, ParamType_Enum, CoordinateType_Depth, MV3D_RGBD_FLOAT_Z_UNIT, MV3D_RGBD_OK, \ FileType_BMP,FileType_TIFF,ImageType_Depth, ImageType_RGB8_Planar, ImageType_YUV420SP_NV12 ,ImageType_YUV420SP_NV21 , ImageType_YUV422, ImageType_Mono8 import config as configMap # 全局变量 SN_MAP = {} # {sn: camera_instance} import time import random from ctypes import * from logConfig import get_logger # 使用示例 logger = get_logger() def reconnect_device(sn, max_retries=5, base_delay=1.0, max_delay=10.0): """ 重新连接指定序列号的设备 Args: sn (str): 设备序列号 max_retries (int): 最大重试次数 base_delay (float): 初始延迟时间(秒) max_delay (float): 最大延迟时间(秒) Returns: bool: 重连是否成功 """ camera = SN_MAP.get(sn) # 如果设备已存在,先尝试关闭 try: camera.MV3D_RGBD_CloseDevice() except: pass # 忽略关闭时可能的错误 # 获取设备信息(需要重新枚举设备) nDeviceNum = c_uint(0) ret = Mv3dRgbd.MV3D_RGBD_GetDeviceNumber( DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir, byref(nDeviceNum) ) if ret != MV3D_RGBD_OK or nDeviceNum.value == 0: logger.error("Failed to get device number or no devices found.") return False stDeviceList = MV3D_RGBD_DEVICE_INFO_LIST() Mv3dRgbd.MV3D_RGBD_GetDeviceList( DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir, pointer(stDeviceList.DeviceInfo[0]), 20, byref(nDeviceNum) ) # 查找对应序列号的设备 target_device_info = None for i in range(nDeviceNum.value): device_sn = ''.join(chr(c) for c in stDeviceList.DeviceInfo[i].chSerialNumber).rstrip('\x00') if device_sn == sn: target_device_info = stDeviceList.DeviceInfo[i] break if not target_device_info: logger.error(f"Device with SN: {sn} not found during reconnection attempt") return False # 尝试重连 for attempt in range(max_retries): try: # 创建新的相机实例 new_camera = Mv3dRgbd() # 尝试打开设备 ret = new_camera.MV3D_RGBD_OpenDevice(pointer(target_device_info)) if ret == MV3D_RGBD_OK: # 更新全局映射 SN_MAP[sn] = new_camera logger.info(f"Successfully reconnected to device {sn}") return True else: logger.info(f"Failed to reconnect to device {sn}. Error code: {ret:#x}") except Exception as e: logger.info(f"Exception during reconnection attempt {attempt + 1}: {e}") # 如果不是最后一次尝试,等待后重试 if attempt < max_retries - 1: # 指数退避延迟 delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay) logger.info(f"Retrying reconnection in {delay:.2f} seconds...") time.sleep(delay) logger.info(f"Failed to reconnect to device {sn} after {max_retries} attempts") return False def retry_initialize_devices(max_retries=3, base_delay=2.0): """ 重试初始化设备 Args: max_retries (int): 最大重试次数 base_delay (float): 初始延迟时间(秒) Returns: bool: 初始化是否成功 """ for attempt in range(max_retries): try: # 清空现有设备映射 SN_MAP.clear() nDeviceNum = c_uint(0) ret = Mv3dRgbd.MV3D_RGBD_GetDeviceNumber( DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir, byref(nDeviceNum) ) if ret != MV3D_RGBD_OK or nDeviceNum.value == 0: logger.info("Failed to get device number or no devices found.") if attempt < max_retries - 1: delay = base_delay * (2 ** attempt) logger.info(f"Retrying device initialization in {delay} seconds...") time.sleep(delay) continue stDeviceList = MV3D_RGBD_DEVICE_INFO_LIST() Mv3dRgbd.MV3D_RGBD_GetDeviceList( DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir, pointer(stDeviceList.DeviceInfo[0]), 20, byref(nDeviceNum) ) success_count = 0 for i in range(nDeviceNum.value): serial_number = ''.join(chr(c) for c in stDeviceList.DeviceInfo[i].chSerialNumber).rstrip('\x00') logger.info(f"Found device [{i}]: Serial Number: {serial_number}") camera = Mv3dRgbd() # 打开设备 ret = camera.MV3D_RGBD_OpenDevice(pointer(stDeviceList.DeviceInfo[i])) if ret != MV3D_RGBD_OK: logger.info(f"Failed to open device with SN: {serial_number}. Error code: {ret:#x}") continue # 存入全局 map SN_MAP[serial_number] = camera logger.info(f"Successfully added device {serial_number} to SN_MAP") success_count += 1 if success_count > 0: logger.info(f"Successfully initialized {success_count} devices") return True else: logger.info("No devices were successfully initialized") except Exception as e: logger.info(f"Exception during device initialization attempt {attempt + 1}: {e}") if attempt < max_retries - 1: delay = base_delay * (2 ** attempt) logger.info(f"Retrying device initialization in {delay} seconds...") time.sleep(delay) logger.info(f"Failed to initialize devices after {max_retries} attempts") return False def enhanced_pic(sn, type): """ 增强版的图片采集函数,包含重连机制和点云数据重试机制 """ camera = SN_MAP.get(sn) if not camera: logger.info(f"No camera found for SN: {sn}, attempting reconnection...") if not reconnect_device(sn): logger.info(f"Failed to reconnect to device {sn}") return None camera = SN_MAP.get(sn) config = configMap.CAMERA_CONFIG_MAP.get(sn) if not config: logger.info(f"No config found for SN: {sn}") return None time_on = config.get("time_on", 0) logger.info(f"Delaying start by {time_on}ms...") time.sleep(time_on / 1000.0) saved_files = { "sn": sn, "depth": "", "color": "", "pcd": "", "point": [] } # 尝试开始取流,如果失败则尝试重连 ret = camera.MV3D_RGBD_Start() if ret != MV3D_RGBD_OK: logger.info(f"Failed to start grabbing. Error code: {ret:#x}") logger.info("Attempting reconnection...") if reconnect_device(sn): camera = SN_MAP.get(sn) ret = camera.MV3D_RGBD_Start() if ret != MV3D_RGBD_OK: logger.info(f"Failed to start grabbing after reconnection. Error code: {ret:#x}") return saved_files else: logger.info("Reconnection failed") return saved_files try: # 最多重试10次获取有效的点云数据 max_retries = 10 retry_count = 0 point_cloud_success = False # 用于存储最后一次成功的彩色图数据 last_color_image_info = None last_color_file_name = None while retry_count < max_retries and not point_cloud_success: stFrameData = MV3D_RGBD_FRAME_DATA() # 获取单帧数据 ret = camera.MV3D_RGBD_FetchFrame(pointer(stFrameData), 5000) if ret == MV3D_RGBD_OK: has_depth_data = False current_color_image_info = None current_color_file_name = None flag = True for i in range(stFrameData.nImageCount): image_info = stFrameData.stImageData[i] # 保存深度图 if image_info.enImageType == ImageType_Depth: has_depth_data = True stPointCloudImage = MV3D_RGBD_IMAGE_DATA() ret = camera.MV3D_RGBD_MapDepthToPointCloud(pointer(stFrameData.stImageData[i]), pointer(stPointCloudImage)) if MV3D_RGBD_OK != ret: logger.info(f"_MapDepthToPointCloud() Run failed... Retry: {retry_count + 1}/{max_retries}") else: logger.info( "_MapDepthToPointCloud() Run Succeed: framenum (%d) height(%d) width(%d) len (%d)!" % ( stPointCloudImage.nFrameNum, stPointCloudImage.nHeight, stPointCloudImage.nWidth, stPointCloudImage.nDataLen)) strMode = string_at(stPointCloudImage.pData, stPointCloudImage.nDataLen) sValue = struct.unpack('f' * int(stPointCloudImage.nHeight * stPointCloudImage.nWidth * 3), strMode) pcd_file, processed_points = point.sValue_to_pcd(sValue, stPointCloudImage, sn, type) if pcd_file and len(processed_points) > 0: saved_files["pcd"] = pcd_file saved_files["point"] = processed_points point_cloud_success = True # 成功获取点云数据 flag = False # 记录彩色图信息,但不立即保存 elif image_info.enImageType in ( ImageType_RGB8_Planar, ImageType_YUV420SP_NV12, ImageType_YUV420SP_NV21, ImageType_YUV422 ): file_name = configMap.save_path("color", sn+"_Color") if config.get("save_image"): submit_task(camera.MV3D_RGBD_SaveImage,pointer(image_info), FileType_BMP, file_name) saved_files["color"] = file_name + ".bmp" logger.info(f"Color image saved successfully: {saved_files['color']}") if flag:break # 如果没有深度数据或点云处理失败,增加重试次数 if not has_depth_data: logger.info(f"No depth data found in frame. Retry: {retry_count + 1}/{max_retries}") retry_count += 1 time.sleep(0.1) # 短暂等待后重试 elif not point_cloud_success: retry_count += 1 time.sleep(0.1) # 短暂等待后重试 else: logger.info(f"Failed to fetch frame. Error code: {ret:#x}. Retry: {retry_count + 1}/{max_retries}") retry_count += 1 time.sleep(0.1) # 短暂等待后重试 except Exception as e: logger.error(f"Error during frame capture for device {sn}: {e}") finally: camera.MV3D_RGBD_Stop() # logger.info(f"Single capture completed. Results: {saved_files}") return saved_files def initialize_devices(): nDeviceNum = ctypes.c_uint(0) ret = Mv3dRgbd.MV3D_RGBD_GetDeviceNumber( DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir, byref(nDeviceNum) ) if ret != MV3D_RGBD_OK or nDeviceNum.value == 0: logger.info("Failed to get device number or no devices found.") return stDeviceList = MV3D_RGBD_DEVICE_INFO_LIST() Mv3dRgbd.MV3D_RGBD_GetDeviceList( DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir, pointer(stDeviceList.DeviceInfo[0]), 20, byref(nDeviceNum) ) for i in range(nDeviceNum.value): serial_number = ''.join(chr(c) for c in stDeviceList.DeviceInfo[i].chSerialNumber).rstrip('\x00') logger.info(f"Found device [{i}]: Serial Number: {serial_number}") camera = Mv3dRgbd() # 打开设备 ret = camera.MV3D_RGBD_OpenDevice(pointer(stDeviceList.DeviceInfo[i])) if ret != MV3D_RGBD_OK: logger.info(f"Failed to open device with SN: {serial_number}. Error code: {ret:#x}") continue # 存入全局 map SN_MAP[serial_number] = camera logger.info(f"Successfully added device {serial_number} to SN_MAP") initialize_devices()