Source code for opr.pipelines.localization.base

"""Hierarchical Localization Pipeline."""
from os import PathLike
from pathlib import Path
from typing import Dict

import numpy as np
import torch
from geotransformer.utils.pointcloud import (
    get_rotation_translation_from_transform,
    get_transform_from_rotation_translation,
)
from loguru import logger
from scipy.spatial.transform import Rotation
from torch import Tensor
from tqdm import tqdm

from opr.pipelines.place_recognition import PlaceRecognitionPipeline
from opr.pipelines.registration import (
    PointcloudRegistrationPipeline,
    SequencePointcloudRegistrationPipeline,
)


[docs] class LocalizationPipeline: """Hierarchical Localization Pipeline. The task of localiation is solved in two steps: 1. Find the best match for the query in the database (Place Recognition). 2. Refine the pose estimate using the query and the database match (Registration). """ def __init__( self, place_recognition_pipeline: PlaceRecognitionPipeline, registration_pipeline: PointcloudRegistrationPipeline | SequencePointcloudRegistrationPipeline, precomputed_reg_feats: bool = False, pointclouds_subdir: str | PathLike | None = None, ) -> None: """Hierarchical Localization Pipeline. The task of localiation is solved in two steps: 1. Find the best match for the query in the database (Place Recognition). 2. Refine the pose estimate using the query and the database match (Registration). Args: place_recognition_pipeline (PlaceRecognitionPipeline): Place Recognition pipeline. registration_pipeline (PointcloudRegistrationPipeline): Registration pipeline. precomputed_reg_feats (bool): Whether to use precomputed registration features. Defaults to False. pointclouds_subdir (str | PathLike, optional): Sub-directory with pointclouds. Will be used for computing registration feats, if they are not exist; or for loading pointclouds if `precomputed_reg_feats=False`. Defaults to None. Raises: ValueError: Pointclouds sub-directory must be provided if precomputed registration features are not used. ValueError: Precomputed registration features are only supported for HRegNet. """ self.pr_pipe = place_recognition_pipeline self.reg_pipe = registration_pipeline self.database_df = self.pr_pipe.database_df self.database_dir = self.pr_pipe.database_dir if "pointcloud" in self.database_df.columns: self.pc_col = "pointcloud" self.num_points_properties = 3 elif "lidar_ts" in self.database_df.columns: self.pc_col = "lidar_ts" self.num_points_properties = 4 if isinstance(self.reg_pipe, SequencePointcloudRegistrationPipeline): self.sequences = True else: self.sequences = False if not precomputed_reg_feats and pointclouds_subdir is None: raise ValueError( "Pointclouds sub-directory must be provided if precomputed registration features are not used." ) self.pointclouds_dir = (self.database_dir / pointclouds_subdir) if pointclouds_subdir else None if self.pointclouds_dir is not None and not self.pointclouds_dir.exists(): raise ValueError(f"Pointclouds directory not found: {self.pointclouds_dir}") self.precomputed_reg_feats = precomputed_reg_feats self._setup_precomputed_reg_feats() def _setup_precomputed_reg_feats(self) -> None: self.precomputed_reg_feats_dir = None reg_model_name = self.reg_pipe.model.__class__.__name__ if self.precomputed_reg_feats: if reg_model_name != "HRegNet": raise ValueError("Precomputed registration features are only supported for HRegNet.") self.precomputed_reg_feats_dir = self.database_dir / f"{reg_model_name}_features" if not self.precomputed_reg_feats_dir.exists(): logger.warning( f"Precomputed registration features directory not found: {self.precomputed_reg_feats_dir}. " "It will be created and features will be computed." ) self.precomputed_reg_feats_dir.mkdir() if self.pointclouds_dir is None or not Path(self.pointclouds_dir).exists(): raise ValueError( "Pointclouds directory must be provided to compute registration features." ) if not any(self.precomputed_reg_feats_dir.iterdir()): logger.warning("Precomputed registration features directory is empty. Computing features.") self.compute_reg_features( save_dir=self.precomputed_reg_feats_dir, pointclouds_dir=self.pointclouds_dir )
[docs] def infer(self, input_data: Dict[str, Tensor] | list[Dict[str, Tensor]]) -> Dict[str, np.ndarray]: """Single sample inference. Args: input_data (Dict[str, Tensor]): Input data. Dictionary with keys in the following format: "image_{camera_name}" for images from cameras, "mask_{camera_name}" for semantic segmentation masks, "pointcloud_lidar_coords" for pointcloud coordinates from lidar, "pointcloud_lidar_feats" for pointcloud features from lidar. Returns: Dict[str, np.ndarray]: Inference results. Dictionary with keys: "db_match_pose" for database match pose in the format [tx, ty, tz, qx, qy, qz, qw], "estimated_pose" for estimated pose in the format [tx, ty, tz, qx, qy, qz, qw]. Raises: ValueError: Provided input data is a list, but the pipeline is not for sequences. ValueError: Provided input data is not a list, but the pipeline is for sequences. """ if isinstance(input_data, list) and not self.sequences: raise ValueError("Provided input data is a list, but the pipeline is not for sequences.") if not isinstance(input_data, list) and self.sequences: raise ValueError("Provided input data is not a list, but the pipeline is for sequences.") out_dict = {} if isinstance(input_data, list): pr_output = self.pr_pipe.infer(input_data[-1]) query_pc = [x["pointcloud_lidar_coords"] for x in input_data] else: pr_output = self.pr_pipe.infer(input_data) query_pc = input_data["pointcloud_lidar_coords"] db_pose = pr_output["pose"] out_dict["db_match_pose"] = db_pose db_pose = get_transform_from_rotation_translation( Rotation.from_quat(db_pose[3:]).as_matrix(), db_pose[:3] ) db_idx = pr_output["idx"] out_dict["db_match_idx"] = db_idx if not self.precomputed_reg_feats: db_pc_filename = f"{int(self.database_df[self.pc_col].iloc[db_idx])}.bin" db_pc = self._load_pc( self.pointclouds_dir / db_pc_filename, num_point_properties=self.num_points_properties ) db_pc = db_pc[:, :3] if isinstance(query_pc, list): estimated_transform = self.reg_pipe.infer(query_pc_list=query_pc, db_pc=db_pc) else: estimated_transform = self.reg_pipe.infer(query_pc=query_pc, db_pc=db_pc) else: db_pc_feats = self._load_feats( self.precomputed_reg_feats_dir / f"{int(self.database_df[self.pc_col].iloc[db_idx])}.pt" ) if isinstance(query_pc, list): estimated_transform = self.reg_pipe.infer(query_pc_list=query_pc, db_pc_feats=db_pc_feats) else: estimated_transform = self.reg_pipe.infer(query_pc=query_pc, db_pc_feats=db_pc_feats) estimated_pose = db_pose @ self._invert_rigid_transformation_matrix(estimated_transform) rot, trans = get_rotation_translation_from_transform(estimated_pose) rot = Rotation.from_matrix(rot).as_quat() pose = np.concatenate([trans, rot]) out_dict["estimated_pose"] = pose return out_dict
[docs] def compute_reg_features(self, save_dir: str | PathLike, pointclouds_dir: str | PathLike) -> None: """Compute registration features for the database. Args: save_dir (str | PathLike): Directory to save the features. pointclouds_dir (str | PathLike): Directory where pointclouds will be saved. """ for idx in tqdm(range(len(self.database_df)), total=len(self.database_df), leave=False): pc_ts = int(self.database_df[self.pc_col].iloc[idx]) pointcloud_path = Path(pointclouds_dir) / f"{pc_ts}.bin" pointcloud = self._load_pc(pointcloud_path, num_point_properties=self.num_points_properties) pointcloud = pointcloud.to(self.reg_pipe.device) pointcloud = pointcloud[:, :3] pointcloud = self.reg_pipe._downsample_pointcloud(pointcloud) features = self.reg_pipe.model.extract_features(pointcloud) feature_save_path = Path(save_dir) / f"{pc_ts}.pt" torch.save(features, feature_save_path)
def _invert_rigid_transformation_matrix(self, T: np.ndarray) -> np.ndarray: """Inverts a 4x4 rigid body transformation matrix. Args: T (np.ndarray): A 4x4 rigid body transformation matrix. Returns: np.ndarray: The inverted 4x4 rigid body transformation matrix. Raises: ValueError: Input matrix must be 4x4. """ if T.shape != (4, 4): raise ValueError("Input matrix must be 4x4.") R = T[:3, :3] t = T[:3, 3] R_inv = R.T t_inv = -R.T @ t T_inv = np.eye(4) T_inv[:3, :3] = R_inv T_inv[:3, 3] = t_inv return T_inv def _load_pc( self, filepath: str | PathLike, num_point_properties: int = 3, dtype: np.dtype = np.float32 ) -> Tensor: """Load pointcloud from file. Args: filepath (str | PathLike): Filepath to the pointcloud bin file. num_point_properties (int): Number of properties for each point in the point cloud. Defaults to 3. dtype (np.dtype): Data type of the point cloud. Defaults to np.float32. Returns: Tensor: Pointcloud. """ pc = np.fromfile(str(filepath), dtype=dtype).reshape(-1, num_point_properties) pc = torch.from_numpy(pc) pc = pc[~torch.isnan(pc).any(dim=1)] return pc def _load_feats(self, filepath: str | PathLike) -> Dict[str, Tensor]: """Load features from file. Args: filepath (str | PathLike): Filepath to the features file. Returns: Dict[str, Tensor]: Features. """ feats = torch.load(filepath, map_location=self.reg_pipe.device, weights_only=True) return feats