nirs4all.data package

Subpackages

Submodules

Module contents

SpectroDataset - A specialized dataset API for spectroscopy data.

This module provides zero-copy, multi-source aware dataset management with transparent versioning and fine-grained indexing capabilities.

Submodules:

synthetic: Synthetic NIRS spectra generation tools.

class nirs4all.data.ColumnConfig(*, features: List[int] | List[str] | str | Dict[str, Any] | None = None, targets: List[int] | List[str] | str | Dict[str, Any] | None = None, metadata: List[int] | List[str] | str | Dict[str, Any] | None = None)[source]

Bases: BaseModel

Configuration for column selection and role assignment.

This is a stub for future implementation of the files syntax. Currently, column selection is handled by the loader directly.

features: List[int] | List[str] | str | Dict[str, Any] | None
metadata: List[int] | List[str] | str | Dict[str, Any] | None
model_config = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

targets: List[int] | List[str] | str | Dict[str, Any] | None
exception nirs4all.data.ColumnSelectionError[source]

Bases: Exception

Raised when column selection fails.

class nirs4all.data.ColumnSelector(case_sensitive: bool = True)[source]

Bases: object

Flexible column selector for DataFrames.

Supports multiple selection methods: - By name: [“col1”, “col2”] or “col_name” - By index: [0, 1, 2] or 0 - By range: “2:-1” (slice syntax as string) - By regex pattern: {“regex”: “^feature_.*”} - By exclusion: {“exclude”: [“id”, “date”]} - Combined: {“include”: [0, 1], “exclude”: [“id”]}

Example

>>> selector = ColumnSelector()
>>> result = selector.select(df, "2:-1")
>>> print(result.names)  # Column names in range
>>> print(result.data)   # Selected columns as DataFrame
parse_selection(selection: Any, available_columns: List[str]) List[int][source]

Parse a selection specification and return column indices.

This is a convenience method for when you don’t have a DataFrame but want to validate and resolve a selection.

Parameters:
  • selection – Column selection specification.

  • available_columns – List of available column names.

Returns:

List of column indices.

Raises:

ColumnSelectionError – If selection is invalid.

select(df: DataFrame, selection: int | str | List[int] | List[str] | Dict[str, Any] | slice | None) SelectionResult[source]

Select columns from a DataFrame.

Parameters:
  • df – The DataFrame to select columns from.

  • selection – Column selection specification. Can be: - None: Select all columns - int: Single column index - str: Single column name or range string (“2:-1”) - List[int]: List of column indices - List[str]: List of column names - Dict: Complex selection (see class docstring)

Returns:

SelectionResult with indices, names, and selected data.

Raises:

ColumnSelectionError – If selection is invalid or columns not found.

class nirs4all.data.ConfigNormalizer(parsers: List[BaseParser] | None = None)[source]

Bases: object

Normalizes dataset configurations from various input formats.

This class combines multiple parsers to handle: - Folder paths (auto-scanning) - JSON/YAML config files - Dictionary configurations (legacy format) - Sources configurations (multi-source format) - Variations configurations (preprocessed data / feature variations) - In-memory numpy arrays

All inputs are normalized to a canonical dictionary format that can be validated and processed by the loader.

Example

```python normalizer = ConfigNormalizer()

# From folder path config, name = normalizer.normalize(“/path/to/data/”)

# From config file config, name = normalizer.normalize(“config.yaml”)

# From dictionary config, name = normalizer.normalize({“train_x”: “data/X.csv”})

# From sources format config, name = normalizer.normalize({

“sources”: [

{“name”: “NIR”, “train_x”: “NIR_train.csv”}, {“name”: “MIR”, “train_x”: “MIR_train.csv”}

]

})

# From variations format config, name = normalizer.normalize({

“variations”: [

{“name”: “raw”, “train_x”: “X_raw.csv”}, {“name”: “snv”, “train_x”: “X_snv.csv”}

], “variation_mode”: “separate”

})

normalize(input_data: Any) Tuple[Dict[str, Any] | None, str][source]

Normalize a configuration to canonical format.

Parameters:

input_data – Configuration in any supported format.

Returns:

Tuple of (normalized_config, dataset_name). Returns (None, ‘Unknown_dataset’) if parsing fails.

class nirs4all.data.ConfigValidator(check_file_existence: bool = False, custom_validators: List[Callable] | None = None)[source]

Bases: object

Validator for dataset configurations.

Provides validation rules and methods for checking dataset configurations. Supports both legacy and new format configurations.

Example

```python validator = ConfigValidator() result = validator.validate(config_dict) if not result.is_valid:

for error in result.errors:

print(f”Error: {error}”)

