Source code for opr.modules.feature_extractors.mink_resnet

"""ResNetFPN feature extraction module implemented with MinkowskiEngine.

Komorowski, Jacek. "Minkloc3d: Point cloud based large-scale place recognition."
Proceedings of the IEEE/CVF Winter Conference on Applications of Computer Vision. 2021.

Paper: https://arxiv.org/abs/2011.04530
Code is adopted from the original repository: https://github.com/jac99/MinkLoc3Dv2, MIT License
"""
from __future__ import annotations
from typing import Tuple, Type, Union

from loguru import logger
from torch import Tensor, nn

from opr.modules.eca import MinkECABasicBlock as ECABasicBlock

try:
    import MinkowskiEngine as ME  # type: ignore
    from MinkowskiEngine.modules.resnet_block import BasicBlock, Bottleneck

    minkowski_available = True
except ImportError:
    logger.warning("MinkowskiEngine is not installed. Some features may not be available.")
    BasicBlock = Bottleneck = nn.Module
    minkowski_available = False


[docs] class MinkResNetBase(nn.Module): """Base ResNet class for sparse tensors with MinkowskiEngine.""" block: Union[Type[BasicBlock], Type[Bottleneck], Type[ECABasicBlock]] layers: Tuple[int, ...] = (1, 1, 1, 1) init_dim: int = 64 planes: Tuple[int, ...] = (64, 128, 256, 512) sparse: bool = True def __init__(self, in_channels: int, out_channels: int, dimension: int = 3) -> None: """Base ResNet class for sparse tensors with MinkowskiEngine. Args: in_channels (int): Number of input channels. out_channels (int): Number of output channels. dimension (int): Number of dimensions. Defaults to 3. Raises: RuntimeError: If MinkowskiEngine is not installed. RuntimeError: If block type is not specified at the moment of initialisation. """ if not minkowski_available: raise RuntimeError("MinkowskiEngine is not installed. MinkResNetBase requires MinkowskiEngine.") super().__init__() self.dimension = dimension if self.block is None: raise RuntimeError("Block type for MinkResNetBase not specified.") self._network_initialization(in_channels, out_channels, dimension) self._weight_initialization() def _network_initialization(self, in_channels: int, out_channels: int, dimension: int) -> None: self.inplanes = self.init_dim self.conv1 = ME.MinkowskiConvolution( in_channels, self.inplanes, kernel_size=5, stride=2, dimension=dimension ) self.bn1 = ME.MinkowskiBatchNorm(self.inplanes) self.relu = ME.MinkowskiReLU(inplace=True) self.pool = ME.MinkowskiAvgPooling(kernel_size=2, stride=2, dimension=dimension) self.layer1 = self._make_layer(self.block, self.planes[0], self.layers[0], stride=2) self.layer2 = self._make_layer(self.block, self.planes[1], self.layers[1], stride=2) self.layer3 = self._make_layer(self.block, self.planes[2], self.layers[2], stride=2) self.layer4 = self._make_layer(self.block, self.planes[3], self.layers[3], stride=2) self.conv5 = ME.MinkowskiConvolution( self.inplanes, self.inplanes, kernel_size=3, stride=3, dimension=dimension ) self.bn5 = ME.MinkowskiBatchNorm(self.inplanes) self.glob_avg = ME.MinkowskiGlobalMaxPooling() self.final = ME.MinkowskiLinear(self.inplanes, out_channels, bias=True) def _weight_initialization(self) -> None: for m in self.modules(): if isinstance(m, ME.MinkowskiConvolution): ME.utils.kaiming_normal_(m.kernel, mode="fan_out", nonlinearity="relu") if isinstance(m, ME.MinkowskiBatchNorm): nn.init.constant_(m.bn.weight, 1) nn.init.constant_(m.bn.bias, 0) def _make_layer( self, block: Union[Type[BasicBlock], Type[Bottleneck], Type[ECABasicBlock]], planes: int, blocks: int, stride: int = 1, dilation: int = 1, bn_momentum: float = 0.1, ) -> nn.Sequential: downsample = None if stride != 1 or self.inplanes != planes * block.expansion: downsample = nn.Sequential( ME.MinkowskiConvolution( self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, dimension=self.dimension, ), ME.MinkowskiBatchNorm(planes * block.expansion), ) layers = [] layers.append( block( self.inplanes, planes, stride=stride, dilation=dilation, downsample=downsample, dimension=self.dimension, ) ) self.inplanes = planes * block.expansion for _ in range(1, blocks): layers.append(block(self.inplanes, planes, stride=1, dilation=dilation, dimension=self.dimension)) return nn.Sequential(*layers)
[docs] def forward(self, x: ME.SparseTensor) -> Tensor: # noqa: D102 x = self.conv1(x) x = self.bn1(x) x = self.relu(x) x = self.pool(x) x = self.layer1(x) x = self.layer2(x) x = self.layer3(x) x = self.layer4(x) x = self.conv5(x) x = self.bn5(x) x = self.relu(x) x = self.glob_avg(x) return self.final(x)
[docs] class MinkResNetFPNFeatureExtractor(MinkResNetBase): """Feature Pyramid Network (FPN) architecture implementation using Minkowski ResNet building blocks.""" sparse: bool = True def __init__( self, in_channels: int = 1, out_channels: int = 256, num_top_down: int = 2, conv0_kernel_size: int = 5, block: str = "ECABasicBlock", layers: Tuple[int, ...] = (1, 1, 1, 1), planes: Tuple[int, ...] = (64, 128, 64, 32), ) -> None: """Feature Pyramid Network (FPN) architecture implementation using Minkowski ResNet building blocks. From paper "MinkLoc3D: Point Cloud Based Large-Scale Place Recognition." https://arxiv.org/abs/2011.04530 Args: in_channels (int): Number of input channels. Defaults to 1. out_channels (int): Number of output channels. Defaults to 256. num_top_down (int): Number of top-down steps for FPN block. Defaults to 2. conv0_kernel_size (int): Kernel size of the first convolution. Defaults to 5. block (str): Block type name. Defaults to "ECABasicBlock". layers (Tuple[int, ...]): Number of layers for each block. Defaults to (1, 1, 1, 1). planes (Tuple[int, ...]): Output channel size for each block. Defaults to (64, 128, 64, 32). Raises: RuntimeError: If MinkowskiEngine is not installed. ValueError: If the length of layers and planes are not the same. ValueError: If the length of layers is less than 1. ValueError: If num_top_down is not between 0 and the numbers of layers. """ if not minkowski_available: raise RuntimeError( "MinkowskiEngine is not installed. MinkResNetFPNFeatureExtractor requires MinkowskiEngine." ) if len(layers) != len(planes): raise ValueError("layers and planes arguments should be the same length") if len(layers) < 1: raise ValueError("layers argument should have at least one element") if not (0 <= num_top_down <= len(layers)): raise ValueError("num_top_down should be between 0 and the numbers of layers") self.num_bottom_up = len(layers) self.num_top_down = num_top_down self.conv0_kernel_size = conv0_kernel_size self.block = self._create_resnet_block(block_name=block) self.layers = layers self.planes = planes self.lateral_dim = out_channels self.init_dim = planes[0] MinkResNetBase.__init__(self, in_channels, out_channels, dimension=3) def _create_resnet_block( self, block_name: str ) -> Union[Type[BasicBlock], Type[Bottleneck], Type[ECABasicBlock]]: if block_name == "BasicBlock": block_module = BasicBlock elif block_name == "Bottleneck": block_module = Bottleneck elif block_name == "ECABasicBlock": block_module = ECABasicBlock else: raise NotImplementedError(f"Unsupported network block: {block_name}") return block_module def _network_initialization(self, in_channels: int, out_channels: int, dimension: int) -> None: if len(self.layers) != len(self.planes): raise ValueError("layers and planes arguments should be the same length") if len(self.planes) != self.num_bottom_up: raise ValueError("planes argument should have the same length as the number of bottom-up blocks") self.convs = nn.ModuleList() # Bottom-up convolutional blocks with stride=2 self.bn = nn.ModuleList() # Bottom-up BatchNorms self.blocks = nn.ModuleList() # Bottom-up blocks self.tconvs = nn.ModuleList() # Top-down tranposed convolutions self.conv1x1 = nn.ModuleList() # 1x1 convolutions in lateral connections # The first convolution is special case, with kernel size = 5 self.inplanes = self.planes[0] self.conv0 = ME.MinkowskiConvolution( in_channels, self.inplanes, kernel_size=self.conv0_kernel_size, dimension=dimension ) self.bn0 = ME.MinkowskiBatchNorm(self.inplanes) for plane, layer in zip(self.planes, self.layers): self.convs.append( ME.MinkowskiConvolution( self.inplanes, self.inplanes, kernel_size=2, stride=2, dimension=dimension ) ) self.bn.append(ME.MinkowskiBatchNorm(self.inplanes)) self.blocks.append(self._make_layer(self.block, plane, layer)) # Lateral connections for i in range(self.num_top_down): self.conv1x1.append( ME.MinkowskiConvolution( self.planes[-1 - i], self.lateral_dim, kernel_size=1, stride=1, dimension=dimension ) ) self.tconvs.append( ME.MinkowskiConvolutionTranspose( self.lateral_dim, self.lateral_dim, kernel_size=2, stride=2, dimension=dimension ) ) # There's one more lateral connection than top-down TConv blocks if self.num_top_down < self.num_bottom_up: # Lateral connection from Conv block 1 or above self.conv1x1.append( ME.MinkowskiConvolution( self.planes[-1 - self.num_top_down], self.lateral_dim, kernel_size=1, stride=1, dimension=dimension, ) ) else: # Lateral connection from Con0 block self.conv1x1.append( ME.MinkowskiConvolution( self.planes[0], self.lateral_dim, kernel_size=1, stride=1, dimension=dimension ) ) self.relu = ME.MinkowskiReLU(inplace=True)
[docs] def forward(self, x: ME.SparseTensor) -> ME.SparseTensor: # noqa: D102 # *** BOTTOM-UP PASS *** # First bottom-up convolution is special (with bigger kernel) feature_maps = [] x = self.conv0(x) x = self.bn0(x) x = self.relu(x) if self.num_top_down == self.num_bottom_up: feature_maps.append(x) # BOTTOM-UP PASS for ndx, (conv, bn, block) in enumerate(zip(self.convs, self.bn, self.blocks)): x = conv(x) # Downsample (conv stride=2 with 2x2x2 kernel) x = bn(x) x = self.relu(x) x = block(x) if self.num_bottom_up - 1 - self.num_top_down <= ndx < len(self.convs) - 1: feature_maps.append(x) if len(feature_maps) != self.num_top_down: raise ValueError("Number of feature maps should be equal to the number of top-down blocks") x = self.conv1x1[0](x) # TOP-DOWN PASS for ndx, tconv in enumerate(self.tconvs): x = tconv(x) # Upsample using transposed convolution x = x + self.conv1x1[ndx + 1](feature_maps[-ndx - 1]) return x