"""NCLT dataset implementation."""
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
import cv2
import numpy as np
import open3d as o3d
import torch
from omegaconf import OmegaConf
from torch import Tensor
from opr.datasets.base import BasePlaceRecognitionDataset
from opr.datasets.projection import NCLTProjector
from opr.datasets.soc_utils import (
get_points_labels_by_mask,
instance_masks_to_objects,
pack_objects,
semantic_mask_to_instances,
)
from opr.optional_deps import lazy
from opr.utils import cartesian_to_spherical
# Lazy-load MinkowskiEngine - will return real module or helpful stub
ME = lazy("MinkowskiEngine", feature="sparse convolutions")
[docs]
class NCLTDataset(BasePlaceRecognitionDataset):
"""NCLT dataset implementation."""
_images_dirname: str
_masks_dirname: str
_pointclouds_dirname: str
_pointcloud_quantization_size: Optional[Union[float, Tuple[float, float, float]]]
_max_point_distance: Optional[float]
_spherical_coords: bool
_use_intensity_values: bool
_valid_data: Tuple[str, ...] = (
"image_Cam0",
"image_Cam1",
"image_Cam2",
"image_Cam3",
"image_Cam4",
"image_Cam5",
"pointcloud_lidar",
"mask_Cam0",
"mask_Cam1",
"mask_Cam2",
"mask_Cam3",
"mask_Cam4",
"mask_Cam5",
# TODO: add text embeddings data
)
def __init__(
self,
dataset_root: Union[str, Path],
subset: Literal["train", "val", "test"],
data_to_load: Union[str, Tuple[str, ...]],
positive_threshold: float = 10.0,
negative_threshold: float = 50.0,
images_dirname: str = "images_small",
masks_dirname: str = "segmentation_masks_small",
pointclouds_dirname: str = "velodyne_data",
use_minkowski: bool = True,
pointcloud_quantization_size: Optional[Union[float, Tuple[float, float, float]]] = 0.5,
max_point_distance: Optional[float] = None,
normalize_point_cloud: bool = False,
num_points_sample: int | None = None,
spherical_coords: bool = False,
use_intensity_values: bool = False,
image_transform: Optional[Any] = None,
semantic_transform: Optional[Any] = None,
pointcloud_transform: Optional[Any] = None,
pointcloud_set_transform: Optional[Any] = None,
load_soc: bool = False,
top_k_soc: int = 10,
soc_coords_type: Literal["cylindrical_3d", "cylindrical_2d", "euclidean", "spherical"] = "euclidean",
max_distance_soc: float = 50.0,
anno: OmegaConf = None,
exclude_dynamic: bool = False,
dynamic_labels: Optional[list] = None,
) -> None:
"""NCLT dataset implementation.
Args:
dataset_root (Union[str, Path]): Path to the dataset root directory.
subset (Literal["train", "val", "test"]): Current subset to load. Defaults to "train".
data_to_load (Union[str, Tuple[str, ...]]): The list of data to load.
Check the documentation for the list of available data.
positive_threshold (float): The UTM distance threshold value for positive samples.
Defaults to 10.0.
negative_threshold (float): The UTM distance threshold value for negative samples.
Defaults to 50.0.
images_dirname (str): Images directory name. It should be specified explicitly
if custom preprocessing was done. Defaults to "images".
masks_dirname (str): Masks directory name. It should be specified explicitly
if custom preprocessing was done. Defaults to "segmentation_masks".
pointclouds_dirname (str): Point clouds directory name. It should be specified
explicitly if custom preprocessing was done. Defaults to "velodyne_data".
use_minkowski (bool): Whether to use MinkowskiEngine to collate point clouds in batches.
Defaults to True.
pointcloud_quantization_size (float, optional): The quantization size for point clouds.
Defaults to 0.01.
max_point_distance (float, optional): The maximum distance of points from the origin.
Defaults to None.
normalize_point_cloud (bool): Whether to normalize point clouds by max_point_distance.
Defaults to False.
num_points_sample (int, optional): The number of points to sample from the point cloud.
Defaults to None, which means no sampling.
spherical_coords (bool): Whether to use spherical coordinates for point clouds.
Defaults to False.
use_intensity_values (bool): Whether to use intensity values for point clouds. Defaults to False.
image_transform (Any, optional): Images transform. If None, DefaultImageTransform will be used.
Defaults to None.
semantic_transform (Any, optional): Semantic masks transform. If None, DefaultSemanticTransform
will be used. Defaults to None.
pointcloud_transform (Any, optional): Point clouds transform. If None, DefaultCloudTransform
will be used. Defaults to None.
pointcloud_set_transform (Any, optional): Point clouds set transform. If None,
DefaultCloudSetTransform will be used. Defaults to None.
load_soc (bool): Whether to load SOC (Semantic Objects in Context) data. Defaults to False.
top_k_soc (int): The number of objects to keep in SOC data. Defaults to 10.
soc_coords_type (Literal["cylindrical_3d", "cylindrical_2d", "euclidean", "spherical"]): The type
of coordinates to use in SOC data. Defaults to "euclidean".
max_distance_soc (float): The maximum distance of objects in SOC data. Defaults to 50.0.
anno (OmegaConf): The annotation configuration. Defaults to None.
exclude_dynamic (bool): Whether to exclude dynamic objects from the point cloud. Defaults to False.
dynamic_labels (Optional[list]): The list of dynamic labels. Defaults to None.
Raises:
ValueError: If data_to_load contains invalid data source names.
FileNotFoundError: If images, masks or pointclouds directory does not exist.
ValueError: If num_points_sample is not specified and MinkowskiEngine is not used.
ValueError: If max_point_distance is not specified and normalize_point_cloud is set to True.
"""
# TODO: ^ docstring is also not DRY -> it is almost the same as in Oxford dataset
super().__init__(
dataset_root,
subset,
data_to_load,
positive_threshold,
negative_threshold,
image_transform,
semantic_transform,
pointcloud_transform,
pointcloud_set_transform,
)
if subset == "test":
self.dataset_df["in_query"] = True # for compatibility with Oxford Dataset
if any(elem not in self._valid_data for elem in self.data_to_load):
raise ValueError(f"Invalid data_to_load argument. Valid data list: {self._valid_data!r}")
_track_name = self.dataset_df.iloc[0]["track"]
if any(elem.startswith("image") for elem in self.data_to_load):
self._images_dirname = images_dirname
if not (self.dataset_root / _track_name / self._images_dirname).exists():
raise FileNotFoundError(f"Images directory {self._images_dirname!r} does not exist.")
if any(elem.startswith("mask") for elem in self.data_to_load):
self._masks_dirname = masks_dirname
if not (self.dataset_root / _track_name / self._masks_dirname).exists():
raise FileNotFoundError(f"Masks directory {self._masks_dirname!r} does not exist.")
if "pointcloud_lidar" in self.data_to_load:
self._pointclouds_dirname = pointclouds_dirname
if not (self.dataset_root / _track_name / self._pointclouds_dirname).exists():
raise FileNotFoundError(
f"Pointclouds directory {self._pointclouds_dirname!r} does not exist."
)
self._use_minkowski = use_minkowski
self._num_points_sample = num_points_sample
if self._num_points_sample is None and not self._use_minkowski:
raise ValueError(
"num_points_sample must be specified if MinkowskiEngine is not used to collate data in batch."
)
self._pointcloud_quantization_size = pointcloud_quantization_size
self._max_point_distance = max_point_distance
self._normalize_point_cloud = normalize_point_cloud
if not self._max_point_distance and self._normalize_point_cloud:
raise ValueError("max_point_distance must be specified if normalize_point_cloud is set to True.")
self._spherical_coords = spherical_coords
self._use_intensity_values = use_intensity_values
self.load_soc = load_soc
self.front_cam_proj = NCLTProjector(front=True)
self.back_cam_proj = NCLTProjector(front=False)
self.top_k_soc = top_k_soc
self.max_distance_soc = max_distance_soc
self.soc_coords_type = soc_coords_type
if self.soc_coords_type not in ("cylindrical_3d", "cylindrical_2d", "euclidean", "spherical"):
raise ValueError(f"Unknown soc_coords_type: {soc_coords_type!r}")
self.anno = anno
if anno:
self.special_classes = [
self.anno.staff_classes.index(special) for special in self.anno.special_classes
]
self.exclude_dynamic = exclude_dynamic
self.dynamic_labels = dynamic_labels
# TODO: apply DRY principle -> this is almost the same as in Oxford dataset
def __getitem__(self, idx: int) -> Dict[str, Tensor]: # noqa: D105
row = self.dataset_df.iloc[idx]
data = {"idx": torch.tensor(idx, dtype=int)}
data["utm"] = torch.tensor(row[["northing", "easting"]].to_numpy(dtype=np.float64))
track_dir = self.dataset_root / str(row["track"])
for data_source in self.data_to_load:
if data_source.startswith("image_"):
cam_name = data_source[6:] # remove "image_" prefix
image_ts = int(row["image"])
im_filepath = track_dir / self._images_dirname / f"{cam_name}" / f"{image_ts}.png"
im = cv2.imread(str(im_filepath))
im = cv2.cvtColor(im, cv2.COLOR_BGR2RGB)
if self.exclude_dynamic:
im = self._mask_dynamic_pixels(im, cam_name, self.dynamic_labels, idx)
im = self.image_transform(im)
data[data_source] = im
elif data_source.startswith("mask_"):
cam_name = data_source[5:] # remove "mask_" prefix
image_ts = int(row["image"])
mask_filepath = track_dir / self._masks_dirname / f"{cam_name}" / f"{image_ts}.png"
mask = cv2.imread(str(mask_filepath), cv2.IMREAD_UNCHANGED)
mask = self.semantic_transform(mask)
data[data_source] = mask
elif data_source == "pointcloud_lidar":
pc_filepath = track_dir / self._pointclouds_dirname / f"{row['pointcloud']}.bin"
pointcloud = self._load_pc(pc_filepath)
if self.exclude_dynamic:
pointcloud = self._mask_dynamic_points(pointcloud, self.dynamic_labels, idx)
data[f"{data_source}_coords"] = self.pointcloud_transform(pointcloud[:, :3])
if self._use_intensity_values:
data[f"{data_source}_feats"] = pointcloud[:, 3].unsqueeze(1)
else:
data[f"{data_source}_feats"] = torch.ones_like(pointcloud[:, :1])
if self.load_soc:
soc = self._get_soc(idx)
data["soc"] = soc
return data
def _load_pc(self, filepath: Union[str, Path], torch_tensor: bool = True) -> Tensor:
if self._use_intensity_values:
raise NotImplementedError("Intensity values are not supported yet.")
pc = np.fromfile(filepath, dtype=np.float32).reshape(-1, 3) # TODO: preprocess pointclouds properly
if self._max_point_distance is not None:
pc = pc[np.linalg.norm(pc, axis=1) < self._max_point_distance]
if self._normalize_point_cloud:
pc = pc / self._max_point_distance
if self._spherical_coords:
pc = cartesian_to_spherical(pc, dataset_name="nclt")
if torch_tensor:
pc_tensor = torch.tensor(pc, dtype=torch.float)
return pc_tensor
else:
return pc
def _mask_dynamic_points(self, pointcloud: Tensor, dynamic_labels: list, idx: int) -> Tensor:
row = self.dataset_df.iloc[idx]
image_ts = int(row["image"])
track_dir = self.dataset_root / str(row["track"])
mask_front_filepath = track_dir / self._masks_dirname / "Cam5" / f"{image_ts}.png"
mask_front = cv2.imread(str(mask_front_filepath), cv2.IMREAD_UNCHANGED).transpose(1, 0)
mask_back_filepath = track_dir / self._masks_dirname / "Cam2" / f"{image_ts}.png"
mask_back = cv2.imread(str(mask_back_filepath), cv2.IMREAD_UNCHANGED).transpose(1, 0)
coords_front, _, in_image_front = self.front_cam_proj(pointcloud)
coords_back, _, in_image_back = self.back_cam_proj(pointcloud)
point_labels = np.zeros(len(pointcloud), dtype=np.uint8)
point_labels[in_image_front] = get_points_labels_by_mask(coords_front, mask_front)
point_labels[in_image_back] = get_points_labels_by_mask(coords_back, mask_back)
return pointcloud[np.isin(point_labels, dynamic_labels, invert=True)]
def _mask_dynamic_pixels(self, im: np.array, cam_name: str, dynamic_labels: list, idx: int) -> np.array:
row = self.dataset_df.iloc[idx]
image_ts = int(row["image"])
track_dir = self.dataset_root / str(row["track"])
mask_filepath = track_dir / self._masks_dirname / cam_name / f"{image_ts}.png"
mask = cv2.imread(str(mask_filepath), cv2.IMREAD_UNCHANGED)
im[np.isin(mask, dynamic_labels)] = 0
return im
def _collate_pc_minkowski(self, data_list: List[Dict[str, Tensor]]) -> tuple[Tensor, Tensor]:
"""Collate pointcloud data using MinkowskiEngine for sparse quantization.
Args:
data_list: List of data dictionaries containing pointcloud coordinates and features.
Returns:
Tuple of (batched_coordinates, concatenated_features) from MinkowskiEngine.
"""
coords_list = [e["pointcloud_lidar_coords"] for e in data_list]
feats_list = [e["pointcloud_lidar_feats"] for e in data_list]
n_points = [int(e.shape[0]) for e in coords_list]
coords_tensor = torch.cat(coords_list, dim=0).unsqueeze(0) # (1,batch_size*n_points,3)
if self.pointcloud_set_transform is not None:
# Apply the same transformation on all dataset elements
coords_tensor = self.pointcloud_set_transform(coords_tensor)
coords_list = torch.split(coords_tensor.squeeze(0), split_size_or_sections=n_points, dim=0)
quantized_coords_list = []
quantized_feats_list = []
for coords, feats in zip(coords_list, feats_list):
quantized_coords, quantized_feats = ME.utils.sparse_quantize(
coordinates=coords,
features=feats,
quantization_size=self._pointcloud_quantization_size,
)
quantized_coords_list.append(quantized_coords)
quantized_feats_list.append(quantized_feats)
return ME.utils.batched_coordinates(quantized_coords_list), torch.cat(quantized_feats_list)
def _collate_pc(self, data_list: List[Dict[str, Tensor]]) -> Tensor:
coords_list = [e["pointcloud_lidar_coords"] for e in data_list]
coords_list = [self._random_point_sample(coords, self._num_points_sample) for coords in coords_list]
# TODO: add support for features tensor
return torch.stack(coords_list) # B x NUM_POINTS_FPS x 3
# TODO: this is the same collate_fn as in Oxford -> refactor to DRY principle
def _collate_data_dict(self, data_list: List[Dict[str, Tensor]]) -> Dict[str, Tensor]:
result: Dict[str, Tensor] = {}
result["idxs"] = torch.stack([e["idx"] for e in data_list], dim=0)
for data_key in data_list[0].keys():
if data_key == "idx":
continue
elif data_key == "utm":
result["utms"] = torch.stack([e["utm"] for e in data_list], dim=0)
elif data_key.startswith("image_"):
result[f"images_{data_key[6:]}"] = torch.stack([e[data_key] for e in data_list])
elif data_key.startswith("mask_"):
result[f"masks_{data_key[5:]}"] = torch.stack([e[data_key] for e in data_list])
elif data_key == "soc":
result["soc"] = torch.stack([e["soc"] for e in data_list], dim=0)
elif data_key == "pointcloud_lidar_coords":
if self._use_minkowski:
(
result["pointclouds_lidar_coords"],
result["pointclouds_lidar_feats"],
) = self._collate_pc_minkowski(data_list)
else:
result["pointclouds_lidar_coords"] = self._collate_pc(data_list)
elif data_key == "pointcloud_lidar_feats":
continue
else:
raise ValueError(f"Unknown data key: {data_key!r}")
return result
[docs]
def collate_fn(self, data_list: List[Dict[str, Tensor]]) -> Dict[str, Tensor]:
"""Pack input data list into batch.
Args:
data_list (List[Dict[str, Tensor]]): batch data list generated by DataLoader.
Returns:
Dict[str, Tensor]: dictionary of batched data.
"""
return self._collate_data_dict(data_list)
def _custom_fps(self, point: Tensor, num_points: int) -> Tensor:
N, _ = point.shape
xyz = point[:, :3]
centroids = torch.zeros((num_points,))
distance = torch.ones((N,)) * 1e10
farthest = torch.randint(0, N, (1,))
for i in range(num_points):
centroids[i] = farthest
centroid = xyz[farthest, :]
dist = torch.sum((xyz - centroid) ** 2, -1)
mask = dist < distance
distance[mask] = dist[mask]
farthest = torch.argmax(distance, -1)
point = point[centroids.int()]
return point
def _o3d_fps(self, input: Tensor, num_points: int) -> Tensor:
pc_o3d = o3d.core.Tensor.from_dlpack(torch.utils.dlpack.to_dlpack(input))
pcd = o3d.t.geometry.PointCloud(pc_o3d)
pcd = pcd.farthest_point_down_sample(num_points)
pc = torch.utils.dlpack.from_dlpack(pcd.point.positions.to_dlpack())
return pc
def _random_point_sample(self, point: Tensor, num_points: int) -> Tensor:
N = point.shape[0]
if N >= num_points:
sample_idx = torch.randperm(N)[:num_points]
else:
sample_idx = torch.cat((torch.arange(N), torch.randint(0, N, (num_points - N,))), dim=0)
point = point[sample_idx]
return point
def _get_soc(self, idx: int) -> Tensor:
row = self.dataset_df.iloc[idx]
image_ts = int(row["image"])
track_dir = self.dataset_root / str(row["track"])
mask_front_filepath = track_dir / self._masks_dirname / "Cam5" / f"{image_ts}.png"
mask_front = cv2.imread(str(mask_front_filepath), cv2.IMREAD_UNCHANGED).transpose(1, 0)
mask_back_filepath = track_dir / self._masks_dirname / "Cam2" / f"{image_ts}.png"
mask_back = cv2.imread(str(mask_back_filepath), cv2.IMREAD_UNCHANGED).transpose(1, 0)
pc_filepath = track_dir / self._pointclouds_dirname / f"{row['pointcloud']}.bin"
lidar_scan = self._load_pc(pc_filepath, torch_tensor=False)
coords_front, _, in_image_front = self.front_cam_proj(lidar_scan)
coords_back, _, in_image_back = self.back_cam_proj(lidar_scan)
point_labels = np.zeros(len(lidar_scan), dtype=np.uint8)
point_labels[in_image_front] = get_points_labels_by_mask(coords_front, mask_front)
point_labels[in_image_back] = get_points_labels_by_mask(coords_back, mask_back)
instances_front = semantic_mask_to_instances(
mask_front,
area_threshold=10,
labels_whitelist=self.special_classes,
)
instances_back = semantic_mask_to_instances(
mask_back,
area_threshold=10,
labels_whitelist=self.special_classes,
)
objects_front = instance_masks_to_objects(
instances_front,
coords_front,
point_labels[in_image_front],
lidar_scan[in_image_front],
)
objects_back = instance_masks_to_objects(
instances_back,
coords_back,
point_labels[in_image_back],
lidar_scan[in_image_back],
)
objects = {**objects_front, **objects_back}
packed_objects = pack_objects(objects, self.top_k_soc, self.max_distance_soc, self.special_classes)
if self.soc_coords_type == "cylindrical_3d":
packed_objects = np.concatenate(
(
np.linalg.norm(packed_objects, axis=-1, keepdims=True),
np.arctan2(packed_objects[..., 1], packed_objects[..., 0])[..., None],
packed_objects[..., 2:],
),
axis=-1,
)
if self.subset == "train":
packed_objects = self.augment_coords_with_rotation(
packed_objects, angle_range=(-np.pi, np.pi)
)
packed_objects = self.augment_coords_with_normal(packed_objects, std=(0.2, 0.2, 0.2))
elif self.soc_coords_type == "cylindrical_2d":
packed_objects = np.concatenate(
(
np.linalg.norm(packed_objects[..., :2], axis=-1, keepdims=True),
np.arctan2(packed_objects[..., 1], packed_objects[..., 0])[..., None],
packed_objects[..., 2:],
),
axis=-1,
)
elif self.soc_coords_type == "euclidean":
pass
elif self.soc_coords_type == "spherical":
packed_objects = np.concatenate(
(
np.linalg.norm(packed_objects, axis=-1, keepdims=True),
np.arccos(
packed_objects[..., 2] / np.linalg.norm(packed_objects, axis=-1, keepdims=True)
),
np.arctan2(packed_objects[..., 1], packed_objects[..., 0])[..., None],
),
axis=-1,
)
else:
raise ValueError(f"Unknown soc_coords_type: {self.soc_coords_type!r}")
objects_tensor = torch.from_numpy(packed_objects).float()
return objects_tensor
[docs]
def augment_coords_with_rotation(
self, coords: np.ndarray, angle_range: Tuple = (-np.pi, np.pi)
) -> np.ndarray:
"""Augment the coordinates with a random rotation.
All objects are rotated by the same, random uniformly distributed angle.
Args:
coords (np.ndarray): The coordinates to be augmented.
angle_range (Tuple): The range of the random rotation angle. Defaults to (-np.pi, np.pi).
Returns:
np.ndarray: The augmented coordinates.
"""
# Generate a random angle for rotation within the specified range
random_angle = np.random.uniform(low=angle_range[0], high=angle_range[1])
# Add the random angle to the θ coordinate of each triplet
coords[:, :, 1] = (coords[:, :, 1] + random_angle) % (2 * np.pi)
# Adjust angles to be in the range (-pi, pi)
coords[:, :, 1] = (coords[:, :, 1] + np.pi) % (2 * np.pi) - np.pi
return coords
[docs]
def augment_coords_with_normal(
self,
coords: np.ndarray,
mean: Tuple[float, float, float] = (0.0, 0.0, 0.0),
std: Tuple[float, float, float] = (1.0, 1.0, 1.0),
) -> np.ndarray:
"""Augment the coordinates with a random normal distribution.
Args:
coords (np.ndarray): The coordinates to be augmented.
mean (Tuple[float, float, float]): The mean of the normal distribution.
Defaults to (0.0, 0.0, 0.0).
std (Tuple[float, float, float]): The standard deviation of the normal distribution.
Defaults to (1.0, 1.0, 1.0).
Returns:
np.ndarray: The augmented coordinates.
"""
# Generate random values from a normal distribution
N, K = coords.shape[:2]
for i, (m, s) in enumerate(zip(mean, std)):
random_deltas = np.random.normal(m, s, size=(N, K, 1))
coords[:, :, i] += random_deltas[:, :, 0]
coords[:, :, 0] = np.maximum(coords[:, :, 0], 0)
coords[:, :, 1] = (coords[:, :, 1] + np.pi) % (2 * np.pi) - np.pi
return coords