nirs4all.data.dataset module

Main SpectroDataset orchestrator class.

This module contains the main facade that coordinates all dataset blocks and provides the primary public API for users.

class nirs4all.data.dataset.SpectroDataset(name: str = 'Unknown_dataset')[source]

Bases: object

Main dataset facade for spectroscopy and ML/DL pipelines.

Coordinates feature, target, and metadata management through specialized accessor interfaces. The primary API uses direct methods like dataset.x() and dataset.y() for convenience.

name

Dataset identifier

Type:

str

features

Feature data accessor (internal use)

Type:

FeatureAccessor

targets

Target data accessor (internal use)

Type:

TargetAccessor

metadata_accessor

Metadata accessor (internal use)

Type:

MetadataAccessor

folds

Cross-validation fold splits

Type:

List[Tuple]

Examples

>>> # Create dataset
>>> dataset = SpectroDataset("my_dataset")
>>> # Add samples
>>> dataset.add_samples(X_train, {"partition": "train"})
>>> dataset.add_targets(y_train)
>>> # Get data
>>> X = dataset.x({"partition": "train"})
>>> y = dataset.y({"partition": "train"})
__str__()[source]

Return readable dataset summary.

add_features(features: list[ndarray] | list[list[ndarray]], processings: list[str], source: int = -1) None[source]

Add processed feature versions to existing data.

add_merged_features(features: ndarray, processing_name: str = 'merged', source: int = 0, processing_names: List[str] | None = None) None[source]

Add merged features from branch merge operations.

This method is used by MergeController to store the output of branch merging operations. The merged features REPLACE all existing processings to become the new feature set for subsequent steps.

Parameters:
  • features

    Feature array to store: - 2D array of shape (n_samples, n_features): flattened features - 3D array of shape (n_samples, n_processings, n_features):

    features with preserved preprocessing dimension

  • processing_name – Name for the merged processing (default: “merged”). Used when features is 2D (single processing).

  • source – Target source index (default: 0, first source).

  • processing_names – Optional list of processing names for 3D features. If not provided, generates names like “merged_0”, “merged_1”, etc.

Raises:

ValueError – If features is not 2D or 3D, or sample count doesn’t match.

Example

>>> # 2D merged features (flattened)
>>> merged = np.concatenate([branch0_features, branch1_features], axis=1)
>>> dataset.add_merged_features(merged, "merged_snv_msc")
>>>
>>> # 3D merged features (preserved preprocessing dimension)
>>> merged_3d = np.stack([snv_features, msc_features], axis=1)
>>> dataset.add_merged_features(merged_3d, processing_names=["snv", "msc"])
add_metadata(data: ndarray | Any, headers: List[str] | None = None) None[source]

Add metadata rows (aligns with add_samples call order).

Parameters:
  • data – Metadata as 2D array (n_samples, n_cols) or DataFrame

  • headers – Column names (required if data is ndarray)

add_metadata_column(column: str, values: List | ndarray) None[source]

Add new metadata column.

Parameters:
  • column – Column name

  • values – Column values (must match number of samples)

add_processed_targets(processing_name: str, targets: ndarray, ancestor_processing: str = 'numeric', transformer: TransformerMixin | None = None) None[source]

Add processed target version (e.g., scaled, encoded).

add_samples(data: ndarray | list[ndarray], indexes: Dict[str, Any] | None = None, headers: List[str] | List[List[str]] | None = None, header_unit: str | List[str] | None = None) None[source]

Add feature samples to the dataset.

Parameters:
  • data – Feature data (single or multi-source)

  • indexes – Optional index dictionary (partition, group, branch, fold)

  • headers – Feature headers (wavelengths, feature names)

  • header_unit – Unit type for headers (“cm-1”, “nm”, “none”, “text”, “index”)

add_samples_batch(data: ndarray | List[ndarray], indexes_list: List[Dict[str, Any]]) None[source]

Add multiple samples in a single batch operation - O(N) instead of O(N²).

