nirs4all.controllers.data package

Submodules

Module contents

Data manipulation controllers.

Controllers for data manipulation operators (branch, merge, source_branch, resampler, augmentation, feature selection, sample filtering, outlier excluder, sample partitioner, metadata partitioner, repetition transformation).

class nirs4all.controllers.data.AutoTransferPreprocessingController[source]

Bases: OperatorController

Controller for automatic transfer-optimized preprocessing selection.

This controller analyzes the distributional distance between source and target datasets and automatically selects preprocessing that best aligns them while preserving predictive information.

Configuration options:
preset: Preset configuration for the selector.
  • “fast” (default): Quick evaluation of single preprocessings only

  • “balanced”: Includes stacking evaluation

  • “thorough”: Includes stacking and augmentation

  • “full”: All stages including supervised validation

  • “exhaustive”: Deep analysis for research/benchmarking

source_partition: Partition to use as source data (“train” or “test”).

Default is “train”.

target_partition: Partition to use as target data (“train” or “test”).

Default is “test”.

apply_recommendation: Whether to apply the best preprocessing to the

dataset. If False, only stores the recommendation in context. Default is True.

top_k: Number of top recommendations to apply if using augmentation.

Default is 1 (best single preprocessing).

use_augmentation: If top_k > 1, whether to use feature augmentation

to concatenate outputs. Default is False.

n_components: Number of PCA components for metric computation.

Default is 10.

verbose: Verbosity level (0=silent, 1=progress, 2=detailed).

Default is 1.

# Stage-specific options (override preset) run_stage2: Enable stacking evaluation. stage2_top_k: Number of top candidates for stacking. run_stage3: Enable augmentation evaluation. run_stage4: Enable supervised validation.

Example pipeline configurations:

# Simple - use defaults {“auto_transfer_preproc”: {}}

# With preset {“auto_transfer_preproc”: {“preset”: “balanced”}}

# Full configuration {

“auto_transfer_preproc”: {

“preset”: “thorough”, “source_partition”: “train”, “target_partition”: “test”, “apply_recommendation”: True, “top_k”: 1, “verbose”: 2,

}

}

# Multi-source with augmentation {

“auto_transfer_preproc”: {

“preset”: “balanced”, “top_k”: 3, “use_augmentation”: True,

}

}

execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: List[Tuple[str, Any]] | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, List[Tuple[str, Any]]][source]

Execute auto transfer preprocessing selection.

In train mode:
  1. Extract source and target data from the dataset

  2. Run TransferPreprocessingSelector to find best preprocessing

  3. Apply the recommended preprocessing if configured

  4. Store the recommendation as an artifact

In predict mode:
  1. Load the saved preprocessing recommendation

  2. Apply it to the incoming data

Parameters:
  • step_info – Parsed step containing the auto_transfer_preproc config

  • dataset – SpectroDataset to operate on

  • context – Execution context with selector and metadata

  • runtime_context – Runtime infrastructure (saver, step_number, etc.)

  • source – Source index (-1 for all sources)

  • mode – Execution mode (“train”, “predict”, “explain”)

  • loaded_binaries – Pre-loaded artifacts for predict/explain mode

  • prediction_store – Not used by this controller

Returns:

Tuple of (updated_context, list_of_artifacts)

classmethod matches(step: Any, operator: Any, keyword: str) bool[source]

Check if step is an auto_transfer_preproc operation.

priority: int = 9
classmethod supports_prediction_mode() bool[source]

Supports prediction mode for applying saved recommendations.

In prediction mode, the controller loads the previously computed preprocessing recommendation and applies it to the new data.

classmethod use_multi_source() bool[source]

Supports multi-source datasets.

class nirs4all.controllers.data.ConcatAugmentationController[source]

Bases: OperatorController

Controller that concatenates multiple transformer outputs.

Semantics: - Top-level (add_feature=False): REPLACES each processing with concatenated version - Inside feature_augmentation (add_feature=True): ADDS one new processing

Supports: - Single transformers: PCA(50) - Chained transformers: [Wavelet(), PCA(50)] → sequential application - Mixed: [PCA(50), [Wavelet(), SVD(30)], LocalStats()]

Examples

Top-level replacement: >>> pipeline = [{“concat_transform”: [PCA(50), SVD(50)]}] # Before: (500, 3, 500) with [“raw”, “snv”, “savgol”] # After: (500, 3, 100) with [“raw_concat_PCA_SVD”, “snv_concat_PCA_SVD”, …]

