nirs4all.pipeline package

Subpackages

Submodules

Module contents

Pipeline module for nirs4all package.

This module contains pipeline classes for processing workflows.

Phase 5 Additions:
  • TraceBasedExtractor: Extracts minimal pipeline from execution trace

  • MinimalPipeline: Minimal pipeline ready for prediction replay

  • MinimalPipelineStep: A single step in the minimal pipeline

  • MinimalPredictor: Executes minimal pipeline for efficient prediction

  • MinimalArtifactProvider: Provides artifacts from MinimalPipeline

Phase 6 Additions:
  • BundleGenerator: Creates standalone prediction bundles (.n4a, .n4a.py)

  • BundleLoader: Loads and predicts from exported bundles

  • BundleFormat: Enumeration of supported bundle formats

  • BundleMetadata: Bundle metadata structure

Phase 7 Additions:
  • Retrainer: Handles retraining with full/transfer/finetune modes

  • RetrainMode: Enumeration of retrain modes

  • StepMode: Per-step mode override for fine-grained control

  • ExtractedPipeline: Extracted pipeline for inspection/modification

  • RetrainArtifactProvider: Artifact provider respecting retrain modes

class nirs4all.pipeline.ArtifactProvider[source]

Bases: ABC

Abstract interface for providing artifacts during prediction replay.

The ArtifactProvider enables controller-agnostic artifact injection: controllers request artifacts by step index rather than by name matching, which is deterministic and works with any controller type.

This interface is used during prediction mode to provide pre-loaded artifacts (transformers, models, etc.) to controllers without requiring them to know about the artifact storage system.

Implementations:
  • MapArtifactProvider: In-memory dictionary-based provider

  • LoaderArtifactProvider: Wraps ArtifactLoader for lazy loading

Example

>>> provider = MapArtifactProvider(artifact_map)
>>> artifacts = provider.get_artifacts_for_step(step_index=2)
>>> for artifact_id, obj in artifacts:
...     process(obj)
abstractmethod get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]

Get a single artifact for a step.

Parameters:
  • step_index – 1-based step index

  • fold_id – Optional fold ID for fold-specific artifacts

Returns:

Artifact object or None if not found

get_artifact_by_chain(chain_path: str) Any | None[source]

Get artifact by V3 chain path (optional V3 method).

Parameters:

chain_path – Full operator chain path (e.g., “s1.MinMaxScaler>s3.PLS”)

Returns:

Artifact object or None if not found

get_artifacts_for_chain_prefix(chain_prefix: str) List[Tuple[str, Any]][source]

Get all artifacts matching a chain path prefix (optional V3 method).

Parameters:

chain_prefix – Chain path prefix to match

Returns:

List of (chain_path, artifact_object) tuples

abstractmethod get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None, branch_id: int | None = None, source_index: int | None = None, substep_index: int | None = None) List[Tuple[str, Any]][source]

Get all artifacts for a step.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter

  • branch_id – Optional branch ID filter

  • source_index – Optional source/dataset index filter for multi-source

  • substep_index – Optional substep index filter for branch substeps

Returns:

List of (artifact_id, artifact_object) tuples

abstractmethod get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]

Get all fold-specific artifacts for a step.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter

Returns:

List of (fold_id, artifact_object) tuples, sorted by fold_id

get_primary_artifact(step_index: int) Any | None[source]

Get the primary artifact for a step.

The primary artifact is typically the main model or transformer for the step. Default implementation returns the first artifact.

Parameters:

step_index – 1-based step index

Returns:

Primary artifact object or None if not found

abstractmethod has_artifacts_for_step(step_index: int) bool[source]

Check if artifacts exist for a step.

Parameters:

step_index – 1-based step index

Returns:

True if artifacts are available for this step

class nirs4all.pipeline.BundleFormat(value)[source]

Bases: str, Enum

Supported bundle export formats.

N4A

Full ZIP bundle with all artifacts and metadata

N4A_PY

Portable Python script with embedded artifacts

N4A = 'n4a'
N4A_PY = 'n4a.py'
class nirs4all.pipeline.BundleGenerator(workspace_path: str | Path, verbose: int = 0)[source]

Bases: object

Generate standalone prediction bundles from trained pipelines.

This class exports trained pipelines to bundle formats that can be used for deployment, sharing, or archival without requiring the original workspace or full nirs4all installation.

workspace_path

Path to the workspace root

resolver

PredictionResolver for resolving prediction sources

verbose

Verbosity level for logging

Example

>>> generator = BundleGenerator(workspace_path)
>>> generator.export(best_prediction, "model.n4a")
>>>
>>> # Export to portable script
>>> generator.export(best_prediction, "model.n4a.py", format="n4a.py")
export(source: Dict[str, Any] | str | Path, output_path: str | Path, format: str | BundleFormat = BundleFormat.N4A, include_metadata: bool = True, compress: bool = True) Path[source]

Export a prediction source to a bundle.

Parameters:
  • source – Prediction source (prediction dict, folder path, etc.)

  • output_path – Path for the output bundle

  • format – Bundle format (‘n4a’ or ‘n4a.py’)

  • include_metadata – Whether to include full metadata in bundle

  • compress – Whether to compress artifacts (for .n4a format)

Returns:

Path to the created bundle

Raises:
class nirs4all.pipeline.BundleLoader(bundle_path: str | Path)[source]

Bases: object

Load and use prediction bundles.

Provides functionality for loading .n4a bundles, extracting metadata, and running predictions.

bundle_path

Path to the bundle file

metadata

Bundle metadata

trace

Execution trace (if available)

pipeline_config

Pipeline configuration

fold_weights

Fold weights for CV ensemble

artifact_provider

Provider for artifacts

Example

>>> loader = BundleLoader("model.n4a")
>>> print(f"Pipeline: {loader.metadata.pipeline_uid}")
>>> print(f"Preprocessing: {loader.metadata.preprocessing_chain}")
>>> y_pred = loader.predict(X_new)
get_chain_for_artifact(artifact_key: str) OperatorChain | None[source]

Get the operator chain for an artifact from the bundle.

Parameters:

artifact_key – Artifact key (e.g., “step_1”, “step_4_fold0”)

Returns:

OperatorChain for the artifact or None if not found

get_merged_chains(import_context_chain: OperatorChain, step_offset: int = 0) Dict[str, OperatorChain][source]

Get all artifact chains merged with an import context chain.

Used when importing a bundle into another pipeline. Each artifact’s chain is prefixed with the import context chain.

Parameters:
  • import_context_chain – Chain from the importing pipeline context

  • step_offset – Step offset to apply to bundle steps

Returns:

Dict mapping artifact keys to merged chains

get_partitioner_routing(step_index: int | None = None) Dict[str, Any] | None[source]

Get partitioner routing info for a specific step or all steps.

Parameters:

step_index – Specific step index, or None for all

Returns:

Routing info dict or None

get_required_metadata_columns() List[str][source]

Get the metadata columns required for prediction routing.

Returns:

List of column names needed for routing, empty if no routing needed.

get_step_info() List[Dict[str, Any]][source]

Get information about steps in the bundle.

Returns:

List of step info dictionaries

has_partitioner_routing() bool[source]

Check if the bundle has metadata partitioner routing info.

Returns:

True if the bundle contains partitioner routing configuration.

import_artifacts_to_registry(registry: ArtifactRegistry, import_context_chain: OperatorChain | None = None, step_offset: int = 0, new_pipeline_id: str | None = None) Dict[str, str][source]

Import bundle artifacts into an artifact registry.

Registers all artifacts from this bundle into the target registry, optionally merging with an import context chain for proper V3 tracking.

Parameters:
  • registry – Target ArtifactRegistry to import into

  • import_context_chain – Optional chain from import context to prefix

  • step_offset – Step offset for imported artifacts

  • new_pipeline_id – New pipeline ID for imported artifacts

Returns:

Dict mapping original artifact keys to new artifact IDs

predict(X: ndarray, branch_path: List[int] | None = None) ndarray[source]

Run prediction on input data.

Applies all preprocessing steps and model(s) from the bundle. Supports branching pipelines, meta-models (stacking), and CV ensembles.

Parameters:
  • X – Input features as numpy array

  • branch_path – Optional branch path filter for multi-branch pipelines

Returns:

Predictions as numpy array

predict_with_metadata(X: ndarray, metadata: Dict[str, ndarray], fallback_branch: int | None = None) ndarray[source]

Run prediction with metadata-based sample routing.

For bundles with metadata partitioner branches, this method routes each sample to the appropriate branch based on its metadata value. Each sample is processed by the transformers and models from its matching branch.

Parameters:
  • X – Input features as numpy array (n_samples, n_features)

  • metadata – Dict mapping column names to value arrays. Must include the column used for partitioning during training.

  • fallback_branch – Optional branch ID to use for samples with unknown metadata values. If None, raises error for unknowns.

Returns:

Predictions as numpy array

Raises:

ValueError – If required metadata column is missing or samples have unknown values without fallback.

Example

>>> loader = BundleLoader("model.n4a")
>>> X_new = np.random.randn(100, 500)
>>> metadata = {"site": np.array(["A"]*50 + ["B"]*50)}
>>> y_pred = loader.predict_with_metadata(X_new, metadata)
to_resolved_prediction() Any[source]

Convert bundle to ResolvedPrediction for use with Predictor.

Returns:

ResolvedPrediction instance

class nirs4all.pipeline.BundleMetadata(bundle_format_version: str = '1.0', nirs4all_version: str = '', created_at: str = '', pipeline_uid: str = '', source_type: str = '', model_step_index: int | None = None, fold_strategy: str = 'weighted_average', preprocessing_chain: str = '', trace_id: str | None = None, original_manifest: Dict[str, ~typing.Any]=<factory>, partitioner_routing: Dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Metadata for a prediction bundle.

Contains information about the bundle format, source, and contents.

bundle_format_version

Version of the bundle format

Type:

str

nirs4all_version

Version of nirs4all that created the bundle

Type:

str

created_at

ISO timestamp of bundle creation

Type:

str

pipeline_uid

UID of the source pipeline

Type:

str

source_type

Type of source that was exported

Type:

str

model_step_index

Index of the model step

Type:

int | None

fold_strategy

Strategy for combining fold predictions

Type:

str

preprocessing_chain

Summary of preprocessing steps

Type:

str

trace_id

ID of the execution trace (if available)

Type:

str | None

original_manifest

Subset of original manifest metadata

Type:

Dict[str, Any]

partitioner_routing

Routing info for metadata partitioner branches

Type:

Dict[str, Any]

bundle_format_version: str = '1.0'
created_at: str = ''
fold_strategy: str = 'weighted_average'
classmethod from_dict(data: Dict[str, Any]) BundleMetadata[source]

Create BundleMetadata from dictionary.

Parameters:

data – Dictionary from bundle manifest.json

Returns:

BundleMetadata instance

model_step_index: int | None = None
nirs4all_version: str = ''
original_manifest: Dict[str, Any]
partitioner_routing: Dict[str, Any]
pipeline_uid: str = ''
preprocessing_chain: str = ''
source_type: str = ''
trace_id: str | None = None
class nirs4all.pipeline.ExecutionStep(step_index: int, operator_type: str = '', operator_class: str = '', operator_config: Dict[str, ~typing.Any]=<factory>, execution_mode: StepExecutionMode = StepExecutionMode.TRAIN, artifacts: StepArtifacts = <factory>, branch_path: List[int] = <factory>, branch_name: str = '', duration_ms: float = 0.0, metadata: Dict[str, ~typing.Any]=<factory>, input_chain_path: str = '', output_chain_paths: List[str] = <factory>, source_count: int = 1, produces_branches: bool = False, substep_index: int | None = None, input_shape: Tuple[int, int] | None=None, output_shape: Tuple[int, int] | None=None, input_features_shape: Tuple[int, int, int]] | None=None, output_features_shape: Tuple[int, int, int]] | None=None)[source]