```

validate(config: Dict[str, Any]) ValidationResult[source]

Validate a configuration dictionary.

Parameters:

config – Configuration dictionary to validate.

Returns:

ValidationResult with errors, warnings, and normalized config.

class nirs4all.data.DatasetConfigSchema(*, name: str | None = None, description: str | None = None, task_type: TaskType | None = None, train_x: Any | None = None, train_y: Any | None = None, train_group: Any | None = None, test_x: Any | None = None, test_y: Any | None = None, test_group: Any | None = None, train_x_filter: List[int] | None = None, train_y_filter: List[int] | None = None, train_group_filter: List[int] | None = None, test_x_filter: List[int] | None = None, test_y_filter: List[int] | None = None, test_group_filter: List[int] | None = None, global_params: LoadingParams | None = None, train_params: LoadingParams | None = None, test_params: LoadingParams | None = None, train_x_params: LoadingParams | None = None, train_y_params: LoadingParams | None = None, train_group_params: LoadingParams | None = None, test_x_params: LoadingParams | None = None, test_y_params: LoadingParams | None = None, test_group_params: LoadingParams | None = None, aggregate: str | bool | None = None, aggregate_method: AggregateMethod | None = None, aggregate_exclude_outliers: bool | None = None, files: List[FileConfig] | None = None, sources: List[SourceConfig] | None = None, shared_targets: SharedTargetsConfig | List[SharedTargetsConfig] | None = None, shared_metadata: SharedMetadataConfig | List[SharedMetadataConfig] | None = None, variations: List[VariationConfig] | None = None, variation_mode: VariationMode | None = None, variation_select: List[str] | None = None, variation_prefix: bool | None = None, folds: FoldConfig | List[Dict[str, Any]] | str | None = None, **extra_data: Any)[source]

Bases: BaseModel

Complete dataset configuration schema.

This model represents the normalized, validated form of a dataset configuration. It supports both the legacy format (train_x, test_x, etc.) and is designed to be extensible for the new files syntax.

All input configurations are normalized to this schema before processing.

aggregate: str | bool | None
aggregate_exclude_outliers: bool | None
aggregate_method: AggregateMethod | None
description: str | None
files: List[FileConfig] | None
folds: FoldConfig | List[Dict[str, Any]] | str | None
classmethod from_dict(data: Dict[str, Any]) DatasetConfigSchema[source]

Create from dictionary.

get_effective_params(partition: str, data_type: str) LoadingParams[source]

Get effective loading parameters for a specific data file.

Parameters are merged with precedence: specific > partition > global.

Parameters:
  • partition – ‘train’ or ‘test’

  • data_type – ‘x’, ‘y’, or ‘group’

Returns:

Merged LoadingParams.

get_selected_variations() List[VariationConfig][source]

Get the variations to use based on variation_mode and variation_select.

For mode=’select’, returns only the selected variations. For other modes, returns all variations.

Returns:

List of VariationConfig objects to use.

get_source_count() int[source]

Get the number of feature sources.

Returns:

Number of sources (1 for single-source, >1 for multi-source).

get_source_names() List[str][source]

Get names of all sources in this config.

Returns:

List of source names, or empty list if not multi-source.

get_variation_count() int[source]

Get the number of feature variations.

Returns:

Number of variations.

get_variation_names() List[str][source]

Get names of all variations in this config.

Returns:

List of variation names, or empty list if no variations.

global_params: LoadingParams | None
is_files_format() bool[source]

Check if this config uses new files format.

is_legacy_format() bool[source]

Check if this config uses legacy format (train_x/test_x).

is_multi_source() bool[source]

Check if this config has multiple feature sources.

is_sources_format() bool[source]

Check if this config uses the new sources format.

is_variations_format() bool[source]

Check if this config uses the variations format.

model_config = {'arbitrary_types_allowed': True, 'extra': 'allow', 'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str | None
classmethod normalize_aggregate_method(v: Any) AggregateMethod | None[source]

Normalize aggregate_method to enum.

classmethod normalize_task_type(v: Any) TaskType | None[source]

Normalize task_type to enum.

classmethod normalize_variation_mode(v: Any) VariationMode | None[source]

Normalize variation_mode to enum.

classmethod parse_loading_params(v: Any) LoadingParams | None[source]

Parse dict to LoadingParams if needed.

classmethod parse_shared_metadata(v: Any) SharedMetadataConfig | List[SharedMetadataConfig] | None[source]

Parse shared metadata configuration.

classmethod parse_shared_targets(v: Any) SharedTargetsConfig | List[SharedTargetsConfig] | None[source]

Parse shared targets configuration.

classmethod parse_sources(v: Any) List[SourceConfig] | None[source]

Parse sources list to SourceConfig objects.

classmethod parse_variations(v: Any) List[VariationConfig] | None[source]

Parse variations list to VariationConfig objects.

shared_metadata: SharedMetadataConfig | List[SharedMetadataConfig] | None
shared_targets: SharedTargetsConfig | List[SharedTargetsConfig] | None
sources: List[SourceConfig] | None
task_type: TaskType | None
test_group: Any | None
test_group_filter: List[int] | None
test_group_params: LoadingParams | None
test_params: LoadingParams | None
test_x: Any | None
test_x_filter: List[int] | None
test_x_params: LoadingParams | None
test_y: Any | None
test_y_filter: List[int] | None
test_y_params: LoadingParams | None
to_dict() Dict[str, Any][source]

Convert to dictionary, excluding None values.

to_legacy_format() Dict[str, Any][source]

Convert sources or variations format to legacy format for backward compatibility.

This converts the sources/variations syntax to the train_x/test_x array syntax that existing loaders understand.

Returns:

Dictionary with legacy format configuration.

train_group: Any | None
train_group_filter: List[int] | None
train_group_params: LoadingParams | None
train_params: LoadingParams | None
train_x: Any | None
train_x_filter: List[int] | None
train_x_params: LoadingParams | None
train_y: Any | None
train_y_filter: List[int] | None
train_y_params: LoadingParams | None
validate_data_sources() DatasetConfigSchema[source]

Validate that at least one data source is specified.

variation_mode: VariationMode | None
variation_prefix: bool | None
variation_select: List[str] | None
variations: List[VariationConfig] | None
variations_to_legacy_format() Dict[str, Any][source]

Convert variations format to legacy format for backward compatibility.

This converts the variations syntax to the train_x/test_x format that existing loaders understand. The conversion depends on variation_mode:

  • separate: Returns config for first variation (caller handles multiple runs)

  • concat: Returns list of paths to be concatenated

  • select: Returns config for selected variations only

  • compare: Same as separate (caller handles comparison)

Returns:

Dictionary with legacy format configuration.

class nirs4all.data.DatasetConfigs(configurations: Dict[str, Any] | List[Dict[str, Any]] | str | List[str], task_type: str | List[str] = 'auto', signal_type: str | SignalType | List[str | SignalType] | None = None, aggregate: str | bool | List[str | bool | None] | None = None, aggregate_method: str | List[str] | None = None, aggregate_exclude_outliers: bool | List[bool] | None = None)[source]

Bases: object

get_dataset(config, name) SpectroDataset[source]

Get dataset by config and name (backward compatible).

Note: When called directly, uses the first task_type (or ‘auto’ if single dataset). For proper per-dataset task_type handling, use iter_datasets() or get_dataset_at().

get_dataset_at(index) SpectroDataset[source]
get_datasets() List[SpectroDataset][source]
iter_datasets()[source]
class nirs4all.data.FeatureLayout(value)[source]

Bases: str, Enum

Feature data layout formats.

String values ensure backward compatibility with existing pipelines that use layout=”3d_transpose” as strings.

FLAT_2D = '2d'
FLAT_2D_INTERLEAVED = '2d_interleaved'
VOLUME_3D = '3d'
VOLUME_3D_TRANSPOSE = '3d_transpose'
class nirs4all.data.FeatureSource(padding: bool = True, pad_value: float = 0.0)[source]

Bases: object

Manages a 3D numpy array of features using modular components.

This class provides efficient storage and manipulation of feature data with multiple processing stages. Each sample can have multiple processing versions (e.g., raw, normalized, filtered), all stored in a single aligned 3D array.

The implementation uses a component-based architecture for better modularity: - ArrayStorage: Manages the 3D numpy array - ProcessingManager: Tracks processing IDs and their indices - HeaderManager: Manages feature headers and units - LayoutTransformer: Transforms arrays to different layouts - UpdateStrategy: Handles update operation logic - AugmentationHandler: Manages sample augmentation

padding

Whether to allow padding when adding features with fewer dimensions.

pad_value

Value to use for padding (default: 0.0).

add_samples(new_samples: ndarray, headers: List[str] | None = None) None[source]

Add new samples to the feature source.

Only allowed when there’s only one processing (raw). Samples are added as a new row in the array with a single processing dimension.

Parameters:
  • new_samples – 2D array of shape (n_samples, n_features).

  • headers – Optional list of feature header names.

Raises:

ValueError – If the dataset already has multiple processings, or if new_samples is not 2D.

add_samples_batch_3d(data: ndarray) None[source]

Add multiple samples with 3D data in a single operation - O(N) instead of O(N²).

This method is optimized for bulk insertion of augmented samples where each sample may have multiple processings.

Parameters:

data – 3D array of shape (n_samples, n_processings, n_features).

Raises:

ValueError – If data dimensions don’t match existing processings/features.

augment_samples(sample_indices: List[int], data: ndarray, processings: List[str], count_list: List[int]) None[source]

Create augmented samples by duplicating existing samples.

Parameters:
  • sample_indices – List of sample indices to augment.

  • data – Augmented feature data of shape (total_augmented_samples, n_features).

  • processings – Processing names for the augmented data.

  • count_list – Number of augmentations per sample.

property header_unit: str

Get the unit type of the headers.

Returns:

Unit type string (“cm-1”, “nm”, “none”, “text”, “index”).

property headers: List[str] | None

Get the feature headers.

Returns:

List of header strings, or None if not set.

property num_2d_features: int

Get total features when flattened to 2D.

Returns:

Product of processings and features dimensions.

property num_features: int

Get the number of features per processing.

Returns:

Number of features (third dimension of array).

property num_processings: int

Get the number of processing stages.

Returns:

Number of unique processings (second dimension of array).

property num_samples: int

Get the number of samples.

Returns:

Number of samples (first dimension of array).

property processing_ids: List[str]

Get a copy of the processing ID list.

Returns:

List of processing identifiers.

reset_features(features: ndarray, processings: List[str]) None[source]

Reset features and processings.

Replaces all features and processings with new data.

Parameters:
  • features – New feature data (2D or 3D).

  • processings – List of new processing names.

set_headers(headers: List[str] | None, unit: str = 'cm-1') None[source]

Set feature headers with unit metadata.

Parameters:
  • headers – List of header strings (wavelengths, feature names, etc.).

  • unit – Unit type - “cm-1” (wavenumber), “nm” (wavelength), “none”, “text”, “index”.

update_features(source_processings: list[str], features: list[ndarray] | list[list[ndarray]], processings: list[str]) None[source]

Add new features or replace existing ones.

Parameters:
  • source_processings – List of existing processing names to replace. Empty string “” means add new.

  • features – List of feature arrays, each of shape (n_samples, n_features), or single array.

  • processings – List of target processing names for the data.

Example

# Add new ‘savgol’ and ‘detrend’, replace ‘raw’ with ‘msc’ update_features([“”, “raw”, “”],

[savgol_data, msc_data, detrend_data], [“savgol”, “msc”, “detrend”])

x(indices: list[int] | ndarray, layout: str) ndarray[source]

Retrieve feature data in specified layout.

Parameters:
  • indices – Sample indices to retrieve.

  • layout – Output format: - “2d”: Flatten to (samples, processings * features) - “2d_interleaved”: Transpose then flatten to (samples, features * processings) - “3d”: Keep as (samples, processings, features) - “3d_transpose”: Transpose to (samples, features, processings)

Returns:

Feature array in requested layout.

Raises:

ValueError – If layout is unknown.

class nirs4all.data.FileConfig(*, path: str, partition: PartitionType | None = None, columns: ColumnConfig | None = None, params: LoadingParams | None = None, link_by: str | None = None)[source]

Bases: BaseModel

Configuration for a single data file.

This is a stub for future implementation of the files syntax. It describes how to load and interpret a single data file.

columns: ColumnConfig | None
model_config = {'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

params: LoadingParams | None
partition: PartitionType | None
path: str
class nirs4all.data.HeaderUnit(value)[source]

Bases: str, Enum

Feature header unit types.

Defines the type of measurement units used in feature headers.

INDEX = 'index'
NONE = 'none'
TEXT = 'text'
WAVELENGTH = 'nm'
WAVENUMBER = 'cm-1'
exception nirs4all.data.LinkingError[source]

Bases: Exception

Raised when sample linking fails.

class nirs4all.data.LoadingParams(*, delimiter: str | None = None, decimal_separator: str | None = None, has_header: bool | None = None, header_unit: HeaderUnit | str | None = None, signal_type: SignalTypeEnum | str | None = None, encoding: str | None = None, na_policy: NAPolicy | str | None = None, categorical_mode: CategoricalMode | str | None = None, **extra_data: Any)[source]

Bases: BaseModel

Parameters for loading data files.

These parameters control how CSV and other files are parsed. Parameters can be specified at global, partition, or file level, with more specific levels overriding general ones.

categorical_mode: CategoricalMode | str | None
decimal_separator: str | None
delimiter: str | None
encoding: str | None
has_header: bool | None
header_unit: HeaderUnit | str | None
merge_with(other: LoadingParams | None) LoadingParams[source]

Merge with another LoadingParams, self taking precedence.

Parameters:

other – Another LoadingParams to merge with (lower priority).

Returns:

New LoadingParams with merged values.

model_config = {'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

na_policy: NAPolicy | str | None
classmethod normalize_header_unit(v: Any) HeaderUnit | str | None[source]

Normalize header_unit to enum if possible.

classmethod normalize_signal_type(v: Any) SignalTypeEnum | str | None[source]

Normalize signal_type to enum if possible.

signal_type: SignalTypeEnum | str | None
class nirs4all.data.PartitionAssigner(default_random_state: int | None = None, base_path: Path | None = None)[source]

Bases: object

Flexible partition assigner for DataFrames.

Supports multiple partition methods: - Static: “train”, “test”, “predict” (assign entire DataFrame) - Column-based: {“column”: “split”, “train_values”: […], “test_values”: […]} - Percentage-based: {“train”: “80%”, “test”: “20%”, “shuffle”: True} - Index-based: {“train”: [0,1,2], “test”: [3,4,5]} - Index file: {“train_file”: “train_idx.txt”, “test_file”: “test_idx.txt”}

Example

>>> assigner = PartitionAssigner()
>>> result = assigner.assign(df, {"train": "80%", "test": "20%"})
>>> print(len(result.train_data), len(result.test_data))
DEFAULT_PREDICT_VALUES = ('predict', 'prediction', 'unknown')
DEFAULT_TEST_VALUES = ('test', 'testing', 'val', 'validation', 'valid')
DEFAULT_TRAIN_VALUES = ('train', 'training', 'cal', 'calibration')
PARTITION_NAMES = ('train', 'test', 'predict')
assign(df: DataFrame, partition: str | Dict[str, Any] | None) PartitionResult[source]

Assign rows to partitions.

Parameters:
  • df – The DataFrame to partition.

  • partition – Partition specification. Can be: - str: Static partition (“train”, “test”, “predict”) - dict: Complex partition (column-based, percentage, or index) - None: No partitioning (returns empty result)

Returns:

PartitionResult with indices and data for each partition.

Raises:

PartitionError – If partition specification is invalid.

concatenate_partitions(results: Sequence[PartitionResult]) PartitionResult[source]

Concatenate multiple partition results.

Useful when combining multiple files with the same partition. Indices are adjusted to account for concatenation order.

Parameters:

results – Sequence of PartitionResult objects.

Returns:

Combined PartitionResult.

class nirs4all.data.PartitionConfig(*, type: PartitionType | None = None, column: str | None = None, train_values: List[str] | None = None, test_values: List[str] | None = None, predict_values: List[str] | None = None, unknown_policy: Literal['error', 'ignore', 'train'] | None = None, train: str | List[int] | None = None, test: str | List[int] | None = None, predict: str | List[int] | None = None, shuffle: bool | None = None, random_state: int | None = None, stratify: str | None = None, train_file: str | None = None, test_file: str | None = None, predict_file: str | None = None, **extra_data: Any)[source]

Bases: BaseModel

Configuration for partition assignment.

Supports multiple partition methods: - Static: Assign entire file to a partition (use type) - Column-based: Partition based on column values (use column) - Percentage-based: Split by percentage (use train, test with percentages) - Index-based: Explicit index lists (use train, test with lists) - Index file: Load indices from external files (use train_file, test_file)

Examples

# Static partition (entire file) partition:

type: train

# Column-based partition partition:

column: “split” train_values: [“train”, “training”] test_values: [“test”, “validation”]

# Percentage-based partition partition:

train: “80%” test: “80%:100%” shuffle: true random_state: 42

# Index-based partition partition:

train: [0, 1, 2, 3, 4] test: [5, 6, 7, 8, 9]

# Index file partition partition:

train_file: “train_indices.txt” test_file: “test_indices.txt”

column: str | None
model_config = {'extra': 'allow'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

predict: str | List[int] | None
predict_file: str | None
predict_values: List[str] | None
random_state: int | None
shuffle: bool | None
stratify: str | None
test: str | List[int] | None
test_file: str | None
test_values: List[str] | None
to_assigner_spec() str | Dict[str, Any] | None[source]

Convert this config to a spec for PartitionAssigner.

Returns:

Partition specification for PartitionAssigner.assign().

train: str | List[int] | None
train_file: str | None
train_values: List[str] | None
type: PartitionType | None
unknown_policy: Literal['error', 'ignore', 'train'] | None
validate_partition_method() PartitionConfig[source]

Validate that partition specification is consistent.

exception nirs4all.data.PartitionError[source]

Bases: Exception

Raised when partition assignment fails.

class nirs4all.data.PartitionResult(train_indices: List[int] = <factory>, test_indices: List[int] = <factory>, predict_indices: List[int] = <factory>, train_data: DataFrame | None = None, test_data: DataFrame | None = None, predict_data: DataFrame | None = None, partition_column: str | None = None)[source]

Bases: object

Result of a partition assignment operation.

train_indices

List of indices assigned to training partition.

Type:

List[int]

test_indices

List of indices assigned to test partition.

Type:

List[int]

predict_indices

List of indices assigned to predict partition (no targets).

Type:

List[int]

train_data

DataFrame subset for training.

Type:

pandas.core.frame.DataFrame | None

test_data

DataFrame subset for testing.

Type:

pandas.core.frame.DataFrame | None

predict_data

DataFrame subset for prediction.

Type:

pandas.core.frame.DataFrame | None

partition_column

Name of column used for partitioning (if column-based).

Type:

str | None

get_data(partition: Literal['train', 'test', 'predict']) DataFrame | None[source]

Get data for a specific partition.

get_indices(partition: Literal['train', 'test', 'predict']) List[int][source]

Get indices for a specific partition.

property has_predict: bool

Check if predict data exists.

property has_test: bool

Check if test data exists.

property has_train: bool

Check if training data exists.

partition_column: str | None = None
predict_data: DataFrame | None = None
predict_indices: List[int]
test_data: DataFrame | None = None
test_indices: List[int]
train_data: DataFrame | None = None
train_indices: List[int]
class nirs4all.data.PredictionAnalyzer(predictions_obj: Predictions, dataset_name_override: str | None = None, config: ChartConfig | None = None, output_dir: str | None = None, cache_size: int = 50, default_aggregate: str | None = None, default_aggregate_method: str | None = None, default_aggregate_exclude_outliers: bool = False)[source]

Bases: object

Orchestrator for prediction analysis and visualization.

Provides a unified interface for creating various prediction visualizations. Delegates to specialized chart classes for rendering.

Includes a caching layer (PredictionCache) to avoid recomputing expensive aggregations when multiple charts use the same parameters. The cache is keyed by (aggregate, rank_metric, rank_partition, display_partition, group_by, filters) and stores the results of predictions.top() calls.

Leverages the refactored Predictions API (predictions.top(), PredictionResult, etc.) for efficient data access and avoids redundant calculations.

predictions

Predictions object containing prediction data.

dataset_name_override

Optional dataset name override for display.

config

ChartConfig for customization across all charts.

output_dir

Directory to save generated charts.

cache

PredictionCache for caching aggregated results.

default_aggregate

Default aggregation column for all visualization methods.

Example

>>> from nirs4all.data.predictions import Predictions
>>> predictions = Predictions.load('predictions.json')
>>> analyzer = PredictionAnalyzer(predictions)
>>>
>>> # Plot top 5 models - first call computes aggregation
>>> fig = analyzer.plot_top_k(k=5, aggregate='ID')
>>>
>>> # Plot heatmap - uses cached aggregation (fast!)
>>> fig = analyzer.plot_heatmap('model_name', 'preprocessings', aggregate='ID')
>>>
>>> # Check cache stats
>>> print(analyzer.get_cache_stats())
>>>
>>> # With default aggregation from dataset config
>>> runner = PipelineRunner()
>>> predictions, _ = runner.run(pipeline, DatasetConfigs(path, aggregate='sample_id'))
>>> analyzer = PredictionAnalyzer(predictions, default_aggregate=runner.last_aggregate)
>>> # All plots now use sample_id aggregation by default
>>> fig = analyzer.plot_top_k(k=5)  # Aggregated automatically
branch_summary(metrics: List[str] | None = None, display_partition: str = 'test', aggregate: str | None = None, as_dataframe: bool = True, **filters) DataFrame | Dict[str, Dict[str, Any]][source]

Generate summary statistics comparing branch performance.

Computes mean, std, min, max for each metric across branches.

Parameters:
  • metrics – List of metrics to compute (default: [‘rmse’, ‘r2’] or [‘balanced_accuracy’, ‘f1’] for classification).

  • display_partition – Partition to compute metrics from (default: ‘test’).

  • aggregate – If provided, aggregate predictions by this metadata column (e.g., ‘ID’) before computing statistics.

  • as_dataframe – If True, return pandas DataFrame. If False, return dict.

  • **filters – Additional filter criteria.

Returns:

  • branch_name: Branch identifier

  • branch_id: Numeric branch ID

  • count: Number of predictions

  • {metric}_mean: Mean value

  • {metric}_std: Standard deviation

  • {metric}_min: Minimum value

  • {metric}_max: Maximum value

Return type:

DataFrame or dict with branch summary statistics

Examples

>>> summary = analyzer.branch_summary(metrics=['rmse', 'r2'])
>>> print(summary.to_markdown())
>>> summary = analyzer.branch_summary(
...     metrics=['balanced_accuracy', 'f1'],
...     aggregate='ID'
... )
clear_cache() None[source]

Clear all caches.

Call this if the underlying predictions data has been modified to ensure fresh results are computed. Clears both: - Analyzer’s query result cache - Ranker’s aggregation and score caches

generate_report(output_path: str, branch_comparison: bool = True, include_diagrams: bool = True, include_tables: bool = True, metrics: List[str] | None = None, partition: str = 'test', title: str | None = None) str[source]

Generate HTML report with branch analysis.

Creates a comprehensive HTML report with branch comparisons, visualizations, and statistical tables.

Parameters:
  • output_path – Path for the output HTML file.

  • branch_comparison – If True, include branch comparison section.

  • include_diagrams – If True, include branch diagram visualization.

  • include_tables – If True, include summary statistics tables.

  • metrics – List of metrics to include (default: [‘rmse’, ‘r2’]).

  • partition – Partition for metrics (default: ‘test’).

  • title – Report title (default: ‘Branch Comparison Report’).

Returns:

Path to the generated HTML file.

Examples

>>> path = analyzer.generate_report(
...     'reports/branch_comparison.html',
...     branch_comparison=True,
...     metrics=['rmse', 'r2', 'mae']
... )
get_branch_ids() List[int][source]

Get list of unique branch IDs in predictions.

Returns:

List of branch IDs (empty list if no branches)

Examples

>>> branch_ids = analyzer.get_branch_ids()
>>> print(branch_ids)  # [0, 1, 2]
get_branches() List[str][source]

Get list of unique branch names in predictions.

Returns:

List of branch names (empty list if no branches)

Examples

>>> branches = analyzer.get_branches()
>>> print(branches)  # ['snv_pca', 'msc_detrend', 'derivative']
get_cache_stats() Dict[str, Any][source]

Get cache performance statistics.

Returns:

  • analyzer_cache: Query result cache stats

  • ranker_cache: Aggregation and score cache stats

Return type:

Dictionary with stats for both analyzer and ranker caches

get_cached_predictions(n: int, rank_metric: str, rank_partition: str = 'val', display_partition: str = 'test', display_metrics: List[str] | None = None, aggregate: str | None = None, aggregate_method: str | None = None, aggregate_exclude_outliers: bool | None = None, group_by: str | List[str] | None = None, aggregate_partitions: bool = True, **filters)[source]

Get predictions with caching support.

This method wraps predictions.top() with a caching layer. Charts should call this method instead of directly calling predictions.top() to benefit from caching.

The cache key includes: aggregate, rank_metric, rank_partition, display_partition, group_by, and all filters.

Parameters:
  • n – Number of top predictions to return.

  • rank_metric – Metric for ranking.

  • rank_partition – Partition for ranking (default: ‘val’).

  • display_partition – Partition for display (default: ‘test’).

  • display_metrics – List of metrics to compute for display.

  • aggregate – Aggregation column (e.g., ‘ID’) or None.

  • aggregate_method – Aggregation method (‘mean’, ‘median’, ‘vote’). If None, uses default_aggregate_method from constructor.

  • aggregate_exclude_outliers – If True, exclude outliers using T² before aggregation. If None, uses default_aggregate_exclude_outliers from constructor.

  • group_by – Grouping column(s) for deduplication.

  • aggregate_partitions – If True, include all partition data.

  • **filters – Additional filter criteria.

Returns:

PredictionResultsList from cache or fresh computation.

Example

>>> # First call computes and caches
>>> preds = analyzer.get_cached_predictions(
...     n=5, rank_metric='rmse', aggregate='ID'
... )
>>> # Second call with same params is instant
>>> preds = analyzer.get_cached_predictions(
...     n=5, rank_metric='rmse', aggregate='ID'
... )
plot_branch_boxplot(rank_metric: str | None = None, display_metric: str | None = None, display_partition: str = 'test', aggregate: str | None = None, figsize: tuple | None = None, config: ChartConfig | None = None, **filters) Figure[source]

Plot boxplot comparing score distributions across branches.

Creates a boxplot showing the distribution of metric values for each branch.

Parameters:
  • rank_metric – Metric for ranking models (default: auto-detect).

  • display_metric – Metric to display (default: same as rank_metric).

  • display_partition – Partition to display results from (default: ‘test’).

  • aggregate – If provided, aggregate predictions by this metadata column.

  • figsize – Figure size tuple (default: auto-computed).

  • config – Optional ChartConfig to override defaults.

  • **filters – Additional filter criteria.

Returns:

matplotlib Figure with branch comparison boxplot.

Examples

>>> fig = analyzer.plot_branch_boxplot(display_metric='rmse')
>>> fig = analyzer.plot_branch_boxplot(
...     display_metric='r2',
...     aggregate='ID'
... )
plot_branch_comparison(rank_metric: str | None = None, display_metric: str | None = None, display_partition: str = 'test', aggregate: str | None = None, show_ci: bool = True, ci_level: float = 0.95, figsize: tuple | None = None, config: ChartConfig | None = None, **filters) Figure[source]

Plot bar chart comparing branch performance with confidence intervals.

Creates a grouped bar chart showing mean metric values for each branch with optional confidence intervals.

Parameters:
  • rank_metric – Metric for ranking models (default: auto-detect).

  • display_metric – Metric to display (default: same as rank_metric).

  • display_partition – Partition to display results from (default: ‘test’).

  • aggregate – If provided, aggregate predictions by this metadata column.

  • show_ci – If True, show confidence intervals (default: True).

  • ci_level – Confidence level for intervals (default: 0.95).

  • figsize – Figure size tuple (default: auto-computed).

  • config – Optional ChartConfig to override defaults.

  • **filters – Additional filter criteria.

Returns:

matplotlib Figure with branch comparison bar chart.

Examples

>>> fig = analyzer.plot_branch_comparison(display_metric='rmse')
>>> fig = analyzer.plot_branch_comparison(
...     display_metric='r2',
...     aggregate='ID',
...     show_ci=True
... )
plot_branch_diagram(show_metrics: bool = True, metric: str | None = None, partition: str = 'test', figsize: tuple | None = None, title: str | None = None, config: Dict[str, Any] | None = None) Figure[source]

Plot DAG diagram showing the branching structure of the pipeline.

Creates a visual diagram showing shared steps, branch nodes, and post-branch models in a hierarchical layout.

Parameters:
  • show_metrics – If True, show metrics in branch nodes (default: True).

  • metric – Metric to display (default: auto-detect).

  • partition – Partition for metrics (default: ‘test’).

  • figsize – Figure size tuple (default: auto-computed).

  • title – Optional title for the diagram.

  • config – Additional configuration dict for BranchDiagram.

Returns:

matplotlib Figure with branch DAG diagram.

Examples

>>> fig = analyzer.plot_branch_diagram(metric='rmse')
>>> fig = analyzer.plot_branch_diagram(
...     show_metrics=True,
...     metric='r2',
...     partition='val'
... )
plot_branch_heatmap(y_var: str = 'fold_id', rank_metric: str | None = None, display_metric: str | None = None, display_partition: str = 'test', aggregate: str | None = None, config: ChartConfig | None = None, **kwargs) Figure[source]

Plot heatmap of branch performance across folds or other variable.

Creates a heatmap with branches on x-axis and another variable (e.g., fold_id) on y-axis.

Parameters:
  • y_var – Variable for y-axis (default: ‘fold_id’).

  • rank_metric – Metric for ranking (default: auto-detect).

  • display_metric – Metric to display (default: same as rank_metric).

  • display_partition – Partition to display (default: ‘test’).

  • aggregate – If provided, aggregate predictions by this metadata column.

  • config – Optional ChartConfig to override defaults.

  • **kwargs – Additional parameters passed to plot_heatmap.

Returns:

matplotlib Figure with branch heatmap.

Examples

>>> fig = analyzer.plot_branch_heatmap(display_metric='rmse')
>>> fig = analyzer.plot_branch_heatmap(
...     y_var='model_name',
...     display_metric='r2'
... )
plot_candlestick(variable: str, display_metric: str | None = None, display_partition: str = 'test', aggregate: str | None = None, config: ChartConfig | None = None, **kwargs) Figure[source]

Plot candlestick chart for score distribution by variable.

Parameters:
  • variable – Variable to group by (e.g., ‘model_name’, ‘preprocessings’).

  • display_metric – Metric to analyze (default: auto-detect from task type).

  • display_partition – Partition to display scores from (default: ‘test’).

  • aggregate – If provided, aggregate predictions by this metadata column or ‘y’. When ‘y’, groups by y_true values. When a column name (e.g., ‘ID’), groups by that metadata column. Aggregated predictions have recalculated metrics.

  • config – Optional ChartConfig to override analyzer’s default config for this chart.

  • **kwargs – Additional parameters (dataset_name, figsize, filters).

Returns:

matplotlib Figure object.

Example

>>> fig = analyzer.plot_candlestick('model_name', display_metric='rmse')
>>> fig = analyzer.plot_candlestick('model_name', display_metric='rmse', aggregate='ID')
plot_confusion_matrix(k: int = 5, rank_metric: str | None = None, rank_partition: str = 'val', display_metric: str | List[str] = '', display_partition: str = 'test', show_scores: bool = True, aggregate: str | None = None, config: ChartConfig | None = None, **kwargs) Figure | List[Figure][source]

Plot confusion matrices for top K classification models.

When multiple datasets are present and no dataset_name is specified, creates one figure per dataset.

Parameters:
  • k – Number of top models to show (default: 5).

  • rank_metric – Metric for ranking (default: auto-detect from task type).

  • rank_partition – Partition used for ranking models (default: ‘val’).

  • display_metric – Metric(s) to display in titles. Can be a single string (e.g., ‘accuracy’) or a list of strings for multiple metrics (e.g., [‘balanced_accuracy’, ‘accuracy’]). Metric names are shown in abbreviated form (default: same as rank_metric).

  • display_partition – Partition to display confusion matrix from (default: ‘test’).

  • show_scores – If True, show scores in chart titles (default: True).

  • aggregate – If provided, aggregate predictions by this metadata column or ‘y’.

  • config – Optional ChartConfig to override analyzer’s default config for this chart.

  • **kwargs – Additional parameters (dataset_name, figsize, filters).

Returns:

matplotlib Figure object or list of Figure objects (one per dataset).

Example

>>> fig = analyzer.plot_confusion_matrix(k=3, rank_metric='f1')
>>> fig = analyzer.plot_confusion_matrix(k=3, aggregate='ID')
>>> # Multiple metrics displayed with abbreviated names
>>> fig = analyzer.plot_confusion_matrix(
...     k=3,
...     display_metric=['balanced_accuracy', 'accuracy']
... )
plot_heatmap(x_var: str, y_var: str, rank_metric: str | None = None, rank_partition: str = 'val', display_metric: str = '', display_partition: str = 'test', normalize: bool = False, rank_agg: str = 'best', display_agg: str = 'best', show_counts: bool = True, local_scale: bool = False, column_scale: bool = False, aggregate: str | None = None, top_k: int | None = None, sort_by_value: bool = False, sort_by: str | None = None, config: ChartConfig | None = None, **kwargs) Figure[source]

Plot performance heatmap across two variables.

For each (x_var, y_var) cell: 1. Rank predictions by rank_metric on rank_partition using rank_agg 2. Display display_metric from display_partition using display_agg 3. Normalize per dataset if requested 4. Show counts if requested

Parameters:
  • x_var – Variable for x-axis (e.g., ‘model_name’, ‘preprocessings’).

  • y_var – Variable for y-axis (e.g., ‘dataset_name’, ‘partition’).

  • rank_metric – Metric used to rank/select models (default: auto-detect from task type).

  • rank_partition – Partition used for ranking models (default: ‘val’).

  • display_metric – Metric to display in heatmap (default: same as rank_metric).

  • display_partition – Partition to display scores from (default: ‘test’).

  • normalize – If True, show normalized scores in cells. Colors always use normalized (default: False).

  • rank_agg – Aggregation for ranking (‘best’, ‘worst’, ‘mean’, ‘median’) (default: ‘best’).

  • display_agg – Aggregation for display scores (‘best’, ‘worst’, ‘mean’, ‘median’) (default: ‘mean’).

  • show_counts – Show prediction counts in cells (default: True).

  • local_scale – If True, colorbar shows actual metric values; if False, shows 0-1 normalized (default: False).

  • column_scale – If True, normalize colors per column (best in column = 1.0). Automatically sets local_scale=False when enabled (default: False).

  • aggregate – If provided, aggregate predictions by this metadata column (e.g., ‘ID’).

  • top_k – If provided, show only top K models. Selection uses Borda count: first keeps top-1 per column, then ranks by Borda count.

  • sort_by_value – If True, sort Y-axis by ranking score (best first) instead of alphabetically. Uses rank_metric on rank_partition. Deprecated: use sort_by=’value’ instead.

  • sort_by – Sorting method for Y-axis (rows). Options: - None: Alphabetical sorting (default). - ‘value’: Sort by ranking score on rank_partition column. - ‘mean’: Sort by mean score across all columns. - ‘median’: Sort by median score across all columns. - ‘borda’: Sort by Borda count (sum of ranks across columns). - ‘condorcet’: Sort by pairwise wins (Copeland method). - ‘consensus’: Sort by consensus (geometric mean of normalized ranks).

  • config – Optional ChartConfig to override analyzer’s default config for this chart.

  • **kwargs – Additional filters (dataset_name, model_name, etc.).

Returns:

matplotlib Figure object.

Example

>>> # Rank on best val RMSE, display mean test RMSE
>>> fig = analyzer.plot_heatmap('model_name', 'dataset_name')
>>>
>>> # Rank on mean val R2, display best test F1
>>> fig = analyzer.plot_heatmap(
...     'model_name', 'dataset_name',
...     rank_metric='r2',
...     rank_agg='mean',
...     display_metric='f1',
...     display_agg='best'
... )
>>>
>>> # Use column normalization for comparing across partitions
>>> fig = analyzer.plot_heatmap(
...     'partition', 'model_name',
...     column_scale=True
... )
plot_histogram(display_metric: str | None = None, display_partition: str = 'test', aggregate: str | None = None, config: ChartConfig | None = None, **kwargs) Figure | List[Figure][source]

Plot score distribution histogram.

When multiple datasets are present and no dataset_name is specified, creates one figure per dataset.

Parameters:
  • display_metric – Metric to plot (default: auto-detect from task type).

  • display_partition – Partition to display scores from (default: ‘test’).

  • aggregate – If provided, aggregate predictions by this metadata column or ‘y’. When ‘y’, groups by y_true values. When a column name (e.g., ‘ID’), groups by that metadata column. Aggregated predictions have recalculated metrics.

  • config – Optional ChartConfig to override analyzer’s default config for this chart.

  • **kwargs – Additional parameters (dataset_name, bins, figsize, filters).

Returns:

matplotlib Figure object or list of Figure objects (one per dataset).

Example

>>> fig = analyzer.plot_histogram(display_metric='r2', display_partition='val')
>>> fig = analyzer.plot_histogram(display_metric='rmse', aggregate='ID')
plot_nested_branches(level1_var: str = 'branch_path_level1', level2_var: str = 'branch_path_level2', metric: str | None = None, partition: str = 'test', plot_type: str = 'grouped_bar', figsize: tuple | None = None, config: ChartConfig | None = None, **filters) Figure[source]

Plot nested branch comparison for hierarchical experiments.

Creates grouped bar charts or faceted plots for nested branch structures.

Parameters:
  • level1_var – Variable for first level grouping (outer group).

  • level2_var – Variable for second level grouping (inner group/x-axis).

  • metric – Metric to display (default: auto-detect).

  • partition – Partition for metrics (default: ‘test’).

  • plot_type – Type of plot (‘grouped_bar’, ‘facet’).

  • figsize – Figure size tuple.

  • config – Optional ChartConfig to override defaults.

  • **filters – Additional filter criteria.

Returns:

matplotlib Figure with nested branch visualization.

Examples

>>> # Compare outlier strategies × preprocessing
>>> fig = analyzer.plot_nested_branches(
...     level1_var='outlier_strategy',
...     level2_var='preprocessing',
...     metric='rmse'
... )
plot_top_k(k: int = 5, rank_metric: str | None = None, rank_partition: str = 'val', display_metric: str = '', display_partition: str = 'all', show_scores: bool = True, aggregate: str | None = None, config: ChartConfig | None = None, **kwargs) Figure | List[Figure][source]

Plot top K model comparison (scatter + residuals).

Models are ranked by rank_metric on rank_partition, then predictions from display_partition(s) are shown.

When multiple datasets are present and no dataset_name is specified, creates one figure per dataset.

Parameters:
  • k – Number of top models to show (default: 5).

  • rank_metric – Metric for ranking models (default: auto-detect from task type).

  • rank_partition – Partition used for ranking (default: ‘val’).

  • display_metric – Metric to display in titles (default: same as rank_metric).

  • display_partition – Partition(s) to display (‘all’ or specific partition).

  • show_scores – If True, show scores in chart titles (default: True).

  • aggregate – If provided, aggregate predictions by this metadata column or ‘y’. When ‘y’, groups by y_true values. When a column name (e.g., ‘ID’), groups by that metadata column. Aggregated predictions have recalculated metrics.

  • config – Optional ChartConfig to override analyzer’s default config for this chart.

  • **kwargs – Additional parameters (dataset_name, figsize, filters).

Returns:

matplotlib Figure object or list of Figure objects (one per dataset).

Example

>>> fig = analyzer.plot_top_k(k=3, rank_metric='r2')
>>> fig = analyzer.plot_top_k(k=3, aggregate='ID')  # Aggregated by ID
class nirs4all.data.PredictionResult[source]

Bases: dict

Enhanced dictionary for a single prediction with convenience methods.

Extends standard dict with property accessors and methods for saving, evaluating, and summarizing predictions.

Features:
  • Property accessors (id, model_name, dataset_name, etc.)

  • save_to_csv() - save individual result

  • eval_score() - compute metrics on-the-fly

  • summary() - generate tab report

Examples

>>> result = PredictionResult({
...     "id": "abc123",
...     "dataset_name": "wheat",
...     "model_name": "PLS",
...     "y_true": [1, 2, 3],
...     "y_pred": [1.1, 2.2, 3.3]
... })
>>> result.model_name
'PLS'
>>> scores = result.eval_score(["rmse", "r2"])
>>> result.save_to_csv("results")
__repr__() str[source]

String representation showing key info.

__str__() str[source]

String representation showing key info.

property config_name: str

Get config name.

property dataset_name: str

Get dataset name.

eval_score(metrics: List[str] | None = None) Dict[str, Any][source]

Evaluate scores for this prediction using specified metrics.

Parameters:

metrics – List of metrics to compute (if None, returns all available metrics)

Returns:

Dictionary of metric names to scores. For aggregated results: {“train”: {…}, “val”: {…}, “test”: {…}} For single partition: {“rmse”: …, “r2”: …, …}

Examples

>>> scores = result.eval_score(["rmse", "r2", "mae"])
>>> # For aggregated: scores = {"train": {"rmse": 0.5}, "val": {...}, "test": {...}}
>>> # For single: scores = {"rmse": 0.5, "r2": 0.9}
property fold_id: str

Get fold ID.

property id: str

Get prediction ID.

property model_name: str

Get model name.

property op_counter: int

Get operation counter.

save_to_csv(path_or_file: str = 'results', filename: str | None = None) None[source]

Save prediction result to CSV file.

Parameters:
  • path_or_file – Base path (folder) or complete file path (if ends with .csv)

  • filename – Optional filename (if path_or_file is a folder)

Examples

>>> result.save_to_csv("output")  # Saves to output/{dataset}/{id}.csv
>>> result.save_to_csv("output/my_result.csv")  # Saves to output/my_result.csv
>>> result.save_to_csv("output", "my_result.csv")  # Saves to output/my_result.csv
property step_idx: int

Get pipeline step index.

summary() str[source]

Generate a summary tab report for this prediction.

Works with both aggregated and non-aggregated prediction results.

Returns:

Formatted string with tab report

Examples

>>> report = result.summary()
>>> print(report)
class nirs4all.data.PredictionResultsList(predictions: List[Dict[str, Any] | PredictionResult] | None = None)[source]

Bases: list

List container for PredictionResult objects with batch operations.

Extends standard list with prediction-specific batch functionality.

Features:
  • save() - batch CSV export

  • get() - retrieve by ID

  • filter() - chain filtering

  • Iterator support

Examples

>>> results = PredictionResultsList([result1, result2, result3])
>>> results.save("output/predictions.csv")
>>> best = results.get("abc123")
>>> len(results)
3
__repr__() str[source]

String representation showing count and brief info.

get(prediction_id: str) PredictionResult | None[source]

Get a prediction by its ID.

Parameters:

prediction_id – The ID of the prediction to retrieve

Returns:

PredictionResult if found, None otherwise

Examples

>>> result = results.get("abc123")
save(path: str = 'results', filename: str | None = None) None[source]

Save all predictions to a single CSV file with structured headers.

CSV Structure:
  • Line 1: dataset_name

  • Line 2: model_classname + model_id

  • Line 3: fold_id

  • Line 4: partition

  • Lines 5+: prediction data (y_true, y_pred columns)

Parameters:
  • path – Base directory path (default: “results”)

  • filename – Optional filename (if None, auto-generated from first prediction)

Examples

>>> results.save("output")
>>> results.save("output", "my_predictions.csv")
class nirs4all.data.Predictions(filepath: str | List[str] | None = None)[source]

Bases: object

Main facade for prediction management.

Delegates to specialized components while maintaining backward-compatible public API.

Architecture:
  • Storage: PredictionStorage (DataFrame backend)

  • Serializer: PredictionSerializer (JSON/Parquet hybrid)

  • Indexer: PredictionIndexer (filtering operations)

  • Ranker: PredictionRanker (ranking and top-k)

  • Aggregator: PartitionAggregator (partition combining)

  • Query: CatalogQueryEngine (catalog operations)

Examples

>>> # Create and add predictions
>>> pred = Predictions()
>>> pred.add_prediction(
...     dataset_name="wheat",
...     model_name="PLS",
...     partition="test",
...     y_true=y_true,
...     y_pred=y_pred,
...     test_score=0.85
... )
>>>
>>> # Query top models
>>> top_5 = pred.top(n=5, rank_metric="rmse", rank_partition="val")
>>>
>>> # Save and load
>>> pred.save_to_file("predictions.json")
>>> loaded = Predictions.load("predictions.json")
__len__() int[source]

Return number of stored predictions.

__repr__() str[source]

String representation.

__str__() str[source]

User-friendly string representation.

add_prediction(dataset_name: str, dataset_path: str = '', config_name: str = '', config_path: str = '', pipeline_uid: str | None = None, step_idx: int = 0, op_counter: int = 0, model_name: str = '', model_classname: str = '', model_path: str = '', fold_id: str | int | None = None, sample_indices: List[int] | None = None, weights: List[float] | None = None, metadata: Dict[str, Any] | None = None, partition: str = '', y_true: ndarray | None = None, y_pred: ndarray | None = None, y_proba: ndarray | None = None, val_score: float | None = None, test_score: float | None = None, train_score: float | None = None, metric: str = 'mse', task_type: str = 'regression', n_samples: int = 0, n_features: int = 0, preprocessings: str = '', best_params: Dict[str, Any] | None = None, scores: Dict[str, Dict[str, float]] | None = None, branch_id: int | None = None, branch_name: str | None = None, exclusion_count: int | None = None, exclusion_rate: float | None = None, model_artifact_id: str | None = None, trace_id: str | None = None) str[source]

Add a single prediction to storage.

Delegates to PredictionStorage component.

Parameters:
  • dataset_name – Dataset name

  • dataset_path – Path to dataset file

  • config_name – Configuration name

  • config_path – Path to config file

  • pipeline_uid – Unique pipeline identifier

  • step_idx – Pipeline step index

  • op_counter – Operation counter

  • model_name – Model name

  • model_classname – Model class name

  • model_path – Path to saved model

  • fold_id – Cross-validation fold ID

  • sample_indices – Indices of samples used

  • weights – Sample weights

  • metadata – Additional metadata

  • partition – Data partition (train/val/test)

  • y_true – True labels

  • y_pred – Predicted labels

  • y_proba – Class probabilities for classification (shape: n_samples x n_classes)

  • val_score – Validation score

  • test_score – Test score

  • train_score – Training score

  • metric – Metric name

  • task_type – Task type (classification/regression)

  • n_samples – Number of samples

  • n_features – Number of features

  • preprocessings – Preprocessing steps applied

  • best_params – Best hyperparameters

  • scores – Dictionary of pre-computed scores per partition

  • branch_id – Branch identifier for pipeline branching (0-indexed)

  • branch_name – Human-readable branch name

  • exclusion_count – Number of samples excluded during training (outlier_excluder)

  • exclusion_rate – Rate of samples excluded (0.0-1.0, outlier_excluder)

  • model_artifact_id – Deterministic artifact ID for model loading (v2 system)

  • trace_id – Execution trace ID for deterministic prediction replay (v2 system)

Returns:

Prediction ID

add_predictions(dataset_name: str | List[str], dataset_path: str | List[str] = '', config_name: str | List[str] = '', config_path: str | List[str] = '', pipeline_uid: str | None | List[str | None] = None, step_idx: int | List[int] = 0, op_counter: int | List[int] = 0, model_name: str | List[str] = '', model_classname: str | List[str] = '', model_path: str | List[str] = '', fold_id: str | None | List[str | None] = None, sample_indices: List[int] | None | List[List[int] | None] = None, weights: List[float] | None | List[List[float] | None] = None, metadata: Dict[str, Any] | None | List[Dict[str, Any] | None] = None, partition: str | List[str] = '', y_true: ndarray | None | List[ndarray | None] = None, y_pred: ndarray | None | List[ndarray | None] = None, val_score: float | None | List[float | None] = None, test_score: float | None | List[float | None] = None, train_score: float | None | List[float | None] = None, metric: str | List[str] = 'mse', task_type: str | List[str] = 'regression', n_samples: int | List[int] = 0, n_features: int | List[int] = 0, preprocessings: str | List[str] = '', best_params: Dict[str, Any] | None | List[Dict[str, Any] | None] = None, scores: Dict[str, Dict[str, float]] | None | List[Dict[str, Dict[str, float]] | None] = None, branch_id: int | None | List[int | None] = None, branch_name: str | None | List[str | None] = None, trace_id: str | None | List[str | None] = None) None[source]

Add multiple predictions to storage (batch operation).

For each parameter, if it’s a single value it will be broadcast to all predictions. If it’s a list, each index corresponds to one prediction.

Parameters:
  • add_prediction (Same as)

  • lists (but can be single values or)

static aggregate(y_pred: ndarray, group_ids: ndarray, y_proba: ndarray | None = None, y_true: ndarray | None = None, method: str = 'mean', exclude_outliers: bool = False, outlier_threshold: float = 0.95) Dict[str, Any][source]

Aggregate predictions by group (e.g., same sample ID with multiple measurements).

For datasets with multiple samples per target (e.g., 4 measurements for each sample ID), this function averages predictions within each group to produce one prediction per group.

For regression: averages y_pred values within each group. For classification: averages y_proba (if available) then takes argmax,

or uses majority voting on y_pred if no probabilities.

Parameters:
  • y_pred – Predicted values array (n_samples,) or (n_samples, 1)

  • group_ids – Group identifiers array (n_samples,) - samples with same ID are grouped

  • y_proba – Optional class probabilities array (n_samples, n_classes) for classification

  • y_true – Optional true values array (n_samples,) for computing aggregated ground truth

  • method – Aggregation method - ‘mean’ (default), ‘median’, ‘vote’ (for classification)

  • exclude_outliers – If True, exclude outliers within each group before aggregation using Hotelling’s T² statistic. Useful when some measurements are anomalous.

  • outlier_threshold – Confidence level for T² outlier detection (default 0.95). Measurements with T² > chi2.ppf(threshold, 1) are excluded.

Returns:

  • ‘y_pred’: Aggregated predictions (n_groups,)

  • ’y_proba’: Aggregated probabilities (n_groups, n_classes) if input had y_proba

  • ’y_true’: Aggregated true values (n_groups,) if input had y_true

  • ’group_ids’: Unique group identifiers (n_groups,)

  • ’group_sizes’: Number of samples per group (n_groups,)

  • ’outliers_excluded’: Number of outliers excluded per group (if exclude_outliers=True)

Return type:

Dictionary containing

Examples

>>> # Aggregate 4 samples per ID for regression
>>> result = Predictions.aggregate(y_pred, sample_ids)
>>> aggregated_pred = result['y_pred']  # One prediction per unique ID
>>> # Aggregate for classification with probabilities
>>> result = Predictions.aggregate(y_pred, sample_ids, y_proba=proba)
>>> aggregated_proba = result['y_proba']  # Averaged probabilities
>>> # Aggregate with outlier exclusion
>>> result = Predictions.aggregate(y_pred, sample_ids, exclude_outliers=True)
>>> print(f"Outliers excluded: {result['outliers_excluded'].sum()}")
archive_to_catalog(catalog_dir: Path, pipeline_dir: Path, metrics: Dict[str, Any] = None) str[source]

Archive pipeline predictions to catalog.

Loads predictions CSV from pipeline directory, adds metadata, and saves to catalog.

Delegates to PredictionStorage for CSV loading.

Parameters:
  • catalog_dir – Catalog directory for storage

  • pipeline_dir – Pipeline directory containing predictions.csv

  • metrics – Optional metadata dict to add to predictions

Returns:

Generated prediction ID

clear() None[source]

Clear all predictions.

Delegates to PredictionStorage component.

clear_caches() None[source]

Clear all internal caches.

Call this when the underlying data has been modified to ensure fresh results are computed. This clears: - Ranker’s aggregation cache (cached aggregated y_true/y_pred) - Ranker’s score cache (cached metric scores)

Examples

>>> predictions.add_prediction(...)  # Add new data
>>> predictions.clear_caches()  # Clear to ensure fresh results
compare_across_datasets(pipeline_hash: str, metric: str = 'test_score') DataFrame[source]

Compare a pipeline’s performance across multiple datasets.

Delegates to CatalogQueryEngine component.

Parameters:
  • pipeline_hash – Pipeline UID to compare

  • metric – Metric column to compare

Returns:

DataFrame with one row per dataset

filter_by_branch(branch_id: int | None = None, branch_name: str | None = None, include_no_branch: bool = False, load_arrays: bool = True) List[Dict[str, Any]][source]

Filter predictions by branch context.

Convenience method for meta-model stacking to retrieve predictions from a specific branch in branched pipelines.

Parameters:
  • branch_id – Branch ID to filter by.

  • branch_name – Branch name to filter by.

  • include_no_branch – If True, include predictions with no branch info.

  • load_arrays – If True, load actual arrays from registry.

Returns:

List of predictions from the specified branch.

Examples

>>> # Get predictions from branch 0
>>> branch_preds = predictions.filter_by_branch(branch_id=0)
>>> # Get predictions from named branch
>>> branch_preds = predictions.filter_by_branch(branch_name='preprocessing_a')
filter_by_criteria(dataset_name: str | None = None, date_range: Tuple[str, str] | None = None, metric_thresholds: Dict[str, float] | None = None) DataFrame[source]

Filter predictions by multiple criteria (catalog query).

Delegates to CatalogQueryEngine component.

Parameters:
  • dataset_name – Filter by dataset name

  • date_range – Tuple of (start_date, end_date)

  • metric_thresholds – Dict of metric names to threshold values

Returns:

Filtered DataFrame

filter_predictions(dataset_name: str | None = None, partition: str | None = None, config_name: str | None = None, model_name: str | None = None, fold_id: str | None = None, step_idx: int | None = None, branch_id: int | None = None, branch_name: str | None = None, load_arrays: bool = True, **kwargs) List[Dict[str, Any]][source]

Filter predictions and return as list of dictionaries.

Delegates to PredictionIndexer for filtering, then deserializes results. Supports lazy loading of arrays for performance optimization.

Parameters:
  • dataset_name – Filter by dataset name

  • partition – Filter by partition

  • config_name – Filter by config name

  • model_name – Filter by model name

  • fold_id – Filter by fold ID

  • step_idx – Filter by step index

  • branch_id – Filter by branch ID (for pipeline branching)

  • branch_name – Filter by branch name (for pipeline branching)

  • load_arrays – If True, loads actual arrays from registry (slower). If False, returns metadata only with array references (fast).

  • **kwargs – Additional filter criteria

Returns:

List of prediction dictionaries with deserialized numpy arrays (if load_arrays=True) or metadata with array_id references (if load_arrays=False)

Examples

>>> # Fast metadata-only query
>>> preds = predictions.filter_predictions(dataset_name="wheat", load_arrays=False)
>>> # Full query with arrays
>>> preds = predictions.filter_predictions(dataset_name="wheat", load_arrays=True)
>>> # Filter by branch
>>> branch_preds = predictions.filter_predictions(branch_id=0)
get_best(metric: str = '', ascending: bool | None = None, aggregate_partitions: bool = False, **filters) PredictionResult | None[source]

Get the best prediction for a specific metric.

Delegates to PredictionRanker component.

Parameters:
  • metric – Metric to optimize

  • ascending – Sort order. If True, sorts ascending (lower is better). If False, sorts descending (higher is better). If None, infers from metric.

  • aggregate_partitions – If True, add partition data

  • **filters – Additional filter criteria

Returns:

Best prediction or None

get_cache_stats() Dict[str, Any][source]

Get cache statistics for debugging performance.

Returns a dictionary with hit rates and sizes for: - aggregation_cache: Cached aggregated arrays - score_cache: Cached metric scores

Returns:

Dictionary with cache statistics

Examples

>>> stats = predictions.get_cache_stats()
>>> print(f"Aggregation cache hit rate: {stats['aggregation_cache']['hit_rate']:.1%}")
get_configs() List[str][source]

Get list of unique config names.

Delegates to PredictionIndexer component.

Returns:

List of config names

get_datasets() List[str][source]

Get list of unique dataset names.

Delegates to PredictionIndexer component.

Returns:

List of dataset names

get_entry_partitions(entry: Dict) Dict[str, Dict | None][source]

Get all partition data for an entry.

Parameters:

entry – Prediction entry dictionary

Returns:

Dictionary with ‘train’, ‘val’, ‘test’ keys containing partition data

get_folds() List[str][source]

Get list of unique fold IDs.

Delegates to PredictionIndexer component.

Returns:

List of fold IDs

get_models() List[str][source]

Get list of unique model names.

Delegates to PredictionIndexer component.

Returns:

List of model names

get_models_before_step(step_idx: int, branch_id: int | None = None, unique_names: bool = True) List[str][source]

Get model names from steps before a given step index.

Convenience method for meta-model stacking to identify source models that can be used for stacking.

Parameters:
  • step_idx – Current step index (models before this are returned).

  • branch_id – Optional filter by branch ID.

  • unique_names – If True, return unique model names only.

Returns:

List of model names from previous steps.

Examples

>>> # Get models available for stacking at step 5
>>> source_models = predictions.get_models_before_step(step_idx=5)
get_oof_predictions(model_name: str | None = None, step_idx: int | None = None, branch_id: int | None = None, exclude_averaged: bool = True, load_arrays: bool = True) List[Dict[str, Any]][source]

Get out-of-fold (validation partition) predictions.

Convenience method for meta-model stacking to retrieve OOF predictions that can be used to construct training features without data leakage.

Parameters:
  • model_name – Optional filter by model name.

  • step_idx – Optional filter by step index.

  • branch_id – Optional filter by branch ID.

  • exclude_averaged – If True, exclude ‘avg’ and ‘w_avg’ fold entries. Default True for OOF reconstruction.

  • load_arrays – If True, load actual arrays from registry.

Returns:

List of validation partition predictions.

Examples

>>> # Get all OOF predictions
>>> oof = predictions.get_oof_predictions()
>>> # Get OOF predictions for a specific model
>>> oof = predictions.get_oof_predictions(model_name='PLS')
get_partitions() List[str][source]

Get list of unique partitions.

Delegates to PredictionIndexer component.

Returns:

List of partitions

get_prediction_by_id(prediction_id: str, load_arrays: bool = True) Dict[str, Any] | None[source]

Get a single prediction by its ID using direct lookup.

This is an O(1) lookup that avoids iterating all predictions, which is much faster than using filter_predictions for ID lookups.

Parameters:
  • prediction_id – Unique prediction identifier (hash ID)

  • load_arrays – If True, loads actual arrays from registry (slower). If False, returns metadata only with array references (fast).

Returns:

Prediction dictionary or None if not found

Examples

>>> pred = predictions.get_prediction_by_id("abc123def456")
>>> if pred:
...     print(f"Found model: {pred['model_name']}")
get_predictions_by_step(step_idx: int, partition: str | None = None, branch_id: int | None = None, load_arrays: bool = True, **kwargs) List[Dict[str, Any]][source]

Get predictions from a specific pipeline step.

Convenience method for meta-model stacking to retrieve predictions from source models at a specific step index.

Parameters:
  • step_idx – Pipeline step index to filter by.

  • partition – Optional partition filter (‘train’, ‘val’, ‘test’).

  • branch_id – Optional branch ID filter.

  • load_arrays – If True, load actual arrays from registry.

  • **kwargs – Additional filter criteria.

Returns:

List of prediction dictionaries from the specified step.

Examples

>>> # Get all predictions from step 2
>>> preds = predictions.get_predictions_by_step(step_idx=2)
>>> # Get validation predictions from step 2
>>> val_preds = predictions.get_predictions_by_step(
...     step_idx=2, partition='val'
... )
get_similar(**filter_kwargs) Dict[str, Any] | None[source]

Get the first prediction matching filter criteria.

Parameters:

**filter_kwargs – Filter criteria (same as filter_predictions)

Returns:

First matching prediction or None

get_summary_stats(metric: str = 'test_score') Dict[str, float][source]

Get summary statistics for a metric.

Delegates to CatalogQueryEngine component.

Parameters:

metric – Metric column name

Returns:

Dictionary with min, max, mean, median, std

get_unique_values(column: str) List[str][source]

Get unique values for a specific column.

Delegates to PredictionIndexer component.

Parameters:

column – Column name

Returns:

List of unique values

list_runs(dataset_name: str | None = None) DataFrame[source]

List all prediction runs with summary information.

Delegates to CatalogQueryEngine component.

Parameters:

dataset_name – Filter by dataset name (None for all)

Returns:

DataFrame with run summary

classmethod load(dataset_name: str | None = None, path: str = 'results', aggregate_partitions: bool = False, **filters) Predictions[source]

Load predictions from results directory structure.

Parameters:
  • dataset_name – Name of dataset to load (None for all)

  • path – Base path to search for predictions

  • aggregate_partitions – If True, aggregate partition data

  • **filters – Additional filter criteria

Returns:

Predictions instance with loaded data

load_from_file(filepath: str, merge: bool = True) None[source]

Load predictions from split Parquet format.

Supports: - Split Parquet with array registry (.meta.parquet + .arrays.parquet)

When called multiple times (e.g., from __init__ with multiple files), predictions are merged by default.

Parameters:
  • filepath – Path to .meta.parquet file

  • merge – If True and storage already has data, merge loaded data. If False, replace existing data. (default: True)

Examples

>>> predictions.load_from_file("predictions.meta.parquet")
>>> # Load additional predictions (merged)
>>> predictions.load_from_file("more_predictions.meta.parquet")
classmethod load_from_file_cls(filepath: str) Predictions[source]

Load predictions from JSON file as class method.

Parameters:

filepath – Input file path

Returns:

Predictions instance with loaded data (empty if file doesn’t exist)

classmethod load_from_parquet(catalog_dir: Path, prediction_ids: list = None) Predictions[source]

Load predictions from split Parquet storage.

Parameters:
  • catalog_dir – Path to catalog directory

  • prediction_ids – Optional list of prediction IDs to load

Returns:

Predictions instance with loaded data

classmethod merge_parquet_files(input_files: List[str], output_file: str, deduplicate: bool = True) Predictions[source]

Merge multiple prediction parquet files into a single output file.

This is a utility method to consolidate predictions from multiple experiment runs into a single file for easier analysis.

Parameters:
  • input_files – List of paths to .meta.parquet files to merge.

  • output_file – Output path for the merged .meta.parquet file.

  • deduplicate – If True, remove duplicate prediction IDs (keep first). Default is True.

Returns:

Predictions instance containing the merged data.

Raises:

Examples

>>> # Merge multiple experiment runs
>>> merged = Predictions.merge_parquet_files(
...     input_files=[
...         "run1/predictions.meta.parquet",
...         "run2/predictions.meta.parquet",
...         "run3/predictions.meta.parquet"
...     ],
...     output_file="combined/all_predictions.meta.parquet"
... )
>>> print(f"Merged {len(merged)} predictions")
>>> # Merge without deduplication
>>> merged = Predictions.merge_parquet_files(
...     input_files=["exp1.meta.parquet", "exp2.meta.parquet"],
...     output_file="merged.meta.parquet",
...     deduplicate=False
... )
merge_predictions(other: Predictions) None[source]

Merge predictions from another Predictions instance.

Delegates to PredictionStorage component.

Parameters:

other – Another Predictions instance to merge

property num_predictions: int

Get the number of stored predictions.

classmethod pred_long_string(entry: Dict, metrics: List[str] | None = None) str[source]

Generate long string representation of a prediction.

Parameters:
  • entry – Prediction dictionary

  • metrics – Optional list of metrics to display

Returns:

Long description string with config

classmethod pred_short_string(entry: Dict, metrics: List[str] | None = None, partition: str | List[str] = 'test') str[source]

Generate short string representation of a prediction.

Parameters:
  • entry – Prediction dictionary

  • metrics – Optional list of metrics to display

Returns:

Short description string

query_best(dataset_name: str | None = None, metric: str = 'test_score', n: int = 10, ascending: bool = False) DataFrame[source]

Query for best performing pipelines by metric (catalog query).

Delegates to CatalogQueryEngine component.

Parameters:
  • dataset_name – Filter by dataset name

  • metric – Metric column to rank by

  • n – Number of top results

  • ascending – If True, lower scores rank higher

Returns:

DataFrame with top n predictions

static save_all_to_csv(predictions: Predictions, path: str = 'results', aggregate_partitions: bool = False, **filters) None[source]

Save all predictions to CSV files.

Parameters:
  • predictions – Predictions instance

  • path – Base path for saving

  • aggregate_partitions – If True, save one file per model with all partitions

  • **filters – Additional filter criteria

static save_predictions_to_csv(y_true: ndarray | List[float] | None = None, y_pred: ndarray | List[float] | None = None, filepath: str = '', prefix: str = '', suffix: str = '') None[source]

Save y_true and y_pred arrays to a CSV file.

Parameters:
  • y_true – True values array

  • y_pred – Predicted values array

  • filepath – Output CSV file path

  • prefix – Optional prefix for column names

  • suffix – Optional suffix for column names

save_to_file(filepath: str, format: str = 'parquet') None[source]

Save predictions to split Parquet format with array registry.

Parameters:
  • filepath – Output file path (should end with .meta.parquet)

  • format – Format to use (only “parquet” is supported)

Examples

>>> predictions.save_to_file("predictions.meta.parquet")
save_to_parquet(catalog_dir: Path, prediction_id: str = None) tuple[source]

Save predictions as split Parquet (metadata + arrays separate).

Appends to existing files if they exist.

Delegates to PredictionStorage component.

Parameters:
  • catalog_dir – Directory for catalog storage

  • prediction_id – Optional prediction ID (generates UUID if None)

Returns:

Tuple of (meta_path, data_path)

to_dataframe() DataFrame[source]

Get predictions as Polars DataFrame.

to_dicts(load_arrays: bool = True) List[Dict[str, Any]][source]

Get predictions as list of dictionaries.

Parameters:

load_arrays – If True, hydrate array references with actual arrays. If False, returns metadata with array IDs only (faster).

Returns:

List of prediction dictionaries

to_pandas()[source]

Get predictions as pandas DataFrame.

top(n: int, rank_metric: str = '', rank_partition: str = 'val', display_metrics: List[str] | None = None, display_partition: str = 'test', aggregate_partitions: bool = False, ascending: bool | None = None, group_by_fold: bool = False, aggregate: str | None = None, group_by: str | List[str] | None = None, best_per_model: bool = False, return_grouped: bool = False, **filters) PredictionResultsList | Dict[Tuple, PredictionResultsList][source]

Get top n models ranked by a metric on a specific partition.

Delegates to PredictionRanker component.

Parameters:
  • n – Number of top models to return. When group_by is used, this means top N per group (e.g., top 3 per dataset).

  • rank_metric – Metric to rank by (if empty, uses record’s metric or val_score)

  • rank_partition – Partition to rank on (default: “val”)

  • display_metrics – Metrics to compute for display (default: task_type defaults)

  • display_partition – Partition to display results from (default: “test”)

  • aggregate_partitions – If True, add train/val/test nested dicts in results

  • ascending – Sort order. If True, sorts ascending (lower is better). If False, sorts descending (higher is better). If None, infers from metric.

  • group_by_fold – If True, include fold_id in model identity (rank per fold)

  • aggregate – If provided, aggregate predictions by this metadata column or ‘y’. When ‘y’, groups by y_true values. When a column name (e.g., ‘ID’), groups by that metadata column. Aggregated predictions have recalculated metrics.

  • group_by – Group predictions by column(s). When provided: - Returns top N results per group (not N total) - Each result includes a ‘group_key’ field for easy filtering - Can be a single column name (str) or list of columns - Examples: ‘dataset_name’, [‘model_name’, ‘dataset_name’]

  • best_per_model – DEPRECATED - Use group_by=[‘model_name’] instead. If True, keep only the best prediction per model_name.

  • return_grouped – If True and group_by is set, return a dict mapping group keys to PredictionResultsList instead of a flat list. Default: False (returns flat list sorted by global rank).

  • **filters – Additional filter criteria (dataset_name, config_name, etc.)

Returns:

PredictionResultsList containing top n

models per group, sorted by rank_metric. Each result includes ‘group_key’.

  • If return_grouped=True: Dict mapping group keys (tuples) to PredictionResultsList, one list per group with top n results each.

Return type:

  • If return_grouped=False (default)

Examples

>>> # Top 3 per dataset (flat list)
>>> top_per_ds = predictions.top(n=3, group_by='dataset_name')
>>> # Filter by group_key
>>> ds1_results = [r for r in top_per_ds if r['group_key'] == ('dataset1',)]
>>>
>>> # Top 3 per dataset (grouped dict)
>>> grouped = predictions.top(n=3, group_by='dataset_name', return_grouped=True)
>>> for key, results in grouped.items():
...     print(f"{key}: {len(results)} results")
class nirs4all.data.RoleAssigner(case_sensitive: bool = True, allow_overlap: bool = False)[source]

Bases: object

Assign columns to data roles (features, targets, metadata).

Validates that: - No column is assigned to multiple roles - At least features are assigned - Indices are valid

Supports the same column selection syntax as ColumnSelector.

Example

>>> assigner = RoleAssigner()
>>> result = assigner.assign(df, {
...     "features": "2:-1",       # All columns except first 2 and last
...     "targets": -1,            # Last column
...     "metadata": [0, 1]        # First 2 columns
... })
assign(df: DataFrame, roles: Dict[str, int | str | List[int] | List[str] | Dict[str, Any] | slice | None]) RoleAssignmentResult[source]

Assign columns to roles.

Parameters:
  • df – The DataFrame to assign roles from.

  • roles – Dictionary mapping role names to column selections. Supported roles: “features”, “targets”, “metadata” Also accepts: “x” (alias for features), “y” (alias for targets)

Returns:

RoleAssignmentResult with separated DataFrames.

Raises:

RoleAssignmentError – If assignment is invalid (overlap, missing features).

assign_auto(df: DataFrame, target_columns: int | str | List[int] | List[str] | Dict[str, Any] | slice | None = None, metadata_columns: int | str | List[int] | List[str] | Dict[str, Any] | slice | None = None) RoleAssignmentResult[source]

Auto-assign roles with specified targets and metadata.

Features are automatically set to all remaining columns.

Parameters:
  • df – The DataFrame to assign roles from.

  • target_columns – Column selection for targets (Y).

  • metadata_columns – Column selection for metadata.

Returns:

RoleAssignmentResult with separated DataFrames.

extract_y_from_x(df: DataFrame, y_columns: int | str | List[int] | List[str] | Dict[str, Any] | slice | None) RoleAssignmentResult[source]

Extract target columns from a features DataFrame.

This is useful when Y columns are embedded in the X data.

Parameters:
  • df – DataFrame containing both features and targets.

  • y_columns – Column selection for targets to extract.

Returns:

RoleAssignmentResult with features (remaining) and targets (extracted).

validate_roles(df: DataFrame, roles: Dict[str, int | str | List[int] | List[str] | Dict[str, Any] | slice | None]) List[str][source]

Validate a role specification without performing assignment.

Parameters:
  • df – The DataFrame to validate against.

  • roles – Role specification to validate.

Returns:

List of warning messages (empty if no warnings).

Raises:

RoleAssignmentError – If role specification is invalid.

exception nirs4all.data.RoleAssignmentError[source]

Bases: Exception

Raised when role assignment fails.

exception nirs4all.data.RowSelectionError[source]

Bases: Exception

Raised when row selection fails.

class nirs4all.data.RowSelector(default_random_state: int | None = None)[source]

Bases: object

Flexible row selector for DataFrames.

Supports multiple selection methods: - All rows: None - By index: [0, 1, 2] or 0 - By range: “0:100” (slice syntax as string) - By percentage: “0:80%” or “80%:100%” - By condition: {“where”: {“column”: “quality”, “op”: “>”, “value”: 0.5}} - Random sample: {“sample”: 100, “random_state”: 42} - Stratified sample: {“sample”: 100, “stratify”: “class”, “random_state”: 42} - Head/Tail: {“head”: 100} or {“tail”: 50}

Example

>>> selector = RowSelector()
>>> result = selector.select(df, "0:80%")
>>> print(len(result.data))  # 80% of rows
OPERATORS: Dict[str, Callable[[Any, Any], bool]] = {'!=': <function RowSelector.<lambda>>, '<': <function RowSelector.<lambda>>, '<=': <function RowSelector.<lambda>>, '==': <function RowSelector.<lambda>>, '>': <function RowSelector.<lambda>>, '>=': <function RowSelector.<lambda>>, 'contains': <function RowSelector.<lambda>>, 'endswith': <function RowSelector.<lambda>>, 'in': <function RowSelector.<lambda>>, 'isna': <function RowSelector.<lambda>>, 'not in': <function RowSelector.<lambda>>, 'notna': <function RowSelector.<lambda>>, 'regex': <function RowSelector.<lambda>>, 'startswith': <function RowSelector.<lambda>>}
select(df: DataFrame, selection: int | str | List[int] | Dict[str, Any] | slice | None) RowSelectionResult[source]

Select rows from a DataFrame.

Parameters:
  • df – The DataFrame to select rows from.

  • selection – Row selection specification. Can be: - None: Select all rows - int: Single row index - str: Range string (“0:100”) or percentage (“0:80%”) - List[int]: List of row indices - Dict: Complex selection (see class docstring)

Returns:

RowSelectionResult with indices, mask, and selected data.

Raises:

RowSelectionError – If selection is invalid or rows not found.

class nirs4all.data.SampleLinker(mode: str = 'inner', on_missing: str = 'warn')[source]

Bases: object

Link samples across multiple data files by key column.

Supports multiple linking modes: - “inner”: Keep only samples present in all sources (default) - “left”: Keep all samples from the first source - “outer”: Keep all samples from any source

Example

>>> linker = SampleLinker()
>>> result = linker.link(
...     {
...         "X": features_df,    # Has columns: sample_id, feature1, feature2
...         "Y": targets_df,     # Has columns: sample_id, target
...         "M": metadata_df,    # Has columns: sample_id, group, date
...     },
...     link_by="sample_id"
... )
>>> # Linked DataFrames have aligned rows
>>> X_linked = result.linked_data["X"]  # Without sample_id column
create_sample_index(sources: Dict[str, DataFrame], link_by: str) DataFrame[source]

Create a sample index showing key presence across sources.

Parameters:
  • sources – Dictionary of source DataFrames.

  • link_by – Key column name.

Returns:

DataFrame with keys as index and boolean columns per source.

Link multiple data sources by key column.

Parameters:
  • sources – Dictionary mapping source names to DataFrames. Each DataFrame must have the key column.

  • link_by – Name of the column to use for linking.

  • keep_key_column – Whether to keep the key column in output DataFrames.

Returns:

LinkingResult with linked DataFrames.

Raises:

LinkingError – If linking fails (missing key columns, no matches, etc.).

Link sources that are already aligned by row index.

This is a simpler linking method for sources that are guaranteed to have matching rows (same samples in same order).

Parameters:
  • sources – Dictionary of aligned DataFrames.

  • validate – Whether to validate that all sources have same row count.

Returns:

Dictionary of DataFrames (unchanged, just validated).

Raises:

LinkingError – If validation fails.

class nirs4all.data.SignalType(value)[source]

Bases: str, Enum

Spectral signal types for NIRS/spectroscopy data.

Defines the measurement type of spectral data. String values ensure backward compatibility with config files.

ABSORBANCE = 'absorbance'
AUTO = 'auto'
KUBELKA_MUNK = 'kubelka_munk'
LOG_1_R = 'log_1_r'
LOG_1_T = 'log_1_t'
PREPROCESSED = 'preprocessed'
REFLECTANCE = 'reflectance'
REFLECTANCE_PERCENT = 'reflectance%'
TRANSMITTANCE = 'transmittance'
TRANSMITTANCE_PERCENT = 'transmittance%'
UNKNOWN = 'unknown'
classmethod from_string(value: str) SignalType[source]

Parse signal type from various string representations.

Parameters:

value – String representation (e.g., “A”, “R”, “%R”, “absorbance”, etc.)

Returns:

SignalType enum value

property is_absorbance_like: bool

Check if this is absorbance or pseudo-absorbance.

property is_determinable: bool

Check if this is a known, determinable signal type.

property is_fraction: bool

Check if this is a fractional [0, 1] signal type.

property is_percent: bool

Check if this is a percentage-based signal type.

property is_reflectance_based: bool

Check if this is any reflectance-based signal.

property is_transmittance_based: bool

Check if this is any transmittance-based signal.

class nirs4all.data.SignalTypeDetector(wavelengths: ndarray | None = None, wavelength_unit: str = 'nm')[source]

Bases: object

Heuristic detector for spectral signal types.

Uses value ranges and optionally wavelength information to determine whether data is absorbance, reflectance, or transmittance.

WATER_BANDS_CM1 = [6897, 5155, 4000]
WATER_BANDS_NM = [1450, 1940, 2500]
detect(spectra: ndarray, confidence_threshold: float = 0.7) Tuple[SignalType, float, str][source]

Detect the signal type of spectral data.

Parameters:
  • spectra – Spectral data array of shape (n_samples, n_features)

  • confidence_threshold – Minimum confidence to return a definite type

Returns:

Tuple of (SignalType, confidence, reason_string)

class nirs4all.data.SpectroDataset(name: str = 'Unknown_dataset')[source]

Bases: object

Main dataset facade for spectroscopy and ML/DL pipelines.

Coordinates feature, target, and metadata management through specialized accessor interfaces. The primary API uses direct methods like dataset.x() and dataset.y() for convenience.

name

Dataset identifier

Type:

str

features

Feature data accessor (internal use)

Type:

FeatureAccessor

targets

Target data accessor (internal use)

Type:

TargetAccessor

metadata_accessor

Metadata accessor (internal use)

Type:

MetadataAccessor

folds

Cross-validation fold splits

Type:

List[Tuple]

Examples

>>> # Create dataset
>>> dataset = SpectroDataset("my_dataset")
>>> # Add samples
>>> dataset.add_samples(X_train, {"partition": "train"})
>>> dataset.add_targets(y_train)
>>> # Get data
>>> X = dataset.x({"partition": "train"})
>>> y = dataset.y({"partition": "train"})
__str__()[source]

Return readable dataset summary.

add_features(features: list[ndarray] | list[list[ndarray]], processings: list[str], source: int = -1) None[source]

Add processed feature versions to existing data.

add_merged_features(features: ndarray, processing_name: str = 'merged', source: int = 0, processing_names: List[str] | None = None) None[source]

Add merged features from branch merge operations.

This method is used by MergeController to store the output of branch merging operations. The merged features REPLACE all existing processings to become the new feature set for subsequent steps.

Parameters:
  • features

    Feature array to store: - 2D array of shape (n_samples, n_features): flattened features - 3D array of shape (n_samples, n_processings, n_features):

    features with preserved preprocessing dimension

  • processing_name – Name for the merged processing (default: “merged”). Used when features is 2D (single processing).

  • source – Target source index (default: 0, first source).

  • processing_names – Optional list of processing names for 3D features. If not provided, generates names like “merged_0”, “merged_1”, etc.

Raises:

ValueError – If features is not 2D or 3D, or sample count doesn’t match.

Example

>>> # 2D merged features (flattened)
>>> merged = np.concatenate([branch0_features, branch1_features], axis=1)
>>> dataset.add_merged_features(merged, "merged_snv_msc")
>>>
>>> # 3D merged features (preserved preprocessing dimension)
>>> merged_3d = np.stack([snv_features, msc_features], axis=1)
>>> dataset.add_merged_features(merged_3d, processing_names=["snv", "msc"])
add_metadata(data: ndarray | Any, headers: List[str] | None = None) None[source]

Add metadata rows (aligns with add_samples call order).

Parameters:
  • data – Metadata as 2D array (n_samples, n_cols) or DataFrame

  • headers – Column names (required if data is ndarray)

add_metadata_column(column: str, values: List | ndarray) None[source]

Add new metadata column.

Parameters:
  • column – Column name

  • values – Column values (must match number of samples)

add_processed_targets(processing_name: str, targets: ndarray, ancestor_processing: str = 'numeric', transformer: TransformerMixin | None = None) None[source]

Add processed target version (e.g., scaled, encoded).

add_samples(data: ndarray | list[ndarray], indexes: Dict[str, Any] | None = None, headers: List[str] | List[List[str]] | None = None, header_unit: str | List[str] | None = None) None[source]

Add feature samples to the dataset.

Parameters:
  • data – Feature data (single or multi-source)

  • indexes – Optional index dictionary (partition, group, branch, fold)

  • headers – Feature headers (wavelengths, feature names)

  • header_unit – Unit type for headers (“cm-1”, “nm”, “none”, “text”, “index”)

add_samples_batch(data: ndarray | List[ndarray], indexes_list: List[Dict[str, Any]]) None[source]

Add multiple samples in a single batch operation - O(N) instead of O(N²).

This method is optimized for bulk insertion of augmented samples. It performs only one array concatenation and one indexer append, making it dramatically faster than calling add_samples() in a loop.

Parameters:
  • data – 3D array of shape (n_samples, n_processings, n_features) for single source, or list of 3D arrays for multi-source datasets.

  • indexes_list – List of index dictionaries, one per sample.

Example

>>> # Batch add 100 augmented samples
>>> data = np.random.rand(100, 2, 500)
>>> indexes = [{"partition": "train", "origin": i, "augmentation": "noise"} for i in range(100)]
>>> dataset.add_samples_batch(data, indexes)
add_targets(y: ndarray) None[source]

Add target samples to the dataset.

property aggregate: str | None

Get the aggregation setting for sample-level prediction aggregation.

Returns:

No aggregation - ‘y’: Aggregate by target values (y_true) - str: Aggregate by specified metadata column name

Return type:

  • None

Example

>>> dataset.aggregate
'sample_id'  # Predictions will be aggregated by sample_id column
property aggregate_exclude_outliers: bool

Get whether T² outlier exclusion is enabled for aggregation.

Returns:

True if outliers should be excluded before aggregation

Return type:

bool

property aggregate_method: str

Get the aggregation method for sample-level prediction aggregation.

Returns:

Aggregation method (‘mean’, ‘median’, or ‘vote’)

Return type:

str

Example

>>> dataset.aggregate_method
'mean'  # Predictions will be averaged within groups
property aggregate_outlier_threshold: float

Get the outlier detection threshold for T² exclusion.

Returns:

Confidence level (0-1) for chi-square critical value

Return type:

float

augment_samples(data: ndarray | list[ndarray], processings: list[str], augmentation_id: str, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, count: int | List[int] = 1) List[int][source]

Create augmented versions of existing samples.

detect_signal_type(src: int = 0, force_redetect: bool = False) Tuple[SignalType, float, str][source]

Detect signal type using heuristics.

Uses value range analysis and optionally wavelength band direction to determine the most likely signal type.

Parameters:
  • src – Source index (default: 0)

  • force_redetect – If True, ignores cached/forced values and re-runs detection

Returns:

Tuple of (SignalType, confidence, reason_string)

Example

>>> signal_type, confidence, reason = dataset.detect_signal_type()
>>> print(f"Detected {signal_type.value} ({confidence:.0%}): {reason}")
features_processings(src: int) List[str][source]

Get processing names for a source.

features_sources() int[source]

Get number of feature sources.

float_headers(src: int = 0) ndarray[source]

Get headers as float array (legacy method).

WARNING: This method assumes headers are numeric and doesn’t handle unit conversion. Use wavelengths_cm1() or wavelengths_nm() for wavelength data.

Parameters:

src – Source index

Returns:

Headers converted to float array

Raises:

ValueError – If headers cannot be converted to float

property folds: List[Tuple[List[int], List[int]]]

Get cross-validation folds.

get_dataset_metadata(include_y_stats: bool = True) Dict[str, Any][source]

Get comprehensive dataset metadata for run manifests.

Returns metadata suitable for efficient path resolution and dataset version tracking in run manifests.

Parameters:

include_y_stats – If True, include target variable statistics

Returns:

  • name: Dataset name

  • path: Original file path (if set)

  • hash: Content hash (if computed)

  • file_size: File size in bytes (if available)

  • n_samples: Number of samples

  • n_features: Number of features

  • n_sources: Number of feature sources

  • task_type: Classification or regression

  • num_classes: Number of classes (classification only)

  • y_columns: Target column names

  • y_stats: Target statistics (min, max, mean, std)

  • wavelength_range: [min, max] wavelength

  • wavelength_unit: Unit (nm, cm-1)

  • signal_types: List of signal types per source

  • metadata_columns: Available metadata columns

Return type:

Dict with

Example

>>> dataset = SpectroDataset.load("wheat.n4a")
>>> meta = dataset.get_dataset_metadata()
>>> print(meta["n_samples"], meta["y_stats"])
get_merged_features(processing_name: str = 'merged', source: int = 0, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None) ndarray[source]

Get merged features by processing name.

Retrieves features that were added via add_merged_features(). Since merged features replace all existing processings, this returns the features for the single merged processing.

Parameters:
  • processing_name – Name of the merged processing (default: “merged”).

  • source – Source index to get features from (default: 0).

  • selector – Optional sample filter.

Returns:

2D array of merged features (n_samples, n_merged_features).

Raises:

ValueError – If the processing name doesn’t exist.

Example

>>> X_merged = dataset.get_merged_features("merged_snv_msc")
>>> print(X_merged.shape)  # (n_samples, n_merged_features)
header_unit(src: int = 0) str[source]

Get the unit type of headers for a data source.

Parameters:

src – Source index

Returns:

“cm-1”, “nm”, “none”, “text”, “index”

Return type:

Unit string

headers(src: int) List[str][source]

Get feature headers for a source.

index_column(col: str, filter: Dict[str, Any] = {}) List[int][source]

Get values from index column.

property is_classification: bool

Check if dataset is for classification task.

is_multi_source() bool[source]

Check if dataset has multiple feature sources.

property is_regression: bool

Check if dataset is for regression task.

keep_sources(source_indices: int | List[int]) None[source]

Keep only specified sources, removing all others.

Used after merge operations with output_as=”features” to consolidate to a single source. This is called automatically by MergeController when output_as=”features” is used.

Parameters:

source_indices – Single source index or list of source indices to keep.

Raises:

ValueError – If source indices are invalid.

Example

>>> # After merge with output_as="features", keep only source 0
>>> dataset.keep_sources(0)
metadata(selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, columns: List[str] | None = None, include_augmented: bool = True)[source]

Get metadata as DataFrame.

Parameters:
  • selector – Filter selector (e.g., {“partition”: “train”})

  • columns – Specific columns to return (None = all)

  • include_augmented – If True, include augmented versions of selected samples. Default True for backward compatibility.

Returns:

Polars DataFrame with metadata

metadata_column(column: str, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, include_augmented: bool = True) ndarray[source]

Get single metadata column as array.

Parameters:
  • column – Column name

  • selector – Filter selector (e.g., {“partition”: “train”})

  • include_augmented – If True, include augmented versions of selected samples. Default True for backward compatibility.

Returns:

Numpy array of column values

property metadata_columns: List[str]

Get list of metadata column names.

metadata_numeric(column: str, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, method: Literal['label', 'onehot'] = 'label', include_augmented: bool = True) Tuple[ndarray, Dict][source]

Get numeric encoding of metadata column.

Parameters:
  • column – Column name

  • selector – Filter selector (e.g., {“partition”: “train”})

  • method – “label” for label encoding or “onehot” for one-hot encoding

  • include_augmented – If True, include augmented versions of selected samples. Default True for backward compatibility.

Returns:

Tuple of (numeric_array, encoding_info)

property n_sources: int

Get number of feature sources.

property num_classes: int

Get the number of unique classes for classification tasks.

property num_features: List[int] | int

Get number of features per source.

property num_folds: int

Return the number of folds.

property num_samples: int

Get total number of samples.

print_summary() None[source]

Print a comprehensive summary of the dataset.

Shows counts, dimensions, number of sources, target versions, etc.

replace_features(source_processings: list[str], features: list[ndarray] | list[list[ndarray]], processings: list[str], source: int = -1) None[source]

Replace existing processed features with new versions.

reshape_reps_to_preprocessings(config: RepetitionConfig) None[source]

Transform repetitions into additional preprocessing slots.

Each repetition becomes a new preprocessing dimension, reducing the number of samples but increasing the preprocessing count. This enables multi-preprocessing modeling strategies.

Input: n_sources × (n_samples, n_pp, n_features) Output: n_sources × (n_unique_samples, n_pp × n_reps, n_features)

Parameters:

config – RepetitionConfig with column and options.

Raises:

ValueError – If grouping column not found, groups have unequal sizes and on_unequal=”error”, or no valid groups found.

Example

>>> # With 120 samples (30 unique × 4 reps), 1 source, 1 pp, 500 features
>>> config = RepetitionConfig(column="Sample_ID")
>>> dataset.reshape_reps_to_preprocessings(config)
>>> # Result: 1 source × (30 samples, 4 pp, 500 features)
reshape_reps_to_sources(config: RepetitionConfig) None[source]

Transform repetitions into separate data sources.

Each repetition index becomes a new source, reducing the number of samples but increasing the number of sources. This enables per-source branching and multi-source modeling strategies.

Input: n_sources × (n_samples, n_pp, n_features) Output: (n_sources × n_reps) × (n_unique_samples, n_pp, n_features)

Parameters:

config – RepetitionConfig with column and options.

Raises:

ValueError – If grouping column not found, groups have unequal sizes and on_unequal=”error”, or no valid groups found.

Example

>>> # With 120 samples (30 unique × 4 reps), 1 source, 500 features
>>> config = RepetitionConfig(column="Sample_ID")
>>> dataset.reshape_reps_to_sources(config)
>>> # Result: 4 sources × (30 samples, 1 pp, 500 features)
set_aggregate(value: str | bool | None) None[source]

Set the aggregation behavior for sample-level prediction aggregation.

When set, predictions from multiple spectra of the same biological sample (as identified by the aggregation key) will be aggregated automatically during scoring and reporting.

Parameters:

value – Aggregation setting - None: No aggregation (default behavior) - True: Aggregate by y_true values (target grouping) - str: Aggregate by specified metadata column (e.g., ‘sample_id’, ‘ID’)

Example

>>> dataset.set_aggregate('sample_id')  # Aggregate by sample_id metadata column
>>> dataset.set_aggregate(True)  # Aggregate by y values
>>> dataset.set_aggregate(None)  # Disable aggregation
set_aggregate_exclude_outliers(value: bool, threshold: float = 0.95) None[source]

Enable/disable T² based outlier exclusion before aggregation.

When enabled, uses Hotelling’s T² statistic to identify and exclude outlier measurements within each sample group before averaging.

Parameters:
  • value – True to enable outlier exclusion, False to disable

  • threshold – Confidence level for outlier detection (0-1, default 0.95)

Example

>>> dataset.set_aggregate_exclude_outliers(True, threshold=0.95)
set_aggregate_method(value: str | None) None[source]

Set the aggregation method for sample-level prediction aggregation.

Parameters:

value – Aggregation method - None: Use default method (mean for regression, vote for classification) - ‘mean’: Average predictions within each group - ‘median’: Median prediction within each group - ‘vote’: Majority voting for classification

Example

>>> dataset.set_aggregate_method('median')
set_content_hash(hash_value: str) None[source]

Set the content hash for version tracking.

Parameters:

hash_value – Content hash string

set_folds(folds_iterable) None[source]

Set cross-validation folds from an iterable of (train_idx, val_idx) tuples.

set_signal_type(signal_type: str | SignalType, src: int = 0, forced: bool = True) None[source]

Set the signal type for a data source.

Parameters:
  • signal_type – Signal type (string or SignalType enum)

  • src – Source index (default: 0)

  • forced – If True, prevents auto-detection from overriding (default: True)

Example

>>> dataset.set_signal_type("absorbance", src=0)
>>> dataset.set_signal_type(SignalType.REFLECTANCE_PERCENT, src=1)
set_source_path(path: str) None[source]

Set the source file path for metadata tracking.

Parameters:

path – Path to the original dataset file

set_task_type(task_type: str | TaskType, forced: bool = True) None[source]

Set the task type explicitly.

Parameters:
  • task_type – Task type as string (‘regression’, ‘binary_classification’, ‘multiclass_classification’) or TaskType enum

  • forced – If True, prevents auto-detection from overriding this value in subsequent y_processing steps (e.g., after MinMaxScaler). Default True.

short_preprocessings_str() str[source]

Get shortened processing string for display.

signal_type(src: int = 0) SignalType[source]

Get the signal type for a data source.

If not set, attempts auto-detection based on value ranges and optionally wavelength band analysis.

Parameters:

src – Source index (default: 0)

Returns:

SignalType enum value

Example

>>> signal = dataset.signal_type(0)
>>> if signal == SignalType.REFLECTANCE:
...     dataset.convert_to_absorbance(0)
property signal_types: List[SignalType]

Get signal types for all sources.

Returns:

List of SignalType values, one per source

property task_type: TaskType | None

Get the detected task type.

update_features(source_processings: list[str], features: list[ndarray] | list[list[ndarray]], processings: list[str], source: int = -1) None[source]

Update existing processed features.

update_metadata(column: str, values: List | ndarray, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, include_augmented: bool = True) None[source]

Update metadata values for selected samples.

Parameters:
  • column – Column name

  • values – New values

  • selector – Filter selector (None = all samples)

  • include_augmented – If True, include augmented versions of selected samples. Default True for backward compatibility.

wavelengths_cm1(src: int = 0) ndarray[source]

Get wavelengths in cm⁻¹ (wavenumber), converting from nm if needed.

Parameters:

src – Source index

Returns:

Wavelengths in cm⁻¹ as float array

Raises:

ValueError – If headers cannot be converted to wavelengths

wavelengths_nm(src: int = 0) ndarray[source]

Get wavelengths in nm, converting from cm⁻¹ if needed.

Parameters:

src – Source index

Returns:

Wavelengths in nm as float array

Raises:

ValueError – If headers cannot be converted to wavelengths

x(selector: Dict[str, Any] | DataSelector | ExecutionContext | None, layout: Literal['2d', '3d', '2d_t', '3d_i'] = '2d', concat_source: bool = True, include_augmented: bool = True, include_excluded: bool = False) ndarray | list[ndarray][source]

Get feature data with automatic augmented sample aggregation.

Parameters:
  • selector – Filter criteria (partition, group, branch, etc.)

  • layout – Output layout (“2d” or “3d”)

  • concat_source – If True, concatenate multiple sources along feature axis

  • include_augmented – If True, include augmented versions of selected samples. If False, return only base samples (origin=null). Default True for backward compatibility.

  • include_excluded – If True, include samples marked as excluded. If False (default), exclude samples marked as excluded=True. Use True when transforming ALL features (e.g., preprocessing).

Returns:

Feature data array(s)

Example

>>> # Get all train samples (base + augmented)
>>> X_train = dataset.x({"partition": "train"})
>>> # Get only base train samples (for splitting)
>>> X_base = dataset.x({"partition": "train"}, include_augmented=False)
>>> # Get all features including excluded (for transformations)
>>> X_all = dataset.x({"partition": "train"}, include_excluded=True)
y(selector: Dict[str, Any] | DataSelector | ExecutionContext | None, include_augmented: bool = True, include_excluded: bool = False) ndarray[source]

Get target data - automatically maps augmented samples to their origin for y values.

Parameters:
  • selector – Filter criteria (partition, group, branch, etc.)

  • include_augmented – If True, include augmented versions of selected samples. Augmented samples are automatically mapped to their origin’s y value. If False, return only base samples. Default True for backward compatibility.

  • include_excluded – If True, include samples marked as excluded. If False (default), exclude samples marked as excluded=True. Use True when transforming ALL targets (e.g., y_processing).

Returns:

Target values array

Example

>>> # Get all train targets (base + augmented, with mapping)
>>> y_train = dataset.y({"partition": "train"})
>>> # Get only base train targets (for splitting)
>>> y_base = dataset.y({"partition": "train"}, include_augmented=False)
>>> # Get all targets including excluded (for y_processing)
>>> y_all = dataset.y({"partition": "train"}, include_excluded=True)
class nirs4all.data.TaskType(value)[source]

Bases: str, Enum

Task type for the dataset.

AUTO = 'auto'
BINARY_CLASSIFICATION = 'binary_classification'
MULTICLASS_CLASSIFICATION = 'multiclass_classification'
REGRESSION = 'regression'
class nirs4all.data.ValidationError(code: str, message: str, field: str | None = None, value: Any = None, suggestion: str | None = None)[source]

Bases: object

Represents a validation error.

code

Error code for programmatic handling.

Type:

str

message

Human-readable error message.

Type:

str

field

The configuration field that caused the error.

Type:

str | None

value

The value that caused the error.

Type:

Any

suggestion

Optional suggestion for fixing the error.

Type:

str | None

code: str
field: str | None = None
message: str
suggestion: str | None = None
value: Any = None
class nirs4all.data.ValidationResult(is_valid: bool, errors: List[ValidationError] = <factory>, warnings: List[ValidationWarning] = <factory>, normalized_config: Dict[str, ~typing.Any] | None=None)[source]

Bases: object

Result of configuration validation.

is_valid

Whether the configuration is valid (no errors).

Type:

bool

errors

List of validation errors.

Type:

List[nirs4all.data.schema.validation.validators.ValidationError]

warnings

List of validation warnings.

Type:

List[nirs4all.data.schema.validation.validators.ValidationWarning]

normalized_config

The validated and normalized configuration.

Type:

Dict[str, Any] | None

errors: List[ValidationError]
is_valid: bool
normalized_config: Dict[str, Any] | None = None
raise_if_invalid() None[source]

Raise ValueError if configuration is invalid.

warnings: List[ValidationWarning]
class nirs4all.data.ValidationWarning(code: str, message: str, field: str | None = None)[source]

Bases: object

Represents a validation warning (non-fatal issue).

code

Warning code for programmatic handling.

Type:

str

message

Human-readable warning message.

Type:

str

field

The configuration field that caused the warning.

Type:

str | None

code: str
field: str | None = None
message: str
nirs4all.data.detect_signal_type(spectra: ndarray, wavelengths: ndarray | None = None, wavelength_unit: str = 'nm') Tuple[SignalType, float, str][source]

Convenience function to detect signal type.

Parameters:
  • spectra – Spectral data array (n_samples, n_features)

  • wavelengths – Optional wavelength values for band analysis

  • wavelength_unit – Unit of wavelengths (“nm” or “cm-1”)

Returns:

Tuple of (SignalType, confidence, reason)

Example

>>> spectra = np.random.rand(100, 500) * 0.8  # Values in [0, 0.8]
>>> signal_type, confidence, reason = detect_signal_type(spectra)
>>> print(f"Detected: {signal_type.value} ({confidence:.0%})")
nirs4all.data.normalize_config(input_data: Any) Tuple[Dict[str, Any] | None, str][source]

Convenience function to normalize a configuration.

Parameters:

input_data – Configuration in any supported format.

Returns:

Tuple of (normalized_config, dataset_name).

nirs4all.data.normalize_header_unit(unit: str | HeaderUnit) HeaderUnit[source]

Convert string header unit to enum.

Parameters:

unit – Unit as string or enum

Returns:

HeaderUnit enum value

Raises:

ValueError – If unit string is invalid

nirs4all.data.normalize_layout(layout: str | FeatureLayout) FeatureLayout[source]

Convert string layout to enum for backward compatibility.

Parameters:

layout – Layout as string or enum

Returns:

FeatureLayout enum value

Raises:

ValueError – If layout string is invalid

nirs4all.data.normalize_signal_type(signal_type: str | SignalType) SignalType[source]

Normalize a signal type input to SignalType enum.

Parameters:

signal_type – String or SignalType enum

Returns:

SignalType enum value