|
|
|
|
|
from datetime import time
|
|
|
|
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
from PIL import Image
|
|
|
|
|
|
import math
|
|
|
|
|
|
|
|
|
|
|
|
import config
|
|
|
|
|
|
import asyncio
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import numpy as np
|
|
|
|
|
|
from scipy.spatial.transform import Rotation as R
|
|
|
|
|
|
|
|
|
|
|
|
from logConfig import get_logger
|
|
|
|
|
|
|
|
|
|
|
|
# 使用示例
|
|
|
|
|
|
logger = get_logger()
|
|
|
|
|
|
|
|
|
|
|
|
def apply_rotation(matrix, angle_degrees, axis='x'):
|
|
|
|
|
|
"""
|
|
|
|
|
|
对输入的 3x3 旋转矩阵 应用额外的旋转
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
matrix (np.ndarray): 原始的 3x3 旋转矩阵
|
|
|
|
|
|
angle_degrees (float): 旋转角度 (单位: 度)
|
|
|
|
|
|
axis (str): 旋转轴,支持 'x', 'y', 'z'
|
|
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
|
np.ndarray: 新的 3x3 旋转矩阵
|
|
|
|
|
|
"""
|
|
|
|
|
|
# 创建绕指定轴旋转的旋转对象
|
|
|
|
|
|
rot = R.from_euler(axis, angle_degrees, degrees=True)
|
|
|
|
|
|
|
|
|
|
|
|
# 将当前旋转转换为矩阵
|
|
|
|
|
|
rotation_matrix = rot.as_matrix()
|
|
|
|
|
|
|
|
|
|
|
|
# 合并旋转:新旋转 × 原始矩阵
|
|
|
|
|
|
combined_rotation = np.dot(rotation_matrix, matrix)
|
|
|
|
|
|
|
|
|
|
|
|
return combined_rotation
|
|
|
|
|
|
|
|
|
|
|
|
#
|
|
|
|
|
|
# def tiff_depth_to_point_cloud(tiff_path,sn, dedup=True):
|
|
|
|
|
|
# """
|
|
|
|
|
|
# 将 TIFF 深度图转换为点云,基于给定的视场角计算相机内参
|
|
|
|
|
|
#
|
|
|
|
|
|
# :param tiff_path: str, TIFF 深度图路径
|
|
|
|
|
|
# :param hfov_degrees: float or None, 水平视场角(单位:度)
|
|
|
|
|
|
# :param vfov_degrees: float or None, 垂直视场角(单位:度)
|
|
|
|
|
|
# :param output_ply_path: str or None, 输出 Pcd 文件路径(可选)
|
|
|
|
|
|
# :param dedup: bool, 是否根据 (x, y) 去重,默认开启
|
|
|
|
|
|
# :return: list of [x, y, z] 点云数据
|
|
|
|
|
|
# """
|
|
|
|
|
|
# hfov_degrees = config.CAMERA_CONFIG_MAP[sn].get("x_angle")
|
|
|
|
|
|
# max_z = config.CAMERA_CONFIG_MAP[sn].get("max_z")
|
|
|
|
|
|
# vfov_degrees = config.CAMERA_CONFIG_MAP[sn].get("y_angle")
|
|
|
|
|
|
# # plane_scaling_ratio = config.CAMERA_CONFIG_MAP[sn].get("plane_scaling_ratio")
|
|
|
|
|
|
# # 加载 TIFF 图像
|
|
|
|
|
|
# depth_image = Image.open(tiff_path)
|
|
|
|
|
|
# depth_array = np.array(depth_image, dtype=np.float32)
|
|
|
|
|
|
#
|
|
|
|
|
|
# height, width = depth_array.shape
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 根据提供的 FOV 计算 fx/fy
|
|
|
|
|
|
# if hfov_degrees is not None:
|
|
|
|
|
|
# hfov_rad = math.radians(hfov_degrees)
|
|
|
|
|
|
# fx = width / (2 * math.tan(hfov_rad / 2))
|
|
|
|
|
|
# fy = fx * height / width # 保持像素方形比例
|
|
|
|
|
|
# else:
|
|
|
|
|
|
# vfov_rad = math.radians(vfov_degrees)
|
|
|
|
|
|
# fy = height / (2 * math.tan(vfov_rad / 2))
|
|
|
|
|
|
# fx = fy * width / height # 保持像素方形比例
|
|
|
|
|
|
#
|
|
|
|
|
|
# cx = width / 2
|
|
|
|
|
|
# cy = height / 2
|
|
|
|
|
|
#
|
|
|
|
|
|
# print(f"Calculated intrinsics: fx={fx:.2f}, fy={fy:.2f}, cx={cx:.2f}, cy={cy:.2f}")
|
|
|
|
|
|
#
|
|
|
|
|
|
# points = []
|
|
|
|
|
|
# seen_xy = set() # 用于记录已添加的 (x, y)
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 获取裁剪配置
|
|
|
|
|
|
# clip_config = config.CUT_CONFIG_MAP.get(sn)
|
|
|
|
|
|
# if clip_config:
|
|
|
|
|
|
# min_pt = np.array(clip_config.get("min_pt", [-np.inf] * 3))
|
|
|
|
|
|
# max_pt = np.array(clip_config.get("max_pt", [np.inf] * 3))
|
|
|
|
|
|
# rotation = np.array(clip_config.get("rotation", [1,0,0,0,1,0,0,0,1])).reshape(3, 3)
|
|
|
|
|
|
# else:
|
|
|
|
|
|
# min_pt = max_pt = rotation = None
|
|
|
|
|
|
#
|
|
|
|
|
|
# for v in range(height):
|
|
|
|
|
|
# for u in range(width):
|
|
|
|
|
|
# z = depth_array[v, u] # mm -> m
|
|
|
|
|
|
# if z <= 0 or z > max_z:
|
|
|
|
|
|
# continue
|
|
|
|
|
|
#
|
|
|
|
|
|
# x = (u - cx) * z / fx
|
|
|
|
|
|
# y = (v - cy) * z / fy
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 构造点并旋转(保留浮点精度)
|
|
|
|
|
|
# point = np.array([x, y, z])
|
|
|
|
|
|
# if clip_config:
|
|
|
|
|
|
# rotated_point = (rotation @ point.T).T
|
|
|
|
|
|
# if not np.all(rotated_point >= min_pt) or not np.all(rotated_point <= max_pt):
|
|
|
|
|
|
# continue
|
|
|
|
|
|
# x_final, y_final, z_final = rotated_point
|
|
|
|
|
|
# else:
|
|
|
|
|
|
# x_final, y_final, z_final = point
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 到这里才进行去重判断
|
|
|
|
|
|
# if dedup:
|
|
|
|
|
|
# x_int = int(round(x_final))
|
|
|
|
|
|
# y_int = int(round(y_final))
|
|
|
|
|
|
# z_int = int(round(z_final))
|
|
|
|
|
|
# if (x_int, y_int) in seen_xy:
|
|
|
|
|
|
# continue
|
|
|
|
|
|
# seen_xy.add((x_int, y_int))
|
|
|
|
|
|
# # 保留浮点精度写入结果
|
|
|
|
|
|
# points.append([x_int, y_int, z_int])
|
|
|
|
|
|
#
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 可选输出PLY文件用于可视化
|
|
|
|
|
|
# if config.CAMERA_CONFIG_MAP[sn].get("save_pcd"):
|
|
|
|
|
|
# output_ply_path = config.save_path("pcd",sn+".pcd")
|
|
|
|
|
|
# write_pcd(output_ply_path, points)
|
|
|
|
|
|
#
|
|
|
|
|
|
# return points
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 新增点云数据转换函数
|
|
|
|
|
|
def convert_sdk_points(sValue, width, height):
|
|
|
|
|
|
"""
|
|
|
|
|
|
将 SDK 原始点云数据转换为 NumPy 点云数组
|
|
|
|
|
|
|
|
|
|
|
|
:param sValue: tuple(float), SDK 输出的扁平化点云数据
|
|
|
|
|
|
:param width: int, 点云宽度
|
|
|
|
|
|
:param height: int, 点云高度
|
|
|
|
|
|
:return: np.ndarray(shape=(N,3)), 三维点云数组
|
|
|
|
|
|
"""
|
|
|
|
|
|
points = np.array(sValue).reshape(-1, 3) # 形状转换 (N,3)
|
|
|
|
|
|
# 剔除无效点(X/Y/Z 任一为 NaN 或 Inf)
|
|
|
|
|
|
mask = np.all(np.isfinite(points), axis=1)
|
|
|
|
|
|
return points[mask]
|
|
|
|
|
|
|
|
|
|
|
|
# def process_point_cloud_fast(points, sn, type):
|
|
|
|
|
|
# points = np.array(points, dtype=np.float32)
|
|
|
|
|
|
# # 获取目标高度和容差(来自模板配置)
|
|
|
|
|
|
# template_config = config.TEMPLATE_CONFIG_MAP.get(type, {})
|
|
|
|
|
|
# target_z = template_config.get("height", 0) # 假设 height 是期望的中心高度
|
|
|
|
|
|
# tolerance = template_config.get("tolerance", 50) # 默认容差为 50
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 计算上下限
|
|
|
|
|
|
# z_min = target_z - tolerance
|
|
|
|
|
|
# z_max = target_z + tolerance # 不超过 200
|
|
|
|
|
|
# # Z 值过滤
|
|
|
|
|
|
# valid_mask = ((points[:, 2] > 0)
|
|
|
|
|
|
# & (points[:, 2] <= config.CAMERA_CONFIG_MAP[sn].get("max_z", np.inf))
|
|
|
|
|
|
# )
|
|
|
|
|
|
#
|
|
|
|
|
|
# points = points[valid_mask]
|
|
|
|
|
|
#
|
|
|
|
|
|
# if clip_config := config.CUT_CONFIG_MAP.get(sn):
|
|
|
|
|
|
# rotation = np.array(clip_config["rotation"], dtype=np.float32).reshape(3, 3)
|
|
|
|
|
|
# min_pt = np.array(clip_config["min_pt"], dtype=np.float32)
|
|
|
|
|
|
# max_pt = np.array(clip_config["max_pt"], dtype=np.float32)
|
|
|
|
|
|
# floorHeight = clip_config.get("floorHeight", 0)
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 批量旋转
|
|
|
|
|
|
# rotated = (rotation @ points.T).T
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 裁剪范围过滤
|
|
|
|
|
|
# bounds_mask = np.all((rotated >= min_pt) & (rotated <= max_pt), axis=1)
|
|
|
|
|
|
# rotated = rotated[bounds_mask]
|
|
|
|
|
|
# x_trimmed = np.round(rotated[:, 0], 3)
|
|
|
|
|
|
# y_trimmed = np.round(rotated[:, 1], 3)
|
|
|
|
|
|
# z_trimmed = np.round(floorHeight - rotated[:, 2], 3)
|
|
|
|
|
|
# else:
|
|
|
|
|
|
# x_trimmed = np.round(points[:, 0], 3)
|
|
|
|
|
|
# y_trimmed = np.round(points[:, 1], 3)
|
|
|
|
|
|
# z_trimmed = np.round(points[:, 2], 3)
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 合并为一个数组
|
|
|
|
|
|
# trimmed_points = np.column_stack((x_trimmed, y_trimmed, z_trimmed))
|
|
|
|
|
|
# # 应用 z 值过滤
|
|
|
|
|
|
# final_mask = (
|
|
|
|
|
|
# (trimmed_points[:, 2] >= z_min) &
|
|
|
|
|
|
# (trimmed_points[:, 2] <= z_max)
|
|
|
|
|
|
# )
|
|
|
|
|
|
# filtered_points = trimmed_points[final_mask]
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 去重
|
|
|
|
|
|
# keys = np.column_stack((filtered_points[:, 0], filtered_points[:, 1]))
|
|
|
|
|
|
# _, unique_indices = np.unique(keys, axis=0, return_index=True)
|
|
|
|
|
|
#
|
|
|
|
|
|
# # 返回 list of [x, y, z]
|
|
|
|
|
|
# return filtered_points[unique_indices].tolist()
|
|
|
|
|
|
def process_point_cloud_fast(points, sn, type):
|
|
|
|
|
|
points = np.array(points, dtype=np.float32)
|
|
|
|
|
|
if not len(points):
|
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
|
|
# 获取配置
|
|
|
|
|
|
template_config = config.TEMPLATE_CONFIG_MAP.get(type, {})
|
|
|
|
|
|
target_z = template_config.get("height", 0)
|
|
|
|
|
|
tolerance = template_config.get("tolerance", 25)
|
|
|
|
|
|
z_min = target_z - tolerance
|
|
|
|
|
|
z_max = target_z + tolerance
|
|
|
|
|
|
|
|
|
|
|
|
# 初始 Z 过滤
|
|
|
|
|
|
valid_mask = (points[:, 2] > 0) & (points[:, 2] <= config.CAMERA_CONFIG_MAP[sn].get("max_z", np.inf))
|
|
|
|
|
|
points = points[valid_mask]
|
|
|
|
|
|
|
|
|
|
|
|
# 获取裁剪配置
|
|
|
|
|
|
clip_config = config.CUT_CONFIG_MAP.get(sn + "_" + type, None) or config.CUT_CONFIG_MAP.get(sn)
|
|
|
|
|
|
|
|
|
|
|
|
if clip_config:
|
|
|
|
|
|
rotation = np.array(clip_config["rotation"], dtype=np.float32).reshape(3, 3)
|
|
|
|
|
|
min_pt = np.array(clip_config["min_pt"], dtype=np.float32)
|
|
|
|
|
|
max_pt = np.array(clip_config["max_pt"], dtype=np.float32)
|
|
|
|
|
|
floorHeight = clip_config.get("floorHeight", 0)
|
|
|
|
|
|
|
|
|
|
|
|
# 批量旋转
|
|
|
|
|
|
rotated = (rotation @ points.T).T
|
|
|
|
|
|
|
|
|
|
|
|
# 裁剪范围过滤
|
|
|
|
|
|
bounds_mask = np.all((rotated >= min_pt) & (rotated <= max_pt), axis=1)
|
|
|
|
|
|
rotated = rotated[bounds_mask]
|
|
|
|
|
|
|
|
|
|
|
|
# 计算坐标并应用 floorHeight
|
|
|
|
|
|
x_trimmed = rotated[:, 0]
|
|
|
|
|
|
y_trimmed = rotated[:, 1]
|
|
|
|
|
|
z_trimmed = floorHeight - rotated[:, 2]
|
|
|
|
|
|
else:
|
|
|
|
|
|
x_trimmed = points[:, 0]
|
|
|
|
|
|
y_trimmed = points[:, 1]
|
|
|
|
|
|
z_trimmed = points[:, 2]
|
|
|
|
|
|
|
|
|
|
|
|
# 合并为一个数组
|
|
|
|
|
|
trimmed_points = np.column_stack((
|
|
|
|
|
|
np.round(x_trimmed, 3),
|
|
|
|
|
|
np.round(y_trimmed, 3),
|
|
|
|
|
|
np.round(z_trimmed, 3)
|
|
|
|
|
|
))
|
|
|
|
|
|
|
|
|
|
|
|
# 应用 z 值过滤
|
|
|
|
|
|
final_mask = (trimmed_points[:, 2] >= z_min) & (trimmed_points[:, 2] <= z_max)
|
|
|
|
|
|
filtered_points = trimmed_points[final_mask]
|
|
|
|
|
|
|
|
|
|
|
|
# 去重
|
|
|
|
|
|
keys = filtered_points[:, :2] # 只取 x,y
|
|
|
|
|
|
_, unique_indices = np.unique(keys, axis=0, return_index=True)
|
|
|
|
|
|
|
|
|
|
|
|
# 返回 list of [x, y, z]
|
|
|
|
|
|
return filtered_points[unique_indices].tolist()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sValue_to_pcd(sValue,stPointCloudImage,sn,type):
|
|
|
|
|
|
# 数据格式转换
|
|
|
|
|
|
points = convert_sdk_points(sValue, stPointCloudImage.nWidth, stPointCloudImage.nHeight)
|
|
|
|
|
|
|
|
|
|
|
|
# 应用旋转、裁剪、去重(假设设备序列号已知)
|
|
|
|
|
|
processed_points = process_point_cloud_fast(points, sn,type)
|
|
|
|
|
|
|
|
|
|
|
|
output_ply_path = config.save_path("pcd", sn + ".pcd")
|
|
|
|
|
|
# 保存结果
|
|
|
|
|
|
# Point.py 修改部分
|
|
|
|
|
|
from thread_pool import submit_task
|
|
|
|
|
|
|
|
|
|
|
|
if config.CAMERA_CONFIG_MAP[sn].get("save_pcd"):
|
|
|
|
|
|
output_ply_path = config.save_path("pcd", sn + ".pcd")
|
|
|
|
|
|
# 使用线程池异步执行点云文件写入
|
|
|
|
|
|
submit_task(write_pcd, output_ply_path, processed_points)
|
|
|
|
|
|
if config.CAMERA_CONFIG_MAP[sn].get("save_not_cut", True):
|
|
|
|
|
|
output_ply_path = config.save_path("pcd", sn + "_original.pcd")
|
|
|
|
|
|
submit_task(write_pcd, output_ply_path, points)
|
|
|
|
|
|
|
|
|
|
|
|
return output_ply_path,processed_points
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_pcd(filename, points):
|
|
|
|
|
|
"""
|
|
|
|
|
|
将点云写入 PCD 文件
|
|
|
|
|
|
|
|
|
|
|
|
:param filename: str, 输出 PCD 文件路径
|
|
|
|
|
|
:param points: list of [x, y, z], 点云数据
|
|
|
|
|
|
"""
|
|
|
|
|
|
with open(filename, 'w') as f:
|
|
|
|
|
|
# 写入 PCD 文件头
|
|
|
|
|
|
f.write("# .PCD v0.7 - Point Cloud Data file format\n")
|
|
|
|
|
|
f.write("VERSION 0.7\n")
|
|
|
|
|
|
f.write("FIELDS x y z\n")
|
|
|
|
|
|
f.write("SIZE 4 4 4\n")
|
|
|
|
|
|
f.write("TYPE F F F\n")
|
|
|
|
|
|
f.write("COUNT 1 1 1\n")
|
|
|
|
|
|
f.write(f"WIDTH {len(points)}\n")
|
|
|
|
|
|
f.write(f"HEIGHT 1\n")
|
|
|
|
|
|
f.write("VIEWPOINT 0 0 0 1 0 0 0\n")
|
|
|
|
|
|
f.write(f"POINTS {len(points)}\n")
|
|
|
|
|
|
f.write("DATA ascii\n")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 写入点数据
|
|
|
|
|
|
for point in points:
|
|
|
|
|
|
f.write(f"{point[0]} {point[1]} {point[2]}\n")
|
|
|
|
|
|
|
|
|
|
|
|
logger.info(f"Saved point cloud to {filename}")
|
|
|
|
|
|
|