from datetime import time import numpy as np from PIL import Image import math import config import asyncio import numpy as np from scipy.spatial.transform import Rotation as R from logConfig import get_logger # 使用示例 logger = get_logger() def apply_rotation(matrix, angle_degrees, axis='x'): """ 对输入的 3x3 旋转矩阵 应用额外的旋转 参数: matrix (np.ndarray): 原始的 3x3 旋转矩阵 angle_degrees (float): 旋转角度 (单位: 度) axis (str): 旋转轴,支持 'x', 'y', 'z' 返回: np.ndarray: 新的 3x3 旋转矩阵 """ # 创建绕指定轴旋转的旋转对象 rot = R.from_euler(axis, angle_degrees, degrees=True) # 将当前旋转转换为矩阵 rotation_matrix = rot.as_matrix() # 合并旋转:新旋转 × 原始矩阵 combined_rotation = np.dot(rotation_matrix, matrix) return combined_rotation # # def tiff_depth_to_point_cloud(tiff_path,sn, dedup=True): # """ # 将 TIFF 深度图转换为点云,基于给定的视场角计算相机内参 # # :param tiff_path: str, TIFF 深度图路径 # :param hfov_degrees: float or None, 水平视场角(单位:度) # :param vfov_degrees: float or None, 垂直视场角(单位:度) # :param output_ply_path: str or None, 输出 Pcd 文件路径(可选) # :param dedup: bool, 是否根据 (x, y) 去重,默认开启 # :return: list of [x, y, z] 点云数据 # """ # hfov_degrees = config.CAMERA_CONFIG_MAP[sn].get("x_angle") # max_z = config.CAMERA_CONFIG_MAP[sn].get("max_z") # vfov_degrees = config.CAMERA_CONFIG_MAP[sn].get("y_angle") # # plane_scaling_ratio = config.CAMERA_CONFIG_MAP[sn].get("plane_scaling_ratio") # # 加载 TIFF 图像 # depth_image = Image.open(tiff_path) # depth_array = np.array(depth_image, dtype=np.float32) # # height, width = depth_array.shape # # # 根据提供的 FOV 计算 fx/fy # if hfov_degrees is not None: # hfov_rad = math.radians(hfov_degrees) # fx = width / (2 * math.tan(hfov_rad / 2)) # fy = fx * height / width # 保持像素方形比例 # else: # vfov_rad = math.radians(vfov_degrees) # fy = height / (2 * math.tan(vfov_rad / 2)) # fx = fy * width / height # 保持像素方形比例 # # cx = width / 2 # cy = height / 2 # # print(f"Calculated intrinsics: fx={fx:.2f}, fy={fy:.2f}, cx={cx:.2f}, cy={cy:.2f}") # # points = [] # seen_xy = set() # 用于记录已添加的 (x, y) # # # 获取裁剪配置 # clip_config = config.CUT_CONFIG_MAP.get(sn) # if clip_config: # min_pt = np.array(clip_config.get("min_pt", [-np.inf] * 3)) # max_pt = np.array(clip_config.get("max_pt", [np.inf] * 3)) # rotation = np.array(clip_config.get("rotation", [1,0,0,0,1,0,0,0,1])).reshape(3, 3) # else: # min_pt = max_pt = rotation = None # # for v in range(height): # for u in range(width): # z = depth_array[v, u] # mm -> m # if z <= 0 or z > max_z: # continue # # x = (u - cx) * z / fx # y = (v - cy) * z / fy # # # 构造点并旋转(保留浮点精度) # point = np.array([x, y, z]) # if clip_config: # rotated_point = (rotation @ point.T).T # if not np.all(rotated_point >= min_pt) or not np.all(rotated_point <= max_pt): # continue # x_final, y_final, z_final = rotated_point # else: # x_final, y_final, z_final = point # # # 到这里才进行去重判断 # if dedup: # x_int = int(round(x_final)) # y_int = int(round(y_final)) # z_int = int(round(z_final)) # if (x_int, y_int) in seen_xy: # continue # seen_xy.add((x_int, y_int)) # # 保留浮点精度写入结果 # points.append([x_int, y_int, z_int]) # # # # 可选输出PLY文件用于可视化 # if config.CAMERA_CONFIG_MAP[sn].get("save_pcd"): # output_ply_path = config.save_path("pcd",sn+".pcd") # write_pcd(output_ply_path, points) # # return points # 新增点云数据转换函数 def convert_sdk_points(sValue, width, height): """ 将 SDK 原始点云数据转换为 NumPy 点云数组 :param sValue: tuple(float), SDK 输出的扁平化点云数据 :param width: int, 点云宽度 :param height: int, 点云高度 :return: np.ndarray(shape=(N,3)), 三维点云数组 """ points = np.array(sValue).reshape(-1, 3) # 形状转换 (N,3) # 剔除无效点(X/Y/Z 任一为 NaN 或 Inf) mask = np.all(np.isfinite(points), axis=1) return points[mask] # def process_point_cloud_fast(points, sn, type): # points = np.array(points, dtype=np.float32) # # 获取目标高度和容差(来自模板配置) # template_config = config.TEMPLATE_CONFIG_MAP.get(type, {}) # target_z = template_config.get("height", 0) # 假设 height 是期望的中心高度 # tolerance = template_config.get("tolerance", 50) # 默认容差为 50 # # # 计算上下限 # z_min = target_z - tolerance # z_max = target_z + tolerance # 不超过 200 # # Z 值过滤 # valid_mask = ((points[:, 2] > 0) # & (points[:, 2] <= config.CAMERA_CONFIG_MAP[sn].get("max_z", np.inf)) # ) # # points = points[valid_mask] # # if clip_config := config.CUT_CONFIG_MAP.get(sn): # rotation = np.array(clip_config["rotation"], dtype=np.float32).reshape(3, 3) # min_pt = np.array(clip_config["min_pt"], dtype=np.float32) # max_pt = np.array(clip_config["max_pt"], dtype=np.float32) # floorHeight = clip_config.get("floorHeight", 0) # # # 批量旋转 # rotated = (rotation @ points.T).T # # # 裁剪范围过滤 # bounds_mask = np.all((rotated >= min_pt) & (rotated <= max_pt), axis=1) # rotated = rotated[bounds_mask] # x_trimmed = np.round(rotated[:, 0], 3) # y_trimmed = np.round(rotated[:, 1], 3) # z_trimmed = np.round(floorHeight - rotated[:, 2], 3) # else: # x_trimmed = np.round(points[:, 0], 3) # y_trimmed = np.round(points[:, 1], 3) # z_trimmed = np.round(points[:, 2], 3) # # # 合并为一个数组 # trimmed_points = np.column_stack((x_trimmed, y_trimmed, z_trimmed)) # # 应用 z 值过滤 # final_mask = ( # (trimmed_points[:, 2] >= z_min) & # (trimmed_points[:, 2] <= z_max) # ) # filtered_points = trimmed_points[final_mask] # # # 去重 # keys = np.column_stack((filtered_points[:, 0], filtered_points[:, 1])) # _, unique_indices = np.unique(keys, axis=0, return_index=True) # # # 返回 list of [x, y, z] # return filtered_points[unique_indices].tolist() def process_point_cloud_fast(points, sn, type): points = np.array(points, dtype=np.float32) if not len(points): return [] # 获取配置 template_config = config.TEMPLATE_CONFIG_MAP.get(type, {}) target_z = template_config.get("height", 0) tolerance = template_config.get("tolerance", 25) z_min = target_z - tolerance z_max = target_z + tolerance # 初始 Z 过滤 valid_mask = (points[:, 2] > 0) & (points[:, 2] <= config.CAMERA_CONFIG_MAP[sn].get("max_z", np.inf)) points = points[valid_mask] # 获取裁剪配置 clip_config = config.CUT_CONFIG_MAP.get(sn + "_" + type, None) or config.CUT_CONFIG_MAP.get(sn) if clip_config: rotation = np.array(clip_config["rotation"], dtype=np.float32).reshape(3, 3) min_pt = np.array(clip_config["min_pt"], dtype=np.float32) max_pt = np.array(clip_config["max_pt"], dtype=np.float32) floorHeight = clip_config.get("floorHeight", 0) # 批量旋转 rotated = (rotation @ points.T).T # 裁剪范围过滤 bounds_mask = np.all((rotated >= min_pt) & (rotated <= max_pt), axis=1) rotated = rotated[bounds_mask] # 计算坐标并应用 floorHeight x_trimmed = rotated[:, 0] y_trimmed = rotated[:, 1] z_trimmed = floorHeight - rotated[:, 2] else: x_trimmed = points[:, 0] y_trimmed = points[:, 1] z_trimmed = points[:, 2] # 合并为一个数组 trimmed_points = np.column_stack(( np.round(x_trimmed, 3), np.round(y_trimmed, 3), np.round(z_trimmed, 3) )) # 应用 z 值过滤 final_mask = (trimmed_points[:, 2] >= z_min) & (trimmed_points[:, 2] <= z_max) filtered_points = trimmed_points[final_mask] # 去重 keys = filtered_points[:, :2] # 只取 x,y _, unique_indices = np.unique(keys, axis=0, return_index=True) # 返回 list of [x, y, z] return filtered_points[unique_indices].tolist() def sValue_to_pcd(sValue,stPointCloudImage,sn,type): # 数据格式转换 points = convert_sdk_points(sValue, stPointCloudImage.nWidth, stPointCloudImage.nHeight) # 应用旋转、裁剪、去重(假设设备序列号已知) processed_points = process_point_cloud_fast(points, sn,type) output_ply_path = config.save_path("pcd", sn + ".pcd") # 保存结果 # Point.py 修改部分 from thread_pool import submit_task if config.CAMERA_CONFIG_MAP[sn].get("save_pcd"): output_ply_path = config.save_path("pcd", sn + ".pcd") # 使用线程池异步执行点云文件写入 submit_task(write_pcd, output_ply_path, processed_points) if config.CAMERA_CONFIG_MAP[sn].get("save_not_cut", True): output_ply_path = config.save_path("pcd", sn + "_original.pcd") submit_task(write_pcd, output_ply_path, points) return output_ply_path,processed_points def write_pcd(filename, points): """ 将点云写入 PCD 文件 :param filename: str, 输出 PCD 文件路径 :param points: list of [x, y, z], 点云数据 """ with open(filename, 'w') as f: # 写入 PCD 文件头 f.write("# .PCD v0.7 - Point Cloud Data file format\n") f.write("VERSION 0.7\n") f.write("FIELDS x y z\n") f.write("SIZE 4 4 4\n") f.write("TYPE F F F\n") f.write("COUNT 1 1 1\n") f.write(f"WIDTH {len(points)}\n") f.write(f"HEIGHT 1\n") f.write("VIEWPOINT 0 0 0 1 0 0 0\n") f.write(f"POINTS {len(points)}\n") f.write("DATA ascii\n") # 写入点数据 for point in points: f.write(f"{point[0]} {point[1]} {point[2]}\n") logger.info(f"Saved point cloud to {filename}")