nirs4all.data.indexer module

class nirs4all.data.indexer.Indexer[source]

Bases: object

Index manager for samples used in ML/DL pipelines. Optimizes contiguous access and manages filtering.

This class is designed to retrieve data during ML pipelines. For example, it can be used to get all test samples from branch 2, including augmented samples, for specific processings such as [“raw”, “savgol”, “gaussian”].

The Indexer uses a component-based architecture for maintainability: - IndexStore: DataFrame storage and queries - QueryBuilder: Selector to Polars expression conversion - SampleManager: ID generation - AugmentationTracker: Origin/augmented relationships - ProcessingManager: Processing list operations - ParameterNormalizer: Input validation

__repr__() str[source]

String representation showing the DataFrame.

Returns:

String representation of the index DataFrame.

Return type:

str

__str__() str[source]

Human-readable summary of indexed samples.

Returns:

Summary showing sample counts by combination of attributes.

Return type:

str

add_processings(new_processings: List[str]) None[source]

Append processing names to all existing processing lists.

Adds new processings to the end of each sample’s processing list. This is useful when applying additional transformations to all data.

Parameters:

new_processings – List of new processing names to add to existing lists.

Raises:

ValueError – If new_processings is empty.

Examples

>>> indexer = Indexer()
>>> indexer.add_samples(5, processings=["raw", "msc"])
>>>
>>> # Add single processing
>>> indexer.add_processings(["normalize"])
>>> # All samples now have ["raw", "msc", "normalize"]
>>>
>>> # Add multiple processings
>>> indexer.add_processings(["scale", "center"])
>>> # All samples now have ["raw", "msc", "normalize", "scale", "center"]

Note

  • Operates on ALL rows in the index

  • Appends to the end of each processing list

  • Does not check for duplicates (allows intentional reprocessing)

  • Use this method when adding pipeline steps to existing data

add_rows(n_rows: int, new_indices: Dict[str, Any] | None = None) List[int][source]

Add rows to the indexer with optional column overrides.

add_rows_dict(n_rows: int, indices: Dict[str, Any], **kwargs) List[int][source]

Add rows using dictionary-based parameter specification.

This method provides a cleaner API for specifying row parameters using a dictionary, similar to the filtering API pattern.

Parameters:
  • n_rows – Number of rows to add

  • indices – Dictionary containing column specifications { “partition”: “train|test|val”, “sample”: [list of sample IDs] or single ID, “origin”: [list of origin IDs] or single ID, “group”: [list of groups] or single group, “branch”: [list of branches] or single branch, “processings”: processing configuration, “augmentation”: augmentation type, … (any other column)

  • }

  • **kwargs – Additional column overrides (take precedence over indices)

Returns:

List of sample indices that were added

Example

# Add rows with dictionary specification indexer.add_rows_dict(2, {

“partition”: “val”, “sample”: [100, 101], “group”: 5

})

add_samples(count: int, partition: Literal['train', 'test', 'val', 'validation'] = 'train', sample_indices: list[int] | ndarray | None = None, origin_indices: list[int] | ndarray | None = None, group: int | List[int] | None = None, branch: int | List[int] | None = None, processings: list[str] | List[list[str]] | None = None, augmentation: str | List[str] | None = None, **kwargs) List[int][source]

Add multiple samples to the indexer efficiently.

This is the primary method for registering samples in the index. Samples can be base samples or augmented samples, with flexible parameter specification.

Parameters:
  • count – Number of samples to add. Must be positive.

  • partition – Data partition (“train”, “test”, “val”). Default “train”.

  • sample_indices – Specific sample IDs to use. If None, auto-increment from current max. Can be: - int: Single ID repeated for all samples - List[int]: One ID per sample (length must match count) - np.ndarray: One ID per sample (length must match count)

  • origin_indices – Original sample IDs for augmented samples. If None, samples are treated as base samples (origin = sample). Same format options as sample_indices.

  • group – Group ID(s) for sample categorization. Can be: - int: Single group for all samples - List[int]: One group per sample (length must match count) - None: No group assignment

  • branch – Pipeline branch ID(s). Same format as group.

  • processings – Processing transformations applied. Can be: - None: Uses default [“raw”] - List[str]: Single list for all samples (e.g., [“raw”, “msc”]) - List[List[str]]: One list per sample (length must match count)

  • augmentation – Augmentation type(s). Same format as group, but allows None values.

  • **kwargs – Additional column values. Must match count if list/array.