Nested inside feature_augmentation: >>> pipeline = [{ … “feature_augmentation”: [ … SNV(), … {“concat_transform”: [PCA(50), SVD(50)]} … ] … }] # Before: (500, 1, 500) with [“raw”] # After: (500, 3, 500) with [“raw”, “snv”, “concat_PCA_SVD”] (padded)

execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: List[Tuple[str, Any]] | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, List[Tuple[str, bytes]]][source]

Execute concat augmentation.

Parameters:
  • step_info – Parsed step containing the concat_transform config

  • dataset – SpectroDataset to operate on

  • context – Execution context with selector and metadata

  • runtime_context – Runtime infrastructure (saver, step_number, etc.)

  • source – Source index (-1 for all sources)

  • mode – Execution mode (“train”, “predict”, “explain”)

  • loaded_binaries – Pre-fitted transformers for predict/explain mode

  • prediction_store – Not used by this controller

Returns:

Tuple of (updated_context, list_of_artifacts)

classmethod matches(step: Any, operator: Any, keyword: str) bool[source]

Check if step is a concat_transform operation.

static normalize_generator_spec(spec: Any) Any[source]

Normalize generator spec for concat_transform context.

In concat_transform context, multi-selection should use combinations by default since the order of concatenated features doesn’t matter. Translates legacy ‘size’ to ‘pick’ for explicit semantics.

Parameters:

spec – Generator specification (may contain _or_, size, pick, arrange).

Returns:

Normalized spec with ‘size’ converted to ‘pick’ if needed.

priority: int = 10
classmethod supports_prediction_mode() bool[source]

Supports prediction mode for applying saved transformers.

classmethod use_multi_source() bool[source]

Supports multi-source datasets.

class nirs4all.controllers.data.FeatureAugmentationController[source]

Bases: OperatorController

Controller for feature augmentation with multiple action modes.

The feature_augmentation controller supports three action modes that control how preprocessing operations interact with existing processings:

  • extend (default): Add new processings to the set. Each operation runs independently on the base processing. If a processing already exists, it is not duplicated. Growth pattern is linear.

  • add: Chain each operation on top of ALL existing processings. Keep original processings alongside new chained versions. Growth pattern is multiplicative with originals (n + n×m).

  • replace: Chain each operation on top of ALL existing processings. Discard original processings, keeping only the chained versions. Growth pattern is multiplicative without originals (n×m).

Example

>>> # Extend mode (default) - linear growth
>>> {"feature_augmentation": [SNV, Gaussian], "action": "extend"}
>>> # With raw_A already present: raw_A, raw_SNV, raw_Gaussian
>>> # Add mode - multiplicative with originals
>>> {"feature_augmentation": [SNV, Gaussian], "action": "add"}
>>> # With raw_A present: raw_A, raw_A_SNV, raw_A_Gaussian
>>> # Replace mode - multiplicative, discards originals
>>> {"feature_augmentation": [SNV, Gaussian], "action": "replace"}
>>> # With raw_A present: raw_A_SNV, raw_A_Gaussian (raw_A discarded)
execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: List[Tuple[str, Any]] | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, List[Tuple[str, bytes]]][source]

Execute feature augmentation with specified action mode.

Parameters:
  • step_info – Parsed step information containing the operation list and action mode.

  • dataset – The spectroscopic dataset to process.

  • context – Current execution context with processing state.

  • runtime_context – Runtime infrastructure for step execution.

  • source – Source index (-1 for all sources).

  • mode – Execution mode (“train”, “predict”, etc.).

  • loaded_binaries – Pre-loaded binary artifacts for prediction mode.

  • prediction_store – Store for prediction-time state.

Returns:

Tuple of (updated_context, artifacts_list).

Raises:

ValueError – If action mode is invalid.

classmethod matches(step: Any, operator: Any, keyword: str) bool[source]

Check if the operator matches the step and keyword.

static normalize_generator_spec(spec: Any) Any[source]

Normalize generator spec for feature_augmentation context.

In feature_augmentation context, multi-selection should use combinations by default since the order of parallel feature channels doesn’t matter. Translates legacy ‘size’ to ‘pick’ for explicit semantics.

Parameters:

spec – Generator specification (may contain _or_, size, pick, arrange).

Returns:

Normalized spec with ‘size’ converted to ‘pick’ if needed.

priority: int = 10
classmethod supports_prediction_mode() bool[source]

Feature augmentation should NOT execute during prediction mode - transformations are already applied and saved.

classmethod use_multi_source() bool[source]

Check if the operator supports multi-source datasets.

class nirs4all.controllers.data.FeatureSelectionController[source]

Bases: OperatorController

Controller for feature selection operators (CARS, MC-UVE).

