Source code for opr.datasets.itlp

"""Custom ITLP-Campus dataset implementations."""
import math
from pathlib import Path
from typing import Dict, List, Literal, Optional, Tuple, Union

import cv2
import gdown
import numpy as np
import pandas as pd
import torch
from loguru import logger
from omegaconf import OmegaConf
from pandas import DataFrame
from torch import Tensor
from torch.utils.data import Dataset

from opr.datasets.augmentations import (
    DefaultCloudSetTransform,
    DefaultCloudTransform,
    DefaultImageTransform,
    DefaultSemanticTransform,
)
from opr.datasets.projection import Projector
from opr.datasets.soc_utils import (
    get_points_labels_by_mask,
    instance_masks_to_objects,
    pack_objects,
    semantic_mask_to_instances,
)
from opr.optional_deps import lazy

# Lazy-load MinkowskiEngine - will return real module or helpful stub
ME = lazy("MinkowskiEngine", feature="sparse convolutions")


[docs] class ITLPCampus(Dataset): """ITLP Campus dataset implementation.""" dataset_root: Path dataset_df: DataFrame front_cam_text_descriptions_df: Optional[DataFrame] back_cam_text_descriptions_df: Optional[DataFrame] front_cam_text_labels_df: Optional[DataFrame] back_cam_text_labels_df: Optional[DataFrame] front_cam_aruco_labels_df: Optional[DataFrame] back_cam_aruco_labels_df: Optional[DataFrame] sensors: Tuple[str, ...] images_subdir: str = "" clouds_subdir: str = "lidar" semantic_subdir: str = "masks" text_descriptions_subdir: str = "text_descriptions" text_labels_subdir: str = "text_labels" aruco_labels_subdir: str = "aruco_labels" image_transform: DefaultImageTransform pointcloud_transform: DefaultCloudTransform cloud_set_transform: DefaultCloudSetTransform _pointcloud_quantization_size: Optional[float] load_semantics: bool load_text_descriptions: bool load_text_labels: bool load_aruco_labels: bool load_soc: bool indoor: bool cam_config: dict sensors_cfg: OmegaConf top_k_soc: int soc_coords_type: Literal["cylindrical_3d", "cylindrical_2d", "euclidean", "spherical"] = "cylindrical_3d" max_distance_soc: float anno: OmegaConf subset: Literal["train", "val", "test"] vis_dir: str = "./vis/" train_split: list = None test_split: list = None def __init__( self, dataset_root: Union[str, Path], subset: Literal["train", "val", "test"] | None = None, csv_file: str = "track.csv", sensors: Union[str, Tuple[str, ...]] = ("front_cam", "lidar"), mink_quantization_size: Optional[float] = 0.5, max_point_distance: Optional[float] = None, load_semantics: bool = False, exclude_dynamic_classes: bool = False, load_text_descriptions: bool = False, load_text_labels: bool = False, load_aruco_labels: bool = False, indoor: bool = False, positive_threshold: float = 10.0, negative_threshold: float = 50.0, image_transform = DefaultImageTransform(resize=(320, 192), train=False), semantic_transform = DefaultSemanticTransform(resize=(320, 192), train=False), late_image_transform = None, load_soc: bool = False, top_k_soc: int = 5, soc_coords_type: Literal[ "cylindrical_3d", "cylindrical_2d", "euclidean", "spherical" ] = "cylindrical_3d", max_distance_soc: float = 50.0, sensors_cfg: OmegaConf = None, anno: OmegaConf = None, train_split: list = None, test_split: list = None, ) -> None: """ITLP Campus dataset implementation. Args: dataset_root (Union[str, Path]): Path to the dataset track root directory. subset (Literal["train", "val", "test"] | None): Subset of the dataset to load. Defaults to None. If None, the dataset will be loaded without splitting. csv_file (str): Name of the csv file with dataset information. Defaults to "track.csv". sensors (Union[str, Tuple[str, ...]]): List of sensors for which the data should be loaded. Defaults to ("front_cam", "lidar"). mink_quantization_size (Optional[float]): The quantization size for point clouds. Defaults to 0.5. max_point_distance (Optional[float]): The maximum distance of points from the origin to be considered. Defaults to None. load_semantics (bool): Wether to load semantic masks for camera images. Defaults to False. load_text_descriptions (bool): Wether to load text descriptions for camera images. Defaults to False. load_text_labels (bool): Wether to load detected text for camera images. Defaults to False. load_aruco_labels (bool): Wether to load detected aruco labels for camera images. Defaults to False. indoor (bool): Wether to load indoor or outdoor dataset track. Defaults to False. positive_threshold (float): The maximum UTM distance between two elements for them to be considered positive. Defaults to 10.0. negative_threshold (float): The maximum UTM distance between two elements for them to be considered non-negative. Defaults to 50.0. load_soc (bool): Wether to load scene object context for each element in the dataset. Defaults to False. top_k_soc (int): The maximum number of objects to consider in scene object context. Defaults to 5. soc_coords_type (Literal["cylindrical_3d", "cylindrical_2d", "euclidean", "spherical"]): The type of coordinates to use for scene object context. Defaults to "cylindrical_3d". max_distance_soc (float): The maximum distance between origin and object to consider in scene object context. Defaults to 50.0. sensors_cfg (OmegaConf): OmegaConf configuration for sensors. Defaults to None. anno (OmegaConf): OmegaConf configuration for annotations. Defaults to None. vis_dir (str): Directory to save visualization images. Defaults to "./vis/". train_split (list): List of train split floor names. Defaults to None. test_split (list): List of test split floor names. Defaults to None. late_image_transform (bool): perform image albu transform before getitem return. Defaults to None. Raises: FileNotFoundError: If dataset_root doesn't exist. FileNotFoundError: If there is no csv file for given subset (track). ValueError: If subset is not one of "train", "val" or "test". """ super().__init__() self.dataset_root = Path(dataset_root) if not self.dataset_root.exists(): raise FileNotFoundError(f"Given dataset_root={self.dataset_root} doesn't exist") self.subset = subset subset_csv = self.dataset_root / csv_file self.dataset_df = pd.read_csv(subset_csv) if indoor and self.subset is not None: if subset == "train": self.dataset_df = self.dataset_df[self.dataset_df["floor"].isin(train_split)] self.dataset_df.reset_index(inplace=True) elif subset == "test" or subset == "val": self.dataset_df = self.dataset_df[self.dataset_df["floor"].isin(test_split)] self.dataset_df.reset_index(inplace=True) else: raise ValueError(f"Unknown subset: {subset!r}") if self.subset == "test": self.dataset_df["in_query"] = True if isinstance(sensors, str): sensors = tuple(sensors) self.sensors = sensors self.late_image_transform = late_image_transform self._pointcloud_quantization_size = mink_quantization_size self._max_point_distance = max_point_distance self.load_semantics = load_semantics self.load_soc = load_soc self.top_k_soc = top_k_soc self.soc_coords_type = soc_coords_type if self.soc_coords_type not in ("cylindrical_3d", "cylindrical_2d", "euclidean", "spherical"): raise ValueError(f"Unknown soc_coords_type: {soc_coords_type!r}") self.max_distance_soc = max_distance_soc self.anno = anno if anno: self.special_classes = [ self.anno.staff_classes.index(special) for special in self.anno.special_classes ] if self.load_soc: if sensors_cfg is None: raise ValueError("cam_cfg must be specified if load_soc=True") self.front_cam_proj = Projector(sensors_cfg.front_cam, sensors_cfg.lidar) self.back_cam_proj = Projector(sensors_cfg.back_cam, sensors_cfg.lidar) self.load_text_descriptions = load_text_descriptions if self.load_text_descriptions: if "front_cam" in self.sensors: self.front_cam_text_descriptions_df = pd.read_csv( self.dataset_root / self.text_descriptions_subdir / "front_cam_text.csv" ) if "back_cam" in self.sensors: self.back_cam_text_descriptions_df = pd.read_csv( self.dataset_root / self.text_descriptions_subdir / "back_cam_text.csv" ) self.load_text_labels = load_text_labels if self.load_text_labels: if "front_cam" in self.sensors: self.front_cam_text_labels_df = pd.read_csv( self.dataset_root / self.text_labels_subdir / "front_cam_text_labels.csv" ) if "back_cam" in self.sensors: self.back_cam_text_labels_df = pd.read_csv( self.dataset_root / self.text_labels_subdir / "back_cam_text_labels.csv" ) self.load_aruco_labels = load_aruco_labels if self.load_aruco_labels: if "front_cam" in self.sensors: self.front_cam_aruco_labels_df = pd.read_csv( self.dataset_root / self.aruco_labels_subdir / "front_cam_aruco_labels.csv", sep="\t", ) if "back_cam" in self.sensors: self.back_cam_aruco_labels_df = pd.read_csv( self.dataset_root / self.aruco_labels_subdir / "back_cam_aruco_labels.csv", sep="\t", ) self.indoor = indoor # omg so wet 💦💦💦 if positive_threshold < 0.0: raise ValueError(f"positive_threshold must be non-negative, but {positive_threshold!r} given.") if negative_threshold < 0.0: raise ValueError(f"negative_threshold must be non-negative, but {negative_threshold!r} given.") self._positives_index, self._nonnegative_index = self._build_indexes( positive_threshold, negative_threshold ) self._positives_mask, self._negatives_mask = self._build_masks(positive_threshold, negative_threshold) self.image_transform = image_transform self.semantic_transform = semantic_transform self.pointcloud_transform = DefaultCloudTransform(train=False) self.pointcloud_set_transform = DefaultCloudSetTransform(train=False) self._ade20k_dynamic_idx = [12] self._mapillary_dynamic_idx = [19, 55] # human, car self.exclude_dynamic_classes = exclude_dynamic_classes self.lidar2front = np.array([[ 0.01509615, -0.99976457, -0.01558544, 0.04632156], [ 0.00871086, 0.01571812, -0.99983852, -0.13278588], [ 0.9998481, 0.01495794, 0.0089461, -0.06092749], [ 0. , 0. , 0. , 1. ]]) self.lidar2back = np.array([[-1.50409674e-02, 9.99886421e-01, 9.55906151e-04, 1.82703304e-02], [-1.30440106e-02, 7.59716299e-04, -9.99914635e-01, -1.41787545e-01], [-9.99801792e-01, -1.50521522e-02, 1.30311022e-02, -6.72336358e-02], [ 0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.00000000e+00]]) self.front_matrix = np.array([[683.6199340820312, 0.0, 615.1160278320312, 0.0, 683.6199340820312, 345.32354736328125, 0.0, 0.0, 1.0]]).reshape((3,3)) self.front_dist = np.array([0.0, 0.0, 0.0, 0.0, 0.0]) self.back_matrix = np.array([[910.4178466796875, 0.0, 648.44140625, 0.0, 910.4166870117188, 354.0118408203125, 0.0, 0.0, 1.0]]).reshape((3,3)) self.back_dist = np.array([0.0, 0.0, 0.0, 0.0, 0.0]) def _load_image(self, cam: str, idx: int, track: str, floor: str, transform: bool = True) -> Tensor: image_ts = int(self.dataset_df[f"{cam}_ts"].iloc[idx]) im_filepath = self.dataset_root / track / floor / self.images_subdir / cam / f"{image_ts}.png" im = cv2.imread(str(im_filepath)) im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB) if transform: im = self.image_transform(im) return im def _load_semantic_mask( self, cam: str, idx: int, track: str, floor: str, transform: bool = True ) -> Tensor: mask_ts = int(self.dataset_df[f"{cam}_ts"].iloc[idx]) im_filepath = self.dataset_root / track / floor / self.semantic_subdir / cam / f"{mask_ts}.png" im = cv2.imread(str(im_filepath), cv2.IMREAD_UNCHANGED) if transform: im = self.semantic_transform(im) return im def _load_text_labels(self, cam: str, idx: int) -> Tensor: image_ts = int(self.dataset_df[f"{cam}_ts"].iloc[idx]) if cam == "front_cam": text_labels_df = self.front_cam_text_labels_df[ self.front_cam_text_labels_df["path"] == f"{image_ts}.png" ] elif cam == "back_cam": text_labels_df = self.back_cam_text_labels_df[ self.back_cam_text_labels_df["path"] == f"{image_ts}.png" ] else: raise ValueError(f"Unknown camera: {cam!r}") return text_labels_df def _load_text_descriptions(self, cam: str, idx: int) -> Tensor: image_ts = int(self.dataset_df[f"{cam}_ts"].iloc[idx]) if cam == "front_cam": text_description_df = self.front_cam_text_descriptions_df[ self.front_cam_text_descriptions_df["path"] == f"{image_ts}.png" ] elif cam == "back_cam": text_description_df = self.back_cam_text_descriptions_df[ self.back_cam_text_descriptions_df["path"] == f"{image_ts}.png" ] else: raise ValueError(f"Unknown camera: {cam!r}") return text_description_df def _load_aruco_labels(self, cam: str, idx: int) -> Tensor: image_ts = int(self.dataset_df[f"{cam}_ts"].iloc[idx]) if cam == "front_cam": aruco_labels_df = self.front_cam_aruco_labels_df[ self.front_cam_aruco_labels_df["image_name"] == f"{image_ts}.png" ] elif cam == "back_cam": aruco_labels_df = self.back_cam_aruco_labels_df[ self.back_cam_aruco_labels_df["image_name"] == f"{image_ts}.png" ] else: raise ValueError(f"Unknown camera: {cam!r}") return aruco_labels_df def _get_soc(self, idx: int, track: str, floor: str) -> Tensor: mask_front = self._load_semantic_mask("front_cam", idx, track, floor, transform=False) mask_back = self._load_semantic_mask("back_cam", idx, track, floor, transform=False) lidar_scan = self._load_pc(idx, track, floor, tensor=False) coords_front, _, in_image_front = self.front_cam_proj(lidar_scan) coords_back, _, in_image_back = self.back_cam_proj(lidar_scan) point_labels = np.zeros(len(lidar_scan), dtype=np.uint8) point_labels[in_image_front] = get_points_labels_by_mask(coords_front, mask_front) point_labels[in_image_back] = get_points_labels_by_mask(coords_back, mask_back) instances_front = semantic_mask_to_instances( mask_front, area_threshold=10, labels_whitelist=self.special_classes, ) instances_back = semantic_mask_to_instances( mask_back, area_threshold=10, labels_whitelist=self.special_classes, ) objects_front = instance_masks_to_objects( instances_front, coords_front, point_labels[in_image_front], lidar_scan[in_image_front], ) objects_back = instance_masks_to_objects( instances_back, coords_back, point_labels[in_image_back], lidar_scan[in_image_back], ) objects = {**objects_front, **objects_back} packed_objects = pack_objects(objects, self.top_k_soc, self.max_distance_soc, self.special_classes) if self.soc_coords_type == "cylindrical_3d": packed_objects = np.concatenate( ( np.linalg.norm(packed_objects, axis=-1, keepdims=True), np.arctan2(packed_objects[..., 1], packed_objects[..., 0])[..., None], packed_objects[..., 2:], ), axis=-1, ) if self.subset == "train": packed_objects = self.augment_coords_with_rotation( packed_objects, angle_range=(-np.pi, np.pi) ) packed_objects = self.augment_coords_with_normal(packed_objects, std=(0.2, 0.2, 0.2)) elif self.soc_coords_type == "cylindrical_2d": packed_objects = np.concatenate( ( np.linalg.norm(packed_objects[..., :2], axis=-1, keepdims=True), np.arctan2(packed_objects[..., 1], packed_objects[..., 0])[..., None], packed_objects[..., 2:], ), axis=-1, ) elif self.soc_coords_type == "euclidean": pass elif self.soc_coords_type == "spherical": packed_objects = np.concatenate( ( np.linalg.norm(packed_objects, axis=-1, keepdims=True), np.arccos( packed_objects[..., 2] / np.linalg.norm(packed_objects, axis=-1, keepdims=True) ), np.arctan2(packed_objects[..., 1], packed_objects[..., 0])[..., None], ), axis=-1, ) else: raise ValueError(f"Unknown soc_coords_type: {self.soc_coords_type!r}") objects_tensor = torch.from_numpy(packed_objects).float() return objects_tensor
[docs] def augment_coords_with_rotation( self, coords: np.ndarray, angle_range: Tuple = (-np.pi, np.pi) ) -> np.ndarray: """Augment the coordinates with a random rotation - all objects are rotated by the same, random uniformly distributed angle. Args: coords (np.ndarray): The coordinates to be augmented. angle_range (Tuple, optional): The range of the random rotation angle. Defaults to (-np.pi, np.pi). Returns: np.ndarray: The augmented coordinates. """ # Generate a random angle for rotation within the specified range random_angle = np.random.uniform(low=angle_range[0], high=angle_range[1]) # Add the random angle to the θ coordinate of each triplet coords[:, :, 1] = (coords[:, :, 1] + random_angle) % (2 * np.pi) # Adjust angles to be in the range (-pi, pi) coords[:, :, 1] = (coords[:, :, 1] + np.pi) % (2 * np.pi) - np.pi return coords
[docs] def augment_coords_with_normal( self, coords: np.ndarray, mean: Tuple[float, float, float] = (0.0, 0.0, 0.0), std: Tuple[float, float, float] = (1.0, 1.0, 1.0), ) -> np.ndarray: """Augment the coordinates with a random normal distribution. Args: coords (np.ndarray): The coordinates to be augmented. mean (Tuple[float, float, float], optional): The mean of the normal distribution. Defaults to (0.0, 0.0, 0.0). std (Tuple[float, float, float], optional): The standard deviation of the normal distribution. Defaults to (1.0, 1.0, 1.0). Returns: np.ndarray: The augmented coordinates. """ # Generate random values from a normal distribution N, K = coords.shape[:2] for i, (m, s) in enumerate(zip(mean, std)): random_deltas = np.random.normal(m, s, size=(N, K, 1)) coords[:, :, i] += random_deltas[:, :, 0] coords[:, :, 0] = np.maximum(coords[:, :, 0], 0) coords[:, :, 1] = (coords[:, :, 1] + np.pi) % (2 * np.pi) - np.pi return coords
def __getitem__(self, idx: int) -> Dict[str, Union[int, Tensor]]: # noqa: D105 data: Dict[str, Union[int, Tensor]] = {"idx": torch.tensor(idx)} data["pose"] = torch.tensor( self.dataset_df.iloc[idx][["tx", "ty", "tz", "qx", "qy", "qz", "qw"]].to_numpy(dtype=np.float32) ) floor = self._get_floor_subdir(idx) track = self._get_track_subdir(idx) if "front_cam" in self.sensors: im = self._load_image("front_cam", idx, track, floor) data["image_front_cam"] = im if self.load_semantics: im = self._load_semantic_mask("front_cam", idx, track, floor) data["mask_front_cam"] = im if self.exclude_dynamic_classes: if self.indoor: for index in self._ade20k_dynamic_idx: data["image_front_cam"] = torch.where(data["mask_front_cam"] == index, 0, data["image_front_cam"]) else: #outdoor for index in self._mapillary_dynamic_idx: data["image_front_cam"] = torch.where(data["mask_front_cam"] == index, 0, data["image_front_cam"]) if self.load_text_labels: text_labels = self._load_text_labels("front_cam", idx) data["text_labels_front_cam_df"] = text_labels if self.load_text_descriptions: text_desc = self._load_text_descriptions("front_cam", idx) data["text_description_front_cam_df"] = text_desc if self.load_aruco_labels: aruco = self._load_aruco_labels("front_cam", idx) data["aruco_labels_front_cam_df"] = aruco if "back_cam" in self.sensors: im = self._load_image("back_cam", idx, track, floor) data["image_back_cam"] = im if self.load_semantics: im = self._load_semantic_mask("back_cam", idx, track, floor) data["mask_back_cam"] = im if self.exclude_dynamic_classes: if self.indoor: for index in self._ade20k_dynamic_idx: data["image_back_cam"] = torch.where(data["mask_back_cam"] == index, 0, data["image_back_cam"]) else: #outdoor for index in self._mapillary_dynamic_idx: data["image_back_cam"] = torch.where(data["mask_back_cam"] == index, 0, data["image_back_cam"]) if self.load_text_labels: text_labels = self._load_text_labels("back_cam", idx) data["text_labels_back_cam_df"] = text_labels if self.load_text_descriptions: text_desc = self._load_text_descriptions("back_cam", idx) data["text_description_back_cam_df"] = text_desc if self.load_aruco_labels: aruco = self._load_aruco_labels("back_cam", idx) data["aruco_labels_back_cam_df"] = aruco if "lidar" in self.sensors: pc = self._load_pc(idx, track, floor, tensor=True) if self.exclude_dynamic_classes: if self.indoor: dynamic_classes = self._ade20k_dynamic_idx else: #outdoor dynamic_classes = self._mapillary_dynamic_idx if "back_cam" in self.sensors: pc = self._remove_dynamic_points(pc, data["mask_back_cam"].numpy().transpose(1, 2, 0), dynamic_classes, self.lidar2back, self.back_matrix, self.back_dist) if "front_cam" in self.sensors: pc = self._remove_dynamic_points(pc, data["mask_front_cam"].numpy().transpose(1, 2, 0), dynamic_classes, self.lidar2front, self.front_matrix, self.front_dist) if isinstance(pc, np.ndarray): pc = torch.from_numpy(pc, dtype=torch.float32) data["pointcloud_lidar_coords"] = pc data["pointcloud_lidar_feats"] = torch.ones_like(pc[:, :1]) if self.load_soc: soc = self._get_soc(idx, track, floor) data["soc"] = soc for elem in ["back_cam", "front_cam"]: if elem in self.sensors and self.late_image_transform: data[f"image_{elem}"] = self.late_image_transform(data[f"image_{elem}"].permute((1, 2, 0)).numpy()) return data def _remove_dynamic_points(self, pointcloud: np.ndarray, semantic_map: np.ndarray, dynamic_classes: list, lidar2sensor: np.ndarray, sensor_intrinsics: np.ndarray, sensor_dist: np.ndarray) -> np.ndarray: pc_values = np.concatenate([pointcloud, np.ones((pointcloud.shape[0], 1))],axis=1).T camera_values = lidar2sensor @ pc_values camera_values = np.transpose(camera_values)[:, :3] points_2d, _ = cv2.projectPoints(camera_values, np.zeros((3, 1), np.float32), np.zeros((3, 1), np.float32), sensor_intrinsics, sensor_dist) points_2d = points_2d[:, 0, :] classes = set(np.unique(semantic_map)) if classes.intersection(set(dynamic_classes)): valid = (~np.isnan(points_2d[:,0])) & (~np.isnan(points_2d[:,1])) in_bounds_x = (points_2d[:,0] >= 0) & (points_2d[:,0] < 1280) in_bounds_y = (points_2d[:,1] >= 0) & (points_2d[:,1] < 720) look_forward = (camera_values[:, 2] > 0) mask = valid & in_bounds_x & in_bounds_y & look_forward indices = np.where(mask)[0] mask_for_points = np.full((points_2d.shape[0], 3), True) dynamic_idx = np.array(dynamic_classes) semantic_values = semantic_map[np.floor(points_2d[indices, 1]).astype(int), np.floor(points_2d[indices, 0]).astype(int)] matching_indices = np.where(np.isin(semantic_values, dynamic_idx)) mask_for_points = np.full((points_2d.shape[0], 3), True) mask_for_points[indices[matching_indices[0]]] = np.array([False, False, False]) return pointcloud[mask_for_points].reshape((-1, 3)) else: return pointcloud def __len__(self) -> int: # noqa: D105 return len(self.dataset_df) def _get_floor_subdir(self, idx: int) -> str: if "floor" in self.dataset_df.columns: return f"floor_{self.dataset_df['floor'].iloc[idx]}" else: return "" def _get_track_subdir(self, idx: int) -> str: if "track" in self.dataset_df.columns: return self.dataset_df["track"].iloc[idx] else: return "" def _load_pc(self, idx: int, track: str, floor: str, tensor: bool = True) -> Tensor: lidar_ts = int(self.dataset_df["lidar_ts"].iloc[idx]) filepath = self.dataset_root / track / floor / self.clouds_subdir / f"{lidar_ts}.bin" pc = np.fromfile(filepath, dtype=np.float32).reshape((-1, 4))[:, :-1] in_range_idx = np.all( np.logical_and(-100 <= pc, pc <= 100), # select points in range [-100, 100] meters axis=1, ) pc = pc[in_range_idx] if self._max_point_distance is not None: pc = pc[np.linalg.norm(pc, axis=1) < self._max_point_distance] if tensor: pc = torch.tensor(pc, dtype=torch.float32) return pc def _collate_data_dict(self, data_list: List[Dict[str, Tensor]]) -> Dict[str, Tensor]: result: Dict[str, Tensor] = {} result["idxs"] = torch.stack([e["idx"] for e in data_list], dim=0) for data_key in data_list[0].keys(): if data_key == "idx": continue elif data_key == "pose": result["poses"] = torch.stack([e["pose"] for e in data_list], dim=0) elif data_key.startswith("image_"): result[f"images_{data_key[6:]}"] = torch.stack([e[data_key] for e in data_list]) elif data_key.startswith("mask_"): result[f"masks_{data_key[5:]}"] = torch.stack([e[data_key] for e in data_list]) elif data_key == "soc": result["soc"] = torch.stack([e["soc"] for e in data_list], dim=0) elif data_key == "pointcloud_lidar_coords": coords_list = [e["pointcloud_lidar_coords"] for e in data_list] feats_list = [e["pointcloud_lidar_feats"] for e in data_list] n_points = [int(e.shape[0]) for e in coords_list] coords_tensor = torch.cat(coords_list, dim=0).unsqueeze(0) # (1,batch_size*n_points,3) if self.pointcloud_set_transform is not None: # Apply the same transformation on all dataset elements coords_tensor = self.pointcloud_set_transform(coords_tensor) coords_list = torch.split( coords_tensor.squeeze(0), split_size_or_sections=n_points, dim=0, ) quantized_coords_list = [] quantized_feats_list = [] for coords, feats in zip(coords_list, feats_list): ( quantized_coords, quantized_feats, ) = ME.utils.sparse_quantize( coordinates=coords, features=feats, quantization_size=self._pointcloud_quantization_size, ) quantized_coords_list.append(quantized_coords) quantized_feats_list.append(quantized_feats) result["pointclouds_lidar_coords"] = ME.utils.batched_coordinates(quantized_coords_list) result["pointclouds_lidar_feats"] = torch.cat(quantized_feats_list) elif data_key == "pointcloud_lidar_feats": continue else: raise ValueError(f"Unknown data key: {data_key!r}") return result
[docs] def collate_fn(self, data_list: List[Dict[str, Tensor]]) -> Dict[str, Tensor]: """Pack input data list into batch. Args: data_list (List[Dict[str, Tensor]]): batch data list generated by DataLoader. Returns: Dict[str, Tensor]: dictionary of batched data. """ return self._collate_data_dict(data_list)
# omg so wet 💦💦💦 def _build_masks(self, positive_threshold: float, negative_threshold: float) -> Tuple[Tensor, Tensor]: """Build boolean masks for dataset elements that satisfy a UTM distance threshold condition. Args: positive_threshold (float): The maximum UTM distance between two elements for them to be considered positive. negative_threshold (float): The maximum UTM distance between two elements for them to be considered non-negative. Returns: Tuple[Tensor, Tensor]: A tuple of two boolean masks that satisfy the UTM distance threshold condition for each element in the dataset. The first mask contains the indices of elements that satisfy the positive threshold, while the second mask contains the indices of elements that satisfy the negative threshold. """ xyz = torch.tensor( self.dataset_df[["tx", "ty", "tz"]].to_numpy(dtype=np.float32), dtype=torch.float32, ) distances = torch.cdist(xyz, xyz) positives_mask = (distances > 0) & (distances < positive_threshold) negatives_mask = distances > negative_threshold return positives_mask, negatives_mask def _build_indexes( self, positive_threshold: float, negative_threshold: float ) -> Tuple[List[Tensor], List[Tensor]]: """Build index of elements that satisfy a UTM distance threshold condition. Args: positive_threshold (float): The maximum UTM distance between two elements for them to be considered positive. negative_threshold (float): The maximum UTM distance between two elements for them to be considered non-negative. Returns: Tuple[List[Tensor], List[Tensor]]: Tuple (positive_indices, nonnegative_indices) of two lists of element indexes that satisfy the UTM distance threshold condition for each element in the dataset. """ xyz = torch.tensor( self.dataset_df[["tx", "ty", "tz"]].to_numpy(dtype=np.float32), dtype=torch.float32, ) distances = torch.cdist(xyz, xyz) positives_mask = (distances > 0) & (distances < positive_threshold) nonnegatives_mask = distances < negative_threshold # Convert the boolean masks to index tensors positive_indices = [torch.nonzero(row).squeeze(dim=-1) for row in positives_mask] nonnegative_indices = [torch.nonzero(row).squeeze(dim=-1) for row in nonnegatives_mask] return positive_indices, nonnegative_indices @property def positives_index(self) -> List[Tensor]: """List of indexes of positive samples for each element in the dataset.""" return self._positives_index @property def nonnegative_index(self) -> List[Tensor]: """List of indexes of non-negatives samples for each element in the dataset.""" return self._nonnegative_index @property def positives_mask(self) -> Tensor: """Boolean mask of positive samples for each element in the dataset.""" return self._positives_mask @property def negatives_mask(self) -> Tensor: """Boolean mask of negative samples for each element in the dataset.""" return self._negatives_mask
[docs] @staticmethod def download_data(out_dir: Union[Path, str]) -> None: """Download ITLP-Campus dataset tracks. Args: out_dir (Union[Path, str]): Output directory for downloaded tracks. """ outdoor_tracks_dict = { "00_2023-02-10": "17HVoPmM7iR1f2Aj8H9GYzOqieCKwjh96", "01_2023-02-21": "1mezN1c8-3ylZrub9_lnGlJzipr90K63O", "02_2023-03-15": "1lKdW7ZfpaNLiIQtoJozoSqx397H7iwb1", "03_2023-04-11": "18t79U4IKxABTMYdSBOafwlUGlYvJcltx", "04_2023-04-13": "1KMTMU-oxXbBV8bmtAY1g8GsquGFksDcE", } indoor_tracks_dict = { "00_2023-03-13": "1AFPKdMrXwPlcC50d1Y8DL4g11CbD31Q2", } out_dir = Path(out_dir) if not out_dir.exists(): print(f"Creating output directory: {out_dir}") out_dir.mkdir(parents=True) else: print(f"Will download in existing directory: {out_dir}") outdoor_dir = out_dir / "ITLP_Campus_outdoor" outdoor_dir.mkdir(exist_ok=True) for track_name, file_id in outdoor_tracks_dict.items(): gdown.download( f"https://drive.google.com/uc?export=download&confirm=pbef&id={file_id}", output=str(outdoor_dir / f"{track_name}.zip"), quiet=False, fuzzy=False, use_cookies=False, ) indoor_dir = out_dir / "ITLP_Campus_indoor" indoor_dir.mkdir(exist_ok=True) for track_name, file_id in indoor_tracks_dict.items(): gdown.download( f"https://drive.google.com/uc?export=download&confirm=pbef&id={file_id}", output=str(indoor_dir / f"{track_name}.zip"), quiet=False, fuzzy=False, use_cookies=False, )