nirs4all.data.dataset module
Main SpectroDataset orchestrator class.
This module contains the main facade that coordinates all dataset blocks and provides the primary public API for users.
- class nirs4all.data.dataset.SpectroDataset(name: str = 'Unknown_dataset')[source]
Bases:
objectMain dataset facade for spectroscopy and ML/DL pipelines.
Coordinates feature, target, and metadata management through specialized accessor interfaces. The primary API uses direct methods like dataset.x() and dataset.y() for convenience.
- features
Feature data accessor (internal use)
- Type:
FeatureAccessor
- targets
Target data accessor (internal use)
- Type:
TargetAccessor
- metadata_accessor
Metadata accessor (internal use)
- Type:
MetadataAccessor
- folds
Cross-validation fold splits
- Type:
List[Tuple]
Examples
>>> # Create dataset >>> dataset = SpectroDataset("my_dataset") >>> # Add samples >>> dataset.add_samples(X_train, {"partition": "train"}) >>> dataset.add_targets(y_train) >>> # Get data >>> X = dataset.x({"partition": "train"}) >>> y = dataset.y({"partition": "train"})
- add_features(features: list[ndarray] | list[list[ndarray]], processings: list[str], source: int = -1) None[source]
Add processed feature versions to existing data.
- add_merged_features(features: ndarray, processing_name: str = 'merged', source: int = 0, processing_names: List[str] | None = None) None[source]
Add merged features from branch merge operations.
This method is used by MergeController to store the output of branch merging operations. The merged features REPLACE all existing processings to become the new feature set for subsequent steps.
- Parameters:
features –
Feature array to store: - 2D array of shape (n_samples, n_features): flattened features - 3D array of shape (n_samples, n_processings, n_features):
features with preserved preprocessing dimension
processing_name – Name for the merged processing (default: “merged”). Used when features is 2D (single processing).
source – Target source index (default: 0, first source).
processing_names – Optional list of processing names for 3D features. If not provided, generates names like “merged_0”, “merged_1”, etc.
- Raises:
ValueError – If features is not 2D or 3D, or sample count doesn’t match.
Example
>>> # 2D merged features (flattened) >>> merged = np.concatenate([branch0_features, branch1_features], axis=1) >>> dataset.add_merged_features(merged, "merged_snv_msc") >>> >>> # 3D merged features (preserved preprocessing dimension) >>> merged_3d = np.stack([snv_features, msc_features], axis=1) >>> dataset.add_merged_features(merged_3d, processing_names=["snv", "msc"])
- add_metadata(data: ndarray | Any, headers: List[str] | None = None) None[source]
Add metadata rows (aligns with add_samples call order).
- Parameters:
data – Metadata as 2D array (n_samples, n_cols) or DataFrame
headers – Column names (required if data is ndarray)
- add_metadata_column(column: str, values: List | ndarray) None[source]
Add new metadata column.
- Parameters:
column – Column name
values – Column values (must match number of samples)
- add_processed_targets(processing_name: str, targets: ndarray, ancestor_processing: str = 'numeric', transformer: TransformerMixin | None = None) None[source]
Add processed target version (e.g., scaled, encoded).
- add_samples(data: ndarray | list[ndarray], indexes: Dict[str, Any] | None = None, headers: List[str] | List[List[str]] | None = None, header_unit: str | List[str] | None = None) None[source]
Add feature samples to the dataset.
- Parameters:
data – Feature data (single or multi-source)
indexes – Optional index dictionary (partition, group, branch, fold)
headers – Feature headers (wavelengths, feature names)
header_unit – Unit type for headers (“cm-1”, “nm”, “none”, “text”, “index”)
- add_samples_batch(data: ndarray | List[ndarray], indexes_list: List[Dict[str, Any]]) None[source]
Add multiple samples in a single batch operation - O(N) instead of O(N²).
This method is optimized for bulk insertion of augmented samples. It performs only one array concatenation and one indexer append, making it dramatically faster than calling add_samples() in a loop.
- Parameters:
data – 3D array of shape (n_samples, n_processings, n_features) for single source, or list of 3D arrays for multi-source datasets.
indexes_list – List of index dictionaries, one per sample.
Example
>>> # Batch add 100 augmented samples >>> data = np.random.rand(100, 2, 500) >>> indexes = [{"partition": "train", "origin": i, "augmentation": "noise"} for i in range(100)] >>> dataset.add_samples_batch(data, indexes)
- property aggregate: str | None
Get the aggregation setting for sample-level prediction aggregation.
- Returns:
No aggregation - ‘y’: Aggregate by target values (y_true) - str: Aggregate by specified metadata column name
- Return type:
None
Example
>>> dataset.aggregate 'sample_id' # Predictions will be aggregated by sample_id column
- property aggregate_exclude_outliers: bool
Get whether T² outlier exclusion is enabled for aggregation.
- Returns:
True if outliers should be excluded before aggregation
- Return type:
- property aggregate_method: str
Get the aggregation method for sample-level prediction aggregation.
- Returns:
Aggregation method (‘mean’, ‘median’, or ‘vote’)
- Return type:
Example
>>> dataset.aggregate_method 'mean' # Predictions will be averaged within groups
- property aggregate_outlier_threshold: float
Get the outlier detection threshold for T² exclusion.
- Returns:
Confidence level (0-1) for chi-square critical value
- Return type:
- augment_samples(data: ndarray | list[ndarray], processings: list[str], augmentation_id: str, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, count: int | List[int] = 1) List[int][source]
Create augmented versions of existing samples.
- detect_signal_type(src: int = 0, force_redetect: bool = False) Tuple[SignalType, float, str][source]
Detect signal type using heuristics.
Uses value range analysis and optionally wavelength band direction to determine the most likely signal type.
- Parameters:
src – Source index (default: 0)
force_redetect – If True, ignores cached/forced values and re-runs detection
- Returns:
Tuple of (SignalType, confidence, reason_string)
Example
>>> signal_type, confidence, reason = dataset.detect_signal_type() >>> print(f"Detected {signal_type.value} ({confidence:.0%}): {reason}")
- float_headers(src: int = 0) ndarray[source]
Get headers as float array (legacy method).
WARNING: This method assumes headers are numeric and doesn’t handle unit conversion. Use wavelengths_cm1() or wavelengths_nm() for wavelength data.
- Parameters:
src – Source index
- Returns:
Headers converted to float array
- Raises:
ValueError – If headers cannot be converted to float
- get_dataset_metadata(include_y_stats: bool = True) Dict[str, Any][source]
Get comprehensive dataset metadata for run manifests.
Returns metadata suitable for efficient path resolution and dataset version tracking in run manifests.
- Parameters:
include_y_stats – If True, include target variable statistics
- Returns:
name: Dataset name
path: Original file path (if set)
hash: Content hash (if computed)
file_size: File size in bytes (if available)
n_samples: Number of samples
n_features: Number of features
n_sources: Number of feature sources
task_type: Classification or regression
num_classes: Number of classes (classification only)
y_columns: Target column names
y_stats: Target statistics (min, max, mean, std)
wavelength_range: [min, max] wavelength
wavelength_unit: Unit (nm, cm-1)
signal_types: List of signal types per source
metadata_columns: Available metadata columns
- Return type:
Dict with
Example
>>> dataset = SpectroDataset.load("wheat.n4a") >>> meta = dataset.get_dataset_metadata() >>> print(meta["n_samples"], meta["y_stats"])
- get_merged_features(processing_name: str = 'merged', source: int = 0, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None) ndarray[source]
Get merged features by processing name.
Retrieves features that were added via add_merged_features(). Since merged features replace all existing processings, this returns the features for the single merged processing.
- Parameters:
processing_name – Name of the merged processing (default: “merged”).
source – Source index to get features from (default: 0).
selector – Optional sample filter.
- Returns:
2D array of merged features (n_samples, n_merged_features).
- Raises:
ValueError – If the processing name doesn’t exist.
Example
>>> X_merged = dataset.get_merged_features("merged_snv_msc") >>> print(X_merged.shape) # (n_samples, n_merged_features)
- header_unit(src: int = 0) str[source]
Get the unit type of headers for a data source.
- Parameters:
src – Source index
- Returns:
“cm-1”, “nm”, “none”, “text”, “index”
- Return type:
Unit string
- index_column(col: str, filter: Dict[str, Any] = {}) List[int][source]
Get values from index column.
- keep_sources(source_indices: int | List[int]) None[source]
Keep only specified sources, removing all others.
Used after merge operations with output_as=”features” to consolidate to a single source. This is called automatically by MergeController when output_as=”features” is used.
- Parameters:
source_indices – Single source index or list of source indices to keep.
- Raises:
ValueError – If source indices are invalid.
Example
>>> # After merge with output_as="features", keep only source 0 >>> dataset.keep_sources(0)
- metadata(selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, columns: List[str] | None = None, include_augmented: bool = True)[source]
Get metadata as DataFrame.
- Parameters:
selector – Filter selector (e.g., {“partition”: “train”})
columns – Specific columns to return (None = all)
include_augmented – If True, include augmented versions of selected samples. Default True for backward compatibility.
- Returns:
Polars DataFrame with metadata
- metadata_column(column: str, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, include_augmented: bool = True) ndarray[source]
Get single metadata column as array.
- Parameters:
column – Column name
selector – Filter selector (e.g., {“partition”: “train”})
include_augmented – If True, include augmented versions of selected samples. Default True for backward compatibility.
- Returns:
Numpy array of column values
- metadata_numeric(column: str, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, method: Literal['label', 'onehot'] = 'label', include_augmented: bool = True) Tuple[ndarray, Dict][source]
Get numeric encoding of metadata column.
- Parameters:
column – Column name
selector – Filter selector (e.g., {“partition”: “train”})
method – “label” for label encoding or “onehot” for one-hot encoding
include_augmented – If True, include augmented versions of selected samples. Default True for backward compatibility.
- Returns:
Tuple of (numeric_array, encoding_info)
- print_summary() None[source]
Print a comprehensive summary of the dataset.
Shows counts, dimensions, number of sources, target versions, etc.
- replace_features(source_processings: list[str], features: list[ndarray] | list[list[ndarray]], processings: list[str], source: int = -1) None[source]
Replace existing processed features with new versions.
- reshape_reps_to_preprocessings(config: RepetitionConfig) None[source]
Transform repetitions into additional preprocessing slots.
Each repetition becomes a new preprocessing dimension, reducing the number of samples but increasing the preprocessing count. This enables multi-preprocessing modeling strategies.
Input: n_sources × (n_samples, n_pp, n_features) Output: n_sources × (n_unique_samples, n_pp × n_reps, n_features)
- Parameters:
config – RepetitionConfig with column and options.
- Raises:
ValueError – If grouping column not found, groups have unequal sizes and on_unequal=”error”, or no valid groups found.
Example
>>> # With 120 samples (30 unique × 4 reps), 1 source, 1 pp, 500 features >>> config = RepetitionConfig(column="Sample_ID") >>> dataset.reshape_reps_to_preprocessings(config) >>> # Result: 1 source × (30 samples, 4 pp, 500 features)
- reshape_reps_to_sources(config: RepetitionConfig) None[source]
Transform repetitions into separate data sources.
Each repetition index becomes a new source, reducing the number of samples but increasing the number of sources. This enables per-source branching and multi-source modeling strategies.
Input: n_sources × (n_samples, n_pp, n_features) Output: (n_sources × n_reps) × (n_unique_samples, n_pp, n_features)
- Parameters:
config – RepetitionConfig with column and options.
- Raises:
ValueError – If grouping column not found, groups have unequal sizes and on_unequal=”error”, or no valid groups found.
Example
>>> # With 120 samples (30 unique × 4 reps), 1 source, 500 features >>> config = RepetitionConfig(column="Sample_ID") >>> dataset.reshape_reps_to_sources(config) >>> # Result: 4 sources × (30 samples, 1 pp, 500 features)
- set_aggregate(value: str | bool | None) None[source]
Set the aggregation behavior for sample-level prediction aggregation.
When set, predictions from multiple spectra of the same biological sample (as identified by the aggregation key) will be aggregated automatically during scoring and reporting.
- Parameters:
value – Aggregation setting - None: No aggregation (default behavior) - True: Aggregate by y_true values (target grouping) - str: Aggregate by specified metadata column (e.g., ‘sample_id’, ‘ID’)
Example
>>> dataset.set_aggregate('sample_id') # Aggregate by sample_id metadata column >>> dataset.set_aggregate(True) # Aggregate by y values >>> dataset.set_aggregate(None) # Disable aggregation
- set_aggregate_exclude_outliers(value: bool, threshold: float = 0.95) None[source]
Enable/disable T² based outlier exclusion before aggregation.
When enabled, uses Hotelling’s T² statistic to identify and exclude outlier measurements within each sample group before averaging.
- Parameters:
value – True to enable outlier exclusion, False to disable
threshold – Confidence level for outlier detection (0-1, default 0.95)
Example
>>> dataset.set_aggregate_exclude_outliers(True, threshold=0.95)
- set_aggregate_method(value: str | None) None[source]
Set the aggregation method for sample-level prediction aggregation.
- Parameters:
value – Aggregation method - None: Use default method (mean for regression, vote for classification) - ‘mean’: Average predictions within each group - ‘median’: Median prediction within each group - ‘vote’: Majority voting for classification
Example
>>> dataset.set_aggregate_method('median')
- set_content_hash(hash_value: str) None[source]
Set the content hash for version tracking.
- Parameters:
hash_value – Content hash string
- set_folds(folds_iterable) None[source]
Set cross-validation folds from an iterable of (train_idx, val_idx) tuples.
- set_signal_type(signal_type: str | SignalType, src: int = 0, forced: bool = True) None[source]
Set the signal type for a data source.
- Parameters:
signal_type – Signal type (string or SignalType enum)
src – Source index (default: 0)
forced – If True, prevents auto-detection from overriding (default: True)
Example
>>> dataset.set_signal_type("absorbance", src=0) >>> dataset.set_signal_type(SignalType.REFLECTANCE_PERCENT, src=1)
- set_source_path(path: str) None[source]
Set the source file path for metadata tracking.
- Parameters:
path – Path to the original dataset file
- set_task_type(task_type: str | TaskType, forced: bool = True) None[source]
Set the task type explicitly.
- Parameters:
task_type – Task type as string (‘regression’, ‘binary_classification’, ‘multiclass_classification’) or TaskType enum
forced – If True, prevents auto-detection from overriding this value in subsequent y_processing steps (e.g., after MinMaxScaler). Default True.
- signal_type(src: int = 0) SignalType[source]
Get the signal type for a data source.
If not set, attempts auto-detection based on value ranges and optionally wavelength band analysis.
- Parameters:
src – Source index (default: 0)
- Returns:
SignalType enum value
Example
>>> signal = dataset.signal_type(0) >>> if signal == SignalType.REFLECTANCE: ... dataset.convert_to_absorbance(0)
- property signal_types: List[SignalType]
Get signal types for all sources.
- Returns:
List of SignalType values, one per source
- update_features(source_processings: list[str], features: list[ndarray] | list[list[ndarray]], processings: list[str], source: int = -1) None[source]
Update existing processed features.
- update_metadata(column: str, values: List | ndarray, selector: Dict[str, Any] | DataSelector | ExecutionContext | None = None, include_augmented: bool = True) None[source]
Update metadata values for selected samples.
- Parameters:
column – Column name
values – New values
selector – Filter selector (None = all samples)
include_augmented – If True, include augmented versions of selected samples. Default True for backward compatibility.
- wavelengths_cm1(src: int = 0) ndarray[source]
Get wavelengths in cm⁻¹ (wavenumber), converting from nm if needed.
- Parameters:
src – Source index
- Returns:
Wavelengths in cm⁻¹ as float array
- Raises:
ValueError – If headers cannot be converted to wavelengths
- wavelengths_nm(src: int = 0) ndarray[source]
Get wavelengths in nm, converting from cm⁻¹ if needed.
- Parameters:
src – Source index
- Returns:
Wavelengths in nm as float array
- Raises:
ValueError – If headers cannot be converted to wavelengths
- x(selector: Dict[str, Any] | DataSelector | ExecutionContext | None, layout: Literal['2d', '3d', '2d_t', '3d_i'] = '2d', concat_source: bool = True, include_augmented: bool = True, include_excluded: bool = False) ndarray | list[ndarray][source]
Get feature data with automatic augmented sample aggregation.
- Parameters:
selector – Filter criteria (partition, group, branch, etc.)
layout – Output layout (“2d” or “3d”)
concat_source – If True, concatenate multiple sources along feature axis
include_augmented – If True, include augmented versions of selected samples. If False, return only base samples (origin=null). Default True for backward compatibility.
include_excluded – If True, include samples marked as excluded. If False (default), exclude samples marked as excluded=True. Use True when transforming ALL features (e.g., preprocessing).
- Returns:
Feature data array(s)
Example
>>> # Get all train samples (base + augmented) >>> X_train = dataset.x({"partition": "train"}) >>> # Get only base train samples (for splitting) >>> X_base = dataset.x({"partition": "train"}, include_augmented=False) >>> # Get all features including excluded (for transformations) >>> X_all = dataset.x({"partition": "train"}, include_excluded=True)
- y(selector: Dict[str, Any] | DataSelector | ExecutionContext | None, include_augmented: bool = True, include_excluded: bool = False) ndarray[source]
Get target data - automatically maps augmented samples to their origin for y values.
- Parameters:
selector – Filter criteria (partition, group, branch, etc.)
include_augmented – If True, include augmented versions of selected samples. Augmented samples are automatically mapped to their origin’s y value. If False, return only base samples. Default True for backward compatibility.
include_excluded – If True, include samples marked as excluded. If False (default), exclude samples marked as excluded=True. Use True when transforming ALL targets (e.g., y_processing).
- Returns:
Target values array
Example
>>> # Get all train targets (base + augmented, with mapping) >>> y_train = dataset.y({"partition": "train"}) >>> # Get only base train targets (for splitting) >>> y_base = dataset.y({"partition": "train"}, include_augmented=False) >>> # Get all targets including excluded (for y_processing) >>> y_all = dataset.y({"partition": "train"}, include_excluded=True)