Skip to content

alp_data.transforms module

What are Transforms?

Transforms are operations that can be applied to an ESP dataset to modify, filter, or enhance the data in various ways. In short, Transforms are callable objects that take a pandas DataFrame as input and return a tuple containing:

  1. The transformed DataFrame
  2. A dictionary of metadata about the transformation. Can be an empty dictionary if no metadata is needed.

Each transform is defined by two main components:

  • A configuration class (inheriting from pydantic.BaseModel)
  • A transform class that implements the actual transformation logic

How to Use Transforms

Basic Usage

Transforms can be used in two ways:

  1. Direct instantiation:

    from alp_data.transforms import Filter
    
    # Create a filter transform
    filter_transform = Filter(
        property="category",
        values=["A", "B"],
        mode="include"
    )
    
    # Apply the transform
    transformed_data, metadata = filter_transform(data)
    

  2. Using configuration:

    from alp_data.transforms import FilterConfig, transform_from_config
    
    # Create a configuration
    config = FilterConfig(
        type="filter",
        property="category",
        values=["A", "B"],
        mode="include"
    )
    
    # Assume a dataframe called 'data' is already defined
    transform = transform_from_config(config)
    transformed_data, metadata = transform(data)
    

Transform Configuration

Each transform has its own configuration class that defines its parameters. For example, the FilterConfig has: - type: The type of transform ("filter") - mode: Either "include" or "exclude" - property: The property to filter on - values: List of values to filter by

Creating Custom Transforms

To create a custom transform:

  1. Create a configuration class:

    from pydantic import BaseModel
    from typing import Literal
    
    class MyTransformConfig(BaseModel):
        type: Literal["my_transform"]
        # Add your configuration parameters here
    

  2. Create the transform class:

    class MyTransform:
        def __init__(self, **kwargs):
            # Initialize your transform
            pass
    
        @classmethod
        def from_config(cls, cfg: MyTransformConfig) -> "MyTransform":
            return cls(**cfg.model_dump(exclude=("type",)))
    
        def __call__(self, data: pd.DataFrame) -> tuple[pd.DataFrame, dict]:
            # Implement your transformation logic
            transformed_data = data  # Your transformation here
            return transformed_data, {}
    

  3. Register your transform:

    from alp_data.transforms import register_transform
    
    register_transform(MyTransformConfig, MyTransform)
    

Available Transforms

The transforms system uses a registry pattern to manage available transforms. The registry ensures that each transform type is unique and properly configured before use. The module provides several built-in transforms to handle common data transformation tasks. Here's an overview of each transform and its functionality:

Filter Transform

The Filter transform allows you to selectively include or exclude rows from your dataset based on specific property values.

alp_data.transforms.Filter

Filter data based on property values.

This transform filters a DataFrame based on the values of a specified property. It can either include or exclude rows based on the specified values. The property is a column in the DataFrame, and the values are the values to filter by.

Works with any backend (pandas, polars) through the DataBackend protocol.

Parameters:

Name Type Description Default
property str

The name of the property (column) to filter by.

required
values list[str]

The values to include or exclude from the DataFrame.

required
mode Literal['include', 'exclude']

The mode of filtering. If "include", only rows with the specified values in the property will be kept. If "exclude", rows with the specified values will be removed from the DataFrame.

'include'

Examples:

>>> from alp_data.transforms import Filter
>>> from alp_data.backends import PandasBackend
>>> import pandas as pd
>>> filter_transform = Filter(property="species", values=["bee", "butterfly"],
...     mode="include")
>>> df = pd.DataFrame({"species": ["bee", "ant", "butterfly", "spider"],
...     "count": [10, 5, 8, 2]})
>>> backend = PandasBackend(df)
>>> filtered_backend, _ = filter_transform(backend)
Source code in alp_data/transforms/filter.py
class Filter:
    """Filter data based on property values.

    This transform filters a DataFrame based on the values of a specified property.
    It can either include or exclude rows based on the specified values. The property
    is a column in the DataFrame, and the values are the values to filter by.

    Works with any backend (pandas, polars) through the DataBackend protocol.

    Parameters
    ----------
    property: str
        The name of the property (column) to filter by.
    values: list[str]
        The values to include or exclude from the DataFrame.
    mode: Literal["include", "exclude"]
        The mode of filtering. If "include", only rows with the specified values
        in the property will be kept. If "exclude", rows with the specified values
        will be removed from the DataFrame.

    Examples
    -------
    >>> from alp_data.transforms import Filter
    >>> from alp_data.backends import PandasBackend
    >>> import pandas as pd
    >>> filter_transform = Filter(property="species", values=["bee", "butterfly"],
    ...     mode="include")
    >>> df = pd.DataFrame({"species": ["bee", "ant", "butterfly", "spider"],
    ...     "count": [10, 5, 8, 2]})
    >>> backend = PandasBackend(df)
    >>> filtered_backend, _ = filter_transform(backend)
    """

    def __init__(
        self,
        *,
        property: str,
        values: list[str],
        mode: Literal["include", "exclude"] = "include",
    ) -> None:
        """
        Initialize the filter.
        """

        self.mode = mode
        self.property = property
        self.values = values

    @classmethod
    def from_config(cls, cfg: FilterConfig) -> "Filter":
        return cls(**cfg.model_dump(exclude=("type")))

    def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
        """Filter the data based on property values.

        Args:
            backend: The backend wrapping the dataframe to filter

        Returns:
            The filtered backend (same type as input) and empty metadata dict.
        """
        # Use backend's filter_isin method
        negate = self.mode == "exclude"
        filtered_backend = backend.filter_isin(self.property, self.values, negate=negate)

        return filtered_backend, {}

__call__(backend)

Filter the data based on property values.

Args: backend: The backend wrapping the dataframe to filter

Returns: The filtered backend (same type as input) and empty metadata dict.

Source code in alp_data/transforms/filter.py
def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
    """Filter the data based on property values.

    Args:
        backend: The backend wrapping the dataframe to filter

    Returns:
        The filtered backend (same type as input) and empty metadata dict.
    """
    # Use backend's filter_isin method
    negate = self.mode == "exclude"
    filtered_backend = backend.filter_isin(self.property, self.values, negate=negate)

    return filtered_backend, {}

__init__(*, property, values, mode='include')

Initialize the filter.

Source code in alp_data/transforms/filter.py
def __init__(
    self,
    *,
    property: str,
    values: list[str],
    mode: Literal["include", "exclude"] = "include",
) -> None:
    """
    Initialize the filter.
    """

    self.mode = mode
    self.property = property
    self.values = values

LabelFromFeature Transform

The LabelFromFeature transform converts categorical features into numerical labels. Example use case: Converting a 'species' column with values like 'dog', 'cat', 'bird' into numerical labels 0, 1, 2.

alp_data.transforms.LabelFromFeature

Transform to create a label feature from an existing feature in a DataFrame.

This transform maps the values of a specified feature to integer labels.

Works with any backend (pandas, polars) through the DataBackend protocol.

Parameters:

Name Type Description Default
feature str

The name of the feature in the DataFrame from which to create labels.

required
label_map dict[Any, int] | None

A mapping of feature values to integer labels. If None, the labels will be created from the unique values in the feature.