This method is optimized for bulk insertion of augmented samples. It performs only one array concatenation and one indexer append, making it dramatically faster than calling add_samples() in a loop.

Parameters:
  • data – 3D array of shape (n_samples, n_processings, n_features) for single source, or list of 3D arrays for multi-source datasets.

  • indexes_list – List of index dictionaries, one per sample.

Example

>>> # Batch add 100 augmented samples
>>> data = np.random.rand(100, 2, 500)
>>> indexes = [{"partition": "train", "origin": i, "augmentation": "noise"} for i in range(100)]
>>> dataset.add_samples_batch(data, indexes)
add_targets(y: ndarray) None[source]

Add target samples to the dataset.

property aggregate: str | None

Get the aggregation setting for sample-level prediction aggregation.

Returns:

No aggregation - ‘y’: Aggregate by target values (y_true) - str: Aggregate by specified metadata column name

Return type:

  • None

Example

>>> dataset.aggregate
'sample_id'  # Predictions will be aggregated by sample_id column
property aggregate_exclude_outliers: bool

Get whether T² outlier exclusion is enabled for aggregation.

Returns:

True if outliers should be excluded before aggregation

Return type:

bool

property aggregate_method: str

Get the aggregation method for sample-level prediction aggregation.

Returns:

Aggregation method (‘mean’, ‘median’, or ‘vote’)

Return type:

str

Example

>>> dataset.aggregate_method
'mean'  # Predictions will be averaged within groups
property aggregate_outlier_threshold: float

Get the outlier detection threshold for T² exclusion.

Returns:

Confidence level (0-1) for chi-square critical value

Return type:

float

augment_samples(data: ndarray | list[ndarray], processings: list[str], augmentation_id: str, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, count: int | List[int] = 1) List[int][source]

Create augmented versions of existing samples.

detect_signal_type(src: int = 0, force_redetect: bool = False) Tuple[SignalType, float, str][source]

Detect signal type using heuristics.

Uses value range analysis and optionally wavelength band direction to determine the most likely signal type.

Parameters:
  • src – Source index (default: 0)

  • force_redetect – If True, ignores cached/forced values and re-runs detection

Returns:

Tuple of (SignalType, confidence, reason_string)

Example

>>> signal_type, confidence, reason = dataset.detect_signal_type()
>>> print(f"Detected {signal_type.value} ({confidence:.0%}): {reason}")
features_processings(src: int) List[str][source]

Get processing names for a source.

features_sources() int[source]

Get number of feature sources.

float_headers(src: int = 0) ndarray[source]

Get headers as float array (legacy method).

WARNING: This method assumes headers are numeric and doesn’t handle unit conversion. Use wavelengths_cm1() or wavelengths_nm() for wavelength data.

Parameters:

src – Source index

Returns:

Headers converted to float array

Raises:

ValueError – If headers cannot be converted to float

property folds: List[Tuple[List[int], List[int]]]

Get cross-validation folds.

get_dataset_metadata(include_y_stats: bool = True) Dict[str, Any][source]

Get comprehensive dataset metadata for run manifests.

Returns metadata suitable for efficient path resolution and dataset version tracking in run manifests.

Parameters:

include_y_stats – If True, include target variable statistics

Returns:

  • name: Dataset name

  • path: Original file path (if set)

  • hash: Content hash (if computed)

  • file_size: File size in bytes (if available)

  • n_samples: Number of samples

  • n_features: Number of features

  • n_sources: Number of feature sources

  • task_type: Classification or regression

  • num_classes: Number of classes (classification only)

  • y_columns: Target column names

  • y_stats: Target statistics (min, max, mean, std)

  • wavelength_range: [min, max] wavelength

  • wavelength_unit: Unit (nm, cm-1)

  • signal_types: List of signal types per source

  • metadata_columns: Available metadata columns

Return type:

Dict with

Example