Bases: object

Record of a single step’s execution in the trace (V3).

Captures all information needed to replay this step during prediction, including operator configuration, execution mode, and produced artifacts.

V3 additions: - input_chain: Operator chain up to this step’s input - output_chains: Chains produced by this step (for branching) - source_count: Number of X sources at this step - produces_branches: Whether this is a branch operator

step_index

1-based step number in the pipeline

Type:

int

operator_type

Type of operation (e.g., “transform”, “model”, “splitter”)

Type:

str

operator_class

Class name of the operator (e.g., “PLSRegression”, “SNV”)

Type:

str

operator_config

Serialized operator configuration

Type:

Dict[str, Any]

execution_mode

How the step was executed (train/predict/skip)

Type:

nirs4all.pipeline.trace.execution_trace.StepExecutionMode

artifacts

Artifacts produced by this step

Type:

nirs4all.pipeline.trace.execution_trace.StepArtifacts

branch_path

Branch indices if in a branch context

Type:

List[int]

branch_name

Human-readable branch name

Type:

str

duration_ms

Execution duration in milliseconds

Type:

float

metadata

Additional step-specific metadata

Type:

Dict[str, Any]

# V3 chain tracking
input_chain_path

Serialized operator chain up to this step’s input

Type:

str

output_chain_paths

List of chains produced by this step

Type:

List[str]

source_count

Number of X sources processed

Type:

int

produces_branches

True if this is a branch operator

Type:

bool

substep_index

Index within substep (for [model1, model2])

Type:

int | None

add_output_chain(chain_path: str) None[source]

Add an output chain path to this step.

Parameters:

chain_path – Operator chain path to add

artifacts: StepArtifacts
branch_name: str = ''
branch_path: List[int]
duration_ms: float = 0.0
execution_mode: StepExecutionMode = 'train'
classmethod from_dict(data: Dict[str, Any]) ExecutionStep[source]

Create ExecutionStep from dictionary.

Parameters:

data – Dictionary from manifest

Returns:

ExecutionStep instance

has_artifacts() bool[source]

Check if this step produced any artifacts.

Returns:

True if the step has at least one artifact

input_chain_path: str = ''
input_features_shape: List[Tuple[int, int, int]] | None = None
input_shape: Tuple[int, int] | None = None
metadata: Dict[str, Any]
operator_class: str = ''
operator_config: Dict[str, Any]
operator_type: str = ''
output_chain_paths: List[str]
output_features_shape: List[Tuple[int, int, int]] | None = None
output_shape: Tuple[int, int] | None = None
produces_branches: bool = False
source_count: int = 1
step_index: int
substep_index: int | None = None
to_dict() Dict[str, Any][source]

Convert to dictionary for YAML serialization.

Returns:

Dictionary suitable for manifest storage

class nirs4all.pipeline.ExecutionTrace(trace_id: str = <factory>, pipeline_uid: str = '', created_at: str = <factory>, steps: List[ExecutionStep] = <factory>, model_step_index: int | None = None, fold_weights: Dict[int, float] | None=None, preprocessing_chain: str = '', metadata: Dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Complete trace of a pipeline execution path.

Records the exact sequence of steps and artifacts that produced a prediction, enabling deterministic replay for prediction, transfer, and export.

The trace is controller-agnostic: it records what happened without encoding specific controller logic, so any controller (existing or custom) can be replayed using the same infrastructure.

trace_id

Unique identifier for this trace

Type:

str

pipeline_uid

Parent pipeline UID

Type:

str

created_at

ISO timestamp of trace creation

Type:

str

steps

Ordered list of execution steps

Type:

List[nirs4all.pipeline.trace.execution_trace.ExecutionStep]

model_step_index

Index of the model step that produced predictions

Type:

int | None

fold_weights

Per-fold weights for CV ensemble (None for single model)

Type:

Dict[int, float] | None

preprocessing_chain

Summary of preprocessing steps for quick reference

Type:

str

metadata

Additional trace metadata (e.g., dataset info, run parameters)

Type:

Dict[str, Any]

add_step(step: ExecutionStep) None[source]

Add a step to the trace.

Parameters:

step – ExecutionStep to add

created_at: str
finalize(preprocessing_chain: str | None = None, metadata: Dict[str, Any] | None = None) None[source]

Finalize the trace with summary information.

Call this after all steps have been recorded to add summary info.

Parameters:
  • preprocessing_chain – Summary string of preprocessing (e.g., “SNV>SG>MinMax”)

  • metadata – Additional metadata to merge

fold_weights: Dict[int, float] | None = None
classmethod from_dict(data: Dict[str, Any]) ExecutionTrace[source]

Create ExecutionTrace from dictionary.

Parameters:

data – Dictionary from manifest

Returns:

ExecutionTrace instance

get_artifact_ids() List[str][source]

Get all artifact IDs in this trace.

Returns:

List of all artifact IDs across all steps

get_artifacts_by_step(step_index: int) StepArtifacts | None[source]

Get artifacts for a specific step.

Parameters:

step_index – 1-based step index

Returns:

StepArtifacts or None if step not found

get_fold_artifact_ids() Dict[int, str][source]

Get per-fold model artifact IDs.

Returns:

Dictionary of fold_id -> artifact_id

get_model_artifact_id() str | None[source]

Get the primary model artifact ID.

Returns:

Model artifact ID or None if no model step

get_step(step_index: int) ExecutionStep | None[source]

Get a step by its index.

Parameters:

step_index – 1-based step index to find

Returns:

ExecutionStep or None if not found

get_steps_before(step_index: int) List[ExecutionStep][source]

Get all steps before a given step index.

Parameters:

step_index – 1-based step index (exclusive)

Returns:

List of steps with step_index < given index

get_steps_up_to_model() List[ExecutionStep][source]

Get all steps up to and including the model step.

Returns:

List of steps needed to reproduce the prediction

metadata: Dict[str, Any]
model_step_index: int | None = None
pipeline_uid: str = ''
preprocessing_chain: str = ''
set_model_step(step_index: int, fold_weights: Dict[int, float] | None = None) None[source]

Set the model step index and optional fold weights.

Parameters:
  • step_index – Index of the model step

  • fold_weights – Optional per-fold weights for CV

steps: List[ExecutionStep]
to_dict() Dict[str, Any][source]

Convert to dictionary for YAML serialization.

Returns:

Dictionary suitable for manifest storage

trace_id: str
class nirs4all.pipeline.Explainer(runner: PipelineRunner)[source]

Bases: object

Handles SHAP explanation generation for trained models.

This class manages the explanation workflow: loading saved models, replaying pipelines to capture the trained model, and generating SHAP explanations with visualizations.

runner

Parent PipelineRunner instance

saver

File saver for managing outputs

manifest_manager

Manager for pipeline manifests

pipeline_uid

Unique identifier for the pipeline

artifact_loader

Loader for trained model artifacts

config_path

Path to the pipeline configuration

target_model

Metadata for the target model

captured_model

Tuple of (model, controller) captured during replay

capture_model(model: Any, controller: Any)[source]

Capture a model during pipeline execution for SHAP analysis.

This method is called by the model controller during explain mode to capture the trained model instance.

Parameters:
  • model – Trained model instance

  • controller – Controller that trained the model

explain(prediction_obj: Dict[str, Any] | str, dataset: DatasetConfigs | SpectroDataset | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], dataset_name: str = 'explain_dataset', shap_params: Dict[str, Any] | None = None, verbose: int = 0, plots_visible: bool = True) Tuple[Dict[str, Any], str][source]

Generate SHAP explanations for a saved model.

Parameters:
  • prediction_obj – Model identifier (dict with config_path or prediction ID)

  • dataset – Dataset to explain on

  • dataset_name – Name for the dataset

  • shap_params – SHAP configuration parameters

  • verbose – Verbosity level

  • plots_visible – Whether to display plots interactively

Returns:

Tuple of (shap_results_dict, output_directory_path)

Example

>>> explainer = Explainer(runner)
>>> shap_results, out_dir = explainer.explain(
...     {"config_path": "0001_abc123"},
...     X_test,
...     shap_params={"n_samples": 200, "visualizations": ["spectral", "summary"]}
... )
class nirs4all.pipeline.ExtractedPipeline(steps: List[Any] = <factory>, trace: ExecutionTrace | None = None, artifact_provider: ArtifactProvider | None = None, model_step_index: int | None = None, preprocessing_chain: str = '', source_pipeline_uid: str = '', metadata: Dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Extracted pipeline for inspection and modification.

Represents a pipeline extracted from a trained prediction, ready for inspection, modification, or re-execution.

steps

List of pipeline steps (can be modified)

Type:

List[Any]

trace

Original execution trace (read-only)

Type:

nirs4all.pipeline.trace.execution_trace.ExecutionTrace | None

artifact_provider

Provider for original artifacts

Type:

nirs4all.pipeline.config.context.ArtifactProvider | None

model_step_index

Index of the model step

Type:

int | None

preprocessing_chain

Summary of preprocessing

Type:

str

source_pipeline_uid

UID of the source pipeline

Type:

str

metadata

Additional metadata

Type:

Dict[str, Any]

artifact_provider: ArtifactProvider | None = None
get_model_step() Any | None[source]

Get the model step.

Returns:

Model step configuration or None

get_step(index: int) Any[source]

Get a step by 0-based index.

Parameters:

index – 0-based step index

Returns:

Step configuration

metadata: Dict[str, Any]
model_step_index: int | None = None
preprocessing_chain: str = ''
set_model(model: Any) None[source]

Replace the model in the model step.

Parameters:

model – New model to use

set_step(index: int, step: Any) None[source]

Set a step by 0-based index.

Parameters:
  • index – 0-based step index

  • step – New step configuration

source_pipeline_uid: str = ''
steps: List[Any]
trace: ExecutionTrace | None = None
class nirs4all.pipeline.FoldStrategy(value)[source]

Bases: str, Enum

Strategy for combining fold predictions in CV ensembles.

AVERAGE

Simple average of fold predictions

WEIGHTED_AVERAGE

Weighted average using fold weights

SINGLE

Use a single fold’s prediction

AVERAGE = 'average'
SINGLE = 'single'
WEIGHTED_AVERAGE = 'weighted_average'
class nirs4all.pipeline.LoaderArtifactProvider(loader: Any, trace: Any | None = None)[source]

Bases: ArtifactProvider

Artifact provider backed by an ArtifactLoader.

Wraps an ArtifactLoader to provide artifacts on-demand with lazy loading and caching. Used when loading from a manifest for prediction.

loader

The underlying ArtifactLoader

trace

Optional ExecutionTrace for step-to-artifact mapping

get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]