None
output_feature str

The name of the new feature to store the labels. Defaults to "label".

'label'
override bool

If True, will override the output feature if it already exists in the DataFrame. If False, will raise an AssertionError if the output feature already exists.

False

Examples:

>>> from alp_data.backends import PandasBackend
>>> import pandas as pd
>>> df = pd.DataFrame({"species": ["cat", "dog", "bird", "cat"]})
>>> backend = PandasBackend(df)
>>> transform = LabelFromFeature(feature="species", output_feature="label")
>>> transformed_backend, metadata = transform(backend)
Source code in alp_data/transforms/label_from_feature.py
class LabelFromFeature:
    """Transform to create a label feature from an existing feature in a DataFrame.

    This transform maps the values of a specified feature to integer labels.

    Works with any backend (pandas, polars) through the DataBackend protocol.

    Parameters
    ----------
    feature: str
        The name of the feature in the DataFrame from which to create labels.
    label_map: dict[Any, int] | None
        A mapping of feature values to integer labels. If None, the labels will be
        created from the unique values in the feature.
    output_feature: str
        The name of the new feature to store the labels. Defaults to "label".
    override: bool
        If True, will override the output feature if it already exists in the DataFrame.
        If False, will raise an AssertionError if the output feature already exists.

    Examples
    -------
    >>> from alp_data.backends import PandasBackend
    >>> import pandas as pd
    >>> df = pd.DataFrame({"species": ["cat", "dog", "bird", "cat"]})
    >>> backend = PandasBackend(df)
    >>> transform = LabelFromFeature(feature="species", output_feature="label")
    >>> transformed_backend, metadata = transform(backend)
    """

    def __init__(
        self,
        *,
        feature: str,
        label_map: dict[Any, int] | None = None,
        output_feature: str = "label",
        override: bool = False,
    ) -> None:
        self.feature = feature
        self.label_map = label_map
        self.override = override
        self.output_feature = output_feature

    @classmethod
    def from_config(cls, cfg: LabelFromFeatureConfig) -> "LabelFromFeature":
        return cls(**cfg.model_dump(exclude=("type")))

    def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
        """Apply the transformation to the backend.

        Parameters
        ----------
        backend : DataBackend
            The backend wrapping the DataFrame to transform.

        Returns
        -------
        tuple[DataBackend, dict]
            A tuple containing the transformed backend and metadata about the labels.

        Raises
        -------
        AssertionError
            If the output feature already exists and override is False.
        """
        if self.output_feature in backend.columns and not self.override:
            raise AssertionError(
                "Feature already exists in DataFrame. Set `override=True` to replace it."
            )

        # Drop rows with null values in the feature column
        backend_clean = backend.dropna(subset=[self.feature])

        # Count dropped rows for logging
        # Note: In streaming mode (LazyFrame), this will trigger evaluation
        try:
            original_len = len(backend)
            clean_len = len(backend_clean)
            if clean_len != original_len:
                logger.warning(f"Dropped {original_len - clean_len} rows with {self.feature}=NaN")
        except Exception as e:
            logger.warning(f"Could not compute dropped rows: {e}")
            pass

        # Get unique values and create label map if not provided
        if self.label_map is None:
            uniques = backend_clean.get_unique(self.feature)
            label_map = {lbl: idx for idx, lbl in enumerate(uniques)}
        else:
            label_map = self.label_map

        # Map the feature to labels
        backend_with_labels = backend_clean.map_column(
            column=self.feature,
            mapping=label_map,
            output_column=self.output_feature,
        )

        metadata = {
            "label_feature": self.feature,
            "label_map": label_map,
            "num_classes": len(label_map),
        }

        return backend_with_labels, metadata

__call__(backend)

Apply the transformation to the backend.

Parameters:

Name Type Description Default
backend DataBackend

The backend wrapping the DataFrame to transform.

required

Returns:

Type Description
tuple[DataBackend, dict]

A tuple containing the transformed backend and metadata about the labels.

Raises:

Type Description
AssertionError

If the output feature already exists and override is False.

Source code in alp_data/transforms/label_from_feature.py
def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
    """Apply the transformation to the backend.

    Parameters
    ----------
    backend : DataBackend
        The backend wrapping the DataFrame to transform.

    Returns
    -------
    tuple[DataBackend, dict]
        A tuple containing the transformed backend and metadata about the labels.

    Raises
    -------
    AssertionError
        If the output feature already exists and override is False.
    """
    if self.output_feature in backend.columns and not self.override:
        raise AssertionError(
            "Feature already exists in DataFrame. Set `override=True` to replace it."
        )

    # Drop rows with null values in the feature column
    backend_clean = backend.dropna(subset=[self.feature])

    # Count dropped rows for logging
    # Note: In streaming mode (LazyFrame), this will trigger evaluation
    try:
        original_len = len(backend)
        clean_len = len(backend_clean)
        if clean_len != original_len:
            logger.warning(f"Dropped {original_len - clean_len} rows with {self.feature}=NaN")
    except Exception as e:
        logger.warning(f"Could not compute dropped rows: {e}")
        pass

    # Get unique values and create label map if not provided
    if self.label_map is None:
        uniques = backend_clean.get_unique(self.feature)
        label_map = {lbl: idx for idx, lbl in enumerate(uniques)}
    else:
        label_map = self.label_map

    # Map the feature to labels
    backend_with_labels = backend_clean.map_column(
        column=self.feature,
        mapping=label_map,
        output_column=self.output_feature,
    )

    metadata = {
        "label_feature": self.feature,
        "label_map": label_map,
        "num_classes": len(label_map),
    }

    return backend_with_labels, metadata

MultiLabelFromFeatures Transform

The MultiLabelFromFeatures transform extends the functionality of LabelFromFeature to handle multiple features simultaneously. Example use case: Creating labels from multiple categorical columns like 'species', 'breed', and 'color' in a single operation.

alp_data.transforms.MultiLabelFromFeatures

A transform that generates multi-label targets from one or more feature columns.

This class goes through one or more specified columns and generates a mapping of unique values to integer IDs. It then uses this mapping to generate a new column where each row contains a list of integer label IDs corresponding to the unique values found in the specified feature columns. It is useful for preparing data for multi-label classification tasks, where each sample may be associated with multiple labels.

Notes

If element values are themselves lists, the transform will explode them first before constructing the mapping dictionary and converting the values.

Parameters:

Name Type Description Default
features list[str]

The names of the columns in the DataFrame to use as sources for the labels. Each column can contain a single value or a list of values per row.

required
label_map dict[Any, int] | None

A mapping of unique values to integer IDs. If not provided, the transform will generate a mapping based on the unique values in the specified feature columns.

None
output_feature str

The name of the output column to store the generated label lists.

"label"
override bool

If False and the output_feature already exists in the dataset, an error is raised. If True, the output_feature will be overwritten.

False
allow_missing_labels bool

If True, rows with no labels will be included in the output. If False, rows with no labels will be dropped.

True

Methods:

Name Description
from_config

Instantiates the transform from a configuration object.

__call__

Applies the transform to the DataFrame, returning the modified DataFrame and metadata about the label mapping.

Examples:

>>> import pandas as pd
>>> from alp_data.transforms import MultiLabelFromFeatures
>>> config = MultiLabelFromFeaturesConfig(
...     type="labels_from_features",
...     features=["tags", "categories"],
...     label_map=None,
...     output_feature="labels",
...     override=False
... )
>>> df = pd.DataFrame({
...     "tags": [["cat", "dog"], ["bird"], ["cat"]],
...     "categories": [["mammal"], ["avian"], []]
... })
>>> from alp_data.backends import PandasBackend
>>> backend = PandasBackend(df)
>>> transform = MultiLabelFromFeatures.from_config(config)
>>> transformed_df, metadata = transform(backend)
>>> metadata["label_map"]
{'avian': 0, 'bird': 1, 'cat': 2, 'dog': 3, 'mammal': 4}
Source code in alp_data/transforms/multilabel_from_features.py
class MultiLabelFromFeatures:
    """
    A transform that generates multi-label targets from one or more feature columns.

    This class goes through one or more specified columns and generates a mapping of
    unique values to integer IDs. It then uses this mapping to generate a new column
    where each row contains a list of integer label IDs corresponding to the unique
    values found in the specified feature columns. It is useful for preparing data for
    multi-label classification tasks, where each sample may be associated with multiple
    labels.

    Notes
    -----
    If element values are themselves lists, the transform will explode them first before
    constructing the mapping dictionary and converting the values.

    Parameters
    ----------
    features : list[str]
        The names of the columns in the DataFrame to use as sources for the labels. Each
        column can contain a single value or a list of values per row.
    label_map : dict[Any, int] | None, default=None
        A mapping of unique values to integer IDs. If not provided, the transform will
        generate a mapping based on the unique values in the specified feature columns.
    output_feature : str, default="label"
        The name of the output column to store the generated label lists.
    override : bool, default=False
        If False and the output_feature already exists in the dataset, an error is
        raised. If True, the output_feature will be overwritten.
    allow_missing_labels : bool, default=True
        If True, rows with no labels will be included in the output. If False, rows with
        no labels will be dropped.

    Methods
    -------
    from_config(cfg: MultiLabelFromFeaturesConfig) -> MultiLabelFromFeatures
        Instantiates the transform from a configuration object.
    __call__(df: pd.DataFrame) -> tuple[pd.DataFrame, dict]
        Applies the transform to the DataFrame, returning the modified DataFrame and
        metadata about the label mapping.

    Examples
    -------
    >>> import pandas as pd
    >>> from alp_data.transforms import MultiLabelFromFeatures
    >>> config = MultiLabelFromFeaturesConfig(
    ...     type="labels_from_features",
    ...     features=["tags", "categories"],
    ...     label_map=None,
    ...     output_feature="labels",
    ...     override=False
    ... )
    >>> df = pd.DataFrame({
    ...     "tags": [["cat", "dog"], ["bird"], ["cat"]],
    ...     "categories": [["mammal"], ["avian"], []]
    ... })
    >>> from alp_data.backends import PandasBackend
    >>> backend = PandasBackend(df)
    >>> transform = MultiLabelFromFeatures.from_config(config)
    >>> transformed_df, metadata = transform(backend)
    >>> metadata["label_map"]
    {'avian': 0, 'bird': 1, 'cat': 2, 'dog': 3, 'mammal': 4}
    """

    def __init__(
        self,
        *,
        features: list[str],
        label_map: dict[Any, int] | None = None,
        output_feature: str = "label",
        override: bool = False,
        allow_missing_labels: bool = True,
    ) -> None:
        self.features = features
        self.label_map = label_map
        self.override = override
        self.output_feature = output_feature
        self.allow_missing_labels = allow_missing_labels

    @classmethod
    def from_config(cls, cfg: MultiLabelFromFeaturesConfig) -> "MultiLabelFromFeatures":
        return cls(**cfg.model_dump(exclude=("type")))

    def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
        if self.output_feature in backend.columns and not self.override:
            raise AssertionError(
                "Feature already exists in DataFrame. Set `override=True` to replace it."
            )

        backend, label_map = backend.multilabel_from_features(
            input_features=self.features,
            label_map=self.label_map,
            output_feature=self.output_feature,
            allow_missing_labels=self.allow_missing_labels,
        )

        metadata = {
            "label_feature": self.features,
            "label_map": label_map,
            "num_classes": len(label_map),
        }

        return backend, metadata

Subsample Transform

The Subsample transform reduces the size of your dataset by sampling a subset of the data. Example use case: Creating a 10% random sample of a large dataset for initial testing.

alp_data.transforms.Subsample

Subsample data based on property ratios.

This transform subsamples a DataFrame based on the specified ratios for each value of a given property. It allows for controlling the representation of different categories in the dataset by specifying how much of each category to keep. The property is a column in the DataFrame, and the ratios are specified as a dictionary where keys are property values and values are the ratios of samples to keep for each property value. The "other" category can be used to specify a ratio for all other values not explicitly listed in the ratios dictionary.

Works with any backend (pandas, polars) through the DataBackend protocol.

Parameters:

Name Type Description Default
property str

The name of the property (column) to subsample by.

required
ratios dict[str, float]

A dictionary where keys are the values of the property and values are the ratios of samples to keep for each value. The ratios should be in the range [0, 1]. If "other" is included as a key, it will subsample all other values not explicitly listed in the ratios dictionary.

required

Examples:

>>> from alp_data.transforms import Subsample, SubsampleConfig
>>> from alp_data.backends import PandasBackend
>>> import pandas as pd
>>> config = SubsampleConfig(
...     type="subsample",
...     property="species",
...     ratios={
...         "bee": 0.5,
...         "butterfly": 0.3,
...         "other": 0.1
...     })
>>> subsample_transform = Subsample.from_config(config)
>>> df = pd.DataFrame({
...     "species": ["bee", "bee", "butterfly", "ant", "butterfly", "spider"],
...     "count": [10, 5, 8, 2, 3, 1]
... })
>>> backend = PandasBackend(df)
>>> subsampled_backend, _ = subsample_transform(backend)
Source code in alp_data/transforms/subsample.py
class Subsample:
    """Subsample data based on property ratios.

    This transform subsamples a DataFrame based on the specified ratios for each value
    of a given property. It allows for controlling the representation of different
    categories in the dataset by specifying how much of each category to keep.
    The property is a column in the DataFrame, and the ratios are specified as a
    dictionary where keys are property values and values are the ratios of samples to
    keep for each property value. The "other" category can be used to specify a ratio
    for all other values not explicitly listed in the ratios dictionary.

    Works with any backend (pandas, polars) through the DataBackend protocol.

    Parameters
    ----------
    property: str
        The name of the property (column) to subsample by.

    ratios: dict[str, float]
        A dictionary where keys are the values of the property and values are the
        ratios of samples to keep for each value. The ratios should be in the range
        [0, 1]. If "other" is included as a key, it will subsample all other values
        not explicitly listed in the ratios dictionary.

    Examples
    -------
    >>> from alp_data.transforms import Subsample, SubsampleConfig
    >>> from alp_data.backends import PandasBackend
    >>> import pandas as pd
    >>> config = SubsampleConfig(
    ...     type="subsample",
    ...     property="species",
    ...     ratios={
    ...         "bee": 0.5,
    ...         "butterfly": 0.3,
    ...         "other": 0.1
    ...     })
    >>> subsample_transform = Subsample.from_config(config)
    >>> df = pd.DataFrame({
    ...     "species": ["bee", "bee", "butterfly", "ant", "butterfly", "spider"],
    ...     "count": [10, 5, 8, 2, 3, 1]
    ... })
    >>> backend = PandasBackend(df)
    >>> subsampled_backend, _ = subsample_transform(backend)
    """

    def __init__(self, property: str, ratios: dict[str, float], seed: int = 42) -> None:
        self.property = property
        self.ratios = ratios
        self.seed = seed

    @classmethod
    def from_config(cls, cfg: SubsampleConfig) -> "Subsample":
        return cls(**cfg.model_dump(exclude=("type")))

    def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
        """
        Apply the subsample transformation.

        Parameters
        ----------
        backend: DataBackend
            The backend wrapping the dataframe to subsample

        Returns
        -------
        tuple[DataBackend, dict]: A tuple containing:
            The subsampled backend (same type as input).
            The metadata dictionary (empty placeholder for future use).

        Raises
        ------
        KeyError
            If the specified property is not found in the DataFrame columns.
        """
        if self.property not in backend.columns:
            raise KeyError(f"Property '{self.property}' not found in the DataFrame columns.")

        # Use backend's subsample_by_column method
        subsampled_backend = backend.subsample_by_column(
            column=self.property,
            ratios=self.ratios,
            seed=self.seed,
        )

        return subsampled_backend, {}