>>> dataset = SpectroDataset.load("wheat.n4a")
>>> meta = dataset.get_dataset_metadata()
>>> print(meta["n_samples"], meta["y_stats"])
get_merged_features(processing_name: str = 'merged', source: int = 0, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None) ndarray[source]

Get merged features by processing name.

Retrieves features that were added via add_merged_features(). Since merged features replace all existing processings, this returns the features for the single merged processing.

Parameters:
  • processing_name – Name of the merged processing (default: “merged”).

  • source – Source index to get features from (default: 0).

  • selector – Optional sample filter.

Returns:

2D array of merged features (n_samples, n_merged_features).

Raises:

ValueError – If the processing name doesn’t exist.

Example

>>> X_merged = dataset.get_merged_features("merged_snv_msc")
>>> print(X_merged.shape)  # (n_samples, n_merged_features)
header_unit(src: int = 0) str[source]

Get the unit type of headers for a data source.

Parameters:

src – Source index

Returns:

“cm-1”, “nm”, “none”, “text”, “index”

Return type:

Unit string

headers(src: int) List[str][source]

Get feature headers for a source.

index_column(col: str, filter: Dict[str, Any] = {}) List[int][source]

Get values from index column.

property is_classification: bool

Check if dataset is for classification task.

is_multi_source() bool[source]

Check if dataset has multiple feature sources.

property is_regression: bool

Check if dataset is for regression task.

keep_sources(source_indices: int | List[int]) None[source]

Keep only specified sources, removing all others.

Used after merge operations with output_as=”features” to consolidate to a single source. This is called automatically by MergeController when output_as=”features” is used.

Parameters:

source_indices – Single source index or list of source indices to keep.

Raises:

ValueError – If source indices are invalid.

Example

>>> # After merge with output_as="features", keep only source 0
>>> dataset.keep_sources(0)
metadata(selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, columns: List[str] | None = None, include_augmented: bool = True)[source]

Get metadata as DataFrame.

Parameters:
  • selector – Filter selector (e.g., {“partition”: “train”})

  • columns – Specific columns to return (None = all)

  • include_augmented – If True, include augmented versions of selected samples. Default True for backward compatibility.

Returns:

Polars DataFrame with metadata

metadata_column(column: str, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, include_augmented: bool = True) ndarray[source]

Get single metadata column as array.

Parameters:
  • column – Column name

  • selector – Filter selector (e.g., {“partition”: “train”})

  • include_augmented – If True, include augmented versions of selected samples. Default True for backward compatibility.

Returns:

Numpy array of column values

property metadata_columns: List[str]

Get list of metadata column names.

metadata_numeric(column: str, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, method: Literal['label', 'onehot'] = 'label', include_augmented: bool = True) Tuple[ndarray, Dict][source]

Get numeric encoding of metadata column.

Parameters:
  • column – Column name

  • selector – Filter selector (e.g., {“partition”: “train”})

  • method – “label” for label encoding or “onehot” for one-hot encoding

  • include_augmented – If True, include augmented versions of selected samples. Default True for backward compatibility.

Returns:

Tuple of (numeric_array, encoding_info)

property n_sources: int

Get number of feature sources.

property num_classes: int

Get the number of unique classes for classification tasks.

property num_features: List[int] | int

Get number of features per source.

property num_folds: int

Return the number of folds.

property num_samples: int

Get total number of samples.

print_summary() None[source]

Print a comprehensive summary of the dataset.

Shows counts, dimensions, number of sources, target versions, etc.

replace_features(source_processings: list[str], features: list[ndarray] | list[list[ndarray]], processings: list[str], source: int = -1) None[source]

Replace existing processed features with new versions.

reshape_reps_to_preprocessings(config: RepetitionConfig) None[source]

Transform repetitions into additional preprocessing slots.

Each repetition becomes a new preprocessing dimension, reducing the number of samples but increasing the preprocessing count. This enables multi-preprocessing modeling strategies.

Input: n_sources × (n_samples, n_pp, n_features) Output: n_sources × (n_unique_samples, n_pp × n_reps, n_features)

Parameters:

config – RepetitionConfig with column and options.

Raises:

ValueError – If grouping column not found, groups have unequal sizes and on_unequal=”error”, or no valid groups found.

Example

>>> # With 120 samples (30 unique × 4 reps), 1 source, 1 pp, 500 features
>>> config = RepetitionConfig(column="Sample_ID")
>>> dataset.reshape_reps_to_preprocessings(config)
>>> # Result: 1 source × (30 samples, 4 pp, 500 features)
reshape_reps_to_sources(config: RepetitionConfig) None[source]

Transform repetitions into separate data sources.

Each repetition index becomes a new source, reducing the number of samples but increasing the number of sources. This enables per-source branching and multi-source modeling strategies.

Input: n_sources × (n_samples, n_pp, n_features) Output: (n_sources × n_reps) × (n_unique_samples, n_pp, n_features)

Parameters:

config – RepetitionConfig with column and options.

Raises:

ValueError – If grouping column not found, groups have unequal sizes and on_unequal=”error”, or no valid groups found.

Example

>>> # With 120 samples (30 unique × 4 reps), 1 source, 500 features
>>> config = RepetitionConfig(column="Sample_ID")
>>> dataset.reshape_reps_to_sources(config)
>>> # Result: 4 sources × (30 samples, 1 pp, 500 features)
set_aggregate(value: str | bool | None) None[source]

Set the aggregation behavior for sample-level prediction aggregation.

When set, predictions from multiple spectra of the same biological sample (as identified by the aggregation key) will be aggregated automatically during scoring and reporting.

Parameters:

value – Aggregation setting - None: No aggregation (default behavior) - True: Aggregate by y_true values (target grouping) - str: Aggregate by specified metadata column (e.g., ‘sample_id’, ‘ID’)

Example

>>> dataset.set_aggregate('sample_id')  # Aggregate by sample_id metadata column
>>> dataset.set_aggregate(True)  # Aggregate by y values
>>> dataset.set_aggregate(None)  # Disable aggregation
set_aggregate_exclude_outliers(value: bool, threshold: float = 0.95) None[source]

Enable/disable T² based outlier exclusion before aggregation.

When enabled, uses Hotelling’s T² statistic to identify and exclude outlier measurements within each sample group before averaging.

Parameters:
  • value – True to enable outlier exclusion, False to disable

  • threshold – Confidence level for outlier detection (0-1, default 0.95)

Example

>>> dataset.set_aggregate_exclude_outliers(True, threshold=0.95)
set_aggregate_method(value: str | None) None[source]

Set the aggregation method for sample-level prediction aggregation.

Parameters:

value – Aggregation method - None: Use default method (mean for regression, vote for classification) - ‘mean’: Average predictions within each group - ‘median’: Median prediction within each group - ‘vote’: Majority voting for classification

Example

>>> dataset.set_aggregate_method('median')
set_content_hash(hash_value: str) None[source]

Set the content hash for version tracking.

Parameters:

hash_value – Content hash string

set_folds(folds_iterable) None[source]

Set cross-validation folds from an iterable of (train_idx, val_idx) tuples.

set_signal_type(signal_type: str | SignalType, src: int = 0, forced: bool = True) None[source]

Set the signal type for a data source.

Parameters:
  • signal_type – Signal type (string or SignalType enum)

  • src – Source index (default: 0)

  • forced – If True, prevents auto-detection from overriding (default: True)

Example

>>> dataset.set_signal_type("absorbance", src=0)
>>> dataset.set_signal_type(SignalType.REFLECTANCE_PERCENT, src=1)
set_source_path(path: str) None[source]

Set the source file path for metadata tracking.

Parameters:

path – Path to the original dataset file

set_task_type(task_type: str | TaskType, forced: bool = True) None[source]

Set the task type explicitly.

Parameters:
  • task_type – Task type as string (‘regression’, ‘binary_classification’, ‘multiclass_classification’) or TaskType enum

  • forced – If True, prevents auto-detection from overriding this value in subsequent y_processing steps (e.g., after MinMaxScaler). Default True.