Returns:

List of sample IDs that were added. Length equals count.

Return type:

List[int]

Raises:
  • ValueError – If count <= 0, or if list/array parameter lengths don’t match count.

  • TypeError – If parameter types are invalid.

Examples

>>> indexer = Indexer()
>>>
>>> # Add 5 base train samples with default settings
>>> ids = indexer.add_samples(5)
>>> # ids: [0, 1, 2, 3, 4]
>>>
>>> # Add test samples with specific processings
>>> test_ids = indexer.add_samples(
...     3,
...     partition="test",
...     processings=["raw", "msc", "savgol"]
... )
>>>
>>> # Add samples with different groups
>>> grouped_ids = indexer.add_samples(
...     4,
...     partition="train",
...     group=[1, 1, 2, 2],
...     processings=["raw"]
... )
>>>
>>> # Add augmented samples (references existing samples as origins)
>>> aug_ids = indexer.add_samples(
...     2,
...     partition="train",
...     origin_indices=[0, 1],  # Augmentations of samples 0 and 1
...     augmentation="flip"
... )

Note

  • Auto-incrementing sample IDs start from 0 or next available ID

  • Base samples have origin == sample (self-referencing)

  • Augmented samples have origin != sample (references base sample)

  • Single values are broadcast to all samples

  • Lists/arrays must match count exactly

add_samples_dict(count: int, indices: Dict[str, Any] | None = None, **kwargs) List[int][source]

Add multiple samples using dictionary-based parameter specification.

This method provides a cleaner API for specifying sample parameters using a dictionary, similar to the filtering API pattern.

Parameters:
  • count – Number of samples to add

  • indices – Dictionary containing column specifications { “partition”: “train|test|val”, “sample”: [list of sample IDs] or single ID, “origin”: [list of origin IDs] or single ID, “group”: [list of groups] or single group, “branch”: [list of branches] or single branch, “processings”: processing configuration, “augmentation”: augmentation type, … (any other column)

  • }

  • **kwargs – Additional column overrides (take precedence over indices)

Returns:

List of sample indices that were added

Example

# Add samples with dictionary specification indexer.add_samples_dict(3, {

“partition”: “train”, “group”: [1, 2, 1], “processings”: [“raw”, “msc”]

})

augment_rows(samples: List[int], count: int | List[int], augmentation_id: str) List[int][source]

Create augmented samples based on existing samples.

This method creates new augmented samples that reference existing base samples as their origins. The augmented samples inherit all attributes (partition, group, branch, processings) from their origin samples.

Parameters:
  • samples – List of sample IDs to augment. Must exist in the index.

  • count – Number of augmentations per sample. Can be: - int: Same count for all samples - List[int]: One count per sample (length must match samples)

  • augmentation_id – String identifier for the augmentation type (e.g., “flip”, “rotate”, “noise”).

Returns:

List of new sample IDs for the augmented samples.

Return type:

List[int]

Raises:

ValueError – If samples list is empty, if count list length doesn’t match samples length, or if any sample IDs are not found.

Examples

>>> indexer = Indexer()
>>> base_ids = indexer.add_samples(3, partition="train", processings=["raw", "msc"])
>>>
>>> # Create 2 augmentations for each base sample
>>> aug_ids = indexer.augment_rows(base_ids, 2, "flip")
>>> # aug_ids: [3, 4, 5, 6, 7, 8] (2 per sample)
>>>
>>> # Different counts per sample
>>> aug_ids2 = indexer.augment_rows([0, 1], [1, 3], "rotate")
>>> # aug_ids2: [9, 10, 11, 12] (1 for sample 0, 3 for sample 1)
>>>
>>> # Verify augmented samples reference their origins
>>> origin = indexer.get_origin_for_sample(aug_ids[0])
>>> print(origin)  # base_ids[0]

Note

  • Augmented samples inherit partition, group, branch, and processings from origins

  • origin field is set to the base sample ID

  • augmentation field is set to augmentation_id

  • Useful for data augmentation in ML pipelines (flips, rotations, noise, etc.)