__call__(backend)

Apply the subsample transformation.

Parameters:

Name Type Description Default
backend DataBackend

The backend wrapping the dataframe to subsample

required

Returns:

Type Description
tuple[DataBackend, dict]: A tuple containing:

The subsampled backend (same type as input). The metadata dictionary (empty placeholder for future use).

Raises:

Type Description
KeyError

If the specified property is not found in the DataFrame columns.

Source code in alp_data/transforms/subsample.py
def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
    """
    Apply the subsample transformation.

    Parameters
    ----------
    backend: DataBackend
        The backend wrapping the dataframe to subsample

    Returns
    -------
    tuple[DataBackend, dict]: A tuple containing:
        The subsampled backend (same type as input).
        The metadata dictionary (empty placeholder for future use).

    Raises
    ------
    KeyError
        If the specified property is not found in the DataFrame columns.
    """
    if self.property not in backend.columns:
        raise KeyError(f"Property '{self.property}' not found in the DataFrame columns.")

    # Use backend's subsample_by_column method
    subsampled_backend = backend.subsample_by_column(
        column=self.property,
        ratios=self.ratios,
        seed=self.seed,
    )

    return subsampled_backend, {}

BalancedSample Transform

The BalancedSample transform performs balanced sampling of the data, ensuring balanced representation across different categories.

alp_data.transforms.BalancedSample

Balance data by sampling to equalize category counts.

This transform balances a DataFrame based on a specified property to ensure that the resulting DataFrame has a balanced distribution of the specified property across the samples.

Parameters:

Name Type Description Default
property str

The name of the property (column) to sample by.

required
strategy Literal['min', 'max', 'median', 'mean', 'median_with_range']

The balancing strategy to use. Options are: - "min": Sample all categories to the minimum count (downsamples larger) - "max": Sample all categories to the maximum count (upsamples smaller) - "median": Sample all categories to the median count (default) - "mean": Sample all categories to the mean count - "median_with_range": Clamp each category to a range around the median

'median'
range_fraction float

Only used with "median_with_range" strategy. The fraction of the median to use as the range. E.g., 0.2 means targets are clamped to [median * 0.8, median * 1.2]. Defaults to 0.2.

0.2
seed int

Random seed for reproducibility. Defaults to 42.

42

Examples:

>>> from alp_data.transforms import BalancedSample, BalancedSampleConfig
>>> from alp_data.backends import PandasBackend
>>> import pandas as pd
>>> config = BalancedSampleConfig(
...     type="balanced_sample",
...     property="species",
...     strategy="median",
...     seed=42
... )
>>> balanced_sample_transform = BalancedSample.from_config(config)
>>> df = pd.DataFrame({
...     "species": ["bee", "bee", "butterfly", "ant", "butterfly", "spider"],
...     "count": [10, 5, 8, 2, 3, 1]
... })
>>> backend = PandasBackend(df)
>>> sampled_backend, _ = balanced_sample_transform(backend)
Source code in alp_data/transforms/balanced_sample.py
class BalancedSample:
    """Balance data by sampling to equalize category counts.

    This transform balances a DataFrame based on a specified property to
    ensure that the resulting DataFrame has a balanced distribution of the specified
    property across the samples.

    Parameters
    ----------
    property: str
        The name of the property (column) to sample by.
    strategy: str
        The balancing strategy to use. Options are:
        - "min": Sample all categories to the minimum count (downsamples larger)
        - "max": Sample all categories to the maximum count (upsamples smaller)
        - "median": Sample all categories to the median count (default)
        - "mean": Sample all categories to the mean count
        - "median_with_range": Clamp each category to a range around the median
    range_fraction: float
        Only used with "median_with_range" strategy. The fraction of the median
        to use as the range. E.g., 0.2 means targets are clamped to
        [median * 0.8, median * 1.2]. Defaults to 0.2.
    seed: int
        Random seed for reproducibility. Defaults to 42.

    Examples
    -------
    >>> from alp_data.transforms import BalancedSample, BalancedSampleConfig
    >>> from alp_data.backends import PandasBackend
    >>> import pandas as pd
    >>> config = BalancedSampleConfig(
    ...     type="balanced_sample",
    ...     property="species",
    ...     strategy="median",
    ...     seed=42
    ... )
    >>> balanced_sample_transform = BalancedSample.from_config(config)
    >>> df = pd.DataFrame({
    ...     "species": ["bee", "bee", "butterfly", "ant", "butterfly", "spider"],
    ...     "count": [10, 5, 8, 2, 3, 1]
    ... })
    >>> backend = PandasBackend(df)
    >>> sampled_backend, _ = balanced_sample_transform(backend)
    """

    def __init__(
        self,
        property: str,
        strategy: Literal["min", "max", "median", "mean", "median_with_range"] = "median",
        range_fraction: float = 0.2,
        seed: int = 42,
    ) -> None:
        self.property = property
        self.strategy = strategy
        self.range_fraction = range_fraction
        self.seed = seed

    @classmethod
    def from_config(cls, cfg: BalancedSampleConfig) -> "BalancedSample":
        return cls(**cfg.model_dump(exclude=("type")))

    def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
        """Apply the balanced sample transformation.

        This transform creates a balanced distribution by sampling categories
        based on the selected strategy.

        Parameters
        ----------
        backend: DataBackend
            The backend wrapping the dataframe to sample.

        Returns
        -------
        tuple[DataBackend, dict]: A tuple containing:
            The sampled backend (same type as input).
            The metadata dictionary (empty placeholder for future use).

        Raises
        ------
        KeyError
            If the specified property is not found in the DataFrame columns.
        """
        if self.property not in backend.columns:
            raise KeyError(f"Property '{self.property}' not found in the DataFrame columns.")

        # Get all unique values for the property
        unique_values = backend.get_unique(self.property)

        # Get counts for each category to compute target counts
        category_counts = backend.histogram(self.property)

        if not category_counts:
            # Empty dataset
            return backend, {}

        counts = list(category_counts.values())

        # Compute target count based on strategy
        if self.strategy == "min":
            target_count = min(counts)
            target_counts = {v: target_count for v in unique_values if category_counts[v] > 0}
        elif self.strategy == "max":
            target_count = max(counts)
            target_counts = {v: target_count for v in unique_values if category_counts[v] > 0}
        elif self.strategy == "median":
            target_count = int(statistics.median(counts))
            target_counts = {v: target_count for v in unique_values if category_counts[v] > 0}
        elif self.strategy == "mean":
            target_count = round(statistics.mean(counts))
            target_counts = {v: target_count for v in unique_values if category_counts[v] > 0}
        elif self.strategy == "median_with_range":
            median_count = statistics.median(counts)
            lower_bound = int(median_count * (1 - self.range_fraction))
            upper_bound = int(median_count * (1 + self.range_fraction))
            # Clamp each category to [lower_bound, upper_bound]
            target_counts = {}
            for value in unique_values:
                count = category_counts[value]
                if count > 0:
                    target_counts[value] = max(lower_bound, min(upper_bound, count))

        # Handle categories with zero count
        for value in unique_values:
            if category_counts[value] == 0:
                target_counts[value] = 0

        # Use backend's upsample_by_column method
        # This handles both upsampling (with replacement) and downsampling (without replacement)
        sampled_backend = backend.upsample_by_column(
            column=self.property,
            target_counts=target_counts,
            seed=self.seed,
        )

        return sampled_backend, {}