Get a single artifact for a step.

If trace is available, uses trace to find artifact IDs. Otherwise, uses loader’s step-based lookup.

Parameters:
  • step_index – 1-based step index

  • fold_id – Optional fold ID for fold-specific artifacts

Returns:

Artifact object or None if not found

get_artifact_by_chain(chain_path: str) Any | None[source]

Get artifact by V3 chain path.

Parameters:

chain_path – Full operator chain path (e.g., “s1.MinMaxScaler>s3.PLS”)

Returns:

Artifact object or None if not found

get_artifacts_for_chain_prefix(chain_prefix: str) List[Tuple[str, Any]][source]

Get all artifacts matching a chain path prefix.

Parameters:

chain_prefix – Chain path prefix to match

Returns:

List of (chain_path, artifact_object) tuples

get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None, branch_id: int | None = None, source_index: int | None = None, substep_index: int | None = None) List[Tuple[str, Any]][source]

Get all artifacts for a step.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter

  • branch_id – Optional branch ID filter

  • source_index – Optional source index filter for multi-source pipelines

  • substep_index – Optional substep index filter (not used in loader provider)

Returns:

List of (artifact_id, artifact_object) tuples

get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]

Get all fold-specific artifacts for a step.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter

Returns:

List of (fold_id, artifact_object) tuples, sorted by fold_id

has_artifacts_for_step(step_index: int) bool[source]

Check if artifacts exist for a step.

Parameters:

step_index – 1-based step index

Returns:

True if artifacts are available for this step

class nirs4all.pipeline.MapArtifactProvider(artifact_map: Dict[int, List[Tuple[str, Any]]], fold_weights: Dict[int, float] | None = None, primary_artifacts: Dict[int, str] | None = None)[source]

Bases: ArtifactProvider

In-memory artifact provider backed by a dictionary.

Provides artifacts from a pre-loaded dictionary mapping step indices to artifacts. Used when artifacts are resolved from an ExecutionTrace or when loading from a bundle.

artifact_map

Dictionary mapping step_index to list of (artifact_id, object) tuples

fold_weights

Optional fold weights for CV ensemble averaging

Example

>>> artifact_map = {
...     1: [("0001:1:all", snv_transformer)],
...     2: [("0001:2:0", model_fold0), ("0001:2:1", model_fold1)]
... }
>>> provider = MapArtifactProvider(artifact_map)
>>> transformer = provider.get_artifact(step_index=1)
get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]

Get a single artifact for a step.

If fold_id is specified, returns the fold-specific artifact. Otherwise, returns the primary or first artifact.

Parameters:
  • step_index – 1-based step index

  • fold_id – Optional fold ID for fold-specific artifacts

Returns:

Artifact object or None if not found

get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None, branch_id: int | None = None, source_index: int | None = None, substep_index: int | None = None) List[Tuple[str, Any]][source]

Get all artifacts for a step.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter (not used in map provider)

  • branch_id – Optional branch ID filter (not used in map provider)

  • source_index – Optional source/dataset index filter (not used in map provider)

  • substep_index – Optional substep index filter (not used in map provider)

Returns:

List of (artifact_id, artifact_object) tuples

get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]

Get all fold-specific artifacts for a step.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter (not used in map provider)

Returns:

List of (fold_id, artifact_object) tuples, sorted by fold_id

get_fold_weights() Dict[int, float][source]

Get fold weights for CV ensemble averaging.

Returns:

Dictionary mapping fold_id to weight

has_artifacts_for_step(step_index: int) bool[source]

Check if artifacts exist for a step.

Parameters:

step_index – 1-based step index

Returns:

True if artifacts are available for this step

class nirs4all.pipeline.MinimalArtifactProvider(minimal_pipeline: MinimalPipeline, artifact_loader: Any, target_sub_index: int | None = None, target_model_name: str | None = None)[source]

Bases: ArtifactProvider

Artifact provider backed by a MinimalPipeline (V3).

Provides artifacts from the minimal pipeline’s artifact map, which contains StepArtifacts extracted from the execution trace.

This provider uses V3 ArtifactRecord metadata (chain_path, branch_path, substep_index) instead of parsing V2-style artifact IDs.

minimal_pipeline

The source MinimalPipeline

artifact_loader

ArtifactLoader for loading actual artifact objects

target_sub_index

Filter artifacts by substep_index

target_model_name

Filter artifacts by custom_name

get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]

Get a single artifact for a step.

Parameters:
  • step_index – 1-based step index

  • fold_id – Optional fold ID for fold-specific artifacts

Returns:

Artifact object or None if not found

get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None, branch_id: int | None = None, source_index: int | None = None, substep_index: int | None = None) List[Tuple[str, Any]][source]

Get all artifacts for a step (V3).

Filters artifacts by branch using the branch_path from ArtifactRecord. This is critical for multisource + branching reload, where branch substep artifacts are lumped together in the execution trace but can be distinguished by their artifact records.

Returns tuples of (operator_name, artifact_object) where operator_name is derived from the object class and substep_index (e.g., “MinMaxScaler_1”). This allows transformer controllers to look up artifacts by name.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter (e.g., [0] for branch 0)

  • branch_id – Optional branch ID filter (used when branch_path not available)

  • source_index – Optional source/dataset index filter for multi-source

  • substep_index – Optional substep index filter for branch substeps

Returns:

List of (operator_name, artifact_object) tuples

get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]

Get all fold-specific artifacts for a step.

Filters by target_sub_index when set (for subpipelines with multiple models). When target_sub_index is set, looks through all artifact_ids instead of fold_artifact_ids because fold_artifact_ids only stores the last model’s artifacts when multiple models exist in a subpipeline.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter

Returns:

List of (fold_id, artifact_object) tuples, sorted by fold_id

get_fold_weights() Dict[int, float][source]

Get fold weights for CV ensemble averaging.

Returns:

Dictionary mapping fold_id to weight

has_artifacts_for_step(step_index: int) bool[source]

Check if artifacts exist for a step.

Parameters:

step_index – 1-based step index

Returns:

True if artifacts are available for this step

