nirs4all.pipeline.execution package

Submodules

Module contents

Pipeline execution module for nirs4all.

class nirs4all.pipeline.execution.ArtifactMeta(name: str, content_hash: str, path: str, format: str, format_version: str = '', nirs4all_version: str = '', metadata: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: object

Metadata about a persisted artifact.

name

Artifact identifier (e.g., ‘model’, ‘scaler’)

Type:

str

content_hash

Content-based hash for deduplication

Type:

str

path

Relative path from artifacts directory

Type:

str

format

Artifact format (e.g., ‘joblib’, ‘h5’, ‘safetensors’)

Type:

str

format_version

Version of the library used (e.g., ‘sklearn==1.3.0’)

Type:

str

nirs4all_version

Version of nirs4all used (e.g., ‘0.4.1’)

Type:

str

metadata

Additional artifact metadata

Type:

Dict[str, Any]

content_hash: str
format: str
format_version: str = ''
metadata: Dict[str, Any]
name: str
nirs4all_version: str = ''
path: str
class nirs4all.pipeline.execution.ExecutorBuilder[source]

Bases: object

Builder for creating PipelineExecutor instances.

Provides a fluent interface for configuring and building executors with all necessary dependencies. Encapsulates the complex setup logic previously duplicated between Orchestrator and other components.

Example

>>> builder = ExecutorBuilder()
>>> executor = (builder
...     .with_run_directory(run_dir)
...     .with_mode("train")
...     .with_verbose(1)
...     .build())
property artifact_registry: Any

Get the artifact registry.

build() PipelineExecutor[source]

Build the configured PipelineExecutor instance.

Creates all necessary components if not already provided, then constructs and returns a fully configured PipelineExecutor.

Returns:

Configured PipelineExecutor instance

Raises:

ValueError – If run_directory is not set

with_artifact_loader(artifact_loader: Any) ExecutorBuilder[source]

Set artifact loader for predict/explain modes.

Parameters:

artifact_loader – ArtifactLoader instance

Returns:

Self for method chaining

with_artifact_registry(artifact_registry: Any) ExecutorBuilder[source]

Set artifact registry for train mode.

Parameters:

artifact_registry – ArtifactRegistry instance

Returns:

Self for method chaining

with_continue_on_error(continue_on_error: bool) ExecutorBuilder[source]

Set whether to continue execution on errors.

Parameters:

continue_on_error – Whether to continue on errors

Returns:

Self for method chaining

with_dataset(dataset: SpectroDataset) ExecutorBuilder[source]

Set the dataset for this execution.

Parameters:

dataset – Dataset to process

Returns:

Self for method chaining

with_manifest_manager(manifest_manager: ManifestManager) ExecutorBuilder[source]

Set custom manifest manager.

Parameters:

manifest_manager – ManifestManager instance

Returns:

Self for method chaining

with_mode(mode: str) ExecutorBuilder[source]

Set execution mode.

Parameters:

mode – Execution mode (‘train’, ‘predict’, ‘explain’)

Returns:

Self for method chaining

with_plots_visible(plots_visible: bool) ExecutorBuilder[source]

Set whether to display plots.

Parameters:

plots_visible – Whether to display plots

Returns:

Self for method chaining

with_run_directory(run_directory: Path) ExecutorBuilder[source]

Set the run directory for this execution.

Parameters:

run_directory – Path to the run directory where outputs will be saved

Returns:

Self for method chaining

with_save_artifacts(save_artifacts: bool) ExecutorBuilder[source]

Set whether to save binary artifacts (models, transformers).

Parameters:

save_artifacts – Whether to save artifacts

Returns:

Self for method chaining

with_save_charts(save_charts: bool) ExecutorBuilder[source]

Set whether to save charts and visual outputs.

Parameters:

save_charts – Whether to save charts

Returns:

Self for method chaining

with_saver(saver: SimulationSaver) ExecutorBuilder[source]

Set custom simulation saver.

Parameters:

saver – SimulationSaver instance

Returns:

Self for method chaining

with_show_spinner(show_spinner: bool) ExecutorBuilder[source]

Set whether to show progress spinners.

Parameters:

show_spinner – Whether to show spinners

Returns:

Self for method chaining

with_step_runner(step_runner: StepRunner) ExecutorBuilder[source]

Set custom step runner.

Parameters:

step_runner – StepRunner instance

Returns:

Self for method chaining

with_verbose(verbose: int) ExecutorBuilder[source]

Set verbosity level.

Parameters:

verbose – Verbosity level (0=quiet, 1=info, 2=debug)

Returns:

Self for method chaining

with_workspace(workspace: Path) ExecutorBuilder[source]

Set workspace root path for artifact storage.

Parameters:

workspace – Workspace root path

Returns:

Self for method chaining

property workspace: Path | None

Get the workspace path.

class nirs4all.pipeline.execution.PipelineExecutor(step_runner: StepRunner, manifest_manager: ManifestManager | None = None, verbose: int = 0, mode: str = 'train', continue_on_error: bool = False, saver: Any = None, artifact_loader: Any = None, artifact_registry: Any = None)[source]

Bases: object

Executes a single pipeline configuration on a single dataset.

Handles: - Step-by-step execution - Context propagation - Artifact management for one pipeline run - Predictions accumulation for this pipeline

step_runner

Executes individual steps

manifest_manager

Manages pipeline manifests

verbose

Verbosity level

mode

Execution mode (train/predict/explain)

continue_on_error

Whether to continue on step failures

artifact_registry

Registry for v2 artifact management

execute(steps: List[Any], config_name: str, dataset: SpectroDataset, context: ExecutionContext, runtime_context: Any, prediction_store: Predictions | None = None, generator_choices: List[Dict[str, Any]] | None = None) None[source]

Execute pipeline steps sequentially on dataset.

Parameters:
  • steps – List of pipeline steps to execute

  • config_name – Pipeline configuration name

  • dataset – Dataset to process

  • context – Initial execution context

  • runtime_context – Runtime infrastructure context

  • prediction_store – Prediction store for accumulating results

  • generator_choices – List of generator choices that produced this pipeline

Raises:

RuntimeError – If pipeline execution fails

execute_minimal(steps: List[Any], minimal_pipeline: Any, dataset: SpectroDataset, context: ExecutionContext, runtime_context: Any, prediction_store: Predictions | None = None) None[source]

Execute minimal pipeline for prediction.

This method executes only the steps from a MinimalPipeline, which represents the subset of the full pipeline needed to replay a prediction. It’s the key optimization of Phase 5: instead of replaying the entire original pipeline, we only run the required steps.

The method: 1. Uses the minimal pipeline’s step list (not full pipeline) 2. Injects artifacts via the artifact_provider in runtime_context 3. Runs controllers in predict mode 4. Skips steps not in the minimal pipeline

Parameters:
  • steps – List of step configs (from minimal_pipeline.steps[i].step_config)

  • minimal_pipeline – MinimalPipeline with artifact mappings

  • dataset – Dataset to process

  • context – Execution context

  • runtime_context – Runtime context with artifact_provider

  • prediction_store – Optional prediction store

Note

The artifact_provider in runtime_context should be a MinimalArtifactProvider that provides artifacts by step index from the MinimalPipeline.

initialize_context(dataset: SpectroDataset) ExecutionContext[source]

Initialize ExecutionContext for pipeline execution.

Parameters:

dataset – Dataset to create context for

Returns:

Initialized ExecutionContext

next_op() int[source]

Get the next operation ID (for compatibility).

class nirs4all.pipeline.execution.PipelineOrchestrator(workspace_path: str | Path | None = None, verbose: int = 0, mode: str = 'train', save_artifacts: bool = True, save_charts: bool = True, enable_tab_reports: bool = True, continue_on_error: bool = False, show_spinner: bool = True, keep_datasets: bool = True, plots_visible: bool = False)[source]

Bases: object

Orchestrates execution of multiple pipelines across multiple datasets.

High-level coordinator that manages: - Workspace initialization - Global predictions aggregation - Best results reporting - Dataset/pipeline normalization

workspace_path

Root workspace directory

runs_dir

Directory for storing runs

verbose

Verbosity level

mode

Execution mode (train/predict/explain)

save_artifacts

Whether to save binary artifacts

save_charts

Whether to save charts and visual outputs

enable_tab_reports

Whether to generate tab reports

keep_datasets

Whether to keep dataset snapshots

plots_visible

Whether to display plots

execute(pipeline: PipelineConfigs | List[Any] | Dict | str, dataset: DatasetConfigs | SpectroDataset | List[SpectroDataset] | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], pipeline_name: str = '', dataset_name: str = 'dataset', max_generation_count: int = 10000, artifact_loader: Any = None, target_model: Dict[str, Any] | None = None, explainer: Any = None) Tuple[Predictions, Dict[str, Any]][source]

