nirs4all.pipeline.runner module

Pipeline runner - Main entry point for pipeline execution.

This module provides the PipelineRunner class, which serves as the main interface for executing ML pipelines on spectroscopic datasets. It delegates execution to PipelineOrchestrator and provides prediction/explanation capabilities via Predictor and Explainer classes.

class nirs4all.pipeline.runner.PipelineRunner(workspace_path: str | Path | None = None, verbose: int = 0, mode: str = 'train', save_artifacts: bool = True, save_charts: bool = True, enable_tab_reports: bool = True, continue_on_error: bool = False, show_spinner: bool = True, keep_datasets: bool = True, plots_visible: bool = False, random_state: int | None = None, log_file: bool = True, log_format: str = 'pretty', use_unicode: bool | None = None, use_colors: bool | None = None, show_progress_bar: bool = True, json_output: bool = False)[source]

Bases: object

Main pipeline execution interface.

Orchestrates pipeline execution on datasets, providing a simplified interface for training, prediction, and explanation workflows. Delegates actual execution to PipelineOrchestrator, Predictor, and Explainer.

workspace_path

Root workspace directory

Type:

Path

verbose

Verbosity level (0=quiet, 1=info, 2=debug, 3=trace)

Type:

int

mode

Execution mode (‘train’, ‘predict’, ‘explain’)

Type:

str

save_artifacts

Whether to save binary artifacts (models, transformers)

Type:

bool

save_charts

Whether to save charts and visual outputs

Type:

bool

enable_tab_reports

Whether to generate tabular reports

Type:

bool

continue_on_error

Whether to continue on step failures

Type:

bool

show_spinner

Whether to show progress spinners

Type:

bool

keep_datasets

Whether to keep raw/preprocessed data snapshots

Type:

bool

plots_visible

Whether to display plots interactively

Type:

bool

orchestrator

Underlying orchestrator for execution

Type:

PipelineOrchestrator

predictor

Handler for prediction mode

Type:

Predictor

explainer

Handler for explanation mode

Type:

Explainer

raw_data

Raw dataset snapshots (if keep_datasets=True)

Type:

Dict[str, np.ndarray]

pp_data

Preprocessed data snapshots

Type:

Dict[str, Dict[str, np.ndarray]]

Example

>>> # Training workflow
>>> runner = PipelineRunner(workspace_path="./workspace", verbose=1)
>>> pipeline = [{"preprocessing": StandardScaler()}, {"model": SVC()}]
>>> X, y = load_data()
>>> predictions, dataset_preds = runner.run(pipeline, (X, y))
>>> # Prediction workflow
>>> runner = PipelineRunner(mode="predict")
>>> y_pred, preds = runner.predict(best_model, X_new)
>>> # Explanation workflow
>>> runner = PipelineRunner(mode="explain")
>>> shap_results, out_dir = runner.explain(best_model, X_test)
property current_run_dir: Path | None

Get current run directory.

Returns:

Path to current run directory, or None if not set

explain(prediction_obj: Dict[str, Any] | str, dataset: DatasetConfigs | SpectroDataset | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], dataset_name: str = 'explain_dataset', shap_params: Dict[str, Any] | None = None, verbose: int = 0, plots_visible: bool = True) Tuple[Dict[str, Any], str][source]

Generate SHAP explanations for a saved model.

Delegates to Explainer class for actual execution.

Parameters:
  • prediction_obj – Model identifier (dict with config_path or prediction ID)

  • dataset – Dataset to explain on

  • dataset_name – Name for the dataset

  • shap_params – SHAP configuration parameters

  • verbose – Verbosity level

  • plots_visible – Whether to display plots interactively

Returns:

Tuple of (shap_results_dict, output_directory_path)

export(source: Dict[str, Any] | str | Path, output_path: str | Path, format: str = 'n4a', include_metadata: bool = True, compress: bool = True) Path[source]

Export a trained pipeline to a standalone bundle.

Creates a self-contained prediction bundle that can be used for deployment, sharing, or archival without requiring the original workspace or full nirs4all installation.

Supported formats:
  • ‘n4a’: Full bundle (ZIP archive with artifacts and metadata)

  • ‘n4a.py’: Portable Python script with embedded artifacts

Phase 6 Feature:

This method enables exporting trained pipelines as standalone bundles that can be loaded and used for prediction without the original workspace structure.

Parameters:
  • source – Prediction source to export. Can be: - prediction dict: From a previous run’s Predictions object - folder path: Path to a pipeline directory - Run object: Best prediction from a Run

  • output_path – Path for the output bundle file

  • format – Bundle format (‘n4a’ or ‘n4a.py’)

  • include_metadata – Whether to include full metadata in bundle

  • compress – Whether to compress artifacts (for .n4a format)

Returns:

Path to the created bundle file

Raises:

Example