property default_values: Dict[str, Any]

Get default values for backward compatibility.

Returns:

Default values used when parameters are None.

Return type:

Dict[str, Any]

property df: DataFrame

Get the underlying DataFrame for backward compatibility.

Returns:

The complete index DataFrame.

Return type:

pl.DataFrame

Note

Direct DataFrame access is provided for backward compatibility. Prefer using indexer methods when possible.

get_augmented_for_origins(origin_samples: List[int]) ndarray[source]

Get all augmented samples for given origin sample IDs.

This method is used to retrieve augmented versions of base samples, enabling two-phase selection that prevents data leakage across CV folds.

Parameters:

origin_samples – List of origin sample IDs to find augmented versions for. Can be empty list.

Returns:

Array of augmented sample IDs (dtype: np.int32). Only includes

samples where origin is in origin_samples AND sample != origin (actual augmented samples, not base samples).

Return type:

np.ndarray

Examples

>>> indexer = Indexer()
>>> indexer.add_samples(3, partition="train")
>>> indexer.augment_rows([0, 1], 2, "flip")
>>>
>>> # Get base samples
>>> base_samples = indexer.x_indices({"partition": "train"}, include_augmented=False)
>>> # base_samples: [0, 1, 2]
>>>
>>> # Get their augmented versions
>>> augmented = indexer.get_augmented_for_origins(base_samples.tolist())
>>> # augmented: [3, 4, 5, 6] (2 augmented each for samples 0 and 1)
>>>
>>> # Combine for full dataset
>>> all_samples = np.concatenate([base_samples, augmented])
>>> # all_samples: [0, 1, 2, 3, 4, 5, 6]

Note

This method does not filter by partition, group, or other criteria. It returns ALL augmented samples for the given origins, regardless of their attributes. Use x_indices() for filtered retrieval with automatic augmentation handling.

get_column_values(col: str, filters: Dict[str, Any] | None = None) List[Any][source]

Get column values, optionally filtered.

Parameters:
  • col – Column name to retrieve.

  • filters – Optional selector dictionary for filtering.

Returns:

Column values.

Return type:

List[Any]

Example

>>> partitions = indexer.get_column_values("partition")
>>> train_groups = indexer.get_column_values("group", {"partition": "train"})
get_excluded_samples(selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None) DataFrame[source]

Get DataFrame of excluded samples with their exclusion reasons.

Parameters:

selector – Optional filter criteria to narrow down the query. If None, returns all excluded samples.

Returns:

DataFrame containing excluded samples with columns:

sample, origin, partition, group, branch, exclusion_reason.

Return type:

pl.DataFrame

Examples

>>> indexer = Indexer()
>>> indexer.add_samples(5, partition="train")
>>> indexer.mark_excluded([0, 1], reason="outlier")
>>>
>>> # Get all excluded samples
>>> excluded_df = indexer.get_excluded_samples()
>>> print(excluded_df)
>>>
>>> # Get excluded samples from train partition only
>>> train_excluded = indexer.get_excluded_samples({"partition": "train"})

Note

Returns a Polars DataFrame for efficient processing. Use .to_pandas() if pandas DataFrame is needed.

get_exclusion_summary() Dict[str, Any][source]

Get summary statistics of exclusions by reason.

Returns:

Dictionary containing:
  • total_excluded: Total number of excluded samples

  • total_samples: Total number of samples in indexer

  • exclusion_rate: Ratio of excluded to total samples

  • by_reason: Dict mapping reason strings to counts

  • by_partition: Dict mapping partition names to excluded counts

Return type:

Dict[str, Any]

Examples

>>> indexer = Indexer()
>>> indexer.add_samples(10, partition="train")
>>> indexer.mark_excluded([0, 1], reason="outlier")
>>> indexer.mark_excluded([2], reason="low_quality")
>>>
>>> summary = indexer.get_exclusion_summary()
>>> print(summary)
>>> # {
>>> #     'total_excluded': 3,
>>> #     'total_samples': 10,
>>> #     'exclusion_rate': 0.3,
>>> #     'by_reason': {'outlier': 2, 'low_quality': 1},
>>> #     'by_partition': {'train': 3}
>>> # }
get_origin_for_sample(sample_id: int) int | None[source]