This controller: 1. Extracts wavelengths from dataset headers 2. Fits the selector on training data with target values 3. Transforms all data to keep only selected wavelengths 4. Updates dataset with new features and headers 5. Supports multi-source datasets with per-source selection

execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: List[Tuple[str, Any]] | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, List][source]

Execute feature selection operation.

Parameters:
  • step_info – Pipeline step configuration

  • dataset – Dataset to operate on

  • context – Pipeline execution context

  • runtime_context – Runtime context

  • source – Data source index (-1 for all sources)

  • mode – Execution mode (“train” or “predict”)

  • loaded_binaries – Pre-loaded binary objects for prediction mode

  • prediction_store – External prediction store (unused)

Returns:

Tuple of (updated_context, fitted_selectors)

classmethod matches(step: Any, operator: Any, keyword: str) bool[source]

Match CARS and MCUVE objects.

priority: int = 5
classmethod supports_prediction_mode() bool[source]

Feature selection supports prediction mode.

classmethod use_multi_source() bool[source]

Feature selection supports multi-source datasets.

class nirs4all.controllers.data.MergeConfigParser[source]

Bases: object

Parser for merge step configurations.

Handles all syntax variants and normalizes them to MergeConfig.

Supported syntaxes:
  • Simple string: “features”, “predictions”, “all”

  • Dict with keys: {“features”: …, “predictions”: …, …}

  • Legacy format: {“predictions”: [0, 1]}

  • Per-branch format: {“predictions”: [{“branch”: 0, …}]}

classmethod parse(raw_config: Any) MergeConfig[source]

Parse raw merge configuration into MergeConfig.

Parameters:

raw_config – The value from {“merge”: raw_config}

Returns:

Normalized MergeConfig instance.

Raises:

ValueError – If configuration format is invalid.

class nirs4all.controllers.data.MergeController[source]

Bases: OperatorController

Controller for merging branch outputs and exiting branch mode.

This controller is the CORE PRIMITIVE for branch combination. It: 1. Collects features and/or predictions from specified branches 2. Performs horizontal concatenation of features 3. Performs OOF reconstruction for predictions (mandatory unless unsafe=True) 4. Creates a unified “merged” processing in the dataset 5. ALWAYS clears branch contexts and exits branch mode

Supported Keywords:
  • “merge”: Branch merging (features/predictions/both)

  • “merge_sources”: Source merging (multi-source datasets) [Phase 9]

  • “merge_predictions”: Prediction-only late fusion [Phase 9]

OOF Safety:

When predictions are merged, OOF reconstruction is MANDATORY by default. This prevents data leakage when the merged output is used for training. Set unsafe=True to disable OOF (generates prominent warnings).

Relationship to MetaModel:

MetaModel internally uses MergeController for data preparation, then trains the meta-learner. Users can achieve the same result with:

{“merge”: “predictions”}, {“model”: Ridge()}

which is equivalent to:

{“model”: MetaModel(Ridge())}

priority

Controller priority (5 = same as BranchController).

Type:

int

SUPPORTED_KEYWORDS

Set of keywords this controller handles.

SUPPORTED_KEYWORDS = {'merge', 'merge_predictions', 'merge_sources'}
classmethod build_config_from_meta_model(meta_operator: Any, context: ExecutionContext, branch_contexts: List[Dict[str, Any]] | None = None) MergeConfig[source]

Build MergeConfig from MetaModel operator parameters.

Translates MetaModel configuration to an equivalent MergeConfig for use with merge_branches(). This enables MetaModel to delegate to the centralized merge logic.

This is a helper for Phase 7: MetaModel Refactoring.

Parameters:
  • meta_operator – MetaModel operator instance with configuration.

  • context – Execution context with branch info.

  • branch_contexts – Optional branch contexts for branch resolution.

Returns:

MergeConfig equivalent to the MetaModel’s configuration.

Example

>>> config = MergeController.build_config_from_meta_model(
...     meta_operator=meta_model,
...     context=context,
... )
>>> merged_X, info = MergeController.merge_branches(
...     dataset=dataset,
...     context=context,
...     config=config,
...     prediction_store=prediction_store,
... )
execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: List[Tuple[str, Any]] | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, StepOutput][source]

Execute the merge step with keyword dispatch.

Dispatches to appropriate handler based on the step keyword: - “merge”: Branch merging (features/predictions/both) - “merge_sources”: Source merging (Phase 9, not yet implemented) - “merge_predictions”: Prediction-only late fusion (Phase 9, not yet implemented)