>>> runner = PipelineRunner()
>>> predictions, _ = runner.run(pipeline, dataset)
>>> best_pred = predictions.top(n=1)[0]
>>>
>>> # Export to .n4a bundle
>>> runner.export(best_pred, "exports/wheat_model.n4a")
>>>
>>> # Export to portable Python script
>>> runner.export(best_pred, "exports/wheat_model.n4a.py", format='n4a.py')
>>>
>>> # Later, predict from bundle
>>> y_pred, _ = runner.predict("exports/wheat_model.n4a", X_new)
export_best_for_dataset(dataset_name: str, mode: str = 'predictions') Path | None[source]

Export best results for a dataset to exports/ folder.

Parameters:
  • dataset_name – Name of the dataset to export

  • mode – Export mode (‘predictions’ or other)

Returns:

Path to exported file, or None if export failed

export_model(source: Dict[str, Any] | str | Path, output_path: str | Path, format: str | None = None, fold: int | None = None) Path[source]

Export only the model artifact from a trained pipeline.

Unlike export() which creates a full bundle with all preprocessing artifacts and metadata, this method exports just the model binary. This is useful when you want a lightweight model file that can be loaded directly into other pipelines or used with external tools.

The output format is determined by the file extension or can be specified explicitly. The model can then be reloaded using: - Direct path in pipeline config: {“model”: “path/to/model.joblib”} - As prediction source: runner.predict(“path/to/model.joblib”, data)

Parameters:
  • source – Prediction source to export from. Can be: - prediction dict: From a previous run’s Predictions object - folder path: Path to a pipeline directory - bundle path: Path to a .n4a bundle

  • output_path – Path for the output model file. Extension determines format: .joblib, .pkl, .h5, .keras, .pt

  • format – Optional explicit format (‘joblib’, ‘pickle’, ‘keras_h5’). If None, determined from output_path extension.

  • fold – Optional fold index to export. If None, exports fold 0 or the primary model artifact.

Returns:

Path to the created model file

Raises:

Example

>>> runner = PipelineRunner()
>>> predictions, _ = runner.run(pipeline, dataset)
>>> best_pred = predictions.top(n=1)[0]
>>>
>>> # Export just the model
>>> runner.export_model(best_pred, "exports/pls_model.joblib")
>>>
>>> # Later, use in new pipeline
>>> new_pipeline = [
...     MinMaxScaler(),
...     {"model": "exports/pls_model.joblib", "name": "pretrained"}
... ]
extract(source: Dict[str, Any] | str | Path) ExtractedPipeline[source]

Extract a trained pipeline for inspection or modification.

Loads a trained pipeline from a prediction source and returns an ExtractedPipeline object that can be inspected, modified, and then executed with runner.run().

Phase 7 Feature:

This method enables extracting and modifying trained pipelines without retraining from scratch.

Parameters:

source – Prediction source to extract. Can be: - prediction dict: From a previous run’s Predictions object - folder path: Path to a pipeline directory - Run object: Best prediction from a Run - artifact_id: Direct artifact reference - bundle: Exported prediction bundle (.n4a)

Returns:

  • steps: List of pipeline steps (can be modified)

  • trace: Original execution trace (read-only)

  • artifact_provider: Provider for original artifacts

  • model_step_index: Index of the model step

  • preprocessing_chain: Summary of preprocessing

Return type:

ExtractedPipeline object with

Example

>>> runner = PipelineRunner()
>>> predictions, _ = runner.run(pipeline, dataset)
>>> best_pred = predictions.top(n=1)[0]
>>>
>>> # Extract for inspection
>>> extracted = runner.extract(best_pred)
>>> print(f"Steps: {len(extracted.steps)}")
>>> print(f"Preprocessing: {extracted.preprocessing_chain}")
>>>
>>> # Modify and run
>>> from sklearn.ensemble import RandomForestRegressor
>>> extracted.set_model(RandomForestRegressor())
>>> new_preds, _ = runner.run(extracted.steps, new_data)
property last_aggregate: str | None

Get aggregate column from the last executed dataset.

Returns the aggregation setting from the last dataset processed by run(). This can be used to create a PredictionAnalyzer with matching defaults.

Returns:

Aggregate column name (‘y’ for y-based aggregation, column name for metadata-based aggregation, or None if no aggregation was set).

Example

>>> runner = PipelineRunner()
>>> predictions, _ = runner.run(pipeline, DatasetConfigs(path, aggregate='sample_id'))
>>> # Create analyzer with same aggregate setting
>>> analyzer = PredictionAnalyzer(predictions, default_aggregate=runner.last_aggregate)
property last_aggregate_exclude_outliers: bool

Get aggregate exclude_outliers setting from the last executed dataset.

Returns:

True if T² outlier exclusion was enabled, False otherwise.

property last_aggregate_method: str | None

Get aggregate method from the last executed dataset.

Returns:

Aggregate method (‘mean’, ‘median’, ‘vote’) or None for default.

property library: PipelineLibrary

Get pipeline library for template management.

Returns:

PipelineLibrary instance for managing pipeline templates

next_op() int[source]

Get the next operation ID (for controller compatibility).

Returns:

Next operation counter value

