You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

308 lines
11 KiB
Python

7 months ago
from datetime import time
8 months ago
import numpy as np
from PIL import Image
import math
import config
7 months ago
import asyncio
8 months ago
7 months ago
import numpy as np
from scipy.spatial.transform import Rotation as R
8 months ago
7 months ago
from logConfig import get_logger
8 months ago
7 months ago
# 使用示例
logger = get_logger()
8 months ago
7 months ago
def apply_rotation(matrix, angle_degrees, axis='x'):
"""
对输入的 3x3 旋转矩阵 应用额外的旋转
8 months ago
7 months ago
参数:
matrix (np.ndarray): 原始的 3x3 旋转矩阵
angle_degrees (float): 旋转角度 (单位: )
axis (str): 旋转轴支持 'x', 'y', 'z'
8 months ago
7 months ago
返回:
np.ndarray: 新的 3x3 旋转矩阵
"""
# 创建绕指定轴旋转的旋转对象
rot = R.from_euler(axis, angle_degrees, degrees=True)
# 将当前旋转转换为矩阵
rotation_matrix = rot.as_matrix()
# 合并旋转:新旋转 × 原始矩阵
combined_rotation = np.dot(rotation_matrix, matrix)
return combined_rotation
#
# def tiff_depth_to_point_cloud(tiff_path,sn, dedup=True):
# """
# 将 TIFF 深度图转换为点云,基于给定的视场角计算相机内参
#
# :param tiff_path: str, TIFF 深度图路径
# :param hfov_degrees: float or None, 水平视场角(单位:度)
# :param vfov_degrees: float or None, 垂直视场角(单位:度)
# :param output_ply_path: str or None, 输出 Pcd 文件路径(可选)
# :param dedup: bool, 是否根据 (x, y) 去重,默认开启
# :return: list of [x, y, z] 点云数据
# """
# hfov_degrees = config.CAMERA_CONFIG_MAP[sn].get("x_angle")
# max_z = config.CAMERA_CONFIG_MAP[sn].get("max_z")
# vfov_degrees = config.CAMERA_CONFIG_MAP[sn].get("y_angle")
# # plane_scaling_ratio = config.CAMERA_CONFIG_MAP[sn].get("plane_scaling_ratio")
# # 加载 TIFF 图像
# depth_image = Image.open(tiff_path)
# depth_array = np.array(depth_image, dtype=np.float32)
#
# height, width = depth_array.shape
#
# # 根据提供的 FOV 计算 fx/fy
# if hfov_degrees is not None:
# hfov_rad = math.radians(hfov_degrees)
# fx = width / (2 * math.tan(hfov_rad / 2))
# fy = fx * height / width # 保持像素方形比例
# else:
# vfov_rad = math.radians(vfov_degrees)
# fy = height / (2 * math.tan(vfov_rad / 2))
# fx = fy * width / height # 保持像素方形比例
#
# cx = width / 2
# cy = height / 2
#
# print(f"Calculated intrinsics: fx={fx:.2f}, fy={fy:.2f}, cx={cx:.2f}, cy={cy:.2f}")
#
# points = []
# seen_xy = set() # 用于记录已添加的 (x, y)
#
# # 获取裁剪配置
# clip_config = config.CUT_CONFIG_MAP.get(sn)
# if clip_config:
# min_pt = np.array(clip_config.get("min_pt", [-np.inf] * 3))
# max_pt = np.array(clip_config.get("max_pt", [np.inf] * 3))
# rotation = np.array(clip_config.get("rotation", [1,0,0,0,1,0,0,0,1])).reshape(3, 3)
# else:
# min_pt = max_pt = rotation = None
#
# for v in range(height):
# for u in range(width):
# z = depth_array[v, u] # mm -> m
# if z <= 0 or z > max_z:
# continue
#
# x = (u - cx) * z / fx
# y = (v - cy) * z / fy
#
# # 构造点并旋转(保留浮点精度)
# point = np.array([x, y, z])
# if clip_config:
# rotated_point = (rotation @ point.T).T
# if not np.all(rotated_point >= min_pt) or not np.all(rotated_point <= max_pt):
# continue
# x_final, y_final, z_final = rotated_point
# else:
# x_final, y_final, z_final = point
#
# # 到这里才进行去重判断
# if dedup:
# x_int = int(round(x_final))
# y_int = int(round(y_final))
# z_int = int(round(z_final))
# if (x_int, y_int) in seen_xy:
# continue
# seen_xy.add((x_int, y_int))
# # 保留浮点精度写入结果
# points.append([x_int, y_int, z_int])
#
#
# # 可选输出PLY文件用于可视化
# if config.CAMERA_CONFIG_MAP[sn].get("save_pcd"):
# output_ply_path = config.save_path("pcd",sn+".pcd")
# write_pcd(output_ply_path, points)
#
# return points
8 months ago
# 新增点云数据转换函数
def convert_sdk_points(sValue, width, height):
"""
SDK 原始点云数据转换为 NumPy 点云数组
:param sValue: tuple(float), SDK 输出的扁平化点云数据
:param width: int, 点云宽度
:param height: int, 点云高度
:return: np.ndarray(shape=(N,3)), 三维点云数组
"""
points = np.array(sValue).reshape(-1, 3) # 形状转换 (N,3)
# 剔除无效点X/Y/Z 任一为 NaN 或 Inf
mask = np.all(np.isfinite(points), axis=1)
return points[mask]
7 months ago
# def process_point_cloud_fast(points, sn, type):
# points = np.array(points, dtype=np.float32)
# # 获取目标高度和容差(来自模板配置)
# template_config = config.TEMPLATE_CONFIG_MAP.get(type, {})
# target_z = template_config.get("height", 0) # 假设 height 是期望的中心高度
# tolerance = template_config.get("tolerance", 50) # 默认容差为 50
#
# # 计算上下限
# z_min = target_z - tolerance
# z_max = target_z + tolerance # 不超过 200
# # Z 值过滤
# valid_mask = ((points[:, 2] > 0)
# & (points[:, 2] <= config.CAMERA_CONFIG_MAP[sn].get("max_z", np.inf))
# )
#
# points = points[valid_mask]
#
# if clip_config := config.CUT_CONFIG_MAP.get(sn):
# rotation = np.array(clip_config["rotation"], dtype=np.float32).reshape(3, 3)
# min_pt = np.array(clip_config["min_pt"], dtype=np.float32)
# max_pt = np.array(clip_config["max_pt"], dtype=np.float32)
# floorHeight = clip_config.get("floorHeight", 0)
#
# # 批量旋转
# rotated = (rotation @ points.T).T
#
# # 裁剪范围过滤
# bounds_mask = np.all((rotated >= min_pt) & (rotated <= max_pt), axis=1)
# rotated = rotated[bounds_mask]
# x_trimmed = np.round(rotated[:, 0], 3)
# y_trimmed = np.round(rotated[:, 1], 3)
# z_trimmed = np.round(floorHeight - rotated[:, 2], 3)
# else:
# x_trimmed = np.round(points[:, 0], 3)
# y_trimmed = np.round(points[:, 1], 3)
# z_trimmed = np.round(points[:, 2], 3)
#
# # 合并为一个数组
# trimmed_points = np.column_stack((x_trimmed, y_trimmed, z_trimmed))
# # 应用 z 值过滤
# final_mask = (
# (trimmed_points[:, 2] >= z_min) &
# (trimmed_points[:, 2] <= z_max)
# )
# filtered_points = trimmed_points[final_mask]
#
# # 去重
# keys = np.column_stack((filtered_points[:, 0], filtered_points[:, 1]))
# _, unique_indices = np.unique(keys, axis=0, return_index=True)
#
# # 返回 list of [x, y, z]
# return filtered_points[unique_indices].tolist()
def process_point_cloud_fast(points, sn, type):
points = np.array(points, dtype=np.float32)
if not len(points):
return []
# 获取配置
template_config = config.TEMPLATE_CONFIG_MAP.get(type, {})
target_z = template_config.get("height", 0)
tolerance = template_config.get("tolerance", 25)
z_min = target_z - tolerance
z_max = target_z + tolerance
# 初始 Z 过滤
valid_mask = (points[:, 2] > 0) & (points[:, 2] <= config.CAMERA_CONFIG_MAP[sn].get("max_z", np.inf))
points = points[valid_mask]
7 months ago
# 获取裁剪配置
clip_config = config.CUT_CONFIG_MAP.get(sn + "_" + type, None) or config.CUT_CONFIG_MAP.get(sn)
if clip_config:
7 months ago
rotation = np.array(clip_config["rotation"], dtype=np.float32).reshape(3, 3)
min_pt = np.array(clip_config["min_pt"], dtype=np.float32)
max_pt = np.array(clip_config["max_pt"], dtype=np.float32)
floorHeight = clip_config.get("floorHeight", 0)
# 批量旋转
rotated = (rotation @ points.T).T
# 裁剪范围过滤
bounds_mask = np.all((rotated >= min_pt) & (rotated <= max_pt), axis=1)
rotated = rotated[bounds_mask]
# 计算坐标并应用 floorHeight
x_trimmed = rotated[:, 0]
y_trimmed = rotated[:, 1]
z_trimmed = floorHeight - rotated[:, 2]
else:
7 months ago
x_trimmed = points[:, 0]
y_trimmed = points[:, 1]
z_trimmed = points[:, 2]
7 months ago
# 合并为一个数组
trimmed_points = np.column_stack((
np.round(x_trimmed, 3),
np.round(y_trimmed, 3),
np.round(z_trimmed, 3)
))
7 months ago
# 应用 z 值过滤
final_mask = (trimmed_points[:, 2] >= z_min) & (trimmed_points[:, 2] <= z_max)
filtered_points = trimmed_points[final_mask]
7 months ago
# 去重
keys = filtered_points[:, :2] # 只取 x,y
_, unique_indices = np.unique(keys, axis=0, return_index=True)
7 months ago
# 返回 list of [x, y, z]
return filtered_points[unique_indices].tolist()
7 months ago
def sValue_to_pcd(sValue,stPointCloudImage,sn,type):
# 数据格式转换
points = convert_sdk_points(sValue, stPointCloudImage.nWidth, stPointCloudImage.nHeight)
# 应用旋转、裁剪、去重(假设设备序列号已知)
7 months ago
processed_points = process_point_cloud_fast(points, sn,type)
output_ply_path = config.save_path("pcd", sn + ".pcd")
# 保存结果
7 months ago
# Point.py 修改部分
from thread_pool import submit_task
if config.CAMERA_CONFIG_MAP[sn].get("save_pcd"):
output_ply_path = config.save_path("pcd", sn + ".pcd")
# 使用线程池异步执行点云文件写入
submit_task(write_pcd, output_ply_path, processed_points)
if config.CAMERA_CONFIG_MAP[sn].get("save_not_cut", True):
output_ply_path = config.save_path("pcd", sn + "_original.pcd")
submit_task(write_pcd, output_ply_path, points)
return output_ply_path,processed_points
8 months ago
def write_pcd(filename, points):
"""
将点云写入 PCD 文件
:param filename: str, 输出 PCD 文件路径
:param points: list of [x, y, z], 点云数据
"""
with open(filename, 'w') as f:
# 写入 PCD 文件头
f.write("# .PCD v0.7 - Point Cloud Data file format\n")
7 months ago
f.write("VERSION 0.7\n")
8 months ago
f.write("FIELDS x y z\n")
7 months ago
f.write("SIZE 4 4 4\n")
f.write("TYPE F F F\n")
f.write("COUNT 1 1 1\n")
f.write(f"WIDTH {len(points)}\n")
f.write(f"HEIGHT 1\n")
f.write("VIEWPOINT 0 0 0 1 0 0 0\n")
f.write(f"POINTS {len(points)}\n")
f.write("DATA ascii\n")
8 months ago
# 写入点数据
for point in points:
f.write(f"{point[0]} {point[1]} {point[2]}\n")
7 months ago
logger.info(f"Saved point cloud to {filename}")
8 months ago