import numpy as np import cv2 import threading from thread_pool import submit_task from scipy.ndimage import label import config from logConfig import get_logger # 使用示例 logger = get_logger() # # def point_cloud_to_2d_image(points, resolution=1.0, x_range=(17, 275), y_range=(-129, -1227)): # """ # 将点云转换为 2D 图像(X-Y 平面),每个像素值表示对应位置的 Z 值平均值 # """ # if not isinstance(points, np.ndarray): # points = np.array(points) # # if len(points) == 0: # raise ValueError("点云为空") # # x_min, x_max = x_range # y_min, y_max = y_range # # width = int((x_max - x_min) / resolution) + 1 # height = int((y_max - y_min) / resolution) + 1 # # image = np.zeros((height, width), dtype=np.float32) # count_map = np.zeros((height, width), dtype=np.int32) # # for x, y, z in points: # xi = int((x - x_min) / resolution) # yi = int((y - y_min) / resolution) # # if 0 <= xi < width and 0 <= yi < height: # image[yi, xi] += z # count_map[yi, xi] += 1 # # # 防止除零错误 # count_map[count_map == 0] = 1 # image /= count_map # # return image, (x_min, y_min) # 优化 point_cloud_to_2d_image 函数 def point_cloud_to_2d_image(points, resolution=1.0, x_range=(17, 275), y_range=(-129, -1227)): """ 将点云转换为 2D 图像(X-Y 平面),每个像素值表示对应位置的 Z 值平均值 """ if not isinstance(points, np.ndarray): points = np.array(points) if len(points) == 0: raise ValueError("点云为空") x_min, x_max = x_range y_min, y_max = y_range width = int((x_max - x_min) / resolution) + 1 height = int((y_max - y_min) / resolution) + 1 # 使用向量化操作替代循环 x_coords = points[:, 0] y_coords = points[:, 1] z_coords = points[:, 2] # 计算像素坐标 xi = ((x_coords - x_min) / resolution).astype(int) yi = ((y_coords - y_min) / resolution).astype(int) # 筛选有效坐标 valid_mask = (xi >= 0) & (xi < width) & (yi >= 0) & (yi < height) xi = xi[valid_mask] yi = yi[valid_mask] z_coords = z_coords[valid_mask] # 使用直方图统计替代循环累加 image = np.zeros((height, width), dtype=np.float32) count_map = np.zeros((height, width), dtype=np.int32) # 使用 bincount 进行快速统计 indices = yi * width + xi z_sums = np.bincount(indices, weights=z_coords, minlength=height * width) counts = np.bincount(indices, minlength=height * width) image.flat[:] = z_sums count_map.flat[:] = counts # 防止除零错误 count_map[count_map == 0] = 1 image /= count_map return image, (x_min, y_min) def stitch(imagePath1, imagePath2): # 读取两张图像 image1 = cv2.imread(imagePath1) image2 = cv2.imread(imagePath2) # 创建 Stitcher 对象 stitcher = cv2.Stitcher_create() # 拼接图像 (status, stitched) = stitcher.stitch((image1, image2)) # 检查拼接结果 if status == cv2.Stitcher_OK: cv2.imwrite('stitched_output.jpg', stitched) print("图像拼接成功!") else: print(f"图像拼接失败,错误代码: {status}") if __name__ == '__main__': stitch('D:/git/test/hik3d-python/image/2025-07-01/color/105601193_-_Color.bmp' , 'D:/git/test/hik3d-python/image/2025-07-01/color/164646720_-_Color.bmp') def detect_black_regions(binary_mask, min_area=10,box_area=10): """ 检测图像中的黑色连通区域(值为 0 的区域) :param binary_mask: 2D numpy array, 二值图(0 表示黑色区域) :param min_area: int, 最小面积阈值,小于该值的区域会被忽略 :return: list of dict, 包含每个区域的信息: [ { 'id': int, 'center': (cx, cy), 'area': int, 'coordinates': [(x1,y1), (x2,y2), ...] }, ... ] """ # 确保输入是二值图像(0 是黑) binary = (binary_mask == 0).astype(np.uint8) # 使用连通域分析 structure = np.array([[1, 1, 1], [1, 1, 1], [1, 1, 1]]) labeled_map, num_features = label(binary, structure=structure) regions = [] count = 0 for label_id in range(1, num_features + 1): coords = np.where(labeled_map == label_id) area = len(coords[0]) if area >= min_area: cx = np.mean(coords[1]) # x 坐标(列) cy = np.mean(coords[0]) # y 坐标(行) logger.info(f"区域: {label_id} 中心: {cx, cy} 面积: {area}") regions.append(((cx, cy),area)) count += ( area/box_area) return regions,count def convert_image_holes_to_real(holes, offset, resolution): real_holes = [] x_min, y_min = offset for ((cx, cy), area) in holes: real_x = x_min + cx * resolution real_y = y_min + cy * resolution real_holes.append(((real_x, real_y), area * resolution ** 2)) return real_holes def visualize_holes_on_image(image, holes, output_path=None): """ 在图像上画出检测到的空洞中心和轮廓 """ try: # 确保图像数据有效 if image is None or len(image) == 0: print("Warning: Empty image data for visualization") return # 彩色化灰度图用于可视化 if len(image.shape) == 2: # 灰度图 color_image = cv2.cvtColor(image, cv2.COLOR_GRAY2BGR) else: color_image = image.copy() # 绘制空洞标记 if holes: for (cx, cy), _ in holes: # 确保坐标有效 if 0 <= int(cx) < color_image.shape[1] and 0 <= int(cy) < color_image.shape[0]: cv2.circle(color_image, (int(cx), int(cy)), radius=5, color=(0, 0, 255), thickness=-1) # 生成输出路径 if output_path is None: output_path = config.save_path("image", "_holes.png") # 保存图像 success = cv2.imwrite(output_path, color_image) if success: print(f"Saved visualization to {output_path}") else: print(f"Failed to save visualization to {output_path}") except Exception as e: print(f"Error in visualize_holes_on_image: {e}") def read_pcd_points(pcd_path): """ 从 .pcd 文件中提取点云坐标 (x, y, z) :param pcd_path: str, PCD 文件路径 :return: list of [x, y, z], 点云坐标列表 """ points = [] data_started = False with open(pcd_path, 'r') as f: for line in f: if line.startswith("DATA"): data_started = True continue if data_started: parts = line.strip().split() if len(parts) >= 3: try: x = float(parts[0]) y = float(parts[1]) z = float(parts[2]) points.append([x, y, z]) except ValueError: continue return points def detect_large_holes(points, sn, type): # 获取裁剪配置 cat_map = config.CUT_CONFIG_MAP.get(sn + "_" + type, None) or config.CUT_CONFIG_MAP.get(sn) template_map = config.TEMPLATE_CONFIG_MAP[type] camera_map = config.CAMERA_CONFIG_MAP[sn] # 2. 生成 2D 图像 x_range = (cat_map["min_pt"][0], cat_map["max_pt"][0]) y_range = (cat_map["min_pt"][1], cat_map["max_pt"][1]) resolution = config.CAMERA_CONFIG_MAP[sn].get("resolution") image, offset = point_cloud_to_2d_image(points, resolution=resolution, x_range=x_range, y_range=y_range) # 3. 图像归一化用于可视化 normalized_image = cv2.normalize(image, None, 0, 255, cv2.NORM_MINMAX).astype(np.uint8) min_area = min(template_map["min_area"], (template_map["width"] * template_map["height"]) /(2*camera_map["resolution"]*camera_map["resolution"])) # 4. 检测空洞 holes, count = detect_black_regions(normalized_image, min_area, (template_map["width"] * template_map["height"]) / 2) if config.CAMERA_CONFIG_MAP[sn].get("save_image"): # 创建数据副本确保异步执行时数据完整性 image_copy = normalized_image.copy() holes_copy = list(holes) # 创建holes的副本 # 预先生成输出路径 output_path = config.save_path("image", f"_{sn}_holes.png") # 使用线程池异步执行可视化操作 submit_task(visualize_holes_on_image, image_copy, holes_copy, output_path) # 6. 输出真实世界坐标 real_holes = convert_image_holes_to_real(holes, offset, resolution) return real_holes, count # if __name__ == '__main__': # points = read_pcd_points("D:/PycharmProjects/Hik3D/image/2025-06-25/pcd/182109899_00DA6823936_merged.pcd") # sn = "00DA6823936" # x_max = 326 # detect_large_holes(points,sn, x_max)