Execute pipeline configurations on dataset configurations.

Parameters:
  • pipeline – Pipeline definition (PipelineConfigs, List[steps], Dict, or file path)

  • dataset – Dataset definition (DatasetConfigs, SpectroDataset, numpy arrays, Dict, or file path)

  • pipeline_name – Optional name for the pipeline

  • dataset_name – Optional name for array-based datasets

  • max_generation_count – Maximum number of pipeline combinations to generate

  • artifact_loader – ArtifactLoader for predict/explain modes

  • target_model – Target model for predict/explain modes

  • explainer – Explainer instance for explain mode

Returns:

Tuple of (run_predictions, dataset_predictions)

class nirs4all.pipeline.execution.StepResult(updated_context: ~nirs4all.pipeline.config.context.ExecutionContext, artifacts: ~typing.List[~typing.Any] = <factory>, outputs: ~typing.List[~typing.Any] = <factory>, predictions: ~nirs4all.data.predictions.Predictions | None = None)[source]

Bases: object

Result of executing a single pipeline step.

updated_context

Updated execution context after step

Type:

nirs4all.pipeline.config.context.ExecutionContext

artifacts

List of artifacts persisted during step execution

Type:

List[Any]

outputs

List of output files saved during step execution

Type:

List[Any]

predictions

Optional predictions generated by this step

Type:

nirs4all.data.predictions.Predictions | None

artifacts: List[Any]
outputs: List[Any]
predictions: Predictions | None = None
updated_context: ExecutionContext