Skip to content

alp_data.chain Module

What is Dataset chaining?

The chain module provides a lightweight way to iterate over multiple ESP datasets as if they were a single dataset. Unlike ConcatenatedDataset, chaining does not merge DataFrames or support transformations on the combined data—it simply yields items from each source dataset in sequence.

Use ChainedDataset when you:

  • Need to iterate over multiple datasets without applying joint transformations
  • Want to support streaming mode across multiple datasets
  • Prefer a lightweight approach that doesn't create a merged DataFrame

For combining datasets with transformation support, see concatenate.md.

How can I chain datasets?

Basic Usage

from alp_data.datasets import InsectSet459, BirdSet
from alp_data.chain import ChainedDataset

# Load individual datasets
dataset1 = InsectSet459(split="validation")
dataset2 = BirdSet(split="HSN-test")

print(f"Dataset 1 length: {len(dataset1)}")
print(f"Dataset 2 length: {len(dataset2)}")

# Chain datasets for iteration
chained = ChainedDataset([dataset1, dataset2])

# Length is the sum of all source datasets
print(f"Chained dataset length: {len(chained)}")

# Iterate over all items
for item in chained:
    print(item.keys())
    break  # Just show first item

Indexing

ChainedDataset supports indexing by mapping the global index to the appropriate source dataset:

# Access items by global index
first_item = chained[0]  # From dataset1
last_item = chained[-1]  # Not supported - raises IndexError

# Index maps across datasets
# If dataset1 has 100 items and dataset2 has 200 items:
# - chained[0] returns dataset1[0]
# - chained[99] returns dataset1[99]
# - chained[100] returns dataset2[0]
# - chained[299] returns dataset2[199]

Warning

Negative indexing is not supported in ChainedDataset. Attempting to use negative indices will raise an IndexError.

Streaming Mode

Unlike ConcatenatedDataset, ChainedDataset supports streaming mode. All source datasets must have the same streaming mode:

# Streaming mode - all datasets must be streaming
streaming_ds1 = SomeDataset(split="train", streaming=True)
streaming_ds2 = AnotherDataset(split="train", streaming=True)

chained_streaming = ChainedDataset([streaming_ds1, streaming_ds2])

# In streaming mode, len() raises RuntimeError
# Iterate instead:
for item in chained_streaming:
    process(item)

Creating from Configuration

You can create a ChainedDataset from a YAML configuration file using the chain keyword:

chain:
  datasets:
    - dataset_name: insectset459
      split: validation
    - dataset_name: birdset
      split: HSN-test

Load the configuration in Python:

from alp_data import dataset_from_config

chained_dataset, metadata = dataset_from_config("path/to/chain_config.yaml")

Key Differences from ConcatenatedDataset

Aspect ChainedDataset ConcatenatedDataset
DataFrame handling Delegates to source datasets Merges into single DataFrame
Transformations Not supported Supported via apply_transformations
Column handling Union of all columns reported Merge strategies (hard/overlap/soft)
Streaming Supported Not supported
Memory footprint Lightweight Holds merged DataFrame
Metadata merging Basic Full merge (names, owners, versions, etc.)

Understanding the ChainedDataset Class

Key Properties

# Available columns (union of all source dataset columns)
print(chained.columns)

# Available splits (always ["chained"])
print(chained.available_splits)

# Length (sum of source dataset lengths)
print(len(chained))  # Raises RuntimeError in streaming mode

Iteration Behavior

When iterating, items are yielded from each source dataset in order:

# Items come from datasets in order
chained = ChainedDataset([dataset1, dataset2, dataset3])

# Iteration yields:
# - All items from dataset1
# - Then all items from dataset2
# - Then all items from dataset3
for item in chained:
    # item comes from whichever dataset it belongs to
    pass

Limitations

  1. No transformations: Cannot apply transforms to the chained dataset as a whole
  2. No negative indexing: Only non-negative integer indices are supported
  3. Streaming mode consistency: All source datasets must have the same streaming mode
  4. No column merging: Columns are not aligned or merged; each item has whatever columns its source dataset provides

Function Reference

Helper class to chain multiple datasets for iteration and indexing.

This class allows iterating over multiple datasets as if they were a single dataset.