__call__(backend)

Apply the balanced sample transformation.

This transform creates a balanced distribution by sampling categories based on the selected strategy.

Parameters:

Name Type Description Default
backend DataBackend

The backend wrapping the dataframe to sample.

required

Returns:

Type Description
tuple[DataBackend, dict]: A tuple containing:

The sampled backend (same type as input). The metadata dictionary (empty placeholder for future use).

Raises:

Type Description
KeyError

If the specified property is not found in the DataFrame columns.

Source code in alp_data/transforms/balanced_sample.py
def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
    """Apply the balanced sample transformation.

    This transform creates a balanced distribution by sampling categories
    based on the selected strategy.

    Parameters
    ----------
    backend: DataBackend
        The backend wrapping the dataframe to sample.

    Returns
    -------
    tuple[DataBackend, dict]: A tuple containing:
        The sampled backend (same type as input).
        The metadata dictionary (empty placeholder for future use).

    Raises
    ------
    KeyError
        If the specified property is not found in the DataFrame columns.
    """
    if self.property not in backend.columns:
        raise KeyError(f"Property '{self.property}' not found in the DataFrame columns.")

    # Get all unique values for the property
    unique_values = backend.get_unique(self.property)

    # Get counts for each category to compute target counts
    category_counts = backend.histogram(self.property)

    if not category_counts:
        # Empty dataset
        return backend, {}

    counts = list(category_counts.values())

    # Compute target count based on strategy
    if self.strategy == "min":
        target_count = min(counts)
        target_counts = {v: target_count for v in unique_values if category_counts[v] > 0}
    elif self.strategy == "max":
        target_count = max(counts)
        target_counts = {v: target_count for v in unique_values if category_counts[v] > 0}
    elif self.strategy == "median":
        target_count = int(statistics.median(counts))
        target_counts = {v: target_count for v in unique_values if category_counts[v] > 0}
    elif self.strategy == "mean":
        target_count = round(statistics.mean(counts))
        target_counts = {v: target_count for v in unique_values if category_counts[v] > 0}
    elif self.strategy == "median_with_range":
        median_count = statistics.median(counts)
        lower_bound = int(median_count * (1 - self.range_fraction))
        upper_bound = int(median_count * (1 + self.range_fraction))
        # Clamp each category to [lower_bound, upper_bound]
        target_counts = {}
        for value in unique_values:
            count = category_counts[value]
            if count > 0:
                target_counts[value] = max(lower_bound, min(upper_bound, count))

    # Handle categories with zero count
    for value in unique_values:
        if category_counts[value] == 0:
            target_counts[value] = 0

    # Use backend's upsample_by_column method
    # This handles both upsampling (with replacement) and downsampling (without replacement)
    sampled_backend = backend.upsample_by_column(
        column=self.property,
        target_counts=target_counts,
        seed=self.seed,
    )

    return sampled_backend, {}

Deduplicate Transform

The Deduplicate transform removes duplicate rows from your dataset based on specified columns. Example use case: Ensuring that each entry in a dataset is unique based on a combination of 'species' and 'location'.

alp_data.transforms.Deduplicate

A transform to remove duplicate rows from a DataFrame.

This transform removes duplicate rows based on specified columns or all columns if none are specified. It can keep either the first or last occurrence of duplicates.

Works with any backend (pandas, polars) through the DataBackend protocol.

Parameters:

Name Type Description Default
subset list[str] | None

List of column names to consider for deduplication. If empty, all columns are considered.

None
keep_first bool

If True, keeps the first occurrence of duplicates. If False, keeps the last occurrence.

True

Examples:

>>> from alp_data.backends import PandasBackend
>>> import pandas as pd
>>> df = pd.DataFrame({
...     "species": ["bee", "bee", "butterfly", "bee"],
...     "count": [10, 10, 5, 10]
... })
>>> backend = PandasBackend(df)
>>> transform = Deduplicate(subset=["species"], keep_first=True)
>>> deduplicated_backend, _ = transform(backend)
Source code in alp_data/transforms/deduplicate.py
class Deduplicate:
    """A transform to remove duplicate rows from a DataFrame.

    This transform removes duplicate rows based on specified columns or all columns if
    none are specified. It can keep either the first or last occurrence of duplicates.

    Works with any backend (pandas, polars) through the DataBackend protocol.

    Parameters
    ----------
    subset: list[str]
        List of column names to consider for deduplication. If empty, all columns are
        considered.
    keep_first: bool
        If True, keeps the first occurrence of duplicates. If False, keeps the last
        occurrence.

    Examples
    --------
    >>> from alp_data.backends import PandasBackend
    >>> import pandas as pd
    >>> df = pd.DataFrame({
    ...     "species": ["bee", "bee", "butterfly", "bee"],
    ...     "count": [10, 10, 5, 10]
    ... })
    >>> backend = PandasBackend(df)
    >>> transform = Deduplicate(subset=["species"], keep_first=True)
    >>> deduplicated_backend, _ = transform(backend)
    """

    def __init__(self, *, subset: list[str] | None = None, keep_first: bool = True) -> None:
        self.subset = subset
        self.keep_first = keep_first

    @classmethod
    def from_config(cls, cfg: DeduplicateConfig) -> "Deduplicate":
        return cls(subset=cfg.subset, keep_first=cfg.keep_first)

    def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
        """Remove duplicate rows from the backend.

        Args:
            backend: The backend wrapping the dataframe to deduplicate

        Returns:
            The deduplicated backend (same type as input) and empty metadata dict.
        """
        deduplicated_backend = backend.drop_duplicates(
            subset=self.subset,
            keep="first" if self.keep_first else "last",
        )
        return deduplicated_backend, {}