Phase 2 implementation provides: - Configuration parsing - Branch validation - Branch mode exit - Keyword dispatch framework

Subsequent phases will add: - Feature collection (Phase 3) - Prediction OOF reconstruction (Phase 4) - Per-branch selection/aggregation (Phase 5) - Source merge implementation (Phase 9)

Parameters:
  • step_info – Parsed step containing merge configuration

  • dataset – Dataset to operate on

  • context – Pipeline execution context

  • runtime_context – Runtime infrastructure context

  • source – Data source index

  • mode – Execution mode (“train” or “predict”)

  • loaded_binaries – Pre-loaded binary objects for prediction mode

  • prediction_store – External prediction store for model predictions

Returns:

Tuple of (updated_context, StepOutput)

Raises:
  • ValueError – If not in branch mode or configuration is invalid.

  • NotImplementedError – If merge_sources or merge_predictions called (Phase 9).

classmethod matches(step: Any, operator: Any, keyword: str) bool[source]

Check if the step matches the merge controller.

Parameters:
  • step – Original step configuration

  • operator – Deserialized operator

  • keyword – Step keyword

Returns:

True if keyword is one of the supported merge keywords.

classmethod merge_branches(dataset: SpectroDataset, context: ExecutionContext, config: MergeConfig, prediction_store: Any | None = None, mode: str = 'train') Tuple[ndarray, Dict[str, Any]][source]

Static method for programmatic merge (used by MetaModel).

This class method allows MetaModelController to delegate to merge logic without going through the full step execution machinery. It provides the core branch merging functionality without modifying the context or requiring a step_info object.

This is the key integration point for Phase 7: MetaModel Refactoring.

Parameters:
  • dataset – SpectroDataset with sample data.

  • context – Execution context with branch_contexts and state.

  • config – MergeConfig specifying what to merge.

  • prediction_store – Prediction storage for model predictions. Required if config.collect_predictions is True.

  • mode – Execution mode (“train” or “predict”).

Returns:

  • merged_features: 2D numpy array (n_samples, n_features)

  • info_dict: Dictionary with merge metadata including:
    • ”merged_shape”: Shape of merged features

    • ”feature_branches_used”: List of branch indices for features

    • ”prediction_branches_used”: List of branch indices for predictions

    • ”models_used”: List of model names (if predictions)

    • ”oof_reconstruction”: Whether OOF was used (if predictions)

    • ”unsafe_merge”: True if unsafe mode was used

Return type:

Tuple of (merged_features, info_dict) where

Raises:
  • ValueError – If not in branch mode or config is invalid.

  • ValueError – If prediction_store is None but predictions requested.

Example

>>> from nirs4all.controllers.data.merge import MergeController
>>> from nirs4all.operators.data.merge import MergeConfig
>>>
>>> # Called from MetaModelController
>>> config = MergeConfig(
...     collect_predictions=True,
...     prediction_branches="all",
... )
>>> merged_X, info = MergeController.merge_branches(
...     dataset=dataset,
...     context=context,
...     config=config,
...     prediction_store=prediction_store,
... )
>>> meta_model.fit(merged_X, y)

Note

Unlike execute(), this method does NOT: - Exit branch mode (caller must handle this if needed) - Modify the context - Add merged features to the dataset - Return a StepOutput

It simply performs the merge computation and returns the result.

priority: int = 5
classmethod supports_prediction_mode() bool[source]

Merge controller should execute in prediction mode.

classmethod use_multi_source() bool[source]

Merge controller supports multi-source datasets.

class nirs4all.controllers.data.MetadataPartitionerController[source]

Bases: OperatorController

Controller for metadata-based branching via partitioning.

This controller creates branches by partitioning samples based on a metadata column. Each branch contains a disjoint subset of samples where the metadata column equals specific value(s).

Key behaviors:
  • Each branch contains a disjoint subset of samples

  • Per-branch cross-validation is supported

  • Branches with too few samples can be skipped (min_samples)

  • Values can be grouped into combined branches (group_values)

  • Models train and predict only on their partition

priority

Controller priority (set to 3 to run before other controllers).

Type:

int

execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: List[Tuple[str, Any]] | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, StepOutput][source]

Execute the metadata partitioner branch step.

Creates branches based on metadata column values, with each branch containing only samples matching specific value(s).

In prediction mode, samples are routed to the correct branch based on their metadata value. Each sample is processed by the branch that matches its metadata value.

Parameters:
  • step_info – Parsed step containing branch definitions

  • dataset – Dataset to operate on

  • context – Pipeline execution context

  • runtime_context – Runtime infrastructure context

  • source – Data source index

  • mode – Execution mode (“train” or “predict”)

  • loaded_binaries – Pre-loaded binary objects for prediction mode

  • prediction_store – External prediction store for model predictions