class nirs4all.pipeline.MinimalPipeline(trace_id: str = '', pipeline_uid: str = '', steps: List[MinimalPipelineStep] = <factory>, artifact_map: Dict[int, ~nirs4all.pipeline.trace.execution_trace.StepArtifacts]=<factory>, model_step_index: int | None = None, fold_weights: Dict[int, float] | None=None, preprocessing_chain: str = '', metadata: Dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Minimal pipeline extracted from an execution trace.

Contains only the steps needed to replay a prediction, with artifact mappings for each step. Used by MinimalPredictor for efficient prediction.

trace_id

ID of the source execution trace

Type:

str

pipeline_uid

UID of the parent pipeline

Type:

str

steps

Ordered list of minimal steps to execute

Type:

List[nirs4all.pipeline.trace.extractor.MinimalPipelineStep]

artifact_map

Mapping of step_index to list of (artifact_id, step_artifacts)

Type:

Dict[int, nirs4all.pipeline.trace.execution_trace.StepArtifacts]

model_step_index

Index of the model step

Type:

int | None

fold_weights

Per-fold weights for CV ensemble

Type:

Dict[int, float] | None

preprocessing_chain

Summary of preprocessing steps

Type:

str

metadata

Additional metadata from trace

Type:

Dict[str, Any]

artifact_map: Dict[int, StepArtifacts]
fold_weights: Dict[int, float] | None = None
get_all_chain_paths() Dict[str, str][source]

Get all artifacts indexed by chain path.

Returns:

Dict mapping chain_path to artifact_id

get_artifact_by_chain(chain_path: str) str | None[source]

Get artifact ID by V3 chain path across all steps.

Parameters:

chain_path – Operator chain path

Returns:

Artifact ID or None if not found

get_artifact_ids() List[str][source]

Get all artifact IDs in the minimal pipeline.

Returns:

List of all artifact IDs across all steps

get_artifacts_for_step(step_index: int) StepArtifacts | None[source]

Get artifacts for a specific step.

Parameters:

step_index – 1-based step index

Returns:

StepArtifacts or None if not found

get_step(step_index: int) MinimalPipelineStep | None[source]

Get a step by its index.

Parameters:

step_index – 1-based step index

Returns:

MinimalPipelineStep or None if not found

get_step_count() int[source]

Get the number of steps in the minimal pipeline.

Returns:

Number of steps

get_step_indices() List[int][source]

Get all step indices in execution order.

Returns:

List of step indices

has_step(step_index: int) bool[source]

Check if a step is included in the minimal pipeline.

Parameters:

step_index – 1-based step index

Returns:

True if step is included

metadata: Dict[str, Any]
model_step_index: int | None = None
pipeline_uid: str = ''
preprocessing_chain: str = ''
steps: List[MinimalPipelineStep]
trace_id: str = ''
class nirs4all.pipeline.MinimalPipelineStep(step_index: int, step_config: Any = None, execution_mode: StepExecutionMode = StepExecutionMode.PREDICT, artifacts: StepArtifacts = <factory>, operator_type: str = '', operator_class: str = '', branch_path: List[int] = <factory>, branch_name: str = '', substep_index: int | None = None, depends_on: Set[int] = <factory>)[source]

Bases: object

A step in the minimal pipeline for prediction replay.

Contains the step configuration and metadata needed to replay the step during prediction, without encoding controller-specific logic.

step_index

1-based step index from original pipeline

Type:

int

step_config

The pipeline step configuration (dict or object)

Type:

Any

execution_mode

How to execute this step (train/predict/skip)

Type:

nirs4all.pipeline.trace.execution_trace.StepExecutionMode

artifacts

Artifacts for this step (from trace)

Type:

nirs4all.pipeline.trace.execution_trace.StepArtifacts

operator_type

Type of operator (for logging/debugging)

Type:

str

operator_class

Class name of operator

Type:

str

branch_path

Branch path if in branch context

Type:

List[int]

branch_name

Human-readable branch name

Type:

str

depends_on

Indices of steps this step depends on

Type:

Set[int]

artifacts: StepArtifacts
branch_name: str = ''
branch_path: List[int]
depends_on: Set[int]
execution_mode: StepExecutionMode = 'predict'
get_artifact_by_chain(chain_path: str) str | None[source]

Get artifact ID by V3 chain path.

Parameters:

chain_path – Operator chain path

Returns:

Artifact ID or None if not found

get_artifact_ids() List[str][source]

Get all artifact IDs for this step.

Returns:

List of artifact IDs

get_artifacts_by_chain() Dict[str, str][source]

Get all artifacts indexed by chain path.

Returns:

Dict mapping chain_path to artifact_id

has_artifacts() bool[source]

Check if this step has associated artifacts.

Returns:

True if artifacts are available for this step

operator_class: str = ''
operator_type: str = ''
step_config: Any = None
step_index: int
substep_index: int | None = None
class nirs4all.pipeline.MinimalPredictor(artifact_loader: Any, run_dir: str | Path, saver: Any = None, manifest_manager: Any = None, verbose: int = 0)[source]

Bases: object

Execute minimal pipeline for prediction.

This class takes a MinimalPipeline (extracted from an ExecutionTrace) and executes only the required steps using existing controllers with artifact injection.

The MinimalPredictor achieves the Phase 5 goal of “execute only needed steps” by: 1. Using the minimal pipeline’s step list (not the full original pipeline) 2. Injecting pre-loaded artifacts via ArtifactProvider 3. Running controllers in predict mode

artifact_loader

ArtifactLoader for loading artifacts

run_dir

Path to run directory

saver

Optional SimulationSaver for outputs

manifest_manager

Optional ManifestManager

verbose

Verbosity level

Example

>>> predictor = MinimalPredictor(artifact_loader, run_dir)
>>> y_pred, predictions = predictor.predict(minimal_pipeline, dataset)
predict(minimal_pipeline: MinimalPipeline, dataset: SpectroDataset, target_model: Dict[str, Any] | None = None) Tuple[ndarray, Predictions][source]

Execute minimal pipeline and return predictions.

Runs only the steps in the minimal pipeline, using pre-loaded artifacts from the execution trace.

Parameters:
  • minimal_pipeline – MinimalPipeline to execute

  • dataset – Dataset to predict on

  • target_model – Optional target model metadata for filtering

Returns:

Tuple of (y_pred array, Predictions object)

predict_with_fold_ensemble(minimal_pipeline: MinimalPipeline, dataset: SpectroDataset, fold_strategy: str = 'weighted_average') Tuple[ndarray, Predictions][source]

Execute minimal pipeline with fold ensemble averaging.

For cross-validation models, runs prediction with each fold model and combines results according to fold_strategy.

Parameters:
  • minimal_pipeline – MinimalPipeline to execute

  • dataset – Dataset to predict on

  • fold_strategy – How to combine folds (“average”, “weighted_average”)

Returns:

Tuple of (y_pred array, Predictions object)

validate_minimal_pipeline(minimal_pipeline: MinimalPipeline) Tuple[bool, List[str]][source]

Validate that minimal pipeline can be executed.

Checks that: - All step configs are present - All required artifacts are loadable - Model step is included

Parameters:

minimal_pipeline – MinimalPipeline to validate

Returns:

Tuple of (is_valid, list of issues)

class nirs4all.pipeline.PipelineConfigs(definition: Dict | List[Any] | str, name: str = '', description: str = 'No description provided', max_generation_count: int = 10000)[source]

Bases: object

Class to hold the configuration for a pipeline.

property expansion_count: int

Return the number of pipeline configurations generated from the template.

Returns:

Number of expanded configurations (1 if no generators were used)

static get_hash(steps) str[source]

Generate a hash for the pipeline configuration.

All objects are fully JSON-serializable (no _runtime_instance). No need for default=str hack anymore.

get_template_dict() Dict[source]

Get the original template as a dictionary.

Returns:

Original template dictionary (deep copy to prevent mutation)

get_template_yaml() str[source]

Serialize the original template to YAML format for storage.

Returns:

YAML string of the original template

classmethod value_of(obj, key)[source]

Recursively collect all values of a key in a (possibly nested) serialized object. Returns a single string with values joined by commas.

classmethod value_of_str(obj, key)[source]

Returns a single string of all values for the given key, joined by commas.

class nirs4all.pipeline.PipelineLibrary(workspace_path: Path)[source]

Bases: object

Manages reusable pipeline templates in the workspace library.

Templates are stored in: workspace/library/{category}/{template_name}/ Each template contains: - pipeline.json: The pipeline configuration - metadata.json: Description, tags, performance metrics, etc. - README.md: Human-readable documentation

copy_from_pipeline(pipeline_dir: Path, name: str, category: str = 'general', description: str = '', tags: List[str] | None = None, extract_metrics: bool = True) Path[source]

Copy a successful pipeline to the library as a template.

Parameters:
  • pipeline_dir – Path to pipeline directory (e.g., workspace/runs/…/0001_hash/)

  • name – Template name

  • category – Category for the template

  • description – Description of the template

  • tags – Tags for filtering

  • extract_metrics – Whether to extract metrics from manifest

Returns:

Path to saved template

delete_template(name: str, category: str | None = None) None[source]

Delete a template from the library.

Parameters:
  • name – Template name

  • category – Optional category to search in

Raises:

FileNotFoundError – If template not found

export_template(name: str, export_path: str | Path, category: str | None = None) Path[source]

Export a template to a standalone directory.

Parameters:
  • name – Template name

  • export_path – Destination directory

  • category – Optional category to search in

Returns:

Path to exported template

get_template_metadata(name: str, category: str | None = None) Dict[str, Any][source]

Get metadata for a template.

Parameters:
  • name – Template name

  • category – Optional category to search in

Returns:

Metadata dictionary

import_template(import_path: str | Path, category: str = 'general', overwrite: bool = False) Path[source]

Import a template from an external directory.

Parameters:
  • import_path – Path to template directory

  • category – Category to import into

  • overwrite – Whether to overwrite existing template

Returns:

Path to imported template in library

list_templates(category: str | None = None, tags: List[str] | None = None) List[Dict[str, Any]][source]

List all templates, optionally filtered by category and tags.

Parameters:
  • category – Optional category filter

  • tags – Optional list of tags to filter by (matches any)

Returns:

List of template metadata dictionaries

load_template(name: str, category: str | None = None) Dict[str, Any][source]

Load a pipeline template by name.

Parameters:
  • name – Template name

  • category – Optional category to search in (searches all if None)

Returns:

Pipeline configuration dictionary

Raises:

FileNotFoundError – If template not found

save_template(pipeline_config: Dict[str, Any], name: str, category: str = 'general', description: str = '', tags: List[str] | None = None, metrics: Dict[str, float] | None = None, notes: str = '', overwrite: bool = False) Path[source]

Save a pipeline configuration as a reusable template.

Parameters:
  • pipeline_config – Pipeline configuration dictionary

  • name – Template name (will be sanitized for filesystem)

  • category – Category/folder (e.g., “preprocessing”, “modeling”, “full_pipeline”)

  • description – Short description of the template

  • tags – List of tags for filtering (e.g., [“classification”, “neural_network”])

  • metrics – Performance metrics (e.g., {“accuracy”: 0.95, “f1”: 0.93})

  • notes – Additional notes or usage instructions

  • overwrite – Whether to overwrite existing template

Returns:

Path to saved template directory

Raises:
class nirs4all.pipeline.PipelineRunner(workspace_path: str | Path | None = None, verbose: int = 0, mode: str = 'train', save_artifacts: bool = True, save_charts: bool = True, enable_tab_reports: bool = True, continue_on_error: bool = False, show_spinner: bool = True, keep_datasets: bool = True, plots_visible: bool = False, random_state: int | None = None, log_file: bool = True, log_format: str = 'pretty', use_unicode: bool | None = None, use_colors: bool | None = None, show_progress_bar: bool = True, json_output: bool = False)[source]

Bases: object

Main pipeline execution interface.

Orchestrates pipeline execution on datasets, providing a simplified interface for training, prediction, and explanation workflows. Delegates actual execution to PipelineOrchestrator, Predictor, and Explainer.

workspace_path

Root workspace directory

Type:

Path

verbose

Verbosity level (0=quiet, 1=info, 2=debug, 3=trace)

Type:

int

mode

Execution mode (‘train’, ‘predict’, ‘explain’)

Type:

str

save_artifacts

Whether to save binary artifacts (models, transformers)

Type:

bool

save_charts

Whether to save charts and visual outputs

Type:

bool

enable_tab_reports

Whether to generate tabular reports

Type:

bool

continue_on_error

Whether to continue on step failures

Type:

bool

show_spinner

Whether to show progress spinners

Type:

bool

keep_datasets

Whether to keep raw/preprocessed data snapshots

Type:

bool

plots_visible

Whether to display plots interactively

Type:

bool

orchestrator

Underlying orchestrator for execution

Type:

PipelineOrchestrator

predictor

Handler for prediction mode

Type:

Predictor

explainer

Handler for explanation mode

Type:

Explainer

raw_data

Raw dataset snapshots (if keep_datasets=True)

Type:

Dict[str, np.ndarray]

pp_data

Preprocessed data snapshots

Type:

Dict[str, Dict[str, np.ndarray]]

Example

>>> # Training workflow
>>> runner = PipelineRunner(workspace_path="./workspace", verbose=1)
>>> pipeline = [{"preprocessing": StandardScaler()}, {"model": SVC()}]
>>> X, y = load_data()
>>> predictions, dataset_preds = runner.run(pipeline, (X, y))
>>> # Prediction workflow
>>> runner = PipelineRunner(mode="predict")
>>> y_pred, preds = runner.predict(best_model, X_new)
>>> # Explanation workflow
>>> runner = PipelineRunner(mode="explain")
>>> shap_results, out_dir = runner.explain(best_model, X_test)
property current_run_dir: Path | None

Get current run directory.

Returns:

Path to current run directory, or None if not set

explain(prediction_obj: Dict[str, Any] | str, dataset: DatasetConfigs | SpectroDataset | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], dataset_name: str = 'explain_dataset', shap_params: Dict[str, Any] | None = None, verbose: int = 0, plots_visible: bool = True) Tuple[Dict[str, Any], str][source]

Generate SHAP explanations for a saved model.

Delegates to Explainer class for actual execution.

Parameters:
  • prediction_obj – Model identifier (dict with config_path or prediction ID)

  • dataset – Dataset to explain on

  • dataset_name – Name for the dataset

  • shap_params – SHAP configuration parameters

  • verbose – Verbosity level

  • plots_visible – Whether to display plots interactively

