Source code for ethology.io.annotations.load_bboxes

"""Load bounding boxes annotations into ``ethology``."""

import json
from pathlib import Path
from typing import Any, Literal

import numpy as np
import pandas as pd
import pandera.pandas as pa
import xarray as xr
from pandera.typing.pandas import DataFrame

from ethology.io.annotations.validate import (
    ValidBboxesDataFrame,
    ValidBboxesDataset,
    ValidCOCO,
    ValidVIA,
    _check_output,
)


[docs] @_check_output(ValidBboxesDataset) def from_files( file_paths: Path | str | list[Path | str], format: Literal["VIA", "COCO"], images_dirs: Path | str | list[Path | str] | None = None, ) -> xr.Dataset: """Load an ``ethology`` bounding box annotations dataset from a file. Parameters ---------- file_paths : pathlib.Path | str | list[pathlib.Path | str] Path or list of paths to the input annotation files. format : {"VIA", "COCO"} Format of the input annotation files. images_dirs : pathlib.Path | str | list[pathlib.Path | str], optional Path or list of paths to the directories containing the images the annotations refer to. The paths are added to the dataset attributes. Returns ------- xarray.Dataset A valid bounding box annotations dataset with dimensions `image_id`, `space`, `id`, and the following arrays: - `position`, with dimensions (image_id, space, id), - `shape`, with dimensions (image_id, space, id), - `category`, with dimensions (image_id, id) - optional, - `image_shape`, with dimensions (image_id, space) - optional. The `category` array, if present, holds category IDs as 1-based integers, matching the category IDs in the input file. The dataset attributes include: - `annotation_files`: a list of paths to the input annotation files - `annotation_format`: the format of the input annotation files - `map_category_to_str`: a map from category ID to category name - `map_image_id_to_filename`: a map from image ID to image filename - `images_directories`: directory paths for the images (optional) Notes ----- The `image_id` is assigned based on the alphabetically sorted list of unique image filenames across all input files. So if two images have the same filename but are in different input annotation files, they will be assigned the same image ID and their annotations will be merged. The `id` dimension corresponds to the annotation ID per image. It ranges from 0 to the maximum number of annotations per image in the dataset. Note that the annotation IDs are not necessarily consistent across images. This means that the annotations with ID=3 in image `t` and image `t+1` will likely not correspond to the same individual. The `space` dimension holds the "x" and "y" coordinates. Note that supercategories are not currently added to the xarray dataset, even if specified in the input file. Examples -------- Load annotations from a single COCO file: >>> from ethology.io.annotations import load_bboxes >>> ds = load_bboxes.from_files( ... file_paths="path/to/annotation_file.json", format="COCO" ... ) Load annotations from a single COCO file and specify the images directory: >>> from ethology.io.annotations import load_bboxes >>> ds = load_bboxes.from_files( ... file_paths="path/to/annotation_file.json", ... format="COCO", ... images_dirs="path/to/images_dir", ... ) Load annotations from two VIA files and specify multiple image directories: >>> from ethology.io.annotations import load_bboxes >>> ds = load_bboxes.from_files( ... file_paths=[ ... "path/to/annotation_file_1.json", ... "path/to/annotation_file_2.json", ... ], ... format="VIA", ... images_dirs=["path/to/images_dir_1", "path/to/images_dir_2"], ... ) """ # Compute intermediate dataframe df if isinstance(file_paths, list): df_all = _df_from_multiple_files(file_paths, format=format) else: df_all = _df_from_single_file(file_paths, format=format) # Get maps to set as dataset attributes map_image_id_to_filename, map_category_to_str = ( _get_map_attributes_from_df(df_all) ) # Convert dataframe to xarray dataset ds = _df_to_xarray_ds(df_all) # Add attributes to the xarray dataset ds.attrs = { "annotation_files": file_paths, "annotation_format": format, "images_directories": images_dirs, "map_category_to_str": map_category_to_str, "map_image_id_to_filename": map_image_id_to_filename, } return ds
def _get_map_attributes_from_df( df: DataFrame[ValidBboxesDataFrame], ) -> tuple[dict, dict]: """Get the map attributes from the dataframe. Parameters ---------- df : DataFrame[ValidBboxesDataFrame] Bounding box annotations dataframe. Returns ------- tuple[dict, dict] Map from "image_id" to image_filename and map from "category_id" to category name if present. """ # Compute dataset attributes from a valid intermediate dataframe # map from "image_id" to image_filename mapping_df = df[["image_filename", "image_id"]].drop_duplicates() map_image_id_to_filename = mapping_df.set_index("image_id").to_dict()[ "image_filename" ] # map from "category_id" to category name map_category_to_str = {} if all(col in df.columns for col in ["category_id", "category"]): map_category_to_str = ( df[["category_id", "category"]] .drop_duplicates() .set_index("category_id") .to_dict()["category"] ) # sort by category_id map_category_to_str = dict(sorted(map_category_to_str.items())) return (map_image_id_to_filename, map_category_to_str) @pa.check_types def _df_from_multiple_files( list_filepaths: list[Path | str], format: Literal["VIA", "COCO"] ) -> DataFrame[ValidBboxesDataFrame]: """Read annotations from multiple files as a valid intermediate dataframe. Parameters ---------- list_filepaths : list[Path | str] List of paths to the input annotation bounding boxes files format : Literal["VIA", "COCO"] Format of the input annotation bounding boxes files. Currently supported formats are "VIA" and "COCO". Returns ------- DataFrame[ValidBboxesDataFrame] Intermediate dataframe for bounding boxes annotations. The dataframe is indexed by "annotation_id" and has the following columns: "image_filename", "image_id", "image_width", "image_height", "x_min", "y_min", "width", "height", "supercategory", "category", "category_id". """ # Get list of dataframes df_list = [ _df_from_single_file(file_path=file, format=format) for file in list_filepaths ] # Concatenate and reindex # the resulting axis is labeled 0,1,…,n - 1. # NOTE: after ignore_index=True the index name is no longer "annotation_id" df_all = pd.concat(df_list, ignore_index=True) # Update "image_id" based on the alphabetically sorted list of unique image # filenames across all input files list_image_filenames = sorted(list(df_all["image_filename"].unique())) df_all["image_id"] = df_all["image_filename"].apply( lambda x: list_image_filenames.index(x) ) # Sort by image_filename df_all = df_all.sort_values(by=["image_filename"]) # Remove duplicates that may exist across files and reindex # NOTE: we exclude image_width and image_height from the set of columns # to identify duplicates, as these may differ across files. df_all = df_all.drop_duplicates( subset=[ col for col in df_all.columns if col not in ["image_width", "image_height"] ], ignore_index=True, inplace=False, ) # Set the index name back to "annotation_id" df_all.index.name = "annotation_id" return df_all @pa.check_types def _df_from_single_file( file_path: Path | str, format: Literal["VIA", "COCO"] ) -> DataFrame[ValidBboxesDataFrame]: """Read annotations from a single file as a valid intermediate dataframe. Parameters ---------- file_path : Path | str Path to the input annotation bounding boxes file. format : Literal["VIA", "COCO"] Format of the input bounding boxes annotation file. Currently supported formats are "VIA" and "COCO". Returns ------- DataFrame[ValidBboxesDataFrame] Intermediate dataframe for bounding boxes annotations. The dataframe is indexed by "annotation_id" and has the following columns: 'image_filename', 'image_id', 'image_width', 'image_height', 'x_min', 'y_min', 'width', 'height', 'supercategory', 'category', 'category_id'. """ # Choose the appropriate validator and row-extraction function validator: type[ValidVIA | ValidCOCO] if format == "VIA": validator = ValidVIA get_rows_from_file = _df_rows_from_valid_VIA_file elif format == "COCO": validator = ValidCOCO get_rows_from_file = _df_rows_from_valid_COCO_file else: raise ValueError(f"Unsupported format: {format}") # Build dataframe from extracted rows valid_file = validator(file_path) list_rows = get_rows_from_file(valid_file.path) df = pd.DataFrame(list_rows) # Sort annotations by image_filename df = df.sort_values(by=["image_filename"]) # Drop duplicates and reindex # The resulting axis is labeled 0,1,…,n-1. df = df.drop_duplicates( subset=[col for col in df.columns if col != "annotation_id"], ignore_index=True, inplace=False, ) # Cast bbox coordinates and shape as floats for col in ["x_min", "y_min", "width", "height"]: df[col] = df[col].astype(np.float64) # Set the index name to "annotation_id" df = df.set_index("annotation_id") return df def _df_rows_from_valid_VIA_file(file_path: Path) -> list[dict]: """Extract list of dataframe rows from a validated VIA JSON file. Parameters ---------- file_path : Path Path to the validated VIA JSON file. Returns ------- list[dict] List of dataframe rows extracted from the validated VIA JSON file. """ # Read validated json as dict with open(file_path) as file: data_dict = json.load(file) # Get list of sorted image filenames image_metadata_dict = data_dict["_via_img_metadata"] list_sorted_filenames = sorted( [img_dict["filename"] for img_dict in image_metadata_dict.values()] ) # Get supercategories and categories via_attributes = data_dict["_via_attributes"] supercategories_dict = {} if "region" in via_attributes: supercategories_dict = via_attributes["region"] # Compute list of rows in dataframe list_rows = [] annotation_id = 0 # loop through images for _, img_dict in image_metadata_dict.items(): # Extract img width and height, # set to default if invalid or not present image_width, image_height = ( _get_image_shape_attr_as_integer( img_dict["file_attributes"], file_attr, # type: ignore ) for file_attr in ["width", "height"] ) # loop thru annotations in the image for region in img_dict["regions"]: # Extract region data region_shape = region["shape_attributes"] region_attributes = region["region_attributes"] # Extract category data if present if region_attributes and supercategories_dict: # supercategory # A region (bbox) can have multiple supercategories. # We only consider the first supercategory in alphabetical # order. supercategory = sorted(list(region_attributes.keys()))[0] # category name # in VIA files, the category_id is a string category_id_str = region_attributes[supercategory] categories_dict = supercategories_dict[supercategory][ "options" ] category = categories_dict[category_id_str] # category_id as int category_id = _category_id_as_int( category_id_str, categories_dict ) else: supercategory, category, category_id = ( ValidBboxesDataFrame.get_empty_values()[key] for key in ["supercategory", "category", "category_id"] ) # Add to row row = { "annotation_id": annotation_id, "image_filename": img_dict["filename"], "image_id": list_sorted_filenames.index(img_dict["filename"]), "image_width": image_width, "image_height": image_height, "x_min": region_shape["x"], "y_min": region_shape["y"], "width": region_shape["width"], "height": region_shape["height"], "supercategory": supercategory, "category": category, "category_id": category_id, } list_rows.append(row) # update "annotation_id" annotation_id += 1 return list_rows def _get_image_shape_attr_as_integer( file_attrs: dict, attr_name: Literal["width", "height"] ) -> int: """Safely extract the image shape attribute as an integer. If the attribute is not present or invalid, return the default value for the image shape attribute defined in ValidBboxesDataFrame.get_empty_values(). The file_attrs dictionary should come from a VIA input file. Parameters ---------- file_attrs : dict File attributes dictionary extracted from a VIA input file. attr_name : Literal["width", "height"] Name of the image shape attribute. Returns ------- int Attribute value as int. If the attribute is not present or invalid, return the default value for the image shape attribute defined in ValidBboxesDataFrame.get_empty_values(). """ default_value = ValidBboxesDataFrame.get_empty_values()[ f"image_{attr_name}" ] try: return int(file_attrs.get(attr_name, default_value)) except (TypeError, ValueError): return default_value def _category_id_as_int( category_id_str: str, list_categories: list[str] ) -> int: """Convert category_id to int if possible, otherwise factorize it. The category_id is a string in VIA files. If it cannot be converted to an integer, it is factorized to a 1-based integer (0 is reserved for the background class) based on the alphabetically sorted list of categories. Parameters ---------- category_id_str : str Category ID as string. list_categories : list[str] List of categories. Returns ------- int Category ID as int. """ # get category_id as int try: category_id = int(category_id_str) except ValueError: # factorize to 0-based integers list_sorted_options = sorted(list_categories) category_id = list_sorted_options.index(category_id_str) # Add 1 to the factorised values to make them 1-based category_id = category_id + 1 return category_id def _df_rows_from_valid_COCO_file(file_path: Path) -> list[dict]: """Extract list of dataframe rows from a validated COCO JSON file. Parameters ---------- file_path : Path Path to the validated COCO JSON file. Returns ------- list[dict] List of dataframe rows extracted from the validated COCO JSON file. """ # Read validated json as dict with open(file_path) as file: data_dict = json.load(file) # Prepare data # We define "image_id_ethology" as the 0-based index of the image # following the alphabetically sorted list of unique image filenames. # In the following we assume the list of images under "images" in the # COCO JSON file is unique (i.e. it has no duplicate elements). map_img_id_coco_to_ethology = { img_dict["id"]: idx for idx, img_dict in enumerate( sorted(data_dict["images"], key=lambda x: x["file_name"]) ) } map_img_id_coco_to_filename = { img_dict["id"]: img_dict["file_name"] for img_dict in data_dict["images"] } map_img_id_coco_to_width_height = { img_dict["id"]: (img_dict["width"], img_dict["height"]) for img_dict in data_dict["images"] } # COCO files from VGG annotator always have # image width and height (can be 0) map_category_id_to_category_data = { cat_dict["id"]: (cat_dict["name"], cat_dict.get("supercategory", "")) for cat_dict in data_dict["categories"] } # category data: category name, supercategory name # Build standard dataframe list_rows = [] for annot_id, annot_dict in enumerate(data_dict["annotations"]): # image data img_id_coco = annot_dict["image_id"] image_filename = map_img_id_coco_to_filename[img_id_coco] image_width, image_height = map_img_id_coco_to_width_height[ img_id_coco ] # compute image ID following ethology convention img_id_ethology = map_img_id_coco_to_ethology[img_id_coco] # bbox data x_min, y_min, width, height = annot_dict["bbox"] # category data category_id = annot_dict["category_id"] category, supercategory = map_category_id_to_category_data[category_id] row = { "annotation_id": annot_id, "image_filename": image_filename, "image_id": img_id_ethology, "image_width": image_width, "image_height": image_height, "x_min": x_min, "y_min": y_min, "width": width, "height": height, "supercategory": supercategory, # if not defined, set to "" "category": category, "category_id": category_id, # in COCO files, the category_id is always a 1-based integer } list_rows.append(row) return list_rows @pa.check_types def _df_to_xarray_ds(df: DataFrame[ValidBboxesDataFrame]) -> xr.Dataset: """Convert a bounding box annotations dataframe to an xarray dataset. Parameters ---------- df : DataFrame[ValidBboxesDataFrame] A valid intermediate dataframe for bounding boxes annotations. Returns ------- xr.Dataset an xarray dataset with the following dimensions: - `image_id`: holds the 0-based index of the image in the "images" list of the COCO JSON file; - `space`: `x` or `y`; - `id`: annotation ID per image, assigned from 0 to the max number of annotations per image in the full dataset. Note that the annotation IDs are not necessarily consistent across images. This means that the annotations with ID `m` in image `t` and image `t+1` will likely not correspond to the same individual. The dataset is made up of the following arrays: - `position`: (`image_id`, `space`, `id`) - `shape`: (`image_id`, `space`, `id`) - `category`: (`image_id`, `id`) """ # Drop columns if all values in that column are empty default_values = ValidBboxesDataFrame.get_empty_values() list_empty_cols = [ col for col in default_values if all(df[col] == default_values[col]) ] df = df.drop(columns=list_empty_cols) # Compute max number of annotations per image max_annotations_per_image = df["image_id"].value_counts().max() # Sort the dataframe by image_id df = df.sort_values(by=["image_id"]) # Compute indices of the rows where the image ID switches bool_id_diff_from_prev = df["image_id"].ne(df["image_id"].shift()) indices_id_switch = np.argwhere(bool_id_diff_from_prev)[1:, 0] # Extract arrays from the dataframe arrays_metadata = _prepare_array_dicts(df) array_dict = _extract_arrays_from_df( df, arrays_metadata, indices_id_switch, max_annotations_per_image ) # Build data vars dictionary data_vars = { array_key.split("_array")[0]: ( arrays_metadata[array_key]["dims"], array_dict[array_key], ) for array_key in array_dict } return xr.Dataset( data_vars=data_vars, coords=dict( image_id=df["image_id"].unique(), space=["x", "y"], id=range(max_annotations_per_image), ), ) def _prepare_array_dicts( df: pd.DataFrame, ) -> dict[str, dict[str, Any]]: """Prepare the metadata for the arrays in the xarray dataset. Parameters ---------- df : pd.DataFrame A dataframe for bounding boxes annotations. Returns ------- dict[str, dict[str, Any]] A dictionary with the metadata for the arrays in the xarray dataset. """ arrays_metadata: dict[str, dict[str, Any]] = { "position_array": { "columns": ["x_min", "y_min"], "type": np.float64, "pad_value": np.nan, "dims": ("image_id", "space", "id"), }, "shape_array": { "columns": ["width", "height"], "type": np.float64, "pad_value": np.nan, "dims": ("image_id", "space", "id"), }, } # Add image shape data if present if all(col in df.columns for col in ["image_width", "image_height"]): arrays_metadata["image_shape_array"] = { "columns": ["image_width", "image_height"], "type": int, "pad_value": -1, "dims": ("image_id", "space"), } # Add category data if present if all(col in df.columns for col in ["category_id", "category"]): arrays_metadata["category_array"] = { "columns": ["category_id"], "type": int, "pad_value": -1, "dims": ("image_id", "id"), } return arrays_metadata def _extract_arrays_from_df( df: pd.DataFrame, arrays_metadata: dict[str, dict[str, Any]], indices_id_switch: np.ndarray, max_annotations_per_image: int, ) -> dict[str, np.ndarray]: """Extract arrays in metadata dict from a df of bounding boxes annotations. Parameters ---------- df : pd.DataFrame A dataframe for bounding boxes annotations. arrays_metadata : dict[str, dict[str, Any]] A dictionary with the metadata for the arrays to extract. indices_id_switch : np.ndarray Indices of the rows where the image ID switches. max_annotations_per_image : int The maximum number of annotations per image. Returns ------- dict[str, np.ndarray] A dictionary with the arrays extracted from the dataframe. """ array_dict = {} for key in arrays_metadata: # Extract annotations per image list_arrays = np.split( df[arrays_metadata[key]["columns"]].to_numpy( dtype=arrays_metadata[key]["type"] ), indices_id_switch, ) # each array: (n_annotations, N_DIM) if key == "image_shape_array": array_dict[key] = np.stack( [np.unique(arr, axis=0) for arr in list_arrays], axis=0 ).squeeze(axis=1) # (n_images, N_DIM) else: # Pad arrays with NaN values along the annotation ID axis # and stack to (n_images, n_max_annotations, N_DIM) list_arrays_padded = [ np.pad( arr, ((0, max_annotations_per_image - arr.shape[0]), (0, 0)), constant_values=arrays_metadata[key]["pad_value"], ) for arr in list_arrays ] array_dict[key] = np.stack(list_arrays_padded, axis=0) # Reorder dimensions to (n_images, N_DIM, n_max_annotations) # (squeeze the N_DIM axis (N_DIM=1) for "category") array_dict[key] = np.moveaxis(array_dict[key], -1, 1) if key == "category_array": array_dict[key] = array_dict[key].squeeze(axis=1) # Modify x_min and y_min to represent the bbox centre array_dict["position_array"] += array_dict["shape_array"] / 2 return array_dict