Returns:

Tuple of (updated_context, StepOutput with collected artifacts)

classmethod matches(step: Any, operator: Any, keyword: str) bool[source]

Check if the step matches the metadata_partitioner branch pattern.

Matches:

{“branch”: […], “by”: “metadata_partitioner”, “column”: “…”}

Parameters:
  • step – Original step configuration

  • operator – Deserialized operator

  • keyword – Step keyword

Returns:

True if this is a metadata_partitioner branch definition.

priority: int = 3
classmethod supports_prediction_mode() bool[source]

Metadata partitioner should execute in prediction mode.

In prediction mode, we need to route samples to the correct branch based on their metadata value.

classmethod use_multi_source() bool[source]

Metadata partitioner operates on dataset level.

class nirs4all.controllers.data.OutlierExcluderController[source]

Bases: OperatorController

Controller for sample-based branching with outlier exclusion strategies.

This controller creates multiple branches, each with a different outlier exclusion strategy. Samples identified as outliers are excluded from training in that branch, but predictions still cover all samples.

Key behaviors:
  • Each branch applies a different outlier detection method

  • Outlier detection runs on training data only

  • Exclusion is per-branch (tracked in context, not in indexer)

  • Predictions include exclusion metadata for analysis

  • Branch 0 with None strategy serves as baseline

priority

Controller priority (set to 4 to run before regular branch controller).

Type:

int

execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: List[Tuple[str, Any]] | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, StepOutput][source]

Execute the outlier excluder branch step.

Creates branches for each outlier exclusion strategy. In train mode, applies outlier detection and marks exclusions. In predict mode, reconstructs branch contexts without applying exclusions.

Parameters:
  • step_info – Parsed step containing branch definitions

  • dataset – Dataset to operate on

  • context – Pipeline execution context

  • runtime_context – Runtime infrastructure context

  • source – Data source index

  • mode – Execution mode (“train” or “predict”)

  • loaded_binaries – Pre-loaded binary objects for prediction mode

  • prediction_store – External prediction store for model predictions

Returns:

Tuple of (updated_context, StepOutput with collected artifacts)

classmethod matches(step: Any, operator: Any, keyword: str) bool[source]

Check if the step matches the outlier excluder branch pattern.

Matches:

{“branch”: {“by”: “outlier_excluder”, “strategies”: […]}}

Parameters:
  • step – Original step configuration

  • operator – Deserialized operator

  • keyword – Step keyword

Returns:

True if this is an outlier_excluder branch definition.

priority: int = 4
classmethod supports_prediction_mode() bool[source]

Outlier excluder should execute in prediction mode.

In prediction mode, we need to reconstruct the branch contexts but NOT apply sample exclusion (we predict on all samples).

classmethod use_multi_source() bool[source]

Outlier excluder operates on dataset level.

class nirs4all.controllers.data.RepToPPController[source]

Bases: OperatorController

Controller for transforming repetitions into additional preprocessings.

This controller handles the rep_to_pp pipeline keyword, which groups samples by a metadata column and reshapes each repetition into a preprocessing dimension.

Before: n_sources × (n_samples, n_pp, n_features) After: n_sources × (n_unique_samples, n_pp × n_reps, n_features)

This enables:
  • Multi-preprocessing input for models like NiConNet

  • Repetition-as-preprocessing fusion strategies

  • Consistent sample count for cross-validation

priority

Controller priority (3 = early, before CV).

Type:

int

execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: List[Tuple[str, Any]] | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, StepOutput][source]

Execute rep_to_pp transformation.

Reshapes the dataset by grouping samples by the specified column and stacking repetitions into the preprocessing dimension.

Parameters:
  • step_info – Parsed step containing rep_to_pp configuration

  • dataset – Dataset to transform

  • context – Pipeline execution context

  • runtime_context – Runtime infrastructure context

  • source – Data source index (not used, operates on all sources)

  • mode – Execution mode (“train” or “predict”)

  • loaded_binaries – Pre-loaded binary objects (not used)

  • prediction_store – External prediction store (not used)

Returns:

Tuple of (context, StepOutput with transformation info)

Raises:

ValueError – If column not found or groups have unequal sizes and on_unequal=”error”.

classmethod matches(step: Any, operator: Any, keyword: str) bool[source]

Check if the step matches the rep_to_pp controller.

Parameters:
  • step – Original step configuration

  • operator – Deserialized operator

  • keyword – Step keyword