predict(prediction_obj: Dict[str, Any] | str, dataset: DatasetConfigs | SpectroDataset | List[SpectroDataset] | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], dataset_name: str = 'prediction_dataset', all_predictions: bool = False, verbose: int = 0) Tuple[ndarray, Predictions] | Tuple[Dict[str, Any], Predictions][source]

Run prediction using a saved model on new dataset.

Delegates to Predictor class for actual execution.

Parameters:
  • prediction_obj – Model identifier (dict with config_path or prediction ID)

  • dataset – New dataset to predict on

  • dataset_name – Name for the dataset

  • all_predictions – If True, return all predictions; if False, return single best

  • verbose – Verbosity level

Returns:

(y_pred, predictions) If all_predictions=True: (predictions_dict, predictions)

Return type:

If all_predictions=False

retrain(source: Dict[str, Any] | str | Path, dataset: DatasetConfigs | SpectroDataset | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], mode: str = 'full', dataset_name: str = 'retrain_dataset', new_model: Any | None = None, epochs: int | None = None, step_modes: List[StepMode] | None = None, verbose: int = 0, **kwargs) Tuple[Predictions, Dict[str, Any]][source]

Retrain a pipeline on new data.

Enables retraining trained pipelines with various modes: - full: Train from scratch with same pipeline structure - transfer: Use existing preprocessing artifacts, train new model - finetune: Continue training existing model with new data

Phase 7 Feature:

This method enables retraining pipelines without having to reconstruct the pipeline configuration manually. It uses the resolved prediction source (from Phase 3/4) to extract the pipeline structure and optionally reuse preprocessing artifacts.

Parameters:
  • source – Prediction source to retrain from. Can be: - prediction dict: From a previous run’s Predictions object - folder path: Path to a pipeline directory - Run object: Best prediction from a Run - artifact_id: Direct artifact reference - bundle: Exported prediction bundle (.n4a)

  • dataset – New dataset to train on. Supports same formats as run()

  • mode – Retrain mode: - ‘full’: Train everything from scratch (same pipeline structure) - ‘transfer’: Use existing preprocessing, train new model - ‘finetune’: Continue training existing model

  • dataset_name – Name for the dataset if array-based

  • new_model – Optional new model for transfer mode (replaces original)

  • epochs – Optional epochs for fine-tuning

  • step_modes – Optional per-step mode overrides for fine-grained control

  • verbose – Verbosity level

  • **kwargs – Additional parameters: - learning_rate: Learning rate for fine-tuning - freeze_layers: List of layers to freeze during fine-tuning

Returns:

Tuple of (run_predictions, datasets_predictions)

Raises:

Example

>>> runner = PipelineRunner()
>>> predictions, _ = runner.run(pipeline, dataset)
>>> best_pred = predictions.top(n=1)[0]
>>>
>>> # Full retrain on new data
>>> new_preds, _ = runner.retrain(best_pred, new_data, mode='full')
>>>
>>> # Transfer: use preprocessing from old model, train new one
>>> new_preds, _ = runner.retrain(
...     best_pred, new_data, mode='transfer',
...     new_model=XGBRegressor()
... )
>>>
>>> # Finetune: continue training existing model
>>> new_preds, _ = runner.retrain(
...     best_pred, new_data, mode='finetune', epochs=10
... )
>>>
>>> # Fine-grained control: specify per-step modes
>>> from nirs4all.pipeline import StepMode
>>> step_modes = [
...     StepMode(step_index=1, mode='predict'),  # Use existing
...     StepMode(step_index=2, mode='train'),    # Retrain
... ]
>>> new_preds, _ = runner.retrain(
...     best_pred, new_data, mode='full', step_modes=step_modes
... )
run(pipeline: PipelineConfigs | List[Any] | Dict | str, dataset: DatasetConfigs | SpectroDataset | List[SpectroDataset] | ndarray | Tuple[ndarray, ...] | Dict | List[Dict] | str | List[str], pipeline_name: str = '', dataset_name: str = 'dataset', max_generation_count: int = 10000) Tuple[Predictions, Dict[str, Any]][source]

Execute pipeline on dataset(s).

Main entry point for training workflows. Executes one or more pipeline configurations on one or more datasets, tracking predictions and artifacts.

Parameters:
  • pipeline – Pipeline definition (PipelineConfigs, list of steps, dict, or path)

  • dataset – Dataset definition (see DatasetConfigs for supported formats)

  • pipeline_name – Optional pipeline name for identification

  • dataset_name – Name for array-based datasets

  • max_generation_count – Max pipeline combinations to generate

Returns:

Tuple of (run_predictions, datasets_predictions)

property runs_dir: Path

Get runs directory.

Returns:

Path to runs directory in workspace

nirs4all.pipeline.runner.init_global_random_state(seed: int | None = None)[source]

Initialize global random state for reproducibility.

Sets random seeds for numpy, Python’s random module, TensorFlow, PyTorch, and sklearn to ensure reproducible results across runs.

Parameters:

seed – Random seed value. If None, uses default seed of 42 for TensorFlow and PyTorch.