You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
hik3d-python/SimpleView_SaveImage.py

346 lines
13 KiB
Python

8 months ago
# -- coding: utf-8 --
import threading
import msvcrt
import ctypes
import time
import os
import struct
7 months ago
from thread_pool import submit_task
8 months ago
from ctypes import *
from datetime import datetime
import Point as point
8 months ago
from Mv3dRgbdImport.Mv3dRgbdDefine import *
from Mv3dRgbdImport.Mv3dRgbdApi import *
from Mv3dRgbdImport.Mv3dRgbdDefine import DeviceType_Ethernet, DeviceType_USB, DeviceType_Ethernet_Vir, DeviceType_USB_Vir, MV3D_RGBD_FLOAT_EXPOSURETIME, \
ParamType_Float, ParamType_Int, ParamType_Enum, CoordinateType_Depth, MV3D_RGBD_FLOAT_Z_UNIT, MV3D_RGBD_OK, \
FileType_BMP,FileType_TIFF,ImageType_Depth, ImageType_RGB8_Planar, ImageType_YUV420SP_NV12 ,ImageType_YUV420SP_NV21 , ImageType_YUV422, ImageType_Mono8
import config as configMap
# 全局变量
SN_MAP = {} # {sn: camera_instance}
7 months ago
import time
import random
from ctypes import *
8 months ago
7 months ago
from logConfig import get_logger
8 months ago
7 months ago
# 使用示例
logger = get_logger()
def reconnect_device(sn, max_retries=5, base_delay=1.0, max_delay=10.0):
"""
重新连接指定序列号的设备
Args:
sn (str): 设备序列号
max_retries (int): 最大重试次数
base_delay (float): 初始延迟时间()
max_delay (float): 最大延迟时间()
Returns:
bool: 重连是否成功
"""
camera = SN_MAP.get(sn)
# 如果设备已存在,先尝试关闭
try:
camera.MV3D_RGBD_CloseDevice()
except:
pass # 忽略关闭时可能的错误
# 获取设备信息(需要重新枚举设备)
nDeviceNum = c_uint(0)
8 months ago
ret = Mv3dRgbd.MV3D_RGBD_GetDeviceNumber(
DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir,
byref(nDeviceNum)
)
7 months ago
8 months ago
if ret != MV3D_RGBD_OK or nDeviceNum.value == 0:
7 months ago
logger.error("Failed to get device number or no devices found.")
return False
8 months ago
stDeviceList = MV3D_RGBD_DEVICE_INFO_LIST()
Mv3dRgbd.MV3D_RGBD_GetDeviceList(
DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir,
pointer(stDeviceList.DeviceInfo[0]), 20, byref(nDeviceNum)
)
7 months ago
# 查找对应序列号的设备
target_device_info = None
8 months ago
for i in range(nDeviceNum.value):
7 months ago
device_sn = ''.join(chr(c) for c in stDeviceList.DeviceInfo[i].chSerialNumber).rstrip('\x00')
if device_sn == sn:
target_device_info = stDeviceList.DeviceInfo[i]
break
8 months ago
7 months ago
if not target_device_info:
logger.error(f"Device with SN: {sn} not found during reconnection attempt")
return False
8 months ago
7 months ago
# 尝试重连
for attempt in range(max_retries):
try:
# 创建新的相机实例
new_camera = Mv3dRgbd()
8 months ago
7 months ago
# 尝试打开设备
ret = new_camera.MV3D_RGBD_OpenDevice(pointer(target_device_info))
if ret == MV3D_RGBD_OK:
# 更新全局映射
SN_MAP[sn] = new_camera
logger.info(f"Successfully reconnected to device {sn}")
return True
else:
logger.info(f"Failed to reconnect to device {sn}. Error code: {ret:#x}")
8 months ago
7 months ago
except Exception as e:
logger.info(f"Exception during reconnection attempt {attempt + 1}: {e}")
# 如果不是最后一次尝试,等待后重试
if attempt < max_retries - 1:
# 指数退避延迟
delay = min(base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay)
logger.info(f"Retrying reconnection in {delay:.2f} seconds...")
time.sleep(delay)
logger.info(f"Failed to reconnect to device {sn} after {max_retries} attempts")
return False
def retry_initialize_devices(max_retries=3, base_delay=2.0):
"""
重试初始化设备
Args:
max_retries (int): 最大重试次数
base_delay (float): 初始延迟时间()
Returns:
bool: 初始化是否成功
"""
for attempt in range(max_retries):
try:
# 清空现有设备映射
SN_MAP.clear()
nDeviceNum = c_uint(0)
ret = Mv3dRgbd.MV3D_RGBD_GetDeviceNumber(
DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir,
byref(nDeviceNum)
)
if ret != MV3D_RGBD_OK or nDeviceNum.value == 0:
logger.info("Failed to get device number or no devices found.")
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
logger.info(f"Retrying device initialization in {delay} seconds...")
time.sleep(delay)
continue
stDeviceList = MV3D_RGBD_DEVICE_INFO_LIST()
Mv3dRgbd.MV3D_RGBD_GetDeviceList(
DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir,
pointer(stDeviceList.DeviceInfo[0]), 20, byref(nDeviceNum)
)
success_count = 0
for i in range(nDeviceNum.value):
serial_number = ''.join(chr(c) for c in stDeviceList.DeviceInfo[i].chSerialNumber).rstrip('\x00')
logger.info(f"Found device [{i}]: Serial Number: {serial_number}")
camera = Mv3dRgbd()
# 打开设备
ret = camera.MV3D_RGBD_OpenDevice(pointer(stDeviceList.DeviceInfo[i]))
if ret != MV3D_RGBD_OK:
logger.info(f"Failed to open device with SN: {serial_number}. Error code: {ret:#x}")
continue
# 存入全局 map
SN_MAP[serial_number] = camera
logger.info(f"Successfully added device {serial_number} to SN_MAP")
success_count += 1
if success_count > 0:
logger.info(f"Successfully initialized {success_count} devices")
return True
else:
logger.info("No devices were successfully initialized")
except Exception as e:
logger.info(f"Exception during device initialization attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
logger.info(f"Retrying device initialization in {delay} seconds...")
time.sleep(delay)
logger.info(f"Failed to initialize devices after {max_retries} attempts")
return False
def enhanced_pic(sn, type):
"""
增强版的图片采集函数包含重连机制和点云数据重试机制
"""
8 months ago
camera = SN_MAP.get(sn)
if not camera:
7 months ago
logger.info(f"No camera found for SN: {sn}, attempting reconnection...")
if not reconnect_device(sn):
logger.info(f"Failed to reconnect to device {sn}")
return None
camera = SN_MAP.get(sn)
8 months ago
config = configMap.CAMERA_CONFIG_MAP.get(sn)
if not config:
7 months ago
logger.info(f"No config found for SN: {sn}")
return None
8 months ago
7 months ago
time_on = config.get("time_on", 0)
logger.info(f"Delaying start by {time_on}ms...")
time.sleep(time_on / 1000.0)
8 months ago
saved_files = {
7 months ago
"sn": sn,
"depth": "",
"color": "",
"pcd": "",
"point": []
}
8 months ago
7 months ago
# 尝试开始取流,如果失败则尝试重连
8 months ago
ret = camera.MV3D_RGBD_Start()
if ret != MV3D_RGBD_OK:
7 months ago
logger.info(f"Failed to start grabbing. Error code: {ret:#x}")
logger.info("Attempting reconnection...")
if reconnect_device(sn):
camera = SN_MAP.get(sn)
ret = camera.MV3D_RGBD_Start()
if ret != MV3D_RGBD_OK:
logger.info(f"Failed to start grabbing after reconnection. Error code: {ret:#x}")
return saved_files
else:
logger.info("Reconnection failed")
return saved_files
8 months ago
try:
7 months ago
# 最多重试10次获取有效的点云数据
max_retries = 10
retry_count = 0
point_cloud_success = False
8 months ago
7 months ago
# 用于存储最后一次成功的彩色图数据
last_color_image_info = None
last_color_file_name = None
while retry_count < max_retries and not point_cloud_success:
stFrameData = MV3D_RGBD_FRAME_DATA()
# 获取单帧数据
ret = camera.MV3D_RGBD_FetchFrame(pointer(stFrameData), 5000)
if ret == MV3D_RGBD_OK:
has_depth_data = False
current_color_image_info = None
current_color_file_name = None
flag = True
for i in range(stFrameData.nImageCount):
image_info = stFrameData.stImageData[i]
# 保存深度图
if image_info.enImageType == ImageType_Depth:
has_depth_data = True
stPointCloudImage = MV3D_RGBD_IMAGE_DATA()
ret = camera.MV3D_RGBD_MapDepthToPointCloud(pointer(stFrameData.stImageData[i]),
pointer(stPointCloudImage))
if MV3D_RGBD_OK != ret:
logger.info(f"_MapDepthToPointCloud() Run failed... Retry: {retry_count + 1}/{max_retries}")
else:
logger.info(
"_MapDepthToPointCloud() Run Succeed: framenum (%d) height(%d) width(%d) len (%d)!" % (
stPointCloudImage.nFrameNum,
stPointCloudImage.nHeight, stPointCloudImage.nWidth, stPointCloudImage.nDataLen))
strMode = string_at(stPointCloudImage.pData, stPointCloudImage.nDataLen)
sValue = struct.unpack('f' * int(stPointCloudImage.nHeight * stPointCloudImage.nWidth * 3),
strMode)
pcd_file, processed_points = point.sValue_to_pcd(sValue, stPointCloudImage, sn, type)
if pcd_file and len(processed_points) > 0:
saved_files["pcd"] = pcd_file
saved_files["point"] = processed_points
point_cloud_success = True # 成功获取点云数据
flag = False
# 记录彩色图信息,但不立即保存
elif image_info.enImageType in (
ImageType_RGB8_Planar, ImageType_YUV420SP_NV12,
ImageType_YUV420SP_NV21, ImageType_YUV422
):
file_name = configMap.save_path("color", sn+"_Color")
if config.get("save_image"):
submit_task(camera.MV3D_RGBD_SaveImage,pointer(image_info), FileType_BMP, file_name)
saved_files["color"] = file_name + ".bmp"
logger.info(f"Color image saved successfully: {saved_files['color']}")
if flag:break
# 如果没有深度数据或点云处理失败,增加重试次数
if not has_depth_data:
logger.info(f"No depth data found in frame. Retry: {retry_count + 1}/{max_retries}")
retry_count += 1
time.sleep(0.1) # 短暂等待后重试
elif not point_cloud_success:
retry_count += 1
time.sleep(0.1) # 短暂等待后重试
else:
logger.info(f"Failed to fetch frame. Error code: {ret:#x}. Retry: {retry_count + 1}/{max_retries}")
retry_count += 1
time.sleep(0.1) # 短暂等待后重试
except Exception as e:
logger.error(f"Error during frame capture for device {sn}: {e}")
8 months ago
finally:
camera.MV3D_RGBD_Stop()
7 months ago
# logger.info(f"Single capture completed. Results: {saved_files}")
return saved_files
8 months ago
7 months ago
def initialize_devices():
nDeviceNum = ctypes.c_uint(0)
ret = Mv3dRgbd.MV3D_RGBD_GetDeviceNumber(
DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir,
byref(nDeviceNum)
)
if ret != MV3D_RGBD_OK or nDeviceNum.value == 0:
logger.info("Failed to get device number or no devices found.")
return
8 months ago
7 months ago
stDeviceList = MV3D_RGBD_DEVICE_INFO_LIST()
Mv3dRgbd.MV3D_RGBD_GetDeviceList(
DeviceType_Ethernet | DeviceType_USB | DeviceType_Ethernet_Vir | DeviceType_USB_Vir,
pointer(stDeviceList.DeviceInfo[0]), 20, byref(nDeviceNum)
)
for i in range(nDeviceNum.value):
serial_number = ''.join(chr(c) for c in stDeviceList.DeviceInfo[i].chSerialNumber).rstrip('\x00')
logger.info(f"Found device [{i}]: Serial Number: {serial_number}")
camera = Mv3dRgbd()
# 打开设备
ret = camera.MV3D_RGBD_OpenDevice(pointer(stDeviceList.DeviceInfo[i]))
if ret != MV3D_RGBD_OK:
logger.info(f"Failed to open device with SN: {serial_number}. Error code: {ret:#x}")
continue
# 存入全局 map
SN_MAP[serial_number] = camera
logger.info(f"Successfully added device {serial_number} to SN_MAP")
initialize_devices()