Parameters:

Name Type Description Default
datasets list[Dataset]

List of datasets to concatenate for iteration

required

Examples:

>>> from alp_data.datasets import InsectSet459, BirdSet
>>> from alp_data.chain import ChainedDataset
>>> dataset1 = InsectSet459(split="validation")
>>> dataset2 = BirdSet(split="HSN-test")
>>> concat_iter = ChainedDataset([dataset1, dataset2])
>>> total_length = len(dataset1) + len(dataset2)
>>> item = next(iter(concat_iter))
>>> assert len(concat_iter) == total_length,         "Concatenated iterator length should match sum of source datasets lengths"
Source code in alp_data/chain.py
@register_dataset
class ChainedDataset(Dataset):
    """Helper class to chain multiple datasets for iteration and indexing.

    This class allows iterating over multiple datasets as if they were a single dataset.

    Parameters
    ----------
    datasets : list[Dataset]
        List of datasets to concatenate for iteration

    Examples
    --------
    >>> from alp_data.datasets import InsectSet459, BirdSet
    >>> from alp_data.chain import ChainedDataset
    >>> dataset1 = InsectSet459(split="validation")
    >>> dataset2 = BirdSet(split="HSN-test")
    >>> concat_iter = ChainedDataset([dataset1, dataset2])
    >>> total_length = len(dataset1) + len(dataset2)
    >>> item = next(iter(concat_iter))
    >>> assert len(concat_iter) == total_length, \
        "Concatenated iterator length should match sum of source datasets lengths"
    """

    info = DatasetInfo(
        name="chained_dataset",
        owner="ESP Data Team",
        split_paths={"chained": "virtual://chained_dataset"},
        version="0.1.0",
        description="A dataset created by chaining multiple datasets for iteration.",
        sources=["Multiple datasets"],
        license="CC0-1.0",
    )

    def __init__(self, datasets: list[Dataset]) -> None:
        if not datasets:
            raise ChainException("At least one dataset must be provided")

        if not all(isinstance(ds, Dataset) for ds in datasets):
            raise ChainException("All objects must be Dataset instances")

        # determine streaming mode based on source datasets
        # all datasets must have the same streaming mode
        streaming_modes = {ds.streaming for ds in datasets}
        if len(streaming_modes) > 1:
            raise ChainException(
                "All datasets must have the same streaming mode "
                "to be combined into a ChainedDataset."
            )
        _streaming = streaming_modes.pop()

        # _backend_class doesn't matter here since we override all data access methods
        super().__init__(streaming=_streaming)

        self._source_datasets = datasets
        try:
            self._lengths = [len(ds) for ds in datasets]
            self._total_length = sum(self._lengths)
        except RuntimeError:
            self._lengths = []
            self._total_length = -1

        self._all_columns = set()
        for ds in datasets:
            self._all_columns.update(ds.columns)
        self._all_columns = sorted(list(self._all_columns))

    @property
    def columns(self) -> list[str]:
        return self._all_columns

    @property
    def available_splits(self) -> list[str]:
        return ["chained"]

    def _load(self) -> None:
        pass  # Data is already loaded

    def __len__(self) -> int:
        if self._streaming:
            raise RuntimeError("Length is not supported in streaming mode")
        return self._total_length

    def __iter__(self) -> Iterator[dict[str, Any]]:
        for dataset in self._source_datasets:
            for item in dataset:
                yield item

    def __getitem__(self, idx: int) -> dict[str, Any]:
        """Get item by global index across chained datasets.

        Parameters
        ----------
        idx : int
            Global index across all chained datasets.

        Returns
        -------
        dict[str, Any]
            The item at the specified global index.

        Raises
        ------
        IndexError
            If the index is out of bounds.
        RuntimeError
            If indexing is attempted in streaming mode.
        """
        if self._streaming:
            raise RuntimeError("Indexing is not supported in streaming mode")

        if idx < 0:
            raise IndexError("Negative indexing is not supported")

        if idx >= self._total_length:
            raise IndexError(
                f"Index {idx} out of bounds for concatenated dataset of length {self._total_length}"
            )

        # Determine which dataset the index falls into
        cumulative_length = 0
        for dataset, length in zip(self._source_datasets, self._lengths, strict=True):
            if idx < cumulative_length + length:
                return dataset[idx - cumulative_length]
            cumulative_length += length

    @classmethod
    def from_config(
        cls, chain_config: ChainedDatasetConfig
    ) -> tuple["ChainedDataset", dict[str, Any]]:
        """Create a `ChainedDataset` from a `ChainedDatasetConfig` object.

        Parameters
        ----------
        chain_config : ChainedDatasetConfig
            Configuration object specifying the datasets to chain together.

        Returns
        -------
        tuple[ChainedDataset, dict]
            A tuple containing the `ChainedDataset` instance and a metadata
            dict aggregating transform metadata from each source dataset,
            keyed by `f"{dataset_name}_metadata"`.
        """
        datasets = []
        metadata = {}
        for cfg in chain_config.datasets:
            ds, meta = dataset_from_config(cfg)
            datasets.append(ds)
            metadata.update({f"{cfg.dataset_name}_metadata": meta})
        ds = cls(datasets)

        return ds, metadata

    def __str__(self) -> str:
        return (
            f"{self.info.name} (v{self.info.version})\n"
            f"Description: {self.info.description}\n"
            f"Length: {len(self)}\n"
            f"Columns: {', '.join(self.columns)}\n"
            f"Source datasets: {len(self._datasets)}"
        )