Returns:

True if keyword is “rep_to_pp”

priority: int = 3
classmethod supports_prediction_mode() bool[source]

Repetition transformation should NOT run in prediction mode.

The transformation happens once during training. During prediction, the model expects the same structure that was used during training.

classmethod use_multi_source() bool[source]

This controller operates on the whole dataset, not per-source.

class nirs4all.controllers.data.RepToSourcesController[source]

Bases: OperatorController

Controller for transforming repetitions into separate data sources.

This controller handles the rep_to_sources pipeline keyword, which groups samples by a metadata column (typically sample ID) and reshapes each repetition index into a separate data source.

Before: 1 source × (n_samples, n_pp, n_features) After: n_reps sources × (n_unique_samples, n_pp, n_features)

This enables:
  • Per-repetition preprocessing via source_branch

  • Multi-source modeling strategies

  • Repetition-aware feature fusion

priority

Controller priority (3 = early, before CV).

Type:

int

execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: List[Tuple[str, Any]] | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, StepOutput][source]

Execute rep_to_sources transformation.

Reshapes the dataset by grouping samples by the specified column and creating one source per repetition index.

Parameters:
  • step_info – Parsed step containing rep_to_sources configuration

  • dataset – Dataset to transform

  • context – Pipeline execution context

  • runtime_context – Runtime infrastructure context

  • source – Data source index (not used, operates on all sources)

  • mode – Execution mode (“train” or “predict”)

  • loaded_binaries – Pre-loaded binary objects (not used)

  • prediction_store – External prediction store (not used)

Returns:

Tuple of (context, StepOutput with transformation info)

Raises:

ValueError – If column not found or groups have unequal sizes and on_unequal=”error”.

classmethod matches(step: Any, operator: Any, keyword: str) bool[source]

Check if the step matches the rep_to_sources controller.

Parameters:
  • step – Original step configuration

  • operator – Deserialized operator

  • keyword – Step keyword

Returns:

True if keyword is “rep_to_sources”

priority: int = 3
classmethod supports_prediction_mode() bool[source]

Repetition transformation should NOT run in prediction mode.

The transformation happens once during training. During prediction, the model expects the same structure that was used during training. The controller should be skipped in prediction mode - the user must ensure prediction data has the same structure as training data after transformation.

classmethod use_multi_source() bool[source]

This controller operates on the whole dataset, not per-source.

class nirs4all.controllers.data.ResamplerController[source]

Bases: OperatorController

Controller for Resampler operators.

This controller: 1. Extracts wavelengths from dataset headers 2. Validates that headers are convertible to float (wavelengths in cm-1) 3. Fits the resampler with original wavelengths 4. Transforms all data to the target wavelength grid 5. Updates dataset with new features and headers 6. Supports multi-source datasets with per-source or shared parameters

execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: List[Tuple[str, Any]] | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, List][source]

Execute resampling operation.

Parameters:
  • step_info – Pipeline step configuration

  • dataset – Dataset to operate on

  • context – Pipeline execution context

  • runtime_context – Runtime context

  • source – Data source index (-1 for all sources)

  • mode – Execution mode (“train” or “predict”)

  • loaded_binaries – Pre-loaded binary objects for prediction mode

  • prediction_store – External prediction store (unused)

Returns:

Tuple of (updated_context, fitted_resamplers)

classmethod matches(step: Any, operator: Any, keyword: str) bool[source]

Match Resampler objects.

priority: int = 5
classmethod supports_prediction_mode() bool[source]

Resampler supports prediction mode.

classmethod use_multi_source() bool[source]

Resampler supports multi-source datasets.

class nirs4all.controllers.data.SampleAugmentationController[source]

Bases: OperatorController

Sample Augmentation Controller with delegation pattern.

This controller orchestrates sample augmentation by: 1. Calculating augmentation distribution (standard or balanced mode) 2. Creating transformer→samples mapping 3. Emitting ONE run_step per transformer with target samples

The actual augmentation work is delegated to TransformerMixinController.

execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: Any | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, List][source]

Execute sample augmentation with standard or balanced mode.

Step format for standard mode:
{
“sample_augmentation”: {

“transformers”: [transformer1, transformer2, …], “count”: int, “selection”: “random” or “all”, # Default “random” “random_state”: int # Optional

}

}

Step format for balanced mode (choose one balancing strategy):

Mode 1 - Fixed target size per class: {

“sample_augmentation”: {

“transformers”: […], “balance”: “y” or “metadata_column”, # Default “y” “target_size”: int, # Fixed target samples per class “selection”: “random” or “all”, “random_state”: int

}

}

