"""
Module-level run() function for nirs4all.
This module provides the primary entry point for training ML pipelines on NIRS data.
It wraps PipelineRunner.run() with a simpler, more ergonomic interface.
Example:
>>> import nirs4all
>>> result = nirs4all.run(
... pipeline=[MinMaxScaler(), PLSRegression(10)],
... dataset="sample_data/regression",
... verbose=1
... )
>>> print(f"Best RMSE: {result.best_rmse:.4f}")
"""
from typing import Any, Dict, List, Optional, Tuple, Union
from pathlib import Path
import numpy as np
from nirs4all.pipeline import PipelineRunner, PipelineConfigs
from nirs4all.data import DatasetConfigs
from nirs4all.data.dataset import SpectroDataset
from nirs4all.data.predictions import Predictions
from .result import RunResult
from .session import Session
# Type aliases for a single pipeline or dataset (not lists)
SinglePipelineSpec = Union[
List[Any], # List of steps (most common)
Dict[str, Any], # Dict configuration
str, # Path to YAML/JSON config
Path, # Path to config file
PipelineConfigs # Backward compat: existing PipelineConfigs
]
SingleDatasetSpec = Union[
str, # Path to data folder
Path, # Path to data folder
np.ndarray, # X array (y inferred or None)
Tuple[np.ndarray, ...], # (X,) or (X, y) or (X, y, metadata)
Dict[str, Any], # Dict with X, y, metadata keys
SpectroDataset, # Direct SpectroDataset instance
DatasetConfigs, # Backward compat: existing DatasetConfigs
]
# Type aliases that also support lists for batch execution
PipelineSpec = Union[
SinglePipelineSpec,
List[SinglePipelineSpec], # List of pipelines for batch execution
]
DatasetSpec = Union[
SingleDatasetSpec,
List[SingleDatasetSpec], # List of datasets for batch execution
]
def _is_single_pipeline(pipeline: Any) -> bool:
"""Check if pipeline is a single pipeline definition (not a list of pipelines).
A single pipeline can be:
- A PipelineConfigs object
- A dict (configuration)
- A str/Path (config file path)
- A list of steps where steps are NOT themselves lists of steps
A list of pipelines would be:
- A list where elements are themselves lists of steps (each inner list is a pipeline)
"""
if isinstance(pipeline, (PipelineConfigs, dict, str, Path)):
return True
if isinstance(pipeline, list):
if len(pipeline) == 0:
return True # Empty list treated as single empty pipeline
# Check the first element to determine if this is a list of pipelines
# or a single pipeline (list of steps)
first = pipeline[0]
# If the first element is a list, this could be:
# 1. A list of pipelines (each is a list of steps)
# 2. A single pipeline with a sub-pipeline as first step (rare)
#
# Heuristic: if the first element is a list AND contains typical step objects
# (dicts with known keys like "model", "preprocessing", etc., or class instances),
# it's likely a list of pipelines.
if isinstance(first, list):
# Check if the inner list looks like a pipeline (list of steps)
if len(first) > 0:
inner_first = first[0]
# If inner elements are dicts, classes, instances, etc., it's likely
# that the outer list is a list of pipelines
if isinstance(inner_first, (dict, str)) or _looks_like_step(inner_first):
return False # It's a list of pipelines
# Otherwise, treat as a single pipeline
return True
return True
def _looks_like_step(obj: Any) -> bool:
"""Check if an object looks like a pipeline step.
Steps can be:
- sklearn-like objects (have fit/transform/predict methods)
- Class objects (types)
- Dicts with step configuration keys
- None (no-op step)
"""
if obj is None:
return True
if isinstance(obj, type):
return True # It's a class
if isinstance(obj, dict):
return True
# Check if it's an instance with sklearn-like interface
if hasattr(obj, 'fit') or hasattr(obj, 'transform') or hasattr(obj, 'predict'):
return True
# Check for nirs4all transforms
if hasattr(obj, '__class__') and obj.__class__.__module__.startswith('nirs4all'):
return True
return False
def _is_single_dataset(dataset: Any) -> bool:
"""Check if dataset is a single dataset definition (not a list of datasets).
A single dataset can be:
- A DatasetConfigs object
- A SpectroDataset instance
- A str/Path (data folder path)
- A numpy array
- A tuple of arrays (X, y, ...)
- A dict with X, y keys
A list of datasets would be:
- A list where each element is a dataset spec (str, SpectroDataset, dict, array, tuple)
"""
if isinstance(dataset, (DatasetConfigs, SpectroDataset, str, Path, np.ndarray, tuple)):
return True
if isinstance(dataset, dict):
# A dict could be a dataset config or data dict
# If it has 'X' or 'features' key, it's a data dict (single dataset)
# Otherwise it could be a DatasetConfigs-like dict
return True
if isinstance(dataset, list):
if len(dataset) == 0:
return True # Empty list treated as single empty dataset
first = dataset[0]
# List of SpectroDataset is treated as multi-dataset (handled specially by orchestrator)
if isinstance(first, SpectroDataset):
return False
# List of paths/strings -> multi-dataset
if isinstance(first, (str, Path)):
return False
# List of dicts where each dict is a dataset config -> multi-dataset
if isinstance(first, dict):
# Check if it looks like a dataset config (has path, X, etc.)
if 'path' in first or 'X' in first or 'features' in first:
return False
# List of arrays or tuples -> multi-dataset
if isinstance(first, (np.ndarray, tuple)):
return False
# Otherwise, it might be something else, treat as single
return True
return True
def _normalize_to_list(spec: Any, is_single_fn) -> List[Any]:
"""Normalize a spec (pipeline or dataset) to a list of specs.
If it's a single spec, wrap it in a list.
If it's already a list of specs, return as-is.
"""
if is_single_fn(spec):
return [spec]
else:
return spec
[docs]
def run(
pipeline: PipelineSpec,
dataset: DatasetSpec,
*,
name: str = "",
session: Optional[Session] = None,
# Common runner options (shortcuts for most-used parameters)
verbose: int = 1,
save_artifacts: bool = True,
save_charts: bool = True,
plots_visible: bool = False,
random_state: Optional[int] = None,
# All other PipelineRunner options
**runner_kwargs: Any
) -> RunResult:
"""Execute a training pipeline on a dataset.
This is the primary entry point for training ML pipelines on NIRS data.
It provides a simpler interface than creating PipelineRunner and config
objects directly.
Args:
pipeline: Pipeline definition. Can be:
- List of steps (most common): ``[MinMaxScaler(), PLSRegression(10)]``
- Dict with steps: ``{"steps": [...], "name": "my_pipeline"}``
- Path to YAML/JSON config file: ``"configs/my_pipeline.yaml"``
- PipelineConfigs object (backward compatibility)
- **List of pipelines**: ``[pipeline1, pipeline2, ...]`` - each
pipeline is executed independently (cartesian product with datasets)
dataset: Dataset definition. Can be:
- Path to data folder: ``"sample_data/regression"``
- Numpy arrays: ``(X, y)`` or ``X`` alone
- Dict with arrays: ``{"X": X, "y": y, "metadata": meta}``
- SpectroDataset instance
- List of SpectroDataset instances (multi-dataset)
- DatasetConfigs object (backward compatibility)
- **List of datasets**: ``[dataset1, dataset2, ...]`` - each
dataset is used with each pipeline (cartesian product)
name: Optional pipeline name for identification and logging.
If not provided, a name will be generated.
session: Optional Session object for resource reuse across multiple
runs. When provided, shares workspace and configuration.
verbose: Verbosity level (0=quiet, 1=info, 2=debug, 3=trace).
Default: 1
save_artifacts: Whether to save binary artifacts (models, transformers).
Default: True
save_charts: Whether to save charts and visual outputs.
Default: True
plots_visible: Whether to display plots interactively.
Default: False
random_state: Random seed for reproducibility.
Default: None (no seeding)
**runner_kwargs: Additional PipelineRunner parameters. See
PipelineRunner.__init__ for full list. Common options:
- workspace_path: Workspace root directory
- continue_on_error: Whether to continue on step failures
- show_spinner: Whether to show progress spinners
- log_file: Whether to write logs to disk
- log_format: Output format ("pretty", "minimal", "json")
- show_progress_bar: Whether to show progress bars
- max_generation_count: Max pipeline combinations (for generators)
Returns:
RunResult containing:
- predictions: Predictions object with all pipeline results
- per_dataset: Dictionary with per-dataset execution details
- best: Best prediction entry (convenience accessor)
- best_score: Best model's primary test score
- best_rmse, best_r2, best_accuracy: Score shortcuts
Use ``result.top(n=5)`` to get top N predictions, or
``result.export("path.n4a")`` to export the best model.
Raises:
ValueError: If pipeline or dataset format is invalid.
FileNotFoundError: If pipeline config or dataset path doesn't exist.
Examples:
Simple usage with list of steps:
>>> import nirs4all
>>> from sklearn.preprocessing import MinMaxScaler
>>> from sklearn.cross_decomposition import PLSRegression
>>>
>>> result = nirs4all.run(
... pipeline=[MinMaxScaler(), PLSRegression(10)],
... dataset="sample_data/regression",
... verbose=1
... )
>>> print(f"Best RMSE: {result.best_rmse:.4f}")
With cross-validation and multiple models:
>>> from sklearn.model_selection import ShuffleSplit
>>>
>>> result = nirs4all.run(
... pipeline=[
... MinMaxScaler(),
... ShuffleSplit(n_splits=3),
... {"model": PLSRegression(10)}
... ],
... dataset="sample_data/regression",
... name="PLS_experiment",
... verbose=2,
... save_artifacts=True
... )
Multiple pipelines executed independently:
>>> pipeline_pls = [MinMaxScaler(), PLSRegression(10)]
>>> pipeline_rf = [StandardScaler(), RandomForestRegressor()]
>>>
>>> result = nirs4all.run(
... pipeline=[pipeline_pls, pipeline_rf], # Two independent pipelines
... dataset="sample_data/regression",
... verbose=1
... )
>>> print(f"Total configs: {result.num_predictions}")
Cartesian product of pipelines × datasets:
>>> pipelines = [pipeline1, pipeline2, pipeline3]
>>> datasets = [dataset_a, dataset_b]
>>>
>>> # Runs 6 combinations: p1×da, p1×db, p2×da, p2×db, p3×da, p3×db
>>> result = nirs4all.run(
... pipeline=pipelines,
... dataset=datasets,
... verbose=1
... )
Using a session for multiple runs:
>>> with nirs4all.session(verbose=1) as s:
... r1 = nirs4all.run(pipeline1, data, session=s)
... r2 = nirs4all.run(pipeline2, data, session=s)
... print(f"Pipeline 1: {r1.best_score:.4f}")
... print(f"Pipeline 2: {r2.best_score:.4f}")
Export the best model:
>>> result = nirs4all.run(pipeline, dataset)
>>> result.export("exports/best_model.n4a")
See Also:
- :func:`nirs4all.predict`: Make predictions with a trained model
- :func:`nirs4all.explain`: Generate SHAP explanations
- :func:`nirs4all.session`: Create execution session for resource reuse
- :class:`nirs4all.PipelineRunner`: Direct runner access for advanced use
"""
# Normalize pipelines and datasets to lists
pipelines = _normalize_to_list(pipeline, _is_single_pipeline)
datasets = _normalize_to_list(dataset, _is_single_dataset)
# If session provided, use its runner
if session is not None:
runner = session.runner
# Update runner settings if explicitly provided
if verbose != 1: # Not the default
runner.verbose = verbose
else:
# Build runner kwargs from explicit params + extras
all_kwargs = {
"verbose": verbose,
"save_artifacts": save_artifacts,
"save_charts": save_charts,
"plots_visible": plots_visible,
**runner_kwargs
}
if random_state is not None:
all_kwargs["random_state"] = random_state
runner = PipelineRunner(**all_kwargs)
# Execute the cartesian product: each pipeline × each dataset
all_predictions = Predictions()
all_per_dataset: Dict[str, Any] = {}
for pipeline_idx, single_pipeline in enumerate(pipelines):
for dataset_idx, single_dataset in enumerate(datasets):
# Generate name with index if multiple pipelines
if len(pipelines) > 1:
pipeline_name = f"{name}_p{pipeline_idx}" if name else f"pipeline_{pipeline_idx}"
else:
pipeline_name = name
# Convert Path to str for compatibility with type hints
pipeline_arg = str(single_pipeline) if isinstance(single_pipeline, Path) else single_pipeline
dataset_arg = str(single_dataset) if isinstance(single_dataset, Path) else single_dataset
predictions, per_dataset = runner.run(
pipeline=pipeline_arg,
dataset=dataset_arg,
pipeline_name=pipeline_name
)
# Merge predictions from this run
all_predictions.merge_predictions(predictions)
# Merge per_dataset info (datasets with same name will be combined)
for ds_name, ds_info in per_dataset.items():
if ds_name not in all_per_dataset:
all_per_dataset[ds_name] = ds_info
else:
# Merge run_predictions from multiple runs on same dataset
existing_run_preds = all_per_dataset[ds_name].get("run_predictions")
new_run_preds = ds_info.get("run_predictions")
if existing_run_preds is not None and new_run_preds is not None:
existing_run_preds.merge_predictions(new_run_preds)
return RunResult(
predictions=all_predictions,
per_dataset=all_per_dataset,
_runner=runner
)