__call__(backend)

Remove duplicate rows from the backend.

Args: backend: The backend wrapping the dataframe to deduplicate

Returns: The deduplicated backend (same type as input) and empty metadata dict.

Source code in alp_data/transforms/deduplicate.py
def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
    """Remove duplicate rows from the backend.

    Args:
        backend: The backend wrapping the dataframe to deduplicate

    Returns:
        The deduplicated backend (same type as input) and empty metadata dict.
    """
    deduplicated_backend = backend.drop_duplicates(
        subset=self.subset,
        keep="first" if self.keep_first else "last",
    )
    return deduplicated_backend, {}

SelectColumns Transform

The SelectColumns transform allows you to select a subset of columns from your dataset. Example use case: Keeping only the 'audio' and 'label' columns for a machine learning task.

alp_data.transforms.SelectColumns

Select a subset of columns from the dataset.

This transform keeps only the specified columns and drops all others.

Works with any backend (pandas, polars) through the DataBackend protocol.

Parameters:

Name Type Description Default
columns list[str]

List of column names to keep.

required

Examples:

>>> from alp_data.transforms import SelectColumns, SelectColumnsConfig
>>> from alp_data.backends import PandasBackend
>>> import pandas as pd
>>> config = SelectColumnsConfig(
...     type="select_columns",
...     columns=["species", "audio"],
... )
>>> transform = SelectColumns.from_config(config)
>>> df = pd.DataFrame({
...     "species": ["bee", "ant"],
...     "audio": ["/a.wav", "/b.wav"],
...     "extra": [1, 2],
... })
>>> backend = PandasBackend(df)
>>> result, _ = transform(backend)
Source code in alp_data/transforms/select_columns.py
class SelectColumns:
    """Select a subset of columns from the dataset.

    This transform keeps only the specified columns and drops all others.

    Works with any backend (pandas, polars) through the DataBackend protocol.

    Parameters
    ----------
    columns : list[str]
        List of column names to keep.

    Examples
    -------
    >>> from alp_data.transforms import SelectColumns, SelectColumnsConfig
    >>> from alp_data.backends import PandasBackend
    >>> import pandas as pd
    >>> config = SelectColumnsConfig(
    ...     type="select_columns",
    ...     columns=["species", "audio"],
    ... )
    >>> transform = SelectColumns.from_config(config)
    >>> df = pd.DataFrame({
    ...     "species": ["bee", "ant"],
    ...     "audio": ["/a.wav", "/b.wav"],
    ...     "extra": [1, 2],
    ... })
    >>> backend = PandasBackend(df)
    >>> result, _ = transform(backend)
    """

    def __init__(self, columns: list[str]) -> None:
        self.columns = columns

    @classmethod
    def from_config(cls, cfg: SelectColumnsConfig) -> "SelectColumns":
        return cls(columns=cfg.columns)

    def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
        """Select the specified columns from the backend.

        Parameters
        ----------
        backend : DataBackend
            The backend wrapping the dataframe to transform.

        Returns
        -------
        tuple[DataBackend, dict]
            A tuple containing the transformed backend with only the selected
            columns and an empty metadata dictionary.

        Raises
        ------
        KeyError
            If any of the specified columns are not found in the backend.
        """
        missing = [c for c in self.columns if c not in backend.columns]
        if missing:
            raise KeyError(
                f"Columns {missing} not found in the DataFrame. "
                f"Available columns: {backend.columns}"
            )

        return backend.select_columns(self.columns), {}

__call__(backend)

Select the specified columns from the backend.

Parameters:

Name Type Description Default
backend DataBackend

The backend wrapping the dataframe to transform.

required

Returns:

Type Description
tuple[DataBackend, dict]

A tuple containing the transformed backend with only the selected columns and an empty metadata dictionary.

Raises:

Type Description
KeyError

If any of the specified columns are not found in the backend.

Source code in alp_data/transforms/select_columns.py
def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
    """Select the specified columns from the backend.

    Parameters
    ----------
    backend : DataBackend
        The backend wrapping the dataframe to transform.

    Returns
    -------
    tuple[DataBackend, dict]
        A tuple containing the transformed backend with only the selected
        columns and an empty metadata dictionary.

    Raises
    ------
    KeyError
        If any of the specified columns are not found in the backend.
    """
    missing = [c for c in self.columns if c not in backend.columns]
    if missing:
        raise KeyError(
            f"Columns {missing} not found in the DataFrame. "
            f"Available columns: {backend.columns}"
        )

    return backend.select_columns(self.columns), {}

LongTailUpsample Transform

The LongTailUpsample transform performs upsampling of underrepresented classes in a long-tailed distribution. Example use case: Increasing the number of samples for rare species in a biodiversity dataset.

alp_data.transforms.LongTailUpsample

Upsample under-represented categories without excessive repetition.

Designed for long-tail distributions (e.g. bioacoustic species counts) where a few categories dominate and many categories have very few examples. This transform lifts the tail towards a sufficient_threshold while capping how many times any single example can be repeated via max_repeats.

For each category with count c:

  • If c >= sufficient_threshold: the category is left untouched.
  • If c < sufficient_threshold: the target becomes min(sufficient_threshold, c * max_repeats).

This produces a gradual compression of the distribution: well-represented categories keep all their data, moderately-represented categories are boosted to the threshold, and very rare categories are boosted as much as possible without repeating any single example more than max_repeats times.

Parameters:

Name Type Description Default
property str

The name of the property (column) to balance on.

required
sufficient_threshold int

Categories with at least this many examples are left as-is. Categories below this count are upsampled towards it, subject to max_repeats.

required
max_repeats int

Maximum number of times any individual example may appear in the output. Prevents over-fitting on very rare categories.

required
seed int

Random seed for reproducibility. Defaults to 42.

42

Examples:

>>> from alp_data.transforms import LongTailUpsample, LongTailUpsampleConfig
>>> from alp_data.backends import PandasBackend
>>> import pandas as pd
>>> config = LongTailUpsampleConfig(
...     type="long_tail_upsample",
...     property="species",
...     sufficient_threshold=6,
...     max_repeats=3,
...     seed=42,
... )
>>> transform = LongTailUpsample.from_config(config)
>>> df = pd.DataFrame({
...     "species": (
...         ["common"] * 10
...         + ["moderate"] * 4
...         + ["rare"] * 1
...     ),
... })
>>> backend = PandasBackend(df)
>>> result, _ = transform(backend)
>>> # common (10): untouched — already >= 6
>>> # moderate (4): upsampled to min(6, 4*3)=6
>>> # rare (1): upsampled to min(6, 1*3)=3
Source code in alp_data/transforms/long_tail_upsample.py
class LongTailUpsample:
    """Upsample under-represented categories without excessive repetition.

    Designed for long-tail distributions (e.g. bioacoustic species counts) where
    a few categories dominate and many categories have very few examples. This
    transform lifts the tail towards a `sufficient_threshold` while capping how
    many times any single example can be repeated via `max_repeats`.

    For each category with count *c*:

    - If ``c >= sufficient_threshold``: the category is left untouched.
    - If ``c < sufficient_threshold``: the target becomes
      ``min(sufficient_threshold, c * max_repeats)``.

    This produces a gradual compression of the distribution: well-represented
    categories keep all their data, moderately-represented categories are boosted
    to the threshold, and very rare categories are boosted as much as possible
    without repeating any single example more than `max_repeats` times.

    Parameters
    ----------
    property : str
        The name of the property (column) to balance on.
    sufficient_threshold : int
        Categories with at least this many examples are left as-is. Categories
        below this count are upsampled towards it, subject to `max_repeats`.
    max_repeats : int
        Maximum number of times any individual example may appear in the output.
        Prevents over-fitting on very rare categories.
    seed : int
        Random seed for reproducibility. Defaults to 42.

    Examples
    -------
    >>> from alp_data.transforms import LongTailUpsample, LongTailUpsampleConfig
    >>> from alp_data.backends import PandasBackend
    >>> import pandas as pd
    >>> config = LongTailUpsampleConfig(
    ...     type="long_tail_upsample",
    ...     property="species",
    ...     sufficient_threshold=6,
    ...     max_repeats=3,
    ...     seed=42,
    ... )
    >>> transform = LongTailUpsample.from_config(config)
    >>> df = pd.DataFrame({
    ...     "species": (
    ...         ["common"] * 10
    ...         + ["moderate"] * 4
    ...         + ["rare"] * 1
    ...     ),
    ... })
    >>> backend = PandasBackend(df)
    >>> result, _ = transform(backend)
    >>> # common (10): untouched — already >= 6
    >>> # moderate (4): upsampled to min(6, 4*3)=6
    >>> # rare (1): upsampled to min(6, 1*3)=3
    """

    def __init__(
        self,
        property: str,
        sufficient_threshold: int,
        max_repeats: int,
        seed: int = 42,
    ) -> None:
        self.property = property
        self.sufficient_threshold = sufficient_threshold
        self.max_repeats = max_repeats
        self.seed = seed

    @classmethod
    def from_config(cls, cfg: LongTailUpsampleConfig) -> "LongTailUpsample":
        return cls(**cfg.model_dump(exclude=("type")))

    def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
        """Apply the long-tail upsample transformation.

        Categories below `sufficient_threshold` are upsampled towards it,
        bounded by `max_repeats`. Categories at or above the threshold are
        left unchanged.

        Parameters
        ----------
        backend : DataBackend
            The backend wrapping the dataframe to transform.

        Returns
        -------
        tuple[DataBackend, dict]
            A tuple containing the transformed backend (same type as input)
            and a metadata dictionary with keys ``histogram_before`` and
            ``histogram_after``, each mapping category values to their counts.

        Raises
        ------
        KeyError
            If the specified property is not found in the DataFrame columns.
        """
        if self.property not in backend.columns:
            raise KeyError(f"Property '{self.property}' not found in the DataFrame columns.")

        category_counts = backend.histogram(self.property)

        if not category_counts:
            return backend, {"histogram_before": {}, "histogram_after": {}}

        target_counts: dict[str, int] = {}
        for value, count in category_counts.items():
            if count == 0:
                target_counts[value] = 0
            elif count >= self.sufficient_threshold:
                target_counts[value] = count
            else:
                target_counts[value] = min(
                    self.sufficient_threshold,
                    count * self.max_repeats,
                )

        sampled_backend = backend.upsample_by_column(
            column=self.property,
            target_counts=target_counts,
            seed=self.seed,
        )

        histogram_after = sampled_backend.histogram(self.property)

        return sampled_backend, {
            "histogram_before": category_counts,
            "histogram_after": histogram_after,
        }

__call__(backend)

Apply the long-tail upsample transformation.

Categories below sufficient_threshold are upsampled towards it, bounded by max_repeats. Categories at or above the threshold are left unchanged.

Parameters:

Name Type Description Default
backend DataBackend

The backend wrapping the dataframe to transform.

required

Returns:

Type Description
tuple[DataBackend, dict]

A tuple containing the transformed backend (same type as input) and a metadata dictionary with keys histogram_before and histogram_after, each mapping category values to their counts.

Raises:

Type Description
KeyError

If the specified property is not found in the DataFrame columns.

Source code in alp_data/transforms/long_tail_upsample.py
def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
    """Apply the long-tail upsample transformation.

    Categories below `sufficient_threshold` are upsampled towards it,
    bounded by `max_repeats`. Categories at or above the threshold are
    left unchanged.

    Parameters
    ----------
    backend : DataBackend
        The backend wrapping the dataframe to transform.

    Returns
    -------
    tuple[DataBackend, dict]
        A tuple containing the transformed backend (same type as input)
        and a metadata dictionary with keys ``histogram_before`` and
        ``histogram_after``, each mapping category values to their counts.

    Raises
    ------
    KeyError
        If the specified property is not found in the DataFrame columns.
    """
    if self.property not in backend.columns:
        raise KeyError(f"Property '{self.property}' not found in the DataFrame columns.")

    category_counts = backend.histogram(self.property)

    if not category_counts:
        return backend, {"histogram_before": {}, "histogram_after": {}}

    target_counts: dict[str, int] = {}
    for value, count in category_counts.items():
        if count == 0:
            target_counts[value] = 0
        elif count >= self.sufficient_threshold:
            target_counts[value] = count
        else:
            target_counts[value] = min(
                self.sufficient_threshold,
                count * self.max_repeats,
            )

    sampled_backend = backend.upsample_by_column(
        column=self.property,
        target_counts=target_counts,
        seed=self.seed,
    )

    histogram_after = sampled_backend.histogram(self.property)

    return sampled_backend, {
        "histogram_before": category_counts,
        "histogram_after": histogram_after,
    }

AddTaxonomy Transform

The AddTaxonomy transform adds precomputed GBIF taxonomic information to your dataset based on existing features. Example use case: Adding family and order information to a dataset with a 'species' column using GBIF taxonomy.

alp_data.discover.AddTaxonomy

Transform that adds resolved GBIF taxonomy info to each row.

Uses GBIFConverter to resolve scientific names in a specified column to their accepted species-level taxonomic records. New columns are added for each taxonomy rank: 'kingdom', 'phylum', 'class', 'order', 'family', 'genus'. An extra column 'taxonomic_name' is also added, which concatenates the higher ranks with the canonical name e.g. "Animalia Chordata Aves Passeriformes Corvidae Corvus corax".

Parameters:

Name Type Description Default
feature str

Column name containing scientific names to look up.

'scientific_name'
gbif_precomputed_taxonomy_path str | AnyPathT

Path to GBIF taxonomy json file, preprocessed via scripts/cache_gbif_taxonomy_conversion.py

DEFAULT_PRECOMPUTED_LOCATION
add_taxonomic_name bool

Whether to add a 'taxonomic_name' column with the full taxonomic name.

