Source code for opr.pipelines.depth_estimation

import numpy as np
import torch
import torch.nn as nn
from os import PathLike
from argparse import Namespace
from opr.utils import init_model, parse_device
from typing import Dict, Optional, Union
from torchvision.transforms import Resize
from skimage.transform import resize
from sklearn.linear_model import LinearRegression
import torch_tensorrt
import time

[docs] class DepthEstimationPipeline: def __init__(self, model: nn.Module, model_type: str = 'AdelaiDepth', align_type: str = 'average', mode: str = 'indoor', model_weights_path: Optional[Union[str, PathLike]] = None, device: Union[str, int, torch.device] = "cuda"): """ This class provides a pipeline for monocular depth estimation with use of sparse lidar point cloud data. Args: model (nn.Module): PyTorch model for depth estimation. model_type (str): Type of the neural depth reconstruction model, either 'AdelaiDepth' or 'DepthAnything'. align_type (str): Predicted depth and point cloud alignment type, either 'average' or 'regression'. mode (str): Mode of operation, either 'indoor' or 'outdoor'. model_weights_path (Optional[Union[str, PathLike]]): Path to the model weights (optional). device (Union[str, int, torch.device]): Device to run the model on (defaults to "cuda"). """ self.device = parse_device(device) self.model = init_model(model, model_weights_path, self.device) self.model_type = model_type assert self.model_type in ['AdelaiDepth', 'DepthAnything'] self.align_type = align_type assert align_type in ['average', 'regression'] self.mode = mode assert self.mode in ['indoor', 'outdoor'] self.model.eval() self.forward_type = 'fp32'
[docs] def set_camera_matrix(self, camera_matrix: Dict[str, float]): """Set the camera intrinsic matrix for calculations.""" self.camera_matrix = Namespace(**camera_matrix)
[docs] def set_lidar_to_camera_transform(self, transform): """Set lidar to camera transform""" self.lidar_to_camera_transform = transform
[docs] def get_depth_with_lidar(self, image: np.ndarray, point_cloud: np.ndarray) -> np.ndarray: """Obtain depth estimation from the provided image and point cloud data. Args: image: np.ndarray - monocular image point_cloud: np.ndarray - sparse lidar point cloud Returns: depth: np.ndarray - reconstructed depth map with the same height and width as the input image zs: np.ndarray - z values of the lidar point cloud projected on the iamge errors: np.ndarray - absolute errors of depth reconstruction for the points of the projected lidar point cloud rel_errors: np.ndarray - relative errors of depth reconstruction for the points of the projected lidar point cloud """ raw_img_h, raw_img_w = image.shape[0], image.shape[1] if self.model_type == 'DepthAnything': image = resize(image, (240, 320)) image = (image * 255).astype(np.uint8) else: image = resize(image, (480, 640)) start_time = time.time() if self.model_type == 'AdelaiDepth': image_tensor = torch.Tensor(np.transpose(image, [2, 0, 1])[np.newaxis, ...]).to(self.device) predicted_depth = self.model.inference(image_tensor).cpu().numpy()[0, 0] else: image_tensor, (h, w) = self.model.image2tensor(image) predicted_depth = self.model.infer_image(image) predicted_depth = resize(predicted_depth, (raw_img_h, raw_img_w)) end_time = time.time() pcd_extended = np.concatenate((point_cloud, np.ones((point_cloud.shape[0], 1))), axis=1) pcd_transformed = pcd_extended @ np.linalg.inv(self.lidar_to_camera_transform).T pcd_transformed = pcd_transformed[:, :3] / pcd_transformed[:, 3:] pcd_forward_segment = pcd_transformed[pcd_transformed[:, 2] > 0] if self.mode == 'indoor': pcd_forward_segment = pcd_forward_segment[(pcd_forward_segment[:, 2] < 15) * \ (pcd_forward_segment[:, 0] > -15) * (pcd_forward_segment[:, 0] < 15) * \ (pcd_forward_segment[:, 1] > -5) * (pcd_forward_segment[:, 1] < 5)] else: pcd_forward_segment = pcd_forward_segment[(pcd_forward_segment[:, 2] < 50) * \ (pcd_forward_segment[:, 0] > -50) * (pcd_forward_segment[:, 0] < 50) * \ (pcd_forward_segment[:, 1] > -10) * (pcd_forward_segment[:, 1] < 10)] pcd_in_fov = pcd_forward_segment[np.abs(pcd_forward_segment[:, 0] / pcd_forward_segment[:, 2]) < self.camera_matrix.cx / self.camera_matrix.f] pcd_in_fov = pcd_in_fov[np.abs(pcd_in_fov[:, 1] / pcd_in_fov[:, 2]) < self.camera_matrix.cy / self.camera_matrix.f] pcd_in_fov_numpy = pcd_in_fov zs = [] preds = [] cnt = 0 mask = np.zeros_like(predicted_depth) for x, y, z in pcd_in_fov_numpy: i = int(self.camera_matrix.cy + y / z * self.camera_matrix.f) j = int(self.camera_matrix.cx + x / z * self.camera_matrix.f) if self.mode == 'indoor': if i < raw_img_h / 3 or i > raw_img_h * 2 / 3: continue if i < 0 or i >= raw_img_h or j < 0 or j >= raw_img_w: continue else: if y > 0: continue mask[i, j] = predicted_depth[i, j] zs.append(z) preds.append(predicted_depth[i, j]) cnt += 1 zs = np.array(zs) preds = np.array(preds) print('cnt:', cnt) if self.align_type == 'average': depth = predicted_depth * np.mean(zs / preds) else: lin = LinearRegression() lin.fit(X=preds[:, np.newaxis], y=zs) depth = predicted_depth * lin.coef_[0] + lin.intercept_ errors = [] rel_errors = [] zs = [] for x, y, z in pcd_in_fov_numpy: i = int(self.camera_matrix.cy + y / z * self.camera_matrix.f) j = int(self.camera_matrix.cx + x / z * self.camera_matrix.f) if i < 0 or i >= raw_img_h or j < 0 or j >= raw_img_w: continue if y > 0: continue zs.append(z) errors.append(np.abs(depth[i, j] - z)) rel_errors.append(np.abs(depth[i, j] - z) / z) cnt += 1 errors = np.array(errors) rel_errors = np.array(rel_errors) rmse = np.sqrt(np.mean(errors ** 2)) rel = np.mean(rel_errors) return depth, zs, errors, rel_errors