Get origin sample ID for a given sample.

With the current design, all samples have origin set: - Base samples: origin == sample (self-referencing) - Augmented samples: origin != sample (references base sample)

Parameters:

sample_id – Sample ID to look up.

Returns:

Origin sample ID, or None if sample not found in index.

Return type:

Optional[int]

Examples

>>> indexer = Indexer()
>>> indexer.add_samples(2, partition="train")
>>> indexer.augment_rows([0], 1, "flip")
>>>
>>> # For augmented sample
>>> origin = indexer.get_origin_for_sample(2)  # Sample 2 is augmentation of 0
>>> print(origin)  # 0
>>>
>>> # For base sample
>>> origin = indexer.get_origin_for_sample(0)  # Sample 0 is base
>>> print(origin)  # 0 (self-referencing)
>>>
>>> # For non-existent sample
>>> origin = indexer.get_origin_for_sample(999)
>>> print(origin)  # None

Note

This is a single-sample lookup. For batch operations, use y_indices() which is more efficient for retrieving origins for multiple samples.

mark_excluded(sample_indices: list[int] | ndarray, reason: str | None = None, cascade_to_augmented: bool = True) int[source]

Mark samples as excluded from training.

Excluded samples are automatically filtered out from x_indices() and y_indices() calls unless include_excluded=True is explicitly passed. This provides a non-destructive way to remove outliers or corrupted samples from training.

Parameters:
  • sample_indices – Sample IDs to exclude. Can be: - int: Single sample ID - List[int]: List of sample IDs - np.ndarray: Array of sample IDs

  • reason – Optional string describing why samples are excluded (e.g., “outlier”, “corrupted”, “low_quality”).

  • cascade_to_augmented – If True (default), also exclude augmented samples derived from the specified base samples. This prevents data leakage from augmented versions of excluded samples.

Returns:

Number of samples marked as excluded.

Return type:

int

Raises:

ValueError – If sample_indices is empty.

Examples

>>> indexer = Indexer()
>>> indexer.add_samples(5, partition="train")
>>> indexer.augment_rows([0, 1], 2, "flip")
>>>
>>> # Mark sample 0 as excluded (outlier detection)
>>> n_excluded = indexer.mark_excluded([0], reason="iqr_outlier")
>>> # n_excluded: 3 (sample 0 + 2 augmented versions)
>>>
>>> # Verify exclusion
>>> train_samples = indexer.x_indices({"partition": "train"})
>>> # Sample 0 and its augmentations no longer included
>>>
>>> # View excluded samples
>>> excluded_df = indexer.get_excluded_samples()

Note

  • Exclusion is non-destructive: data remains in the indexer

  • Use mark_included() to reverse exclusion

  • Excluded samples can still be accessed via include_excluded=True

  • Cascade prevents data leakage from augmented versions

mark_included(sample_indices: list[int] | ndarray | None = None, cascade_to_augmented: bool = True) int[source]

Remove exclusion flag from samples.

This method reverses the effect of mark_excluded(), re-including samples in x_indices() and y_indices() results.

Parameters:
  • sample_indices – Sample IDs to include. Can be: - int: Single sample ID - List[int]: List of sample IDs - np.ndarray: Array of sample IDs - None: Include ALL currently excluded samples

  • cascade_to_augmented – If True (default), also include augmented samples derived from the specified base samples.

Returns:

Number of samples marked as included.

Return type:

int

Examples

>>> indexer = Indexer()
>>> indexer.add_samples(5, partition="train")
>>> indexer.mark_excluded([0, 1], reason="outlier")
>>>
>>> # Re-include sample 0
>>> n_included = indexer.mark_included([0])
>>> # n_included: 1
>>>
>>> # Re-include all excluded samples
>>> n_included = indexer.mark_included()  # No argument = all excluded

Note

  • Clears both the excluded flag and exclusion_reason

  • Useful for iterative filtering or correcting previous exclusions

next_row_index() int[source]

Get the next available row index.

Returns:

Next row index (max + 1, or 0 if empty).

Return type:

int

Example

>>> next_idx = indexer.next_row_index()
next_sample_index() int[source]

