Source code for opr.datasets.augmentations
"""Data augmentation pipelines.
Point cloud augmentations adopted from the repository: https://github.com/jac99/MinkLocMultimodal, MIT License
"""
import math
import random
from typing import Optional, Tuple
import albumentations as A # noqa: N812
import numpy as np
import torch
import torch.nn.functional as F
from albumentations.pytorch import ToTensorV2
from scipy.linalg import expm, norm
from torch import Tensor
from torchvision import transforms
[docs]
class OheHotTransform:
"""Rotate by one of the given angles."""
def __call__(self, image):
onehot = torch.squeeze(F.one_hot(torch.from_numpy(image).long(), 65)) #! Magic number
onehot = onehot.permute(2, 0, 1).float()
return {"image": onehot}
[docs]
class DefaultImageTransform:
"""Default image augmentation pipeline."""
def __init__(self, train: bool = False, resize: Optional[Tuple[int, int]] = None) -> None:
"""Default image augmentation pipeline.
Args:
train (bool): If not train, only normalization will be applied. Defaults to False.
resize (Tuple[int, int], optional): Target size in (W, H) format. Defaults to None.
"""
if train:
transform_list = [
A.GaussNoise(p=0.2),
A.OneOf(
[
A.MotionBlur(p=0.2),
A.MedianBlur(blur_limit=3, p=0.1),
A.Blur(blur_limit=3, p=0.1),
],
p=0.2,
),
A.OneOf(
[
A.OpticalDistortion(p=0.3),
A.GridDistortion(p=0.1),
A.PiecewiseAffine(p=0.3),
],
p=0.2,
),
A.OneOf(
[
A.CLAHE(clip_limit=2),
A.Sharpen(),
A.Emboss(),
],
p=0.2,
),
A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1, always_apply=True),
A.CoarseDropout(max_width=96, max_height=66, min_width=32, min_height=22, max_holes=1, p=0.5),
A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
ToTensorV2(),
]
else:
transform_list = [
A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
ToTensorV2(),
]
if resize is not None:
transform_list = [A.Resize(height=resize[1], width=resize[0])] + transform_list
self.transform = A.Compose(transform_list)
def __call__(self, img: np.ndarray) -> Tensor:
"""Applies transformations to the given image.
Args:
img (np.ndarray): The image in the cv2 format.
Returns:
Tensor: Augmented PyTorch tensor in the channel-first format.
"""
return self.transform(image=img)["image"]
[docs]
class DefaultHM3DImageTransform:
"""Default image augmentation pipeline."""
def __init__(self, train: bool = False, resize: tuple[int, int] | None = (288, 160)) -> None:
"""Default image augmentation pipeline.
Args:
train (bool): If not train, only normalization will be applied. Defaults to False.
resize (Tuple[int, int], optional): Target size in (W, H) format. Defaults to None.
"""
if train:
transform_list = [
A.GaussNoise(p=0.2),
A.OneOf(
[
A.MotionBlur(p=0.2),
A.MedianBlur(blur_limit=3, p=0.1),
A.Blur(blur_limit=3, p=0.1),
],
p=0.2,
),
A.OneOf(
[
A.OpticalDistortion(p=0.3),
A.GridDistortion(p=0.1),
A.PiecewiseAffine(p=0.3),
],
p=0.2,
),
A.OneOf(
[
A.CLAHE(clip_limit=2),
A.Sharpen(),
A.Emboss(),
],
p=0.2,
),
A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1, p=0.2),
A.CoarseDropout(max_width=96, max_height=66, min_width=32, min_height=22, max_holes=1, p=0.5),
A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
ToTensorV2(),
]
else:
transform_list = [
A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
ToTensorV2(),
]
if resize is not None:
transform_list = [A.Resize(height=resize[1], width=resize[0])] + transform_list
self.transform = A.Compose(transform_list)
def __call__(self, img: np.ndarray) -> Tensor:
"""Applies transformations to the given image.
Args:
img (np.ndarray): The image in the cv2 format.
Returns:
Tensor: Augmented PyTorch tensor in the channel-first format.
"""
return self.transform(image=img)["image"]
[docs]
class DefaultSemanticTransform:
"""Default semantic mask augmentation pipeline."""
def __init__(self, train: bool = False, resize: Optional[Tuple[int, int]] = None) -> None:
"""Default semantic mask augmentation pipeline.
Args:
train (bool): If not train, only normalization will be applied. Defaults to False.
resize (Tuple[int, int], optional): Target size in (W, H) format. Defaults to None.
"""
if train:
transform_list = [
A.OneOf(
[
A.OpticalDistortion(p=0.3),
A.GridDistortion(p=0.1),
A.PiecewiseAffine(p=0.3),
],
p=0.2,
),
A.OneOf(
[
A.CoarseDropout(
max_width=96, max_height=66, min_width=32, min_height=22, max_holes=1, p=0.5
),
A.CoarseDropout(
max_width=30, max_height=30, min_width=10, min_height=10, max_holes=10, p=0.5
),
A.GridDropout(ratio=0.05, unit_size_min=4, unit_size_max=30, p=0.5),
],
p=0.2,
),
A.Normalize(mean=(0.0,), std=(1.0,)),
ToTensorV2(),
]
else:
transform_list = [
A.Normalize(mean=(0.0,), std=(1.0,)),
ToTensorV2(),
]
if resize is not None:
transform_list = [A.Resize(height=resize[1], width=resize[0])] + transform_list
self.transform = A.Compose(transform_list)
def __call__(self, img: np.ndarray) -> Tensor:
"""Applies transformations to the given semantic mask.
Args:
img (np.ndarray): The semantic mask (single channel image) in the cv2 format.
Returns:
Tensor: Augmented PyTorch tensor in the channel-first format.
"""
return self.transform(image=img)["image"]
[docs]
class OneHotSemanticTransform:
"""One-Hot semantic mask augmentation pipeline."""
def __init__(self, train: bool = False, resize: Optional[Tuple[int, int]] = None) -> None:
"""One-Hot semantic mask augmentation pipeline.
Args:
train (bool): If not train, only normalization will be applied. Defaults to False.
resize (Tuple[int, int], optional): Target size in (W, H) format. Defaults to None.
"""
if train:
transform_list = [
A.OneOf(
[
A.OpticalDistortion(p=0.3),
A.GridDistortion(p=0.1),
A.PiecewiseAffine(p=0.3),
],
p=0.2,
),
A.OneOf(
[
A.CoarseDropout(
max_width=96, max_height=66, min_width=32, min_height=22, max_holes=1, p=0.5
),
A.CoarseDropout(
max_width=30, max_height=30, min_width=10, min_height=10, max_holes=10, p=0.5
),
A.GridDropout(ratio=0.05, unit_size_min=4, unit_size_max=30, p=0.5),
],
p=0.2,
),
OheHotTransform(),
# ToTensorV2(),
]
else:
transform_list = [
OheHotTransform(),
# ToTensorV2(),
]
if resize is not None:
transform_list = [A.Resize(height=resize[1], width=resize[0])] + transform_list
self.transform = A.Compose(transform_list)
# def _channel(self, image):
# num_tags = len(stuff_classes)
# image_shape = image.shape
# height, width = image_shape[0], image_shape[1]
# new_image = np.zeros([height, width, num_tags])
# for i in range(height):
# for j in range(width - 1):
# if not (stuff_classes[image[i, j]] in blacklist):
# new_image[i, j, image[i, j]] = 1
# return new_image
def __call__(self, img: np.ndarray) -> Tensor:
"""Applies transformations to the given semantic mask.
Args:
img (np.ndarray): The semantic mask (single channel image) in the cv2 format.
Returns:
Tensor: Augmented PyTorch tensor in the channel-first format.
"""
return self.transform(image=img)["image"]
[docs]
class DefaultCloudTransform:
"""Default point cloud augmentation pipeline."""
def __init__(self, train: bool = False) -> None:
"""Default point cloud augmentation pipeline.
Args:
train (bool): If False, no transforms will be applied. Defaults to False.
"""
if train:
self.transform = transforms.Compose(
[
JitterPoints(sigma=0.001, clip=0.002),
RemoveRandomPoints(r=(0.0, 0.1)),
RandomTranslation(max_delta=0.01),
RemoveRandomBlock(p=0.4),
]
)
else:
self.transform = transforms.Compose([])
def __call__(self, pointcloud: Tensor) -> Tensor:
"""Apply the transformations to the given point cloud.
Args:
pointcloud (Tensor): The coordinates tensor.
Returns:
Tensor: Augmented coordinates tensor.
"""
return self.transform(pointcloud)
[docs]
class DefaultCloudSetTransform:
"""Default point cloud set augmentation pipeline."""
def __init__(self, train: bool = False) -> None:
"""Default point cloud set augmentation pipeline.
Note:
This is how augmentation for the whole batch was implemented in MinkLoc method.
Args:
train (bool): If False, no transforms will be applied. Defaults to False.
"""
if train:
self.transform = transforms.Compose(
[
RandomRotation(max_theta=5, max_theta2=0, axis=np.array([0, 0, 1])),
RandomFlip([0.25, 0.25, 0.0]),
]
)
else:
self.transform = transforms.Compose([])
def __call__(self, pointcloud: Tensor) -> Tensor:
"""Apply the transformations to the given point cloud.
Args:
pointcloud (Tensor): The coordinates tensor.
Returns:
Tensor: Augmented coordinates tensor.
"""
return self.transform(pointcloud)
# NOTE: The latter is the raw code taken from https://github.com/jac99/MinkLocMultimodal, MIT License
# TODO: Format code properly, add typing and remove temporary flake8 and mypy disablers
# flake8: noqa
# mypy: ignore-errors
[docs]
class RandomFlip:
def __init__(self, p):
# p = [p_x, p_y, p_z] probability of flipping each axis
assert len(p) == 3
assert 0 < sum(p) <= 1, "sum(p) must be in (0, 1] range, is: {}".format(sum(p))
self.p = p
self.p_cum_sum = np.cumsum(p)
def __call__(self, coords):
r = random.random()
if r <= self.p_cum_sum[0]:
# Flip the first axis
coords[..., 0] = -coords[..., 0]
elif r <= self.p_cum_sum[1]:
# Flip the second axis
coords[..., 1] = -coords[..., 1]
elif r <= self.p_cum_sum[2]:
# Flip the third axis
coords[..., 2] = -coords[..., 2]
return coords
[docs]
class RandomRotation:
def __init__(self, axis=None, max_theta=180, max_theta2=15):
self.axis = axis
self.max_theta = max_theta # Rotation around axis
self.max_theta2 = max_theta2 # Smaller rotation in random direction
def _M(self, axis, theta):
return expm(np.cross(np.eye(3), axis / norm(axis) * theta)).astype(np.float32)
def __call__(self, coords):
if coords.shape[-1] == 4: # with intensity
coords_xyz = coords[:, :, :3]
else: # no intensity
coords_xyz = coords
if self.axis is not None:
axis = self.axis
else:
axis = np.random.rand(3) - 0.5
R = self._M(axis, (np.pi * self.max_theta / 180) * 2 * (np.random.rand(1) - 0.5))
if self.max_theta2 is None:
coords_xyz = coords_xyz @ R
else:
R_n = self._M(
np.random.rand(3) - 0.5, (np.pi * self.max_theta2 / 180) * 2 * (np.random.rand(1) - 0.5)
)
coords_xyz = coords_xyz @ R @ R_n
if coords.shape[-1] == 4: # with intensity
coords = torch.cat((coords_xyz, coords[:, :, 3].unsqueeze(dim=2)), axis=2)
else: # no intensity
coords = coords_xyz
return coords
[docs]
class RandomTranslation:
def __init__(self, max_delta=0.05):
self.max_delta = max_delta
def __call__(self, coords):
trans = self.max_delta * np.random.randn(1, coords.shape[-1])
return coords + trans.astype(np.float32)
[docs]
class RandomScale:
def __init__(self, min, max):
self.scale = max - min
self.bias = min
def __call__(self, coords):
s = self.scale * np.random.rand(1) + self.bias
return coords * s.astype(np.float32)
[docs]
class RandomShear:
def __init__(self, delta=0.1):
self.delta = delta
def __call__(self, coords):
T = np.eye(3) + self.delta * np.random.randn(3, 3)
if coords.shape[-1] == 4: # with intensity
coords = np.append(
coords[:, :, :3] @ T.astype(np.float32), coords[:, :, 3].unsqueeze(dim=2), axis=2
)
else: # no intensity
coords = coords @ T.astype(np.float32)
return coords
[docs]
class JitterPoints:
def __init__(self, sigma=0.01, clip=None, p=1.0):
assert 0 < p <= 1.0
assert sigma > 0.0
self.sigma = sigma
self.clip = clip
self.p = p
def __call__(self, e):
# Should be adapted to clouds with intensity values,
# now the sigma values for coordinates/intensities are the same
"""Randomly jitter points. jittering is per point.
Input:
BxNx3 array, original batch of point clouds
Return:
BxNx3 array, jittered batch of point clouds
"""
sample_shape = (e.shape[0],)
if self.p < 1.0:
# Create a mask for points to jitter
m = torch.distributions.categorical.Categorical(probs=torch.tensor([1 - self.p, self.p]))
mask = m.sample(sample_shape=sample_shape)
else:
mask = torch.ones(sample_shape, dtype=torch.int64)
mask = mask == 1
jitter = self.sigma * torch.randn_like(e[mask])
if self.clip is not None:
jitter = torch.clamp(jitter, min=-self.clip, max=self.clip)
e[mask] = e[mask] + jitter
return e
[docs]
class RemoveRandomPoints:
def __init__(self, r):
if type(r) is list or type(r) is tuple:
assert len(r) == 2
assert 0 <= r[0] <= 1
assert 0 <= r[1] <= 1
self.r_min = float(r[0])
self.r_max = float(r[1])
else:
assert 0 <= r <= 1
self.r_min = None
self.r_max = float(r)
def __call__(self, e):
n = len(e)
if self.r_min is None:
r = self.r_max
else:
# Randomly select removal ratio
r = random.uniform(self.r_min, self.r_max)
mask = np.random.choice(range(n), size=int(n * r), replace=False) # select elements to remove
e[mask] = torch.zeros_like(e[mask])
return e
[docs]
class RemoveRandomBlock:
"""
Randomly remove part of the point cloud. Similar to PyTorch RandomErasing but operating on 3D point clouds.
Erases fronto-parallel cuboid.
Instead of erasing we set coords of removed points to (0, 0, 0) to retain the same number of points
"""
def __init__(self, p=0.5, scale=(0.02, 0.33), ratio=(0.3, 3.3)):
self.p = p
self.scale = scale
self.ratio = ratio
[docs]
def get_params(self, coords):
# Find point cloud 3D bounding box
flattened_coords = coords.view(-1, coords.shape[-1])
min_coords, _ = torch.min(flattened_coords, dim=0)
max_coords, _ = torch.max(flattened_coords, dim=0)
span = max_coords - min_coords
area = span[0] * span[1]
erase_area = random.uniform(self.scale[0], self.scale[1]) * area
aspect_ratio = random.uniform(self.ratio[0], self.ratio[1])
h = math.sqrt(erase_area * aspect_ratio)
w = math.sqrt(erase_area / aspect_ratio)
x = min_coords[0] + random.uniform(0, 1) * (span[0] - w)
y = min_coords[1] + random.uniform(0, 1) * (span[1] - h)
return x, y, w, h
def __call__(self, coords):
if random.random() < self.p:
x, y, w, h = self.get_params(coords) # Fronto-parallel cuboid to remove
mask = (
(x < coords[..., 0])
& (coords[..., 0] < x + w)
& (y < coords[..., 1])
& (coords[..., 1] < y + h)
)
coords[mask] = torch.zeros_like(coords[mask])
return coords