# -- coding: utf-8 -- import threading import msvcrt import ctypes import time import os import struct from ctypes import * from datetime import datetime import Point as point from Mv3dRgbdImport.Mv3dRgbdDefine import * from Mv3dRgbdImport.Mv3dRgbdApi import * from Mv3dRgbdImport.Mv3dRgbdDefine import DeviceType_Ethernet, DeviceType_USB, DeviceType_Ethernet_Vir, DeviceType_USB_Vir, MV3D_RGBD_FLOAT_EXPOSURETIME, \ ParamType_Float, ParamType_Int, ParamType_Enum, CoordinateType_Depth, MV3D_RGBD_FLOAT_Z_UNIT, MV3D_RGBD_OK, \ FileType_BMP,FileType_TIFF,ImageType_Depth, ImageType_RGB8_Planar, ImageType_YUV420SP_NV12 ,ImageType_YUV420SP_NV21 , ImageType_YUV422, ImageType_Mono8 import config as configMap # 全局变量 SN_MAP = {} # {sn: camera_instance} def initialize_devices(): nDeviceNum = ctypes.c_uint(0) ret = Mv3dRgbd.MV3D_RGBD_GetDeviceNumber( DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir, byref(nDeviceNum) ) if ret != MV3D_RGBD_OK or nDeviceNum.value == 0: print("Failed to get device number or no devices found.") return stDeviceList = MV3D_RGBD_DEVICE_INFO_LIST() Mv3dRgbd.MV3D_RGBD_GetDeviceList( DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir, pointer(stDeviceList.DeviceInfo[0]), 20, byref(nDeviceNum) ) for i in range(nDeviceNum.value): serial_number = ''.join(chr(c) for c in stDeviceList.DeviceInfo[i].chSerialNumber).rstrip('\x00') print(f"Found device [{i}]: Serial Number: {serial_number}") camera = Mv3dRgbd() # 打开设备 ret = camera.MV3D_RGBD_OpenDevice(pointer(stDeviceList.DeviceInfo[i])) if ret != MV3D_RGBD_OK: print(f"Failed to open device with SN: {serial_number}. Error code: {ret:#x}") continue # 存入全局 map SN_MAP[serial_number] = camera print(f"Successfully added device {serial_number} to SN_MAP") def pic(sn): camera = SN_MAP.get(sn) if not camera: print(f"No camera found for SN: {sn}") return config = configMap.CAMERA_CONFIG_MAP.get(sn) if not config: print(f"No config found for SN: {sn}") return time_on = config.get("time_on", 0) # 延时开始(毫秒) print(f"Delaying start by {time_on}ms...") time.sleep(time_on / 1000.0) # 转成秒 saved_files = { "depth": [], "color": [], "pcd": [] } # 开始取流 ret = camera.MV3D_RGBD_Start() if ret != MV3D_RGBD_OK: print(f"Failed to start grabbing. Error code: {ret:#x}") return try: stFrameData = MV3D_RGBD_FRAME_DATA() # 获取单帧数据 ret = camera.MV3D_RGBD_FetchFrame(pointer(stFrameData), 5000) if ret == MV3D_RGBD_OK: for i in range(stFrameData.nImageCount): image_info = stFrameData.stImageData[i] # 保存深度图 if image_info.enImageType == ImageType_Depth: file_name = configMap.save_path("depth", "-Depth") ret_save = camera.MV3D_RGBD_SaveImage(pointer(image_info), FileType_TIFF, file_name) print("Saved depth image." if ret_save == MV3D_RGBD_OK else "Failed to save depth image.") if ret_save == MV3D_RGBD_OK: saved_files["depth"].append(file_name) # 点云转换与保存 stPointCloudImage = MV3D_RGBD_IMAGE_DATA() ret = camera.MV3D_RGBD_MapDepthToPointCloud(pointer(stFrameData.stImageData[i]), pointer(stPointCloudImage)) if MV3D_RGBD_OK != ret: print("_MapDepthToPointCloud() Run failed...") else: print( "_MapDepthToPointCloud() Run Succeed: framenum (%d) height(%d) width(%d) len (%d)!" % ( stPointCloudImage.nFrameNum, stPointCloudImage.nHeight, stPointCloudImage.nWidth, stPointCloudImage.nDataLen)) strMode = string_at(stPointCloudImage.pData, stPointCloudImage.nDataLen) sValue = struct.unpack('f' * int(stPointCloudImage.nHeight * stPointCloudImage.nWidth * 3), strMode) pcd_file = point.sValue_to_pcd(sValue, stPointCloudImage, sn) if pcd_file: saved_files["pcd"].append(pcd_file) # 保存彩色图 elif image_info.enImageType in ( ImageType_RGB8_Planar, ImageType_YUV420SP_NV12, ImageType_YUV420SP_NV21, ImageType_YUV422 ): file_name = configMap.save_path("color", "-_Color") ret_save = camera.MV3D_RGBD_SaveImage(pointer(image_info), FileType_BMP, file_name) print("Saved color image." if ret_save == MV3D_RGBD_OK else "Failed to save color image.") if ret_save == MV3D_RGBD_OK: saved_files["color"].append(file_name) else: print("Failed to fetch frame.") finally: camera.MV3D_RGBD_Stop() print("Single capture completed.") # 输出结果路径 print("Saved files:") for key, paths in saved_files.items(): if paths: print(f" {key.upper()}:") for path in paths: print(f" - {path}") return saved_files initialize_devices() if __name__ == '__main__': pic("00DA6823936")