Get the next available sample index.

Returns:

Next sample index (max + 1, or 0 if empty).

Return type:

int

Example

>>> next_idx = indexer.next_sample_index()
register_samples(count: int, partition: Literal['train', 'test', 'val', 'validation'] = 'train') List[int][source]

Register samples using the unified _append method.

register_samples_dict(count: int, indices: Dict[str, Any], **kwargs) List[int][source]

Register samples using dictionary-based parameter specification.

Parameters:
  • count – Number of samples to register

  • indices – Dictionary containing column specifications

  • **kwargs – Additional column overrides (take precedence over indices)

Returns:

List of sample indices that were registered

Example

indexer.register_samples_dict(5, {“partition”: “test”, “group”: 2})

replace_processings(source_processings: List[str], new_processings: List[str]) None[source]

Replace processing names across all samples.

Creates a mapping from old to new processing names and applies it to all processing lists in the index.

Parameters:
  • source_processings – List of existing processing names to replace.

  • new_processings – List of new processing names to set. Must have same length as source_processings.

Raises:
  • ValueError – If source_processings and new_processings have different lengths.

  • ValueError – If source_processings or new_processings is empty.

Examples

>>> indexer = Indexer()
>>> indexer.add_samples(5, processings=["raw", "old_msc", "savgol"])
>>>
>>> # Replace single processing
>>> indexer.replace_processings(["old_msc"], ["msc"])
>>> # Now all samples have ["raw", "msc", "savgol"]
>>>
>>> # Replace multiple processings
>>> indexer.replace_processings(
...     ["raw", "savgol"],
...     ["raw_v2", "savgol_v2"]
... )
>>> # Now all samples have ["raw_v2", "msc", "savgol_v2"]

Note

  • Operates on ALL rows in the index

  • Non-matched processings are left unchanged

  • Case-sensitive matching

  • Use this method when renaming processings after pipeline changes

reset_exclusions(selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None) int[source]

Remove all exclusion flags matching the selector.

This is a convenience method equivalent to calling mark_included() on all excluded samples matching the selector.

Parameters:

selector – Optional filter criteria. If None, resets ALL exclusions.

Returns:

Number of samples reset.

Return type:

int

Examples

>>> # Reset all exclusions
>>> n_reset = indexer.reset_exclusions()
>>>
>>> # Reset only train partition exclusions
>>> n_reset = indexer.reset_exclusions({"partition": "train"})
reset_processings(new_processings: List[str]) None[source]

Reset processing names for all samples to a new list.

This replaces the entire processing list for every sample with the provided list. Used when resetting feature storage (e.g. after merge).

Parameters:

new_processings – List of new processing names.

Raises:

ValueError – If new_processings is empty.

uniques(col: str) List[Any][source]

Get unique values in a column.

Parameters:

col – Column name.

Returns:

Unique values in the column.

Return type:

List[Any]

Example

>>> unique_partitions = indexer.uniques("partition")
update_by_filter(selector: Dict[str, Any] | DataSelector | ExecutionContext | None, updates: Dict[str, Any]) None[source]

Update rows matching a selector filter.

Parameters:
  • selector – Filter criteria dictionary (same format as x_indices).

  • updates – Dictionary of column:value pairs to update.

Example

>>> indexer.update_by_filter({"partition": "train", "group": 1}, {"branch": 2})
update_by_indices(sample_indices: list[int] | ndarray, updates: Dict[str, Any]) None[source]

Update rows by sample indices.

Parameters:
  • sample_indices – Sample IDs to update (int, list, or array).

  • updates – Dictionary of column:value pairs to update.

Example

>>> indexer.update_by_indices([0, 1, 2], {"group": 5})
x_indices(selector: Dict[str, Any] | DataSelector | ExecutionContext | None, include_augmented: bool = True, include_excluded: bool = False) ndarray[source]

Get sample indices with optional augmented sample aggregation.

This method implements two-phase selection to prevent data leakage: 1. Phase 1: Get base samples (sample == origin) 2. Phase 2: Get augmented versions of those base samples

