You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

308 lines
11 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from datetime import time
import numpy as np
from PIL import Image
import math
import config
import asyncio
import numpy as np
from scipy.spatial.transform import Rotation as R
from logConfig import get_logger
# 使用示例
logger = get_logger()
def apply_rotation(matrix, angle_degrees, axis='x'):
"""
对输入的 3x3 旋转矩阵 应用额外的旋转
参数:
matrix (np.ndarray): 原始的 3x3 旋转矩阵
angle_degrees (float): 旋转角度 (单位: 度)
axis (str): 旋转轴,支持 'x', 'y', 'z'
返回:
np.ndarray: 新的 3x3 旋转矩阵
"""
# 创建绕指定轴旋转的旋转对象
rot = R.from_euler(axis, angle_degrees, degrees=True)
# 将当前旋转转换为矩阵
rotation_matrix = rot.as_matrix()
# 合并旋转:新旋转 × 原始矩阵
combined_rotation = np.dot(rotation_matrix, matrix)
return combined_rotation
#
# def tiff_depth_to_point_cloud(tiff_path,sn, dedup=True):
# """
# 将 TIFF 深度图转换为点云,基于给定的视场角计算相机内参
#
# :param tiff_path: str, TIFF 深度图路径
# :param hfov_degrees: float or None, 水平视场角(单位:度)
# :param vfov_degrees: float or None, 垂直视场角(单位:度)
# :param output_ply_path: str or None, 输出 Pcd 文件路径(可选)
# :param dedup: bool, 是否根据 (x, y) 去重,默认开启
# :return: list of [x, y, z] 点云数据
# """
# hfov_degrees = config.CAMERA_CONFIG_MAP[sn].get("x_angle")
# max_z = config.CAMERA_CONFIG_MAP[sn].get("max_z")
# vfov_degrees = config.CAMERA_CONFIG_MAP[sn].get("y_angle")
# # plane_scaling_ratio = config.CAMERA_CONFIG_MAP[sn].get("plane_scaling_ratio")
# # 加载 TIFF 图像
# depth_image = Image.open(tiff_path)
# depth_array = np.array(depth_image, dtype=np.float32)
#
# height, width = depth_array.shape
#
# # 根据提供的 FOV 计算 fx/fy
# if hfov_degrees is not None:
# hfov_rad = math.radians(hfov_degrees)
# fx = width / (2 * math.tan(hfov_rad / 2))
# fy = fx * height / width # 保持像素方形比例
# else:
# vfov_rad = math.radians(vfov_degrees)
# fy = height / (2 * math.tan(vfov_rad / 2))
# fx = fy * width / height # 保持像素方形比例
#
# cx = width / 2
# cy = height / 2
#
# print(f"Calculated intrinsics: fx={fx:.2f}, fy={fy:.2f}, cx={cx:.2f}, cy={cy:.2f}")
#
# points = []
# seen_xy = set() # 用于记录已添加的 (x, y)
#
# # 获取裁剪配置
# clip_config = config.CUT_CONFIG_MAP.get(sn)
# if clip_config:
# min_pt = np.array(clip_config.get("min_pt", [-np.inf] * 3))
# max_pt = np.array(clip_config.get("max_pt", [np.inf] * 3))
# rotation = np.array(clip_config.get("rotation", [1,0,0,0,1,0,0,0,1])).reshape(3, 3)
# else:
# min_pt = max_pt = rotation = None
#
# for v in range(height):
# for u in range(width):
# z = depth_array[v, u] # mm -> m
# if z <= 0 or z > max_z:
# continue
#
# x = (u - cx) * z / fx
# y = (v - cy) * z / fy
#
# # 构造点并旋转(保留浮点精度)
# point = np.array([x, y, z])
# if clip_config:
# rotated_point = (rotation @ point.T).T
# if not np.all(rotated_point >= min_pt) or not np.all(rotated_point <= max_pt):
# continue
# x_final, y_final, z_final = rotated_point
# else:
# x_final, y_final, z_final = point
#
# # 到这里才进行去重判断
# if dedup:
# x_int = int(round(x_final))
# y_int = int(round(y_final))
# z_int = int(round(z_final))
# if (x_int, y_int) in seen_xy:
# continue
# seen_xy.add((x_int, y_int))
# # 保留浮点精度写入结果
# points.append([x_int, y_int, z_int])
#
#
# # 可选输出PLY文件用于可视化
# if config.CAMERA_CONFIG_MAP[sn].get("save_pcd"):
# output_ply_path = config.save_path("pcd",sn+".pcd")
# write_pcd(output_ply_path, points)
#
# return points
# 新增点云数据转换函数
def convert_sdk_points(sValue, width, height):
"""
将 SDK 原始点云数据转换为 NumPy 点云数组
:param sValue: tuple(float), SDK 输出的扁平化点云数据
:param width: int, 点云宽度
:param height: int, 点云高度
:return: np.ndarray(shape=(N,3)), 三维点云数组
"""
points = np.array(sValue).reshape(-1, 3) # 形状转换 (N,3)
# 剔除无效点X/Y/Z 任一为 NaN 或 Inf
mask = np.all(np.isfinite(points), axis=1)
return points[mask]
# def process_point_cloud_fast(points, sn, type):
# points = np.array(points, dtype=np.float32)
# # 获取目标高度和容差(来自模板配置)
# template_config = config.TEMPLATE_CONFIG_MAP.get(type, {})
# target_z = template_config.get("height", 0) # 假设 height 是期望的中心高度
# tolerance = template_config.get("tolerance", 50) # 默认容差为 50
#
# # 计算上下限
# z_min = target_z - tolerance
# z_max = target_z + tolerance # 不超过 200
# # Z 值过滤
# valid_mask = ((points[:, 2] > 0)
# & (points[:, 2] <= config.CAMERA_CONFIG_MAP[sn].get("max_z", np.inf))
# )
#
# points = points[valid_mask]
#
# if clip_config := config.CUT_CONFIG_MAP.get(sn):
# rotation = np.array(clip_config["rotation"], dtype=np.float32).reshape(3, 3)
# min_pt = np.array(clip_config["min_pt"], dtype=np.float32)
# max_pt = np.array(clip_config["max_pt"], dtype=np.float32)
# floorHeight = clip_config.get("floorHeight", 0)
#
# # 批量旋转
# rotated = (rotation @ points.T).T
#
# # 裁剪范围过滤
# bounds_mask = np.all((rotated >= min_pt) & (rotated <= max_pt), axis=1)
# rotated = rotated[bounds_mask]
# x_trimmed = np.round(rotated[:, 0], 3)
# y_trimmed = np.round(rotated[:, 1], 3)
# z_trimmed = np.round(floorHeight - rotated[:, 2], 3)
# else:
# x_trimmed = np.round(points[:, 0], 3)
# y_trimmed = np.round(points[:, 1], 3)
# z_trimmed = np.round(points[:, 2], 3)
#
# # 合并为一个数组
# trimmed_points = np.column_stack((x_trimmed, y_trimmed, z_trimmed))
# # 应用 z 值过滤
# final_mask = (
# (trimmed_points[:, 2] >= z_min) &
# (trimmed_points[:, 2] <= z_max)
# )
# filtered_points = trimmed_points[final_mask]
#
# # 去重
# keys = np.column_stack((filtered_points[:, 0], filtered_points[:, 1]))
# _, unique_indices = np.unique(keys, axis=0, return_index=True)
#
# # 返回 list of [x, y, z]
# return filtered_points[unique_indices].tolist()
def process_point_cloud_fast(points, sn, type):
points = np.array(points, dtype=np.float32)
if not len(points):
return []
# 获取配置
template_config = config.TEMPLATE_CONFIG_MAP.get(type, {})
target_z = template_config.get("height", 0)
tolerance = template_config.get("tolerance", 25)
z_min = target_z - tolerance
z_max = target_z + tolerance
# 初始 Z 过滤
valid_mask = (points[:, 2] > 0) & (points[:, 2] <= config.CAMERA_CONFIG_MAP[sn].get("max_z", np.inf))
points = points[valid_mask]
# 获取裁剪配置
clip_config = config.CUT_CONFIG_MAP.get(sn + "_" + type, None) or config.CUT_CONFIG_MAP.get(sn)
if clip_config:
rotation = np.array(clip_config["rotation"], dtype=np.float32).reshape(3, 3)
min_pt = np.array(clip_config["min_pt"], dtype=np.float32)
max_pt = np.array(clip_config["max_pt"], dtype=np.float32)
floorHeight = clip_config.get("floorHeight", 0)
# 批量旋转
rotated = (rotation @ points.T).T
# 裁剪范围过滤
bounds_mask = np.all((rotated >= min_pt) & (rotated <= max_pt), axis=1)
rotated = rotated[bounds_mask]
# 计算坐标并应用 floorHeight
x_trimmed = rotated[:, 0]
y_trimmed = rotated[:, 1]
z_trimmed = floorHeight - rotated[:, 2]
else:
x_trimmed = points[:, 0]
y_trimmed = points[:, 1]
z_trimmed = points[:, 2]
# 合并为一个数组
trimmed_points = np.column_stack((
np.round(x_trimmed, 3),
np.round(y_trimmed, 3),
np.round(z_trimmed, 3)
))
# 应用 z 值过滤
final_mask = (trimmed_points[:, 2] >= z_min) & (trimmed_points[:, 2] <= z_max)
filtered_points = trimmed_points[final_mask]
# 去重
keys = filtered_points[:, :2] # 只取 x,y
_, unique_indices = np.unique(keys, axis=0, return_index=True)
# 返回 list of [x, y, z]
return filtered_points[unique_indices].tolist()
def sValue_to_pcd(sValue,stPointCloudImage,sn,type):
# 数据格式转换
points = convert_sdk_points(sValue, stPointCloudImage.nWidth, stPointCloudImage.nHeight)
# 应用旋转、裁剪、去重(假设设备序列号已知)
processed_points = process_point_cloud_fast(points, sn,type)
output_ply_path = config.save_path("pcd", sn + ".pcd")
# 保存结果
# Point.py 修改部分
from thread_pool import submit_task
if config.CAMERA_CONFIG_MAP[sn].get("save_pcd"):
output_ply_path = config.save_path("pcd", sn + ".pcd")
# 使用线程池异步执行点云文件写入
submit_task(write_pcd, output_ply_path, processed_points)
if config.CAMERA_CONFIG_MAP[sn].get("save_not_cut", True):
output_ply_path = config.save_path("pcd", sn + "_original.pcd")
submit_task(write_pcd, output_ply_path, points)
return output_ply_path,processed_points
def write_pcd(filename, points):
"""
将点云写入 PCD 文件
:param filename: str, 输出 PCD 文件路径
:param points: list of [x, y, z], 点云数据
"""
with open(filename, 'w') as f:
# 写入 PCD 文件头
f.write("# .PCD v0.7 - Point Cloud Data file format\n")
f.write("VERSION 0.7\n")
f.write("FIELDS x y z\n")
f.write("SIZE 4 4 4\n")
f.write("TYPE F F F\n")
f.write("COUNT 1 1 1\n")
f.write(f"WIDTH {len(points)}\n")
f.write(f"HEIGHT 1\n")
f.write("VIEWPOINT 0 0 0 1 0 0 0\n")
f.write(f"POINTS {len(points)}\n")
f.write("DATA ascii\n")
# 写入点数据
for point in points:
f.write(f"{point[0]} {point[1]} {point[2]}\n")
logger.info(f"Saved point cloud to {filename}")