Mode 2 - Multiplier for augmentation: {

“sample_augmentation”: {

“transformers”: […], “balance”: “y” or “metadata_column”, “max_factor”: float, # Multiplier (e.g., 3 means class grows 3x) “selection”: “random” or “all”, “random_state”: int

}

}

Mode 3 - Percentage of majority class: {

“sample_augmentation”: {

“transformers”: […], “balance”: “y” or “metadata_column”, “ref_percentage”: float, # Target as % of majority (0.0-1.0) “selection”: “random” or “all”, “random_state”: int

}

}

Binning for regression (automatic when balance=”y” and task is regression):
{
“sample_augmentation”: {

“transformers”: […], “balance”: “y”, “bins”: int, # Number of virtual classes (default: 10) “binning_strategy”: “equal_width” or “quantile”, # Default: “equal_width” “max_factor”: float, # Choose one balancing mode “selection”: “random” or “all”, “random_state”: int

}

}

classmethod matches(step: Any, operator: Any, keyword: str) bool[source]

Check if the operator matches the step and keyword.

static normalize_generator_spec(spec: Any) Any[source]

Normalize generator spec for sample_augmentation context.

In sample_augmentation context, multi-selection should use combinations by default since the order of transformers doesn’t matter. Translates legacy ‘size’ to ‘pick’ for explicit semantics.

Parameters:

spec – Generator specification (may contain _or_, size, pick, arrange).

Returns:

Normalized spec with ‘size’ converted to ‘pick’ if needed.

priority: int = 10
classmethod supports_prediction_mode() bool[source]

Sample augmentation only runs during training.

classmethod use_multi_source() bool[source]

Check if the operator supports multi-source datasets.

class nirs4all.controllers.data.SampleFilterController[source]

Bases: OperatorController

Controller for sample filtering operations.

This controller orchestrates sample filtering by: 1. Retrieving train samples (base only, no augmented) and their X/y values 2. Applying each filter’s get_mask() method to identify outliers 3. Combining masks according to the specified mode (any/all) 4. Marking excluded samples in the dataset’s indexer 5. Generating filtering report (optional)

Sample filters are non-destructive - they mark samples as excluded in the indexer rather than removing data. Excluded samples can be re-included using dataset._indexer.mark_included().

Pipeline syntax:
{
“sample_filter”: {
“filters”: [

YOutlierFilter(method=”iqr”, threshold=1.5), XOutlierFilter(method=”mahalanobis”),

], “mode”: “any”, # “any” = exclude if ANY filter flags “report”: True, # Generate filtering report “cascade_to_augmented”: True, # Also exclude augmented samples

}

}

Note

Filtering only runs during training mode - in prediction mode, this controller does nothing to avoid excluding prediction samples.

execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: List[Tuple[str, Any]] | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, List][source]

Execute sample filtering operation.

This method: 1. Retrieves training data (base samples only) 2. Fits and applies each filter to identify outliers 3. Combines filter masks using the specified mode 4. Marks excluded samples in the dataset’s indexer 5. Optionally prints a filtering report

Parameters:
  • step_info – Parsed step containing operator and configuration

  • dataset – Dataset to operate on

  • context – Pipeline execution context

  • runtime_context – Runtime infrastructure context

  • source – Data source index (unused, filtering is dataset-level)

  • mode – Execution mode (“train” or “predict”)

  • loaded_binaries – Pre-loaded binaries (filters may be persisted)

  • prediction_store – External prediction store (unused)

Returns:

Tuple of (updated_context, persisted_artifacts)

Raises:
classmethod matches(step: Any, operator: Any, keyword: str) bool[source]

Match sample_filter keyword in pipeline.

priority: int = 5
classmethod supports_prediction_mode() bool[source]

Sample filtering only runs during training.

Prediction samples should never be filtered/excluded - we want to predict on all provided samples. Filters were fitted during training and their thresholds don’t apply to new data.

classmethod use_multi_source() bool[source]

Sample filtering operates on the dataset level, not per-source.

class nirs4all.controllers.data.SamplePartitionerController[source]

Bases: OperatorController

Controller for sample-based branching via partitioning.

This controller creates two branches by partitioning samples based on a filter (e.g., outlier detection). Each branch contains a different subset of samples:

  • “outliers” branch: samples where filter returns False (outliers)

  • “inliers” branch: samples where filter returns True (non-outliers)

Unlike OutlierExcluderController which only excludes from training, this controller truly partitions the samples so each branch trains and predicts only on its subset.