Parameters:
  • selector – Filter criteria dictionary. Supported keys: - partition: “train”|”test”|”val” or list - group: int or list of ints - branch: int or list of ints - augmentation: str, list, or None for null check - Any other indexed columns

  • include_augmented – If True, include augmented versions of selected samples. If False, return only base samples (sample == origin). Default True for backward compatibility.

  • include_excluded – If True, include samples marked as excluded. If False (default), exclude samples marked as excluded=True. Use True for diagnostics, reporting, or viewing excluded samples.

Returns:

Array of sample indices (dtype: np.int32). When include_augmented=True,

includes base samples and their augmented versions. When False, only base samples where sample == origin.

Return type:

np.ndarray

Raises:

KeyError – If selector contains invalid column names.

Examples

>>> indexer = Indexer()
>>> indexer.add_samples(5, partition="train")
>>> indexer.augment_rows([0, 1], 2, "flip")
>>>
>>> # Get all train samples (base + augmented)
>>> all_train = indexer.x_indices({"partition": "train"})
>>> # Returns: [0, 1, 2, 3, 4, 5, 6, 7, 8] (5 base + 4 augmented)
>>>
>>> # Get only base train samples
>>> base_train = indexer.x_indices({"partition": "train"}, include_augmented=False)
>>> # Returns: [0, 1, 2, 3, 4] (5 base only)
>>>
>>> # Mark sample as excluded and filter it
>>> indexer.mark_excluded([0], reason="outlier")
>>> filtered = indexer.x_indices({"partition": "train"})
>>> # Returns: [1, 2, 3, 4, ...] (sample 0 and its augmentations excluded)
>>>
>>> # Include excluded samples (for diagnostics)
>>> all_samples = indexer.x_indices({"partition": "train"}, include_excluded=True)

Note

The two-phase selection ensures that augmented samples from other partitions are NOT included, preventing data leakage in cross-validation scenarios.

y_indices(selector: Dict[str, Any] | DataSelector | ExecutionContext | None, include_augmented: bool = True, include_excluded: bool = False) ndarray[source]

Get y indices for samples. Returns origin indices for y-value lookup.

For augmented samples, this method maps them to their base samples (origins) since y-values only exist for base samples. This enables proper target retrieval when working with augmented data.

Parameters:
  • selector – Filter criteria dictionary. Same format as x_indices(). See x_indices() for supported keys.

  • include_augmented – If True (default), include augmented samples mapped to their origins. If False, return only base sample origins (sample == origin). Default True for backward compatibility with original behavior.

  • include_excluded – If True, include samples marked as excluded. If False (default), exclude samples marked as excluded=True. Use True for diagnostics, reporting, or viewing excluded samples.

Returns:

Array of origin sample indices for y-value lookup (dtype: np.int32).

When include_augmented=True (default), augmented samples are included and each is mapped to its origin. When False, only base samples are returned (sample == origin).

Return type:

np.ndarray

Examples

>>> indexer = Indexer()
>>> indexer.add_samples(5, partition="train")
>>> indexer.augment_rows([0, 1], 2, "flip")
>>>
>>> # Get origins for all train samples (base + augmented)
>>> y_idx = indexer.y_indices({"partition": "train"})
>>> # Returns: [0, 1, 2, 3, 4, 0, 0, 1, 1]
>>> # (5 base origins + 4 augmented mapped to origins 0, 0, 1, 1)
>>>
>>> # Use with targets
>>> targets = np.array([10, 20, 30, 40, 50])  # 5 base samples
>>> x_idx = indexer.x_indices({"partition": "train"})
>>> y_idx = indexer.y_indices({"partition": "train"})
>>> X = all_spectra[x_idx]  # Get spectra (includes augmented)
>>> y = targets[y_idx]   # Get targets (augmented samples use origin's target)
>>>
>>> # Get only base sample origins
>>> base_origins = indexer.y_indices({"partition": "train"}, include_augmented=False)
>>> # Returns: [0, 1, 2, 3, 4]
>>>
>>> # Exclude filtered samples
>>> indexer.mark_excluded([0], reason="outlier")
>>> filtered_y = indexer.y_indices({"partition": "train"})
>>> # Sample 0 and its augmentations excluded from result

Note

The length and order of y_indices() output always corresponds to x_indices() output with the same selector and include_augmented parameters. This ensures X and y arrays are properly aligned for training.