nirs4all.pipeline.execution package
Submodules
- nirs4all.pipeline.execution.builder module
ExecutorBuilderExecutorBuilder.artifact_registryExecutorBuilder.build()ExecutorBuilder.with_artifact_loader()ExecutorBuilder.with_artifact_registry()ExecutorBuilder.with_continue_on_error()ExecutorBuilder.with_dataset()ExecutorBuilder.with_manifest_manager()ExecutorBuilder.with_mode()ExecutorBuilder.with_plots_visible()ExecutorBuilder.with_run_directory()ExecutorBuilder.with_save_artifacts()ExecutorBuilder.with_save_charts()ExecutorBuilder.with_saver()ExecutorBuilder.with_show_spinner()ExecutorBuilder.with_step_runner()ExecutorBuilder.with_verbose()ExecutorBuilder.with_workspace()ExecutorBuilder.workspace
- nirs4all.pipeline.execution.executor module
PipelineExecutorPipelineExecutor.step_runnerPipelineExecutor.manifest_managerPipelineExecutor.verbosePipelineExecutor.modePipelineExecutor.continue_on_errorPipelineExecutor.artifact_registryPipelineExecutor.execute()PipelineExecutor.execute_minimal()PipelineExecutor.initialize_context()PipelineExecutor.next_op()
- nirs4all.pipeline.execution.orchestrator module
PipelineOrchestratorPipelineOrchestrator.workspace_pathPipelineOrchestrator.runs_dirPipelineOrchestrator.verbosePipelineOrchestrator.modePipelineOrchestrator.save_artifactsPipelineOrchestrator.save_chartsPipelineOrchestrator.enable_tab_reportsPipelineOrchestrator.keep_datasetsPipelineOrchestrator.plots_visiblePipelineOrchestrator.execute()
- nirs4all.pipeline.execution.result module
ArtifactMetaArtifactMeta.nameArtifactMeta.content_hashArtifactMeta.pathArtifactMeta.formatArtifactMeta.format_versionArtifactMeta.nirs4all_versionArtifactMeta.metadataArtifactMeta.content_hashArtifactMeta.formatArtifactMeta.format_versionArtifactMeta.metadataArtifactMeta.nameArtifactMeta.nirs4all_versionArtifactMeta.path
StepOutputStepResult
Module contents
Pipeline execution module for nirs4all.
- class nirs4all.pipeline.execution.ArtifactMeta(name: str, content_hash: str, path: str, format: str, format_version: str = '', nirs4all_version: str = '', metadata: Dict[str, ~typing.Any]=<factory>)[source]
Bases:
objectMetadata about a persisted artifact.
- class nirs4all.pipeline.execution.ExecutorBuilder[source]
Bases:
objectBuilder for creating PipelineExecutor instances.
Provides a fluent interface for configuring and building executors with all necessary dependencies. Encapsulates the complex setup logic previously duplicated between Orchestrator and other components.
Example
>>> builder = ExecutorBuilder() >>> executor = (builder ... .with_run_directory(run_dir) ... .with_mode("train") ... .with_verbose(1) ... .build())
- build() PipelineExecutor[source]
Build the configured PipelineExecutor instance.
Creates all necessary components if not already provided, then constructs and returns a fully configured PipelineExecutor.
- Returns:
Configured PipelineExecutor instance
- Raises:
ValueError – If run_directory is not set
- with_artifact_loader(artifact_loader: Any) ExecutorBuilder[source]
Set artifact loader for predict/explain modes.
- Parameters:
artifact_loader – ArtifactLoader instance
- Returns:
Self for method chaining
- with_artifact_registry(artifact_registry: Any) ExecutorBuilder[source]
Set artifact registry for train mode.
- Parameters:
artifact_registry – ArtifactRegistry instance
- Returns:
Self for method chaining
- with_continue_on_error(continue_on_error: bool) ExecutorBuilder[source]
Set whether to continue execution on errors.
- Parameters:
continue_on_error – Whether to continue on errors
- Returns:
Self for method chaining
- with_dataset(dataset: SpectroDataset) ExecutorBuilder[source]
Set the dataset for this execution.
- Parameters:
dataset – Dataset to process
- Returns:
Self for method chaining
- with_manifest_manager(manifest_manager: ManifestManager) ExecutorBuilder[source]
Set custom manifest manager.
- Parameters:
manifest_manager – ManifestManager instance
- Returns:
Self for method chaining
- with_mode(mode: str) ExecutorBuilder[source]
Set execution mode.
- Parameters:
mode – Execution mode (‘train’, ‘predict’, ‘explain’)
- Returns:
Self for method chaining
- with_plots_visible(plots_visible: bool) ExecutorBuilder[source]
Set whether to display plots.
- Parameters:
plots_visible – Whether to display plots
- Returns:
Self for method chaining
- with_run_directory(run_directory: Path) ExecutorBuilder[source]
Set the run directory for this execution.
- Parameters:
run_directory – Path to the run directory where outputs will be saved
- Returns:
Self for method chaining
- with_save_artifacts(save_artifacts: bool) ExecutorBuilder[source]
Set whether to save binary artifacts (models, transformers).
- Parameters:
save_artifacts – Whether to save artifacts
- Returns:
Self for method chaining
- with_save_charts(save_charts: bool) ExecutorBuilder[source]
Set whether to save charts and visual outputs.
- Parameters:
save_charts – Whether to save charts
- Returns:
Self for method chaining
- with_saver(saver: SimulationSaver) ExecutorBuilder[source]
Set custom simulation saver.
- Parameters:
saver – SimulationSaver instance
- Returns:
Self for method chaining
- with_show_spinner(show_spinner: bool) ExecutorBuilder[source]
Set whether to show progress spinners.
- Parameters:
show_spinner – Whether to show spinners
- Returns:
Self for method chaining
- with_step_runner(step_runner: StepRunner) ExecutorBuilder[source]
Set custom step runner.
- Parameters:
step_runner – StepRunner instance
- Returns:
Self for method chaining
- with_verbose(verbose: int) ExecutorBuilder[source]
Set verbosity level.
- Parameters:
verbose – Verbosity level (0=quiet, 1=info, 2=debug)
- Returns:
Self for method chaining
- with_workspace(workspace: Path) ExecutorBuilder[source]
Set workspace root path for artifact storage.
- Parameters:
workspace – Workspace root path
- Returns:
Self for method chaining
- class nirs4all.pipeline.execution.PipelineExecutor(step_runner: StepRunner, manifest_manager: ManifestManager | None = None, verbose: int = 0, mode: str = 'train', continue_on_error: bool = False, saver: Any = None, artifact_loader: Any = None, artifact_registry: Any = None)[source]
Bases:
objectExecutes a single pipeline configuration on a single dataset.
Handles: - Step-by-step execution - Context propagation - Artifact management for one pipeline run - Predictions accumulation for this pipeline
- step_runner
Executes individual steps
- manifest_manager
Manages pipeline manifests
- verbose
Verbosity level
- mode
Execution mode (train/predict/explain)
- continue_on_error
Whether to continue on step failures
- artifact_registry
Registry for v2 artifact management
- execute(steps: List[Any], config_name: str, dataset: SpectroDataset, context: ExecutionContext, runtime_context: Any, prediction_store: Predictions | None = None, generator_choices: List[Dict[str, Any]] | None = None) None[source]
Execute pipeline steps sequentially on dataset.
- Parameters:
steps – List of pipeline steps to execute
config_name – Pipeline configuration name
dataset – Dataset to process
context – Initial execution context
runtime_context – Runtime infrastructure context
prediction_store – Prediction store for accumulating results
generator_choices – List of generator choices that produced this pipeline
- Raises:
RuntimeError – If pipeline execution fails
- execute_minimal(steps: List[Any], minimal_pipeline: Any, dataset: SpectroDataset, context: ExecutionContext, runtime_context: Any, prediction_store: Predictions | None = None) None[source]
Execute minimal pipeline for prediction.
This method executes only the steps from a MinimalPipeline, which represents the subset of the full pipeline needed to replay a prediction. It’s the key optimization of Phase 5: instead of replaying the entire original pipeline, we only run the required steps.
The method: 1. Uses the minimal pipeline’s step list (not full pipeline) 2. Injects artifacts via the artifact_provider in runtime_context 3. Runs controllers in predict mode 4. Skips steps not in the minimal pipeline
- Parameters:
steps – List of step configs (from minimal_pipeline.steps[i].step_config)
minimal_pipeline – MinimalPipeline with artifact mappings
dataset – Dataset to process
context – Execution context
runtime_context – Runtime context with artifact_provider
prediction_store – Optional prediction store
Note
The artifact_provider in runtime_context should be a MinimalArtifactProvider that provides artifacts by step index from the MinimalPipeline.
- initialize_context(dataset: SpectroDataset) ExecutionContext[source]
Initialize ExecutionContext for pipeline execution.
- Parameters:
dataset – Dataset to create context for
- Returns:
Initialized ExecutionContext
- class nirs4all.pipeline.execution.PipelineOrchestrator(workspace_path: str | Path | None = None, verbose: int = 0, mode: str = 'train', save_artifacts: bool = True, save_charts: bool = True, enable_tab_reports: bool = True, continue_on_error: bool = False, show_spinner: bool = True, keep_datasets: bool = True, plots_visible: bool = False)[source]
Bases:
objectOrchestrates execution of multiple pipelines across multiple datasets.
High-level coordinator that manages: - Workspace initialization - Global predictions aggregation - Best results reporting - Dataset/pipeline normalization
- workspace_path
Root workspace directory
- runs_dir
Directory for storing runs
- verbose
Verbosity level
- mode
Execution mode (train/predict/explain)
- save_artifacts
Whether to save binary artifacts
- save_charts
Whether to save charts and visual outputs
- enable_tab_reports
Whether to generate tab reports
- keep_datasets
Whether to keep dataset snapshots
- plots_visible
Whether to display plots
- execute(pipeline: PipelineConfigs | List[Any] | Dict | str, dataset: DatasetConfigs | SpectroDataset | List[SpectroDataset] | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], pipeline_name: str = '', dataset_name: str = 'dataset', max_generation_count: int = 10000, artifact_loader: Any = None, target_model: Dict[str, Any] | None = None, explainer: Any = None) Tuple[Predictions, Dict[str, Any]][source]
Execute pipeline configurations on dataset configurations.
- Parameters:
pipeline – Pipeline definition (PipelineConfigs, List[steps], Dict, or file path)
dataset – Dataset definition (DatasetConfigs, SpectroDataset, numpy arrays, Dict, or file path)
pipeline_name – Optional name for the pipeline
dataset_name – Optional name for array-based datasets
max_generation_count – Maximum number of pipeline combinations to generate
artifact_loader – ArtifactLoader for predict/explain modes
target_model – Target model for predict/explain modes
explainer – Explainer instance for explain mode
- Returns:
Tuple of (run_predictions, dataset_predictions)
- class nirs4all.pipeline.execution.StepResult(updated_context: ExecutionContext, artifacts: List[Any] = <factory>, outputs: List[Any] = <factory>, predictions: Predictions | None = None)[source]
Bases:
objectResult of executing a single pipeline step.
- updated_context
Updated execution context after step
- artifacts
List of artifacts persisted during step execution
- Type:
List[Any]
- outputs
List of output files saved during step execution
- Type:
List[Any]
- predictions
Optional predictions generated by this step
- Type:
- predictions: Predictions | None = None
- updated_context: ExecutionContext