__getitem__(idx)

Get item by global index across chained datasets.

Parameters:

Name Type Description Default
idx int

Global index across all chained datasets.

required

Returns:

Type Description
dict[str, Any]

The item at the specified global index.

Raises:

Type Description
IndexError

If the index is out of bounds.

RuntimeError

If indexing is attempted in streaming mode.

Source code in alp_data/chain.py
def __getitem__(self, idx: int) -> dict[str, Any]:
    """Get item by global index across chained datasets.

    Parameters
    ----------
    idx : int
        Global index across all chained datasets.

    Returns
    -------
    dict[str, Any]
        The item at the specified global index.

    Raises
    ------
    IndexError
        If the index is out of bounds.
    RuntimeError
        If indexing is attempted in streaming mode.
    """
    if self._streaming:
        raise RuntimeError("Indexing is not supported in streaming mode")

    if idx < 0:
        raise IndexError("Negative indexing is not supported")

    if idx >= self._total_length:
        raise IndexError(
            f"Index {idx} out of bounds for concatenated dataset of length {self._total_length}"
        )

    # Determine which dataset the index falls into
    cumulative_length = 0
    for dataset, length in zip(self._source_datasets, self._lengths, strict=True):
        if idx < cumulative_length + length:
            return dataset[idx - cumulative_length]
        cumulative_length += length

from_config(chain_config) classmethod

Create a ChainedDataset from a ChainedDatasetConfig object.

Parameters:

Name Type Description Default
chain_config ChainedDatasetConfig

Configuration object specifying the datasets to chain together.

required

Returns:

Type Description
tuple[ChainedDataset, dict]

A tuple containing the ChainedDataset instance and a metadata dict aggregating transform metadata from each source dataset, keyed by f"{dataset_name}_metadata".

Source code in alp_data/chain.py
@classmethod
def from_config(
    cls, chain_config: ChainedDatasetConfig
) -> tuple["ChainedDataset", dict[str, Any]]:
    """Create a `ChainedDataset` from a `ChainedDatasetConfig` object.

    Parameters
    ----------
    chain_config : ChainedDatasetConfig
        Configuration object specifying the datasets to chain together.

    Returns
    -------
    tuple[ChainedDataset, dict]
        A tuple containing the `ChainedDataset` instance and a metadata
        dict aggregating transform metadata from each source dataset,
        keyed by `f"{dataset_name}_metadata"`.
    """
    datasets = []
    metadata = {}
    for cfg in chain_config.datasets:
        ds, meta = dataset_from_config(cfg)
        datasets.append(ds)
        metadata.update({f"{cfg.dataset_name}_metadata": meta})
    ds = cls(datasets)

    return ds, metadata

Exception raised when dataset chaining fails.

Source code in alp_data/chain.py
class ChainException(Exception):
    """Exception raised when dataset chaining fails."""

    pass