Key behaviors:
  • Each branch contains a disjoint subset of samples

  • Samples are partitioned, not excluded

  • Models train and predict only on their partition

  • Supports Y-outlier and X-outlier detection methods

priority

Controller priority (set to 3 to run before outlier excluder).

Type:

int

execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: List[Tuple[str, Any]] | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, StepOutput][source]

Execute the sample partitioner branch step.

Creates two branches: one for outliers and one for inliers. Each branch contains only its subset of samples.

Parameters:
  • step_info – Parsed step containing branch definitions

  • dataset – Dataset to operate on

  • context – Pipeline execution context

  • runtime_context – Runtime infrastructure context

  • source – Data source index

  • mode – Execution mode (“train” or “predict”)

  • loaded_binaries – Pre-loaded binary objects for prediction mode

  • prediction_store – External prediction store for model predictions

Returns:

Tuple of (updated_context, StepOutput with collected artifacts)

classmethod matches(step: Any, operator: Any, keyword: str) bool[source]

Check if the step matches the sample_partitioner branch pattern.

Matches:

{“branch”: {“by”: “sample_partitioner”, “filter”: {…}}}

Parameters:
  • step – Original step configuration

  • operator – Deserialized operator

  • keyword – Step keyword

Returns:

True if this is a sample_partitioner branch definition.

priority: int = 3
classmethod supports_prediction_mode() bool[source]

Sample partitioner should execute in prediction mode.

In prediction mode, we need to reconstruct the branch contexts and apply the same sample partitioning.

classmethod use_multi_source() bool[source]

Sample partitioner operates on dataset level.

class nirs4all.controllers.data.SourceBranchConfigParser[source]

Bases: object

Parser for source_branch step configurations.

Handles multiple syntax formats for source branching and normalizes them to SourceBranchConfig.

Supported syntaxes:
  • Simple string: “auto” (isolate each source)

  • Dict with source names: {“NIR”: [steps], “markers”: [steps]}

  • Dict with indices: {0: [steps], 1: [steps]}

  • Dict with special keys: {“_default_”: [steps], “_merge_after_”: False}

classmethod parse(raw_config: Any) SourceBranchConfig[source]

Parse raw source_branch configuration into SourceBranchConfig.

Parameters:

raw_config – The value from {“source_branch”: raw_config}

Returns:

Normalized SourceBranchConfig instance.

Raises:

ValueError – If configuration format is invalid.

class nirs4all.controllers.data.SourceBranchController[source]

Bases: OperatorController

Controller for per-source pipeline execution.

This controller enables per-source pipeline execution for multi-source datasets. Each data source gets its own independent processing pipeline.

Key behaviors:
  • Creates per-source execution contexts

  • Executes source-specific pipelines

  • Stores source contexts for subsequent steps or auto-merge

  • Optionally auto-merges sources after processing

Unlike regular BranchController:
  • Operates on the data provenance dimension (sources), not execution paths

  • Each source’s data is isolated during its pipeline execution

  • Sources can have completely different preprocessing chains

  • Designed for multi-modal data (NIR, markers, Raman, etc.)

priority

Controller priority (5 = same as BranchController).

Type:

int

execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: List[Tuple[str, Any]] | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, StepOutput][source]

Execute source branch step.

For each source, runs a specific sub-pipeline (if defined) and updates the processing context. Uses existing infrastructure:

  1. Get source names and current processing chains

  2. For each source with a defined pipeline: - Create a context with processing limited to that source - Run the sub-pipeline steps - Collect artifacts

  3. Update context with new processing chains

  4. Optionally auto-merge sources

The TransformerController will naturally apply transforms only to the source whose processing is in the context.

Parameters:
  • step_info – Parsed step containing source_branch configuration

  • dataset – Dataset to operate on (must have multiple sources)

  • context – Pipeline execution context

  • runtime_context – Runtime infrastructure context

  • source – Data source index

  • mode – Execution mode (“train” or “predict”)

  • loaded_binaries – Pre-loaded binary objects for prediction mode

  • prediction_store – External prediction store

Returns:

Tuple of (updated_context, StepOutput with artifacts)

Raises:

ValueError – If dataset has only one source.

classmethod matches(step: Any, operator: Any, keyword: str) bool[source]

Check if the step matches the source_branch controller.

Parameters:
  • step – Original step configuration

  • operator – Deserialized operator

  • keyword – Step keyword

Returns:

True if keyword is “source_branch”

priority: int = 5
classmethod supports_prediction_mode() bool[source]

Source branch controller should execute in prediction mode.

classmethod use_multi_source() bool[source]

Source branch controller supports multi-source datasets.