opr.pipelines.place_recognition package

Place Recognition pipelines.

opr.pipelines.place_recognition.base

Basic Place Recognition pipelines.

class opr.pipelines.place_recognition.base.PlaceRecognitionPipeline(database_dir: str | PathLike, model: Module, model_weights_path: str | PathLike | None = None, device: str | int | device = 'cpu', pointcloud_quantization_size: float = 0.5)[source]

Bases: object

Basic Place Recognition pipeline.

infer(input_data: Dict[str, Tensor]) Dict[str, ndarray][source]

Single sample inference.

Parameters:

input_data (Dict[str, Tensor]) –

Input data. Dictionary with keys in the following format:

”image_{camera_name}” for images from cameras,

”mask_{camera_name}” for semantic segmentation masks,

”pointcloud_lidar_coords” for pointcloud coordinates from lidar,

”pointcloud_lidar_feats” for pointcloud features from lidar.

Returns:

Inference results. Dictionary with keys:

”idx” for predicted index in the database,

”pose” for predicted pose in the format [tx, ty, tz, qx, qy, qz, qw],

”descriptor” for predicted descriptor.

Return type:

Dict[str, np.ndarray]

opr.pipelines.place_recognition.text_labels

class opr.pipelines.place_recognition.text_labels.TextLabelsPlaceRecognitionOCRPipeline(db_labels_path, *args, **kwargs)[source]

Bases: PlaceRecognitionPipeline

find_most_similar_id(query: List[str], ignore_stopwords: bool = False, normalize_text: bool = False, print_info: bool = False) Tuple[str, List[str], int][source]

Finds the most similar ID in the database based on the given query.

Parameters:
  • query (List[str]) – The query to compare against the database labels.

  • ignore_stopwords (bool, optional) – Whether to ignore stopwords during comparison. Defaults to False.

  • normalize_text (bool, optional) – Whether to normalize the text before comparison. Defaults to False.

  • print_info (bool, optional) – Whether to print additional information during the process. Defaults to False.

Returns:

A tuple containing the best match ID, the corresponding labels, and the highest similarity score.

Return type:

Tuple[Optional[str], Optional[List[str]], int]

static get_labels_by_id(labels: List[str], id: str) List[str][source]

Retrieve all labels associated with a given ID from the labels dictionary.

Parameters:
  • labels (List[str]) – The list of labels.

  • id (str) – The ID to retrieve labels for.

Returns:

The list of labels associated with the given ID.

Return type:

List[str]

infer(input_data: Dict[str, Tensor], text_similarity_thresh: int = 50, print_info: bool = False) Dict[str, ndarray][source]

Single sample inference.

Parameters:

input_data (Dict[str, Tensor]) –

Input data. Dictionary with keys in the following format:

”image_{camera_name}” for images from cameras,

”mask_{camera_name}” for semantic segmentation masks,

”pointcloud_lidar_coords” for pointcloud coordinates from lidar,

”pointcloud_lidar_feats” for pointcloud features from lidar.

”text_similarity_thresh” Text similarity threshold. Defaults to 50.

Returns:

Inference results. Dictionary with keys:

”idx” for predicted index in the database,

”pose” for predicted pose in the format [tx, ty, tz, qx, qy, qz, qw],

”descriptor” for predicted descriptor.

Return type:

Dict[str, np.ndarray]

init_ocr_model(ocr_model)[source]
static normalize_labels(labels: List[str]) List[str][source]

Normalize a list of labels by converting them to lowercase and removing leading/trailing whitespace.

Parameters:

labels (List[str]) – The list of labels to be normalized.

Returns:

The normalized list of labels.

Return type:

List[str]

static remove_stopwords(labels: List[str], stopwords: List[str] = ['выход', 'мфти']) List[str][source]

Removes stopwords from a list of labels.

Parameters:
  • labels (List[str]) – The list of labels to remove stopwords from.

  • stopwords (List[str], optional) – The list of stopwords to be removed. Defaults to [“выход”, “мфти”].

Returns:

The list of labels with stopwords removed.

Return type:

List[str]

class opr.pipelines.place_recognition.text_labels.TextLabelsPlaceRecognitionPipeline(db_labels_path, *args, **kwargs)[source]

Bases: PlaceRecognitionPipeline

find_most_similar_id(query: List[str], ignore_stopwords: bool = False, normalize_text: bool = False, print_info: bool = False) Tuple[str, List[str], int][source]

Finds the most similar ID in the database based on the given query.

Parameters:
  • query (List[str]) – The query to compare against the database labels.

  • ignore_stopwords (bool, optional) – Whether to ignore stopwords during comparison. Defaults to False.

  • normalize_text (bool, optional) – Whether to normalize the text before comparison. Defaults to False.

  • print_info (bool, optional) – Whether to print additional information during the process. Defaults to False.

Returns:

A tuple containing the best match ID, the corresponding labels, and the highest similarity score.

Return type:

Tuple[Optional[str], Optional[List[str]], int]

static get_labels_by_id(labels: List[str], id: str) List[str][source]

Retrieve all labels associated with a given ID from the labels dictionary.

Parameters:
  • labels (List[str]) – The list of labels.

  • id (str) – The ID to retrieve labels for.

Returns:

The list of labels associated with the given ID.

Return type:

List[str]

infer(input_data: Dict[str, Tensor], query_labels: List[str], text_similarity_thresh: int = 50, print_info: bool = False) Dict[str, ndarray][source]

Single sample inference.

Parameters:

input_data (Dict[str, Tensor]) –

Input data. Dictionary with keys in the following format:

”image_{camera_name}” for images from cameras,

”mask_{camera_name}” for semantic segmentation masks,

”pointcloud_lidar_coords” for pointcloud coordinates from lidar,

”pointcloud_lidar_feats” for pointcloud features from lidar.

”query_labels” List of query labels.

”text_similarity_thresh” Text similarity threshold. Defaults to 50.

Returns:

Inference results. Dictionary with keys:

”idx” for predicted index in the database,

”pose” for predicted pose in the format [tx, ty, tz, qx, qy, qz, qw],

”descriptor” for predicted descriptor.

Return type:

Dict[str, np.ndarray]

static normalize_labels(labels: List[str]) List[str][source]

Normalize a list of labels by converting them to lowercase and removing leading/trailing whitespace.

Parameters:

labels (List[str]) – The list of labels to be normalized.

Returns:

The normalized list of labels.

Return type:

List[str]

static remove_stopwords(labels: List[str], stopwords: List[str] = ['выход', 'мфти']) List[str][source]

Removes stopwords from a list of labels.

Parameters:
  • labels (List[str]) – The list of labels to remove stopwords from.

  • stopwords (List[str], optional) – The list of stopwords to be removed. Defaults to [“выход”, “мфти”].

Returns:

The list of labels with stopwords removed.

Return type:

List[str]