Source code for opr.modules.feature_extractors.mink_resnet
"""ResNetFPN feature extraction module implemented with MinkowskiEngine.
Komorowski, Jacek. "Minkloc3d: Point cloud based large-scale place recognition."
Proceedings of the IEEE/CVF Winter Conference on Applications of Computer Vision. 2021.
Paper: https://arxiv.org/abs/2011.04530
Code is adopted from the original repository: https://github.com/jac99/MinkLoc3Dv2, MIT License
"""
from __future__ import annotations
from typing import Tuple, Type, Union
from loguru import logger
from torch import Tensor, nn
from opr.modules.eca import MinkECABasicBlock as ECABasicBlock
try:
import MinkowskiEngine as ME # type: ignore
from MinkowskiEngine.modules.resnet_block import BasicBlock, Bottleneck
minkowski_available = True
except ImportError:
logger.warning("MinkowskiEngine is not installed. Some features may not be available.")
BasicBlock = Bottleneck = nn.Module
minkowski_available = False
[docs]
class MinkResNetBase(nn.Module):
"""Base ResNet class for sparse tensors with MinkowskiEngine."""
block: Union[Type[BasicBlock], Type[Bottleneck], Type[ECABasicBlock]]
layers: Tuple[int, ...] = (1, 1, 1, 1)
init_dim: int = 64
planes: Tuple[int, ...] = (64, 128, 256, 512)
sparse: bool = True
def __init__(self, in_channels: int, out_channels: int, dimension: int = 3) -> None:
"""Base ResNet class for sparse tensors with MinkowskiEngine.
Args:
in_channels (int): Number of input channels.
out_channels (int): Number of output channels.
dimension (int): Number of dimensions. Defaults to 3.
Raises:
RuntimeError: If MinkowskiEngine is not installed.
RuntimeError: If block type is not specified at the moment of initialisation.
"""
if not minkowski_available:
raise RuntimeError("MinkowskiEngine is not installed. MinkResNetBase requires MinkowskiEngine.")
super().__init__()
self.dimension = dimension
if self.block is None:
raise RuntimeError("Block type for MinkResNetBase not specified.")
self._network_initialization(in_channels, out_channels, dimension)
self._weight_initialization()
def _network_initialization(self, in_channels: int, out_channels: int, dimension: int) -> None:
self.inplanes = self.init_dim
self.conv1 = ME.MinkowskiConvolution(
in_channels, self.inplanes, kernel_size=5, stride=2, dimension=dimension
)
self.bn1 = ME.MinkowskiBatchNorm(self.inplanes)
self.relu = ME.MinkowskiReLU(inplace=True)
self.pool = ME.MinkowskiAvgPooling(kernel_size=2, stride=2, dimension=dimension)
self.layer1 = self._make_layer(self.block, self.planes[0], self.layers[0], stride=2)
self.layer2 = self._make_layer(self.block, self.planes[1], self.layers[1], stride=2)
self.layer3 = self._make_layer(self.block, self.planes[2], self.layers[2], stride=2)
self.layer4 = self._make_layer(self.block, self.planes[3], self.layers[3], stride=2)
self.conv5 = ME.MinkowskiConvolution(
self.inplanes, self.inplanes, kernel_size=3, stride=3, dimension=dimension
)
self.bn5 = ME.MinkowskiBatchNorm(self.inplanes)
self.glob_avg = ME.MinkowskiGlobalMaxPooling()
self.final = ME.MinkowskiLinear(self.inplanes, out_channels, bias=True)
def _weight_initialization(self) -> None:
for m in self.modules():
if isinstance(m, ME.MinkowskiConvolution):
ME.utils.kaiming_normal_(m.kernel, mode="fan_out", nonlinearity="relu")
if isinstance(m, ME.MinkowskiBatchNorm):
nn.init.constant_(m.bn.weight, 1)
nn.init.constant_(m.bn.bias, 0)
def _make_layer(
self,
block: Union[Type[BasicBlock], Type[Bottleneck], Type[ECABasicBlock]],
planes: int,
blocks: int,
stride: int = 1,
dilation: int = 1,
bn_momentum: float = 0.1,
) -> nn.Sequential:
downsample = None
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
ME.MinkowskiConvolution(
self.inplanes,
planes * block.expansion,
kernel_size=1,
stride=stride,
dimension=self.dimension,
),
ME.MinkowskiBatchNorm(planes * block.expansion),
)
layers = []
layers.append(
block(
self.inplanes,
planes,
stride=stride,
dilation=dilation,
downsample=downsample,
dimension=self.dimension,
)
)
self.inplanes = planes * block.expansion
for _ in range(1, blocks):
layers.append(block(self.inplanes, planes, stride=1, dilation=dilation, dimension=self.dimension))
return nn.Sequential(*layers)
[docs]
def forward(self, x: ME.SparseTensor) -> Tensor: # noqa: D102
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.pool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.conv5(x)
x = self.bn5(x)
x = self.relu(x)
x = self.glob_avg(x)
return self.final(x)
[docs]
class MinkResNetFPNFeatureExtractor(MinkResNetBase):
"""Feature Pyramid Network (FPN) architecture implementation using Minkowski ResNet building blocks."""
sparse: bool = True
def __init__(
self,
in_channels: int = 1,
out_channels: int = 256,
num_top_down: int = 2,
conv0_kernel_size: int = 5,
block: str = "ECABasicBlock",
layers: Tuple[int, ...] = (1, 1, 1, 1),
planes: Tuple[int, ...] = (64, 128, 64, 32),
) -> None:
"""Feature Pyramid Network (FPN) architecture implementation using Minkowski ResNet building blocks.
From paper "MinkLoc3D: Point Cloud Based Large-Scale Place Recognition."
https://arxiv.org/abs/2011.04530
Args:
in_channels (int): Number of input channels. Defaults to 1.
out_channels (int): Number of output channels. Defaults to 256.
num_top_down (int): Number of top-down steps for FPN block. Defaults to 2.
conv0_kernel_size (int): Kernel size of the first convolution. Defaults to 5.
block (str): Block type name. Defaults to "ECABasicBlock".
layers (Tuple[int, ...]): Number of layers for each block. Defaults to (1, 1, 1, 1).
planes (Tuple[int, ...]): Output channel size for each block. Defaults to (64, 128, 64, 32).
Raises:
RuntimeError: If MinkowskiEngine is not installed.
ValueError: If the length of layers and planes are not the same.
ValueError: If the length of layers is less than 1.
ValueError: If num_top_down is not between 0 and the numbers of layers.
"""
if not minkowski_available:
raise RuntimeError(
"MinkowskiEngine is not installed. MinkResNetFPNFeatureExtractor requires MinkowskiEngine."
)
if len(layers) != len(planes):
raise ValueError("layers and planes arguments should be the same length")
if len(layers) < 1:
raise ValueError("layers argument should have at least one element")
if not (0 <= num_top_down <= len(layers)):
raise ValueError("num_top_down should be between 0 and the numbers of layers")
self.num_bottom_up = len(layers)
self.num_top_down = num_top_down
self.conv0_kernel_size = conv0_kernel_size
self.block = self._create_resnet_block(block_name=block)
self.layers = layers
self.planes = planes
self.lateral_dim = out_channels
self.init_dim = planes[0]
MinkResNetBase.__init__(self, in_channels, out_channels, dimension=3)
def _create_resnet_block(
self, block_name: str
) -> Union[Type[BasicBlock], Type[Bottleneck], Type[ECABasicBlock]]:
if block_name == "BasicBlock":
block_module = BasicBlock
elif block_name == "Bottleneck":
block_module = Bottleneck
elif block_name == "ECABasicBlock":
block_module = ECABasicBlock
else:
raise NotImplementedError(f"Unsupported network block: {block_name}")
return block_module
def _network_initialization(self, in_channels: int, out_channels: int, dimension: int) -> None:
if len(self.layers) != len(self.planes):
raise ValueError("layers and planes arguments should be the same length")
if len(self.planes) != self.num_bottom_up:
raise ValueError("planes argument should have the same length as the number of bottom-up blocks")
self.convs = nn.ModuleList() # Bottom-up convolutional blocks with stride=2
self.bn = nn.ModuleList() # Bottom-up BatchNorms
self.blocks = nn.ModuleList() # Bottom-up blocks
self.tconvs = nn.ModuleList() # Top-down tranposed convolutions
self.conv1x1 = nn.ModuleList() # 1x1 convolutions in lateral connections
# The first convolution is special case, with kernel size = 5
self.inplanes = self.planes[0]
self.conv0 = ME.MinkowskiConvolution(
in_channels, self.inplanes, kernel_size=self.conv0_kernel_size, dimension=dimension
)
self.bn0 = ME.MinkowskiBatchNorm(self.inplanes)
for plane, layer in zip(self.planes, self.layers):
self.convs.append(
ME.MinkowskiConvolution(
self.inplanes, self.inplanes, kernel_size=2, stride=2, dimension=dimension
)
)
self.bn.append(ME.MinkowskiBatchNorm(self.inplanes))
self.blocks.append(self._make_layer(self.block, plane, layer))
# Lateral connections
for i in range(self.num_top_down):
self.conv1x1.append(
ME.MinkowskiConvolution(
self.planes[-1 - i], self.lateral_dim, kernel_size=1, stride=1, dimension=dimension
)
)
self.tconvs.append(
ME.MinkowskiConvolutionTranspose(
self.lateral_dim, self.lateral_dim, kernel_size=2, stride=2, dimension=dimension
)
)
# There's one more lateral connection than top-down TConv blocks
if self.num_top_down < self.num_bottom_up:
# Lateral connection from Conv block 1 or above
self.conv1x1.append(
ME.MinkowskiConvolution(
self.planes[-1 - self.num_top_down],
self.lateral_dim,
kernel_size=1,
stride=1,
dimension=dimension,
)
)
else:
# Lateral connection from Con0 block
self.conv1x1.append(
ME.MinkowskiConvolution(
self.planes[0], self.lateral_dim, kernel_size=1, stride=1, dimension=dimension
)
)
self.relu = ME.MinkowskiReLU(inplace=True)
[docs]
def forward(self, x: ME.SparseTensor) -> ME.SparseTensor: # noqa: D102
# *** BOTTOM-UP PASS ***
# First bottom-up convolution is special (with bigger kernel)
feature_maps = []
x = self.conv0(x)
x = self.bn0(x)
x = self.relu(x)
if self.num_top_down == self.num_bottom_up:
feature_maps.append(x)
# BOTTOM-UP PASS
for ndx, (conv, bn, block) in enumerate(zip(self.convs, self.bn, self.blocks)):
x = conv(x) # Downsample (conv stride=2 with 2x2x2 kernel)
x = bn(x)
x = self.relu(x)
x = block(x)
if self.num_bottom_up - 1 - self.num_top_down <= ndx < len(self.convs) - 1:
feature_maps.append(x)
if len(feature_maps) != self.num_top_down:
raise ValueError("Number of feature maps should be equal to the number of top-down blocks")
x = self.conv1x1[0](x)
# TOP-DOWN PASS
for ndx, tconv in enumerate(self.tconvs):
x = tconv(x) # Upsample using transposed convolution
x = x + self.conv1x1[ndx + 1](feature_maps[-ndx - 1])
return x