short_preprocessings_str() str[source]

Get shortened processing string for display.

signal_type(src: int = 0) SignalType[source]

Get the signal type for a data source.

If not set, attempts auto-detection based on value ranges and optionally wavelength band analysis.

Parameters:

src – Source index (default: 0)

Returns:

SignalType enum value

Example

>>> signal = dataset.signal_type(0)
>>> if signal == SignalType.REFLECTANCE:
...     dataset.convert_to_absorbance(0)
property signal_types: List[SignalType]

Get signal types for all sources.

Returns:

List of SignalType values, one per source

property task_type: TaskType | None

Get the detected task type.

update_features(source_processings: list[str], features: list[ndarray] | list[list[ndarray]], processings: list[str], source: int = -1) None[source]

Update existing processed features.

update_metadata(column: str, values: List | ndarray, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, include_augmented: bool = True) None[source]

Update metadata values for selected samples.

Parameters:
  • column – Column name

  • values – New values

  • selector – Filter selector (None = all samples)

  • include_augmented – If True, include augmented versions of selected samples. Default True for backward compatibility.

wavelengths_cm1(src: int = 0) ndarray[source]

Get wavelengths in cm⁻¹ (wavenumber), converting from nm if needed.

Parameters:

src – Source index

Returns:

Wavelengths in cm⁻¹ as float array

Raises:

ValueError – If headers cannot be converted to wavelengths

wavelengths_nm(src: int = 0) ndarray[source]

Get wavelengths in nm, converting from cm⁻¹ if needed.

Parameters:

src – Source index

Returns:

Wavelengths in nm as float array

Raises:

ValueError – If headers cannot be converted to wavelengths

x(selector: Dict[str, Any] | DataSelector | ExecutionContext | None, layout: Literal['2d', '3d', '2d_t', '3d_i'] = '2d', concat_source: bool = True, include_augmented: bool = True, include_excluded: bool = False) ndarray | list[ndarray][source]

Get feature data with automatic augmented sample aggregation.

Parameters:
  • selector – Filter criteria (partition, group, branch, etc.)

  • layout – Output layout (“2d” or “3d”)

  • concat_source – If True, concatenate multiple sources along feature axis

  • include_augmented – If True, include augmented versions of selected samples. If False, return only base samples (origin=null). Default True for backward compatibility.

  • include_excluded – If True, include samples marked as excluded. If False (default), exclude samples marked as excluded=True. Use True when transforming ALL features (e.g., preprocessing).

Returns:

Feature data array(s)

Example

>>> # Get all train samples (base + augmented)
>>> X_train = dataset.x({"partition": "train"})
>>> # Get only base train samples (for splitting)
>>> X_base = dataset.x({"partition": "train"}, include_augmented=False)
>>> # Get all features including excluded (for transformations)
>>> X_all = dataset.x({"partition": "train"}, include_excluded=True)
y(selector: Dict[str, Any] | DataSelector | ExecutionContext | None, include_augmented: bool = True, include_excluded: bool = False) ndarray[source]

Get target data - automatically maps augmented samples to their origin for y values.

Parameters:
  • selector – Filter criteria (partition, group, branch, etc.)

  • include_augmented – If True, include augmented versions of selected samples. Augmented samples are automatically mapped to their origin’s y value. If False, return only base samples. Default True for backward compatibility.

  • include_excluded – If True, include samples marked as excluded. If False (default), exclude samples marked as excluded=True. Use True when transforming ALL targets (e.g., y_processing).

Returns:

Target values array

Example

>>> # Get all train targets (base + augmented, with mapping)
>>> y_train = dataset.y({"partition": "train"})
>>> # Get only base train targets (for splitting)
>>> y_base = dataset.y({"partition": "train"}, include_augmented=False)
>>> # Get all targets including excluded (for y_processing)
>>> y_all = dataset.y({"partition": "train"}, include_excluded=True)