False
Source code in alp_data/discover/gbif_taxonomy.py
class AddTaxonomy:
    """
    Transform that adds resolved GBIF taxonomy info to each row.

    Uses GBIFConverter to resolve scientific names in a specified column
    to their accepted species-level taxonomic records. New columns are added
    for each taxonomy rank: 'kingdom', 'phylum', 'class', 'order', 'family', 'genus'.
    An extra column 'taxonomic_name' is also added, which concatenates
    the higher ranks with the canonical name e.g.
    "Animalia Chordata Aves Passeriformes Corvidae Corvus corax".

    Parameters
    ----------
    feature : str
        Column name containing scientific names to look up.
    gbif_precomputed_taxonomy_path : str | AnyPathT
        Path to GBIF taxonomy json file, preprocessed via
        scripts/cache_gbif_taxonomy_conversion.py
    add_taxonomic_name : bool
        Whether to add a 'taxonomic_name' column with the full taxonomic name.
    """

    def __init__(
        self,
        feature: str = "scientific_name",
        gbif_precomputed_taxonomy_path: str | AnyPathT = DEFAULT_PRECOMPUTED_LOCATION,
        add_taxonomic_name: bool = False,
    ) -> None:
        self.feature = feature
        self.converter = GBIFConverter(
            precomputed_cache_path=gbif_precomputed_taxonomy_path,
        )
        self.add_taxonomic_name = add_taxonomic_name

    @classmethod
    def from_config(cls, cfg: AddTaxonomyConfig) -> "AddTaxonomy":
        return cls(**cfg.model_dump(exclude={"type"}))

    def _make_taxonomic_name(self, info: dict[str, str]) -> str | None:
        """Construct the full taxonomic name from GBIF info.

        Parameters
        ----------
        info : dict[str, str]
            GBIF taxonomic record fields.

        Returns
        -------
        str | None
            Full taxonomic name (including higher ranks) or None if unavailable.
        """
        if not info:
            return None

        taxonomic_name = ""
        for rank in TAXONOMY_RANKS[:-1]:  # Exclude genus
            rank_value = info.get(rank)
            if rank_value:
                if taxonomic_name:
                    taxonomic_name += " "
                taxonomic_name += rank_value

        # Add canonicalName
        canonical_name = info.get("canonicalName")
        if canonical_name:
            if taxonomic_name:
                taxonomic_name += " "
            taxonomic_name += canonical_name

        return taxonomic_name if len(taxonomic_name) > 0 else None

    def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
        """
        Apply the transform to add taxonomy columns.

        Parameters
        ----------
        backend : DataBackend
            The backend wrapping the DataFrame to transform.

        Returns
        -------
        tuple[DataBackend, dict]
            A tuple containing the transformed backend with taxonomy columns added,
            and metadata about the resolution (success/failure counts).

        Raises
        ------
        ValueError
            If the specified feature column is not found in the DataFrame.
        """
        if self.feature not in backend.columns:
            raise ValueError(f"Feature column '{self.feature}' not found in data.")

        # Get unique scientific names to look up (avoids redundant lookups)
        unique_names = backend.get_unique(self.feature)

        # Build a lookup cache: rank -> [{scientific_name -> value}]
        # e.g. {'kingdom': [{'Corvus corax': 'Animalia'}, ...], ...}
        EXTENDED_RANKS = TAXONOMY_RANKS
        if self.add_taxonomic_name:
            EXTENDED_RANKS = TAXONOMY_RANKS + ["taxonomic_name"]
        taxonomy_cache: dict[str, list[tuple[str, str]]] = {r: [] for r in EXTENDED_RANKS}
        success_count = 0
        failure_count = 0

        for name in unique_names:
            info, ok = self.converter(name)
            if ok:
                # Fill by rank
                for rank in TAXONOMY_RANKS:
                    taxonomy_cache[rank].append((name, info.get(rank)))
                if self.add_taxonomic_name:
                    taxonomy_cache["taxonomic_name"].append((name, self._make_taxonomic_name(info)))
                success_count += 1
            else:
                failure_count += 1
                logger.debug(f"Failed to resolve taxonomy for: {name}")

        if failure_count > 0:
            logger.warning(f"Failed to resolve {failure_count}/{len(unique_names)} unique names")

        # Map resolved taxonomy back to backend, adding new columns
        for rank in EXTENDED_RANKS:
            rank_mapping = {src: target for src, target in taxonomy_cache[rank]}
            backend = backend.map_column(self.feature, mapping=rank_mapping, output_column=rank)

        metadata = {
            "feature": self.feature,
            "resolved": success_count,
            "failed": failure_count,
            "taxonomy_columns_added": EXTENDED_RANKS,
        }

        return backend, metadata

__call__(backend)

Apply the transform to add taxonomy columns.

Parameters:

Name Type Description Default
backend DataBackend

The backend wrapping the DataFrame to transform.

required

Returns:

Type Description
tuple[DataBackend, dict]

A tuple containing the transformed backend with taxonomy columns added, and metadata about the resolution (success/failure counts).

Raises:

Type Description
ValueError

If the specified feature column is not found in the DataFrame.

Source code in alp_data/discover/gbif_taxonomy.py
def __call__(self, backend: DataBackend) -> tuple[DataBackend, dict]:
    """
    Apply the transform to add taxonomy columns.

    Parameters
    ----------
    backend : DataBackend
        The backend wrapping the DataFrame to transform.

    Returns
    -------
    tuple[DataBackend, dict]
        A tuple containing the transformed backend with taxonomy columns added,
        and metadata about the resolution (success/failure counts).

    Raises
    ------
    ValueError
        If the specified feature column is not found in the DataFrame.
    """
    if self.feature not in backend.columns:
        raise ValueError(f"Feature column '{self.feature}' not found in data.")

    # Get unique scientific names to look up (avoids redundant lookups)
    unique_names = backend.get_unique(self.feature)

    # Build a lookup cache: rank -> [{scientific_name -> value}]
    # e.g. {'kingdom': [{'Corvus corax': 'Animalia'}, ...], ...}
    EXTENDED_RANKS = TAXONOMY_RANKS
    if self.add_taxonomic_name:
        EXTENDED_RANKS = TAXONOMY_RANKS + ["taxonomic_name"]
    taxonomy_cache: dict[str, list[tuple[str, str]]] = {r: [] for r in EXTENDED_RANKS}
    success_count = 0
    failure_count = 0

    for name in unique_names:
        info, ok = self.converter(name)
        if ok:
            # Fill by rank
            for rank in TAXONOMY_RANKS:
                taxonomy_cache[rank].append((name, info.get(rank)))
            if self.add_taxonomic_name:
                taxonomy_cache["taxonomic_name"].append((name, self._make_taxonomic_name(info)))
            success_count += 1
        else:
            failure_count += 1
            logger.debug(f"Failed to resolve taxonomy for: {name}")

    if failure_count > 0:
        logger.warning(f"Failed to resolve {failure_count}/{len(unique_names)} unique names")

    # Map resolved taxonomy back to backend, adding new columns
    for rank in EXTENDED_RANKS:
        rank_mapping = {src: target for src, target in taxonomy_cache[rank]}
        backend = backend.map_column(self.feature, mapping=rank_mapping, output_column=rank)

    metadata = {
        "feature": self.feature,
        "resolved": success_count,
        "failed": failure_count,
        "taxonomy_columns_added": EXTENDED_RANKS,
    }

    return backend, metadata