Returns:

Tuple of (shap_results_dict, output_directory_path)

export(source: Dict[str, Any] | str | Path, output_path: str | Path, format: str = 'n4a', include_metadata: bool = True, compress: bool = True) Path[source]

Export a trained pipeline to a standalone bundle.

Creates a self-contained prediction bundle that can be used for deployment, sharing, or archival without requiring the original workspace or full nirs4all installation.

Supported formats:
  • ‘n4a’: Full bundle (ZIP archive with artifacts and metadata)

  • ‘n4a.py’: Portable Python script with embedded artifacts

Phase 6 Feature:

This method enables exporting trained pipelines as standalone bundles that can be loaded and used for prediction without the original workspace structure.

Parameters:
  • source – Prediction source to export. Can be: - prediction dict: From a previous run’s Predictions object - folder path: Path to a pipeline directory - Run object: Best prediction from a Run

  • output_path – Path for the output bundle file

  • format – Bundle format (‘n4a’ or ‘n4a.py’)

  • include_metadata – Whether to include full metadata in bundle

  • compress – Whether to compress artifacts (for .n4a format)

Returns:

Path to the created bundle file

Raises:

Example

>>> runner = PipelineRunner()
>>> predictions, _ = runner.run(pipeline, dataset)
>>> best_pred = predictions.top(n=1)[0]
>>>
>>> # Export to .n4a bundle
>>> runner.export(best_pred, "exports/wheat_model.n4a")
>>>
>>> # Export to portable Python script
>>> runner.export(best_pred, "exports/wheat_model.n4a.py", format='n4a.py')
>>>
>>> # Later, predict from bundle
>>> y_pred, _ = runner.predict("exports/wheat_model.n4a", X_new)
export_best_for_dataset(dataset_name: str, mode: str = 'predictions') Path | None[source]

Export best results for a dataset to exports/ folder.

Parameters:
  • dataset_name – Name of the dataset to export

  • mode – Export mode (‘predictions’ or other)

Returns:

Path to exported file, or None if export failed

export_model(source: Dict[str, Any] | str | Path, output_path: str | Path, format: str | None = None, fold: int | None = None) Path[source]

Export only the model artifact from a trained pipeline.

Unlike export() which creates a full bundle with all preprocessing artifacts and metadata, this method exports just the model binary. This is useful when you want a lightweight model file that can be loaded directly into other pipelines or used with external tools.

The output format is determined by the file extension or can be specified explicitly. The model can then be reloaded using: - Direct path in pipeline config: {“model”: “path/to/model.joblib”} - As prediction source: runner.predict(“path/to/model.joblib”, data)

Parameters:
  • source – Prediction source to export from. Can be: - prediction dict: From a previous run’s Predictions object - folder path: Path to a pipeline directory - bundle path: Path to a .n4a bundle

  • output_path – Path for the output model file. Extension determines format: .joblib, .pkl, .h5, .keras, .pt

  • format – Optional explicit format (‘joblib’, ‘pickle’, ‘keras_h5’). If None, determined from output_path extension.

  • fold – Optional fold index to export. If None, exports fold 0 or the primary model artifact.

Returns:

Path to the created model file

Raises:

Example

>>> runner = PipelineRunner()
>>> predictions, _ = runner.run(pipeline, dataset)
>>> best_pred = predictions.top(n=1)[0]
>>>
>>> # Export just the model
>>> runner.export_model(best_pred, "exports/pls_model.joblib")
>>>
>>> # Later, use in new pipeline
>>> new_pipeline = [
...     MinMaxScaler(),
...     {"model": "exports/pls_model.joblib", "name": "pretrained"}
... ]
extract(source: Dict[str, Any] | str | Path) ExtractedPipeline[source]

Extract a trained pipeline for inspection or modification.

Loads a trained pipeline from a prediction source and returns an ExtractedPipeline object that can be inspected, modified, and then executed with runner.run().

Phase 7 Feature:

This method enables extracting and modifying trained pipelines without retraining from scratch.

Parameters:

source – Prediction source to extract. Can be: - prediction dict: From a previous run’s Predictions object - folder path: Path to a pipeline directory - Run object: Best prediction from a Run - artifact_id: Direct artifact reference - bundle: Exported prediction bundle (.n4a)

Returns:

  • steps: List of pipeline steps (can be modified)

  • trace: Original execution trace (read-only)

  • artifact_provider: Provider for original artifacts

  • model_step_index: Index of the model step

  • preprocessing_chain: Summary of preprocessing

Return type:

ExtractedPipeline object with

Example

>>> runner = PipelineRunner()
>>> predictions, _ = runner.run(pipeline, dataset)
>>> best_pred = predictions.top(n=1)[0]
>>>
>>> # Extract for inspection
>>> extracted = runner.extract(best_pred)
>>> print(f"Steps: {len(extracted.steps)}")
>>> print(f"Preprocessing: {extracted.preprocessing_chain}")
>>>
>>> # Modify and run
>>> from sklearn.ensemble import RandomForestRegressor
>>> extracted.set_model(RandomForestRegressor())
>>> new_preds, _ = runner.run(extracted.steps, new_data)
property last_aggregate: str | None

Get aggregate column from the last executed dataset.

Returns the aggregation setting from the last dataset processed by run(). This can be used to create a PredictionAnalyzer with matching defaults.

Returns:

Aggregate column name (‘y’ for y-based aggregation, column name for metadata-based aggregation, or None if no aggregation was set).

Example

>>> runner = PipelineRunner()
>>> predictions, _ = runner.run(pipeline, DatasetConfigs(path, aggregate='sample_id'))
>>> # Create analyzer with same aggregate setting
>>> analyzer = PredictionAnalyzer(predictions, default_aggregate=runner.last_aggregate)
property last_aggregate_exclude_outliers: bool

Get aggregate exclude_outliers setting from the last executed dataset.

Returns:

True if T² outlier exclusion was enabled, False otherwise.

property last_aggregate_method: str | None

Get aggregate method from the last executed dataset.

Returns:

Aggregate method (‘mean’, ‘median’, ‘vote’) or None for default.

property library: PipelineLibrary

Get pipeline library for template management.

Returns:

PipelineLibrary instance for managing pipeline templates

next_op() int[source]

Get the next operation ID (for controller compatibility).

Returns:

Next operation counter value

predict(prediction_obj: Dict[str, Any] | str, dataset: DatasetConfigs | SpectroDataset | List[SpectroDataset] | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], dataset_name: str = 'prediction_dataset', all_predictions: bool = False, verbose: int = 0) Tuple[ndarray, Predictions] | Tuple[Dict[str, Any], Predictions][source]

Run prediction using a saved model on new dataset.

Delegates to Predictor class for actual execution.

Parameters:
  • prediction_obj – Model identifier (dict with config_path or prediction ID)

  • dataset – New dataset to predict on

  • dataset_name – Name for the dataset

  • all_predictions – If True, return all predictions; if False, return single best

  • verbose – Verbosity level

Returns:

(y_pred, predictions) If all_predictions=True: (predictions_dict, predictions)

Return type:

If all_predictions=False

retrain(source: Dict[str, Any] | str | Path, dataset: DatasetConfigs | SpectroDataset | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], mode: str = 'full', dataset_name: str = 'retrain_dataset', new_model: Any | None = None, epochs: int | None = None, step_modes: List[StepMode] | None = None, verbose: int = 0, **kwargs) Tuple[Predictions, Dict[str, Any]][source]

Retrain a pipeline on new data.

Enables retraining trained pipelines with various modes: - full: Train from scratch with same pipeline structure - transfer: Use existing preprocessing artifacts, train new model - finetune: Continue training existing model with new data

Phase 7 Feature:

This method enables retraining pipelines without having to reconstruct the pipeline configuration manually. It uses the resolved prediction source (from Phase 3/4) to extract the pipeline structure and optionally reuse preprocessing artifacts.

Parameters:
  • source – Prediction source to retrain from. Can be: - prediction dict: From a previous run’s Predictions object - folder path: Path to a pipeline directory - Run object: Best prediction from a Run - artifact_id: Direct artifact reference - bundle: Exported prediction bundle (.n4a)

  • dataset – New dataset to train on. Supports same formats as run()

  • mode – Retrain mode: - ‘full’: Train everything from scratch (same pipeline structure) - ‘transfer’: Use existing preprocessing, train new model - ‘finetune’: Continue training existing model

  • dataset_name – Name for the dataset if array-based

  • new_model – Optional new model for transfer mode (replaces original)

  • epochs – Optional epochs for fine-tuning

  • step_modes – Optional per-step mode overrides for fine-grained control

  • verbose – Verbosity level

  • **kwargs – Additional parameters: - learning_rate: Learning rate for fine-tuning - freeze_layers: List of layers to freeze during fine-tuning

Returns:

Tuple of (run_predictions, datasets_predictions)

Raises:

Example

>>> runner = PipelineRunner()
>>> predictions, _ = runner.run(pipeline, dataset)
>>> best_pred = predictions.top(n=1)[0]
>>>
>>> # Full retrain on new data
>>> new_preds, _ = runner.retrain(best_pred, new_data, mode='full')
>>>
>>> # Transfer: use preprocessing from old model, train new one
>>> new_preds, _ = runner.retrain(
...     best_pred, new_data, mode='transfer',
...     new_model=XGBRegressor()
... )
>>>
>>> # Finetune: continue training existing model
>>> new_preds, _ = runner.retrain(
...     best_pred, new_data, mode='finetune', epochs=10
... )
>>>
>>> # Fine-grained control: specify per-step modes
>>> from nirs4all.pipeline import StepMode
>>> step_modes = [
...     StepMode(step_index=1, mode='predict'),  # Use existing
...     StepMode(step_index=2, mode='train'),    # Retrain
... ]
>>> new_preds, _ = runner.retrain(
...     best_pred, new_data, mode='full', step_modes=step_modes
... )
run(pipeline: PipelineConfigs | List[Any] | Dict | str, dataset: DatasetConfigs | SpectroDataset | List[SpectroDataset] | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], pipeline_name: str = '', dataset_name: str = 'dataset', max_generation_count: int = 10000) Tuple[Predictions, Dict[str, Any]][source]

Execute pipeline on dataset(s).

Main entry point for training workflows. Executes one or more pipeline configurations on one or more datasets, tracking predictions and artifacts.

Parameters:
  • pipeline – Pipeline definition (PipelineConfigs, list of steps, dict, or path)

  • dataset – Dataset definition (see DatasetConfigs for supported formats)

  • pipeline_name – Optional pipeline name for identification

  • dataset_name – Name for array-based datasets

  • max_generation_count – Max pipeline combinations to generate

Returns:

Tuple of (run_predictions, datasets_predictions)

property runs_dir: Path

Get runs directory.

Returns:

Path to runs directory in workspace

class nirs4all.pipeline.PipelineWriter(pipeline_dir: Path, save_charts: bool = True)[source]

Bases: object

Writes files within a pipeline directory.

Focused responsibility: File I/O operations for a single pipeline.

get_path() Path[source]

Get the pipeline directory path.

Returns:

Path to pipeline directory

list_files() list[str][source]

List all files in the pipeline directory.

Returns:

List of filenames

save_file(filename: str, content: str, overwrite: bool = True, encoding: str = 'utf-8', warn_on_overwrite: bool = True) Path[source]

Save a text file to the pipeline directory.

Parameters:
  • filename – Name of file to create

  • content – Text content to write

  • overwrite – Whether to overwrite existing files

  • encoding – Text encoding (default: utf-8)

  • warn_on_overwrite – Whether to warn when overwriting

Returns:

Path to saved file

Raises:

FileExistsError – If file exists and overwrite=False

save_json(filename: str, data: Any, overwrite: bool = True, indent: int | None = 2) Path[source]

Save data as JSON file.

Parameters:
  • filename – Name of file (will add .json if missing)

  • data – Data to serialize as JSON

  • overwrite – Whether to overwrite existing files

  • indent – JSON indentation (None for compact)

Returns:

Path to saved file

save_output(name: str, data: bytes | str, extension: str = '.png') Path | None[source]

Save a human-readable output file (chart, report, etc.).

Parameters:
  • name – Output name (e.g., “2D_Chart”)

  • data – Binary or text data to save

  • extension – File extension (e.g., “.png”, “.csv”, “.txt”)

Returns:

Path to saved file, or None if save_charts=False

class nirs4all.pipeline.PredictionResolver(workspace_path: str | Path, runs_dir: str | Path | None = None)[source]

Bases: object

Resolves any prediction source to executable components.

This class provides a unified interface for resolving prediction sources to the components needed for prediction replay, regardless of the source type (dict, folder, run, artifact_id, bundle).

The resolver is designed to be controller-agnostic: it doesn’t know about specific controller types, but provides artifacts and trace that any controller can use.

workspace_path

Root workspace directory

runs_dir

Directory containing run outputs

Example

>>> resolver = PredictionResolver(workspace_path)
>>> resolved = resolver.resolve(best_prediction)
>>> # Execute using resolved components
>>> provider = resolved.artifact_provider
>>> artifacts = provider.get_artifacts_for_step(step_index=1)
resolve(source: Dict[str, Any] | str | Path | Any, verbose: int = 0) ResolvedPrediction[source]

Resolve any prediction source to executable components.

Detects the source type and delegates to the appropriate resolver.

Parameters:
  • source – Prediction source (dict, folder path, Run, artifact_id, bundle)

  • verbose – Verbosity level for logging

Returns:

ResolvedPrediction with all components for replay

Raises:
class nirs4all.pipeline.Predictor(runner: PipelineRunner, use_minimal_pipeline: bool = True)[source]

Bases: object

Handles prediction using trained pipelines.

This class manages the prediction workflow: loading saved models, replaying pipeline configurations, and generating predictions on new data.

Phase 5 Enhancement:

When use_minimal_pipeline=True (default), the predictor will: 1. Check if an execution trace is available for the prediction 2. Extract the minimal pipeline (only required steps) from the trace 3. Execute only those steps, significantly reducing prediction time

This is especially beneficial for complex pipelines with multiple preprocessing options, branches, or steps that aren’t needed for the specific model being predicted.

runner

Parent PipelineRunner instance

saver

File saver for managing outputs

manifest_manager

Manager for pipeline manifests

pipeline_uid

Unique identifier for the pipeline

artifact_loader

Loader for trained model artifacts

config_path

Path to the pipeline configuration

target_model

Metadata for the target model

use_minimal_pipeline

Whether to use minimal pipeline execution (Phase 5)

predict(prediction_obj: Dict[str, Any] | str, dataset: DatasetConfigs | SpectroDataset | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], dataset_name: str = 'prediction_dataset', all_predictions: bool = False, verbose: int = 0) Tuple[ndarray, Predictions] | Tuple[Dict[str, Any], Predictions][source]

Run prediction using a saved model on new dataset.

Phase 5 Enhancement:

When use_minimal_pipeline=True and an execution trace is available, this method will use TraceBasedExtractor to extract and execute only the required steps, improving prediction speed.

Parameters:
  • prediction_obj – Model identifier (dict with config_path or prediction ID)

  • dataset – New dataset to predict on

  • dataset_name – Name for the dataset

  • all_predictions – If True, return all predictions; if False, return single best

  • verbose – Verbosity level

Returns:

(y_pred, predictions) If all_predictions=True: (predictions_dict, predictions)

Return type:

If all_predictions=False

Example

>>> predictor = Predictor(runner)
>>> y_pred, preds = predictor.predict(
...     {"config_path": "0001_abc123"},
...     X_new
... )
class nirs4all.pipeline.ResolvedPrediction(source_type: SourceType = SourceType.UNKNOWN, minimal_pipeline: List[Any] = <factory>, artifact_provider: ArtifactProvider | None = None, trace: ExecutionTrace | None = None, fold_strategy: FoldStrategy = FoldStrategy.WEIGHTED_AVERAGE, fold_weights: Dict[int, float]=<factory>, model_step_index: int | None = None, target_model: Dict[str, ~typing.Any]=<factory>, pipeline_uid: str = '', run_dir: Path | None = None, manifest: Dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Normalized prediction source ready for execution.

Contains all components needed to replay a prediction: - minimal_pipeline: Subset of steps needed for this prediction - artifact_provider: Provider for artifacts by step index - trace: Execution trace for deterministic replay - fold_strategy: How to combine fold predictions (for CV) - fold_weights: Per-fold weights (for weighted average)

source_type

Type of the original source

Type:

nirs4all.pipeline.resolver.SourceType

minimal_pipeline

List of pipeline steps needed for replay

Type:

List[Any]

artifact_provider

Provider for step artifacts

Type:

nirs4all.pipeline.config.context.ArtifactProvider | None

trace

ExecutionTrace if available

Type:

nirs4all.pipeline.trace.execution_trace.ExecutionTrace | None

fold_strategy

Strategy for combining folds

Type:

nirs4all.pipeline.resolver.FoldStrategy

fold_weights

Per-fold weights for weighted averaging

Type:

Dict[int, float]

model_step_index

Index of the model step

Type:

int | None

target_model

Target model metadata for filtering

Type:

Dict[str, Any]

pipeline_uid

Pipeline unique identifier

Type:

str

run_dir

Path to run directory

Type:

pathlib.Path | None

manifest

Full manifest dictionary (for metadata access)

Type:

Dict[str, Any]

artifact_provider: ArtifactProvider | None = None
fold_strategy: FoldStrategy = 'weighted_average'
fold_weights: Dict[int, float]
get_preprocessing_chain() str[source]

Get the preprocessing chain summary.

Returns:

Preprocessing chain string (e.g., “SNV>SG>MinMax”) or empty

has_fold_artifacts() bool[source]

Check if fold-specific artifacts are available.

Returns:

True if this is a CV ensemble with multiple folds

has_trace() bool[source]

Check if execution trace is available.

Returns:

True if trace is available for deterministic replay

manifest: Dict[str, Any]
minimal_pipeline: List[Any]
model_step_index: int | None = None
pipeline_uid: str = ''
run_dir: Path | None = None
source_type: SourceType = 'unknown'
target_model: Dict[str, Any]
trace: ExecutionTrace | None = None
class nirs4all.pipeline.RetrainArtifactProvider(base_provider: ArtifactProvider, retrain_config: RetrainConfig, trace: ExecutionTrace | None = None)[source]

Bases: ArtifactProvider

Artifact provider for retraining that respects step modes.

Provides artifacts only for steps that should use existing artifacts (i.e., mode=’predict’), while returning None for steps that should train.

base_provider

Underlying artifact provider

retrain_config

Configuration determining which steps use artifacts

trace

Execution trace for step type detection

get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]

Get a single artifact for a step if applicable.

Parameters:
  • step_index – 1-based step index

  • fold_id – Optional fold ID for fold-specific artifacts

Returns:

Artifact object or None if step should train

get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None) List[Tuple[str, Any]][source]

Get all artifacts for a step if applicable.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter

Returns:

List of (artifact_id, artifact_object) tuples, or empty if should train

get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]

Get all fold-specific artifacts for a step if applicable.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter

Returns:

List of (fold_id, artifact_object) tuples, or empty if should train

has_artifacts_for_step(step_index: int) bool[source]

Check if artifacts should be used for this step.

Parameters:

step_index – 1-based step index

Returns:

True if artifacts are available and should be used

class nirs4all.pipeline.RetrainConfig(mode: RetrainMode = RetrainMode.FULL, step_modes: List[StepMode] = <factory>, new_model: Any | None = None, epochs: int | None = None, learning_rate: float | None = None, freeze_layers: List[str] | None = None, metadata: Dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Configuration for retraining operation.

mode

Overall retrain mode (full, transfer, finetune)

Type:

nirs4all.pipeline.retrainer.RetrainMode

step_modes

Per-step mode overrides (optional, for fine-grained control)

Type:

List[nirs4all.pipeline.retrainer.StepMode]

new_model

Optional new model to use instead of original (for transfer)

Type:

Any | None

epochs

Optional epochs for fine-tuning

Type:

int | None

learning_rate

Optional learning rate for fine-tuning

Type:

float | None

freeze_layers

Optional list of layers to freeze during fine-tuning

Type:

List[str] | None

metadata

Additional metadata for the retrain operation

Type:

Dict[str, Any]

epochs: int | None = None
freeze_layers: List[str] | None = None
get_step_mode(step_index: int) StepMode | None[source]

Get mode override for a specific step.

Parameters:

step_index – 1-based step index

Returns:

StepMode if override exists, None otherwise

learning_rate: float | None = None
metadata: Dict[str, Any]
mode: RetrainMode = 'full'
new_model: Any | None = None
should_train_step(step_index: int, is_model: bool = False) bool[source]

Determine if a step should train based on mode and overrides.

Parameters:
  • step_index – 1-based step index

  • is_model – Whether this is the model step

Returns:

True if step should train

step_modes: List[StepMode]
class nirs4all.pipeline.RetrainMode(value)[source]

Bases: str, Enum

Mode of retraining operation.

FULL

Train everything from scratch (same pipeline structure)

TRANSFER

Use existing preprocessing artifacts, train new model

FINETUNE

Continue training existing model with new data

FINETUNE = 'finetune'
FULL = 'full'
TRANSFER = 'transfer'
class nirs4all.pipeline.Retrainer(runner: PipelineRunner)[source]

Bases: object

Handles retraining pipelines with various modes.

This class manages the retrain workflow: loading saved pipelines, determining which steps to retrain vs. reuse, and executing the modified pipeline on new data.

Phase 7 Implementation:

The Retrainer enables three modes: - full: Train from scratch with same pipeline structure - transfer: Use existing preprocessing, train new model - finetune: Continue training existing model

runner

Parent PipelineRunner instance

resolver

Prediction resolver for loading sources

extract(source: Dict[str, Any] | str | Path | Any, verbose: int = 0) ExtractedPipeline[source]

Extract a pipeline for inspection or modification.

Returns an ExtractedPipeline object that can be inspected, modified, and then executed with runner.run().

Parameters:
  • source – Prediction source (dict, folder, Run, artifact_id, bundle)

  • verbose – Verbosity level

Returns:

ExtractedPipeline for inspection/modification

Example

>>> extracted = retrainer.extract(best_pred)
>>> print(extracted.steps)
>>> extracted.steps[-1] = {"model": RandomForestRegressor()}
>>> preds, _ = runner.run(extracted.steps, new_data)
retrain(source: Dict[str, Any] | str | Path | Any, dataset: DatasetConfigs | SpectroDataset | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], mode: str | RetrainMode = 'full', dataset_name: str = 'retrain_dataset', new_model: Any | None = None, epochs: int | None = None, step_modes: List[StepMode] | None = None, verbose: int = 0, **kwargs) Tuple[Predictions, Dict[str, Any]][source]

Retrain a pipeline on new data.

Parameters:
  • source – Prediction source (dict, folder, Run, artifact_id, bundle)

  • dataset – New dataset to train on

  • mode – Retrain mode (‘full’, ‘transfer’, ‘finetune’)

  • dataset_name – Name for the dataset

  • new_model – Optional new model for transfer mode

  • epochs – Optional epochs for fine-tuning

  • step_modes – Optional per-step mode overrides

  • verbose – Verbosity level

  • **kwargs – Additional parameters (learning_rate, freeze_layers, etc.)

Returns:

Tuple of (predictions, dataset_predictions_dict)

Example

>>> retrainer = Retrainer(runner)
>>>
>>> # Full retrain
>>> preds, _ = retrainer.retrain(best_pred, new_data, mode='full')
>>>
>>> # Transfer: use preprocessing, new model
>>> preds, _ = retrainer.retrain(best_pred, new_data, mode='transfer')
>>>
>>> # Finetune: continue training
>>> preds, _ = retrainer.retrain(best_pred, new_data, mode='finetune', epochs=10)
class nirs4all.pipeline.SourceType(value)[source]

Bases: str, Enum

Type of prediction source.

PREDICTION

Dictionary from Predictions object

FOLDER

Path to pipeline folder

RUN

Run object (best prediction from run)

ARTIFACT_ID

Direct artifact reference string

BUNDLE

Exported .n4a bundle file

TRACE_ID

Execution trace reference

MODEL_FILE

Direct model file (.joblib, .pkl, .h5, .pt, etc.)

UNKNOWN

Unrecognized source type

ARTIFACT_ID = 'artifact_id'
BUNDLE = 'bundle'
FOLDER = 'folder'
MODEL_FILE = 'model_file'
PREDICTION = 'prediction'
RUN = 'run'
TRACE_ID = 'trace_id'
UNKNOWN = 'unknown'
class nirs4all.pipeline.StepArtifacts(artifact_ids: ~typing.List[str] = <factory>, primary_artifact_id: str | None = None, fold_artifact_ids: ~typing.Dict[int, str] = <factory>, primary_artifacts: ~typing.Dict[str, str] = <factory>, by_branch: ~typing.Dict[~typing.Tuple[int, ...], ~typing.List[str]] = <factory>, by_source: ~typing.Dict[int, ~typing.List[str]] = <factory>, by_chain: ~typing.Dict[str, str] = <factory>, metadata: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: object

Artifacts produced by a single step (V3).

Records all artifacts created during step execution, with V3 indexes for efficient lookup by chain path, branch, source, and fold.

artifact_ids

List of artifact IDs produced by this step

Type:

List[str]

primary_artifact_id

Main artifact (e.g., model) if applicable

Type:

str | None

fold_artifact_ids

Per-fold artifacts for CV models

Type:

Dict[int, str]

# V3 indexes
primary_artifacts

Map of chain_path to artifact_id for shared artifacts

Type:

Dict[str, str]

by_branch

Artifacts indexed by branch path tuple

Type:

Dict[Tuple[int, …], List[str]]

by_source

Artifacts indexed by source index

Type:

Dict[int, List[str]]

by_chain

Artifacts indexed by chain path

Type:

Dict[str, str]

metadata

Additional artifact metadata (types, paths, etc.)

Type:

Dict[str, Any]

add_artifact(artifact_id: str, is_primary: bool = False, chain_path: str | None = None, branch_path: List[int] | None = None, source_index: int | None = None) None[source]

Add an artifact ID to this step’s artifacts (V3).

Parameters:
  • artifact_id – The artifact ID to add

  • is_primary – Whether this is the primary artifact

  • chain_path – V3 operator chain path

  • branch_path – Branch path for indexing

  • source_index – Source index for multi-source indexing

add_fold_artifact(fold_id: int, artifact_id: str, chain_path: str | None = None, branch_path: List[int] | None = None) None[source]

Add a fold-specific artifact.

Parameters:
  • fold_id – CV fold index

  • artifact_id – Artifact ID for this fold

  • chain_path – V3 operator chain path

  • branch_path – Branch path for indexing

artifact_ids: List[str]
by_branch: Dict[Tuple[int, ...], List[str]]
by_chain: Dict[str, str]
by_source: Dict[int, List[str]]
fold_artifact_ids: Dict[int, str]
classmethod from_dict(data: Dict[str, Any]) StepArtifacts[source]

Create StepArtifacts from dictionary.

Parameters:

data – Dictionary from manifest

Returns:

StepArtifacts instance

get_artifact_by_chain(chain_path: str) str | None[source]

Get artifact ID by exact chain path match.

Parameters:

chain_path – Operator chain path

Returns:

Artifact ID or None if not found

get_artifacts_for_branch(branch_path: List[int]) List[str][source]

Get artifact IDs matching a branch path.

Includes artifacts from: - Exact branch match - Empty branch (shared/pre-branch) - Parent branches (for nested branches)

Parameters:

branch_path – Target branch path

Returns:

List of matching artifact IDs

get_artifacts_for_source(source_index: int) List[str][source]

Get artifact IDs for a specific source.

Parameters:

source_index – Source index to filter

Returns:

List of artifact IDs for that source

merge(other: StepArtifacts) None[source]

Merge another StepArtifacts into this one.

Used when multiple substeps share the same step_index and their artifacts need to be combined in the artifact_map.

Parameters:

other – StepArtifacts to merge into this one

metadata: Dict[str, Any]
primary_artifact_id: str | None = None
primary_artifacts: Dict[str, str]
to_dict() Dict[str, Any][source]

Convert to dictionary for YAML serialization.

Returns:

Dictionary suitable for manifest storage

class nirs4all.pipeline.StepMode(step_index: int, mode: str = 'train', artifact_id: str | None = None, kwargs: Dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Mode override for a specific step during retraining.

Enables fine-grained control over which steps train vs. use existing artifacts.

step_index

1-based step index to apply this mode to

Type:

int

mode

How to execute this step (‘train’, ‘predict’, ‘skip’)

Type:

str

artifact_id

For ‘predict’ mode, specific artifact to use

Type:

str | None

kwargs

Additional step-specific parameters (e.g., epochs for finetune)

Type:

Dict[str, Any]

artifact_id: str | None = None
is_predict() bool[source]

Check if this step should use existing artifacts.

Returns:

True if step should use existing artifacts (predict mode)

is_train() bool[source]

Check if this step should train.

Returns:

True if step should be trained

kwargs: Dict[str, Any]
mode: str = 'train'
step_index: int
class nirs4all.pipeline.TargetResolver(workspace_path: Path)[source]

Bases: object

Resolves prediction targets for predict mode.

Focused responsibility: Finding and resolving prediction targets by ID.

Note: For the comprehensive Phase 3 resolver that handles multiple source types (prediction dict, folder, Run, artifact_id, bundle), see nirs4all.pipeline.resolver.PredictionResolver.

find_best_for_config(config_path: str) Dict[str, Any] | None[source]

Find the best prediction for a given config path.

Parameters:

config_path – Path to pipeline configuration

Returns:

Best prediction metadata, or None if not found

find_prediction_by_id(prediction_id: str) Dict[str, Any] | None[source]

Search for a prediction by ID in global predictions databases.

Uses direct ID filtering for O(1) lookup per file instead of O(N) iteration.

Parameters:

prediction_id – Unique prediction identifier

Returns:

Prediction metadata dict, or None if not found

resolve_target(prediction_obj: Dict[str, Any] | str) tuple[str, Dict[str, Any] | None][source]

Resolve prediction object to config path and model metadata.

Parameters:

prediction_obj – Either: - Dict with ‘config_path’ and optional model metadata - String: config path or prediction ID

Returns:

Tuple of (config_path, target_model_metadata)

Raises:

ValueError – If prediction ID not found or invalid input

class nirs4all.pipeline.TraceBasedExtractor(include_skipped: bool = False, preserve_order: bool = True)[source]

Bases: object

Extract minimal pipeline from execution trace.

The extractor analyzes an ExecutionTrace to determine which steps are needed for prediction replay and builds a MinimalPipeline with the correct artifact mappings.

The extractor is controller-agnostic: it uses trace metadata to identify steps without encoding knowledge of controller types.

include_skipped

Whether to include skipped steps in minimal pipeline

preserve_order

Whether to preserve original step order

Example

>>> extractor = TraceBasedExtractor()
>>> trace = manifest_manager.load_execution_trace(pipeline_uid, trace_id)
>>> minimal = extractor.extract(trace, full_pipeline_steps)
>>> print(f"Minimal pipeline has {minimal.get_step_count()} steps")
extract(trace: ExecutionTrace, full_pipeline: List[Any] | None = None, up_to_model: bool = True) MinimalPipeline[source]

Extract minimal pipeline from execution trace.

Analyzes the trace to determine which steps are needed for prediction and builds a MinimalPipeline with artifact mappings.

Parameters:
  • trace – ExecutionTrace to extract from

  • full_pipeline – Optional full pipeline steps (for step configs)

  • up_to_model – If True, only include steps up to model step

Returns:

MinimalPipeline with steps and artifact mappings

extract_for_branch(trace: ExecutionTrace, branch_path: List[int], full_pipeline: List[Any] | None = None) MinimalPipeline[source]

Extract minimal pipeline for a specific branch.

Includes shared steps (before branching) plus branch-specific steps.

Parameters:
  • trace – ExecutionTrace to extract from

  • branch_path – Branch path to extract (e.g., [0] for first branch)

  • full_pipeline – Optional full pipeline steps

Returns:

MinimalPipeline with steps for the specified branch

extract_for_branch_name(trace: ExecutionTrace, branch_name: str, full_pipeline: List[Any] | None = None) MinimalPipeline[source]

Extract minimal pipeline for a specific branch by name.

More reliable than extract_for_branch for nested branches where branch_id doesn’t map directly to branch_path. Uses branch_name for matching since it’s unique and stored in both predictions and trace.

Includes shared steps (before branching) plus branch-specific steps.

Parameters:
  • trace – ExecutionTrace to extract from

  • branch_name – Branch name to match (e.g., “branch_0_branch_0”)

  • full_pipeline – Optional full pipeline steps

Returns:

MinimalPipeline with steps for the specified branch

extract_for_step(trace: ExecutionTrace, target_step_index: int, full_pipeline: List[Any] | None = None) MinimalPipeline[source]

Extract minimal pipeline up to a specific step.

Useful for partial prediction or when targeting a specific model in a multi-model pipeline.

Parameters:
  • trace – ExecutionTrace to extract from

  • target_step_index – Target step index (inclusive)

  • full_pipeline – Optional full pipeline steps

Returns:

MinimalPipeline with steps up to target

get_required_artifact_ids(trace: ExecutionTrace, up_to_model: bool = True) List[str][source]

Get list of artifact IDs required for prediction.

Useful for pre-loading artifacts or validating artifact availability.

Parameters:
  • trace – ExecutionTrace to analyze

  • up_to_model – If True, only include artifacts up to model step

Returns:

List of artifact IDs needed for prediction

get_step_dependency_graph(trace: ExecutionTrace) Dict[int, Set[int]][source]

Build dependency graph from execution trace.

The dependency graph maps each step to the set of steps it depends on. This is inferred from the trace execution order and branch structure.

Parameters:

trace – ExecutionTrace to analyze

Returns:

Dictionary mapping step_index to set of dependency step indices

validate_trace_for_prediction(trace: ExecutionTrace) Tuple[bool, List[str]][source]

Validate that a trace has all information needed for prediction.

Checks that: - Model step is recorded - All steps up to model have recorded artifacts (if applicable) - No critical information is missing

Parameters:

trace – ExecutionTrace to validate

Returns:

Tuple of (is_valid, list of issues)

class nirs4all.pipeline.TraceRecorder(pipeline_uid: str = '', pipeline_id: str = '', metadata: Dict[str, Any] | None = None)[source]

Bases: object

Records execution traces during pipeline execution (V3).

Builds an ExecutionTrace by recording step starts, artifact creations, and step completions. Designed for use within the pipeline executor.

V3 improvements: - Maintains a chain stack for tracking full operator chain - Maintains a branch stack for automatic branch path management - Tracks source index for multi-source pipelines - Records branch substeps individually

trace

The ExecutionTrace being built

current_step

The step currently being executed

step_start_time

Time when current step started (for duration)

pipeline_id

Pipeline identifier for chain generation

Example

>>> recorder = TraceRecorder(pipeline_uid="0001_pls_abc123")
>>> recorder.start_step(step_index=1, operator_type="transform", operator_class="SNV")
>>> recorder.record_artifact(artifact_id="0001$abc123:all", chain_path="s1.SNV")
>>> recorder.end_step()
>>> recorder.enter_branch(0)
>>> recorder.start_step(step_index=3, operator_type="transform", operator_class="PLS")
>>> recorder.record_artifact(artifact_id="0001$def456:0", chain_path="s1.SNV>s3.PLS[br=0]")
>>> recorder.end_step(is_model=True)
>>> recorder.exit_branch()
>>> trace = recorder.finalize(preprocessing_chain="SNV>MinMax")
add_step_metadata(key: str, value: Any) None[source]

Add metadata to the current step.

Parameters:
  • key – Metadata key

  • value – Metadata value

build_chain_for_artifact(step_index: int, operator_class: str, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None) OperatorChain[source]

Build an operator chain for an artifact.

Creates a chain based on current context plus the specified operator.

Parameters:
  • step_index – Step index of the operator

  • operator_class – Class name of the operator

  • source_index – Source index for multi-source

  • fold_id – Fold ID for CV models

  • substep_index – Substep index within step

Returns:

OperatorChain for the artifact

current_branch_path() List[int][source]

Get current branch path.

Returns:

Copy of current branch path

current_chain() OperatorChain[source]

Get current operator chain without modifying stack.

Returns:

Current OperatorChain

end_step(is_model: bool = False, fold_weights: Dict[int, float] | None = None, skip_trace: bool = False) None[source]

End the current step and add it to the trace.

Parameters:
  • is_model – Whether this is the model step

  • fold_weights – Per-fold weights for CV models

  • skip_trace – If True, don’t add this step to the trace

enter_branch(branch_id: int) List[int][source]

Enter a branch context.

Parameters:

branch_id – Branch index to enter

Returns:

New branch path after entering

exit_branch() List[int][source]

Exit current branch context.

Returns:

The exited branch path

Raises:

RuntimeError – If not in a branch context

finalize(preprocessing_chain: str | None = None, metadata: Dict[str, Any] | None = None) ExecutionTrace[source]

Finalize and return the completed trace.

Parameters:
  • preprocessing_chain – Summary string of preprocessing

  • metadata – Additional metadata to merge

Returns:

The completed ExecutionTrace

get_current_step_index() int | None[source]

Get the current step index.

Returns:

Current step index or None if no step active

has_model_step() bool[source]

Check if a model step has been recorded.

Returns:

True if model step index is set

in_branch() bool[source]

Check if currently in a branch context.

Returns:

True if in a branch

mark_step_skipped(step_index: int) None[source]

Record that a step was skipped.

Parameters:

step_index – Index of the skipped step

pop_chain() OperatorChain[source]

Pop and return the current chain.

Returns:

The popped OperatorChain

Raises:

RuntimeError – If trying to pop the root chain

push_chain(node: OperatorNode) OperatorChain[source]

Push new node onto the chain stack.

Creates a new chain with the node appended and pushes it.

Parameters:

node – OperatorNode to append

Returns:

The new extended chain

record_artifact(artifact_id: str, is_primary: bool = False, fold_id: int | None = None, chain_path: str | None = None, branch_path: List[int] | None = None, source_index: int | None = None, metadata: Dict[str, Any] | None = None) None[source]

Record an artifact created during the current step (V3).

Parameters:
  • artifact_id – The artifact ID

  • is_primary – Whether this is the primary artifact

  • fold_id – CV fold ID if fold-specific artifact

  • chain_path – V3 operator chain path

  • branch_path – Branch path for indexing

  • source_index – Source index for multi-source

  • metadata – Additional artifact metadata

record_input_shapes(input_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]

Record input shapes for the current step.

Parameters:
  • input_shape – 2D layout shape (samples, features)

  • features_shape – List of 3D shapes per source (samples, processings, features)

record_output_shapes(output_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]

Record output shapes for the current step.

Parameters:
  • output_shape – 2D layout shape (samples, features)

  • features_shape – List of 3D shapes per source (samples, processings, features)

reset_chain_to(chain: OperatorChain) None[source]

Reset chain stack to a specific chain.

Useful when entering a new branch context.

Parameters:

chain – Chain to reset to

start_branch_step(step_index: int, branch_count: int, operator_config: Dict[str, Any] | None = None) ExecutionStep[source]

Start recording a branch step.

Parameters:
  • step_index – Step index of the branch

  • branch_count – Number of branches

  • operator_config – Branch configuration

Returns:

The created ExecutionStep for the branch

start_branch_substep(parent_step_index: int, branch_id: int, operator_type: str, operator_class: str, substep_index: int = 0, operator_config: Dict[str, Any] | None = None, branch_name: str | None = None) ExecutionStep[source]

Start recording a substep within a branch.

Note: This method assumes enter_branch() has already been called for this branch, so current_branch_path() already includes the branch_id.

Parameters:
  • parent_step_index – Parent branch step index

  • branch_id – Branch index this substep belongs to (for metadata only)

  • operator_type – Type of operator

  • operator_class – Class name of operator

  • substep_index – Index within the branch’s substeps

  • operator_config – Operator configuration

  • branch_name – Human-readable branch name

Returns:

The created ExecutionStep

start_step(step_index: int, operator_type: str = '', operator_class: str = '', operator_config: Dict[str, Any] | None = None, execution_mode: StepExecutionMode = StepExecutionMode.TRAIN, branch_path: List[int] | None = None, branch_name: str = '', source_count: int = 1, produces_branches: bool = False, substep_index: int | None = None) ExecutionStep[source]

Start recording a new step (V3).

Parameters:
  • step_index – 1-based step index

  • operator_type – Type of operator (e.g., “transform”, “model”)

  • operator_class – Class name of operator

  • operator_config – Serialized operator configuration

  • execution_mode – Train/predict/skip mode

  • branch_path – Branch indices (uses current if None)

  • branch_name – Human-readable branch name

  • source_count – Number of X sources at this step

  • produces_branches – Whether this is a branch operator

  • substep_index – Index within substep

Returns:

The created ExecutionStep

property trace_id: str

Get the trace ID.

Returns:

Trace ID string

class nirs4all.pipeline.WorkspaceExporter(workspace_path: Path)[source]

Bases: object

Exports best results to workspace exports/ folder.

Focused responsibility: Export functionality for best pipelines.

export_best_for_dataset(dataset_name: str, runs_dir: Path, mode: str = 'predictions') Path | None[source]

Export best results for a dataset to exports/ folder.

Parameters:
  • dataset_name – Dataset name

  • runs_dir – Runs directory path

  • mode – Export mode - “predictions”, “template”, “trained”, or “full”

Returns:

Path to export directory, or None if no predictions found

export_best_prediction(predictions_file: Path, dataset_name: str, run_date: str = None, pipeline_id: str = None, custom_name: str | None = None) Path[source]

Export predictions CSV to best_predictions/ folder.

Parameters:
  • predictions_file – Path to predictions.csv

  • dataset_name – Dataset name

  • run_date – Run date - deprecated, kept for compatibility

  • pipeline_id – Pipeline identifier

  • custom_name – Optional custom name for export

Returns:

Path to exported CSV

export_pipeline_full(pipeline_dir: Path, dataset_name: str, run_date: str = None, custom_name: str | None = None) Path[source]

Export full pipeline results to flat structure.

Parameters:
  • pipeline_dir – Path to pipeline (NNNN_hash/)

  • dataset_name – Dataset name

  • run_date – Run date (YYYYMMDD) - deprecated, kept for compatibility

  • custom_name – Optional custom name for export

Returns:

Path to exported directory