"""Pipeline orchestrator for coordinating multiple pipeline executions."""
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
from nirs4all.data.config import DatasetConfigs
from nirs4all.data.dataset import SpectroDataset
from nirs4all.data.predictions import Predictions
from nirs4all.pipeline.storage.artifacts.artifact_registry import ArtifactRegistry
from nirs4all.pipeline.config.pipeline_config import PipelineConfigs
from nirs4all.pipeline.execution.builder import ExecutorBuilder
from nirs4all.pipeline.execution.executor import PipelineExecutor
from nirs4all.pipeline.storage.io import SimulationSaver
from nirs4all.pipeline.storage.manifest_manager import ManifestManager
from nirs4all.core.logging import get_logger
from nirs4all.visualization.reports import TabReportManager
logger = get_logger(__name__)
def _get_default_workspace_path() -> Path:
"""Get the default workspace path.
Uses the workspace module's get_active_workspace() which checks:
1. Explicitly set workspace via set_active_workspace()
2. NIRS4ALL_WORKSPACE environment variable
3. ./workspace in current working directory
Returns:
Default workspace path.
"""
from nirs4all.workspace import get_active_workspace
return get_active_workspace()
[docs]
class PipelineOrchestrator:
"""Orchestrates execution of multiple pipelines across multiple datasets.
High-level coordinator that manages:
- Workspace initialization
- Global predictions aggregation
- Best results reporting
- Dataset/pipeline normalization
Attributes:
workspace_path: Root workspace directory
runs_dir: Directory for storing runs
verbose: Verbosity level
mode: Execution mode (train/predict/explain)
save_artifacts: Whether to save binary artifacts
save_charts: Whether to save charts and visual outputs
enable_tab_reports: Whether to generate tab reports
keep_datasets: Whether to keep dataset snapshots
plots_visible: Whether to display plots
"""
def __init__(
self,
workspace_path: Optional[Union[str, Path]] = None,
verbose: int = 0,
mode: str = "train",
save_artifacts: bool = True,
save_charts: bool = True,
enable_tab_reports: bool = True,
continue_on_error: bool = False,
show_spinner: bool = True,
keep_datasets: bool = True,
plots_visible: bool = False
):
"""Initialize pipeline orchestrator.
Args:
workspace_path: Workspace root directory
verbose: Verbosity level
mode: Execution mode (train/predict/explain)
save_artifacts: Whether to save binary artifacts
save_charts: Whether to save charts and visual outputs
enable_tab_reports: Whether to generate tab reports
continue_on_error: Whether to continue on errors
show_spinner: Whether to show spinners
keep_datasets: Whether to keep dataset snapshots
plots_visible: Whether to display plots
"""
# Workspace configuration
if workspace_path is None:
workspace_path = _get_default_workspace_path()
self.workspace_path = Path(workspace_path)
self.runs_dir = self.workspace_path / "runs"
self.runs_dir.mkdir(parents=True, exist_ok=True)
# Create other workspace directories
(self.workspace_path / "exports").mkdir(exist_ok=True)
(self.workspace_path / "library").mkdir(exist_ok=True)
# Configuration
self.verbose = verbose
self.mode = mode
self.save_artifacts = save_artifacts
self.save_charts = save_charts
self.enable_tab_reports = enable_tab_reports
self.continue_on_error = continue_on_error
self.show_spinner = show_spinner
self.keep_datasets = keep_datasets
self.plots_visible = plots_visible
# Dataset snapshots (if keep_datasets is True)
self.raw_data: Dict[str, np.ndarray] = {}
self.pp_data: Dict[str, Dict[str, np.ndarray]] = {}
# Figure references to prevent garbage collection
self._figure_refs: List[Any] = []
# Store last executed pipeline info for post-run operations and syncing
self.last_saver: Any = None
self.last_pipeline_uid: Optional[str] = None
self.last_manifest_manager: Any = None
self.last_executor: Any = None # For syncing step_number, substep_number, operation_count
self.last_aggregate_column: Optional[str] = None # Last dataset's aggregate setting
self.last_aggregate_method: Optional[str] = None # Last dataset's aggregate method
self.last_aggregate_exclude_outliers: bool = False # Last dataset's exclude outliers setting
self.last_execution_trace: Any = None # ExecutionTrace from last run for post-run visualization
[docs]
def execute(
self,
pipeline: Union[PipelineConfigs, List[Any], Dict, str],
dataset: Union[DatasetConfigs, SpectroDataset, List[SpectroDataset], np.ndarray, Tuple[np.ndarray, ...], Dict, List[Dict], str, List[str]],
pipeline_name: str = "",
dataset_name: str = "dataset",
max_generation_count: int = 10000,
artifact_loader: Any = None,
target_model: Optional[Dict[str, Any]] = None,
explainer: Any = None
) -> Tuple[Predictions, Dict[str, Any]]:
"""Execute pipeline configurations on dataset configurations.
Args:
pipeline: Pipeline definition (PipelineConfigs, List[steps], Dict, or file path)
dataset: Dataset definition (DatasetConfigs, SpectroDataset, numpy arrays, Dict, or file path)
pipeline_name: Optional name for the pipeline
dataset_name: Optional name for array-based datasets
max_generation_count: Maximum number of pipeline combinations to generate
artifact_loader: ArtifactLoader for predict/explain modes
target_model: Target model for predict/explain modes
explainer: Explainer instance for explain mode
Returns:
Tuple of (run_predictions, dataset_predictions)
"""
from nirs4all.pipeline.config.context import RuntimeContext
# Normalize inputs
pipeline_configs = self._normalize_pipeline(
pipeline,
name=pipeline_name,
max_generation_count=max_generation_count
)
dataset_configs = self._normalize_dataset(dataset, dataset_name=dataset_name)
# Clear previous figure references
self._figure_refs.clear()
n_pipelines = len(pipeline_configs.steps)
n_datasets = len(dataset_configs.configs)
total_runs = n_pipelines * n_datasets
logger.info("=" * 120)
logger.starting(
f"Starting Nirs4all run(s) with {n_pipelines} "
f"pipeline(s) on {n_datasets} dataset(s) ({total_runs} total runs)."
)
logger.info("=" * 120)
datasets_predictions = {}
run_predictions = Predictions()
current_run = 0
# Execute for each dataset
for dataset_idx, (config, name) in enumerate(dataset_configs.configs):
# Create run directory: workspace/runs/<dataset>/
# All pipelines for a dataset go in the same folder regardless of date
current_run_dir = self.runs_dir / name
current_run_dir.mkdir(parents=True, exist_ok=True)
# Create artifact registry for this dataset (v2 artifact system)
artifact_registry = None
if self.mode == "train":
artifact_registry = ArtifactRegistry(
workspace=self.workspace_path,
dataset=name,
manifest_manager=None # Set per-pipeline below
)
artifact_registry.start_run()
# Build executor using ExecutorBuilder
executor = (ExecutorBuilder()
.with_run_directory(current_run_dir)
.with_workspace(self.workspace_path)
.with_verbose(self.verbose)
.with_mode(self.mode)
.with_save_artifacts(self.save_artifacts)
.with_save_charts(self.save_charts)
.with_continue_on_error(self.continue_on_error)
.with_show_spinner(self.show_spinner)
.with_plots_visible(self.plots_visible)
.with_artifact_loader(artifact_loader)
.with_artifact_registry(artifact_registry)
.build())
# Get components from executor for compatibility
saver = executor.saver
manifest_manager = executor.manifest_manager
# Update artifact registry with manifest manager
if artifact_registry is not None:
artifact_registry.manifest_manager = manifest_manager
# Store saver for post-run operations (e.g., export_best_for_dataset)
self.last_saver = saver
self.last_executor = executor
# Load global predictions from workspace root (dataset_name.meta.parquet)
dataset_prediction_path = self.workspace_path / f"{name}.meta.parquet"
global_dataset_predictions = Predictions.load_from_file_cls(dataset_prediction_path)
run_dataset_predictions = Predictions()
# Execute each pipeline configuration on this dataset
for i, (steps, config_name, gen_choices) in enumerate(zip(
pipeline_configs.steps,
pipeline_configs.names,
pipeline_configs.generator_choices
)):
current_run += 1
logger.info(f"Run {current_run}/{total_runs}: pipeline '{config_name}' on dataset '{name}'")
dataset = dataset_configs.get_dataset(config, name)
# Capture raw data BEFORE any preprocessing happens
if self.keep_datasets and name not in self.raw_data:
self.raw_data[name] = dataset.x({}, layout="2d")
if self.verbose > 0:
print(dataset)
# Initialize execution context via executor
context = executor.initialize_context(dataset)
# Create RuntimeContext with artifact_registry
runtime_context = RuntimeContext(
saver=saver,
manifest_manager=manifest_manager,
artifact_loader=artifact_loader,
artifact_registry=artifact_registry,
step_runner=executor.step_runner,
target_model=target_model,
explainer=explainer
)
# Execute pipeline with cleanup on failure
config_predictions = Predictions()
try:
executor.execute(
steps=steps,
config_name=config_name,
dataset=dataset,
context=context,
runtime_context=runtime_context,
prediction_store=config_predictions,
generator_choices=gen_choices
)
except Exception as e:
# Cleanup artifacts from failed run
if artifact_registry is not None:
artifact_registry.cleanup_failed_run()
raise
# Capture last pipeline_uid and manifest_manager for syncing back to runner
if runtime_context.pipeline_uid:
self.last_pipeline_uid = runtime_context.pipeline_uid
self.last_manifest_manager = manifest_manager
# Capture execution trace for post-run visualization
self.last_execution_trace = runtime_context.get_execution_trace()
# Capture preprocessed data AFTER preprocessing
if self.keep_datasets:
if name not in self.pp_data:
self.pp_data[name] = {}
self.pp_data[name][dataset.short_preprocessings_str()] = dataset.x({}, layout="2d")
# Merge new predictions into stores
if config_predictions.num_predictions > 0:
global_dataset_predictions.merge_predictions(config_predictions)
run_dataset_predictions.merge_predictions(config_predictions)
run_predictions.merge_predictions(config_predictions)
# Mark run as completed successfully
if artifact_registry is not None:
artifact_registry.end_run()
# Store last aggregate column for visualization integration
self.last_aggregate_column = dataset.aggregate
self.last_aggregate_method = dataset.aggregate_method
self.last_aggregate_exclude_outliers = dataset.aggregate_exclude_outliers
# Print best results for this dataset
self._print_best_predictions(
run_dataset_predictions,
global_dataset_predictions,
dataset,
name,
dataset_prediction_path,
saver
)
# Store dataset prediction info
datasets_predictions[name] = {
"global_predictions": global_dataset_predictions,
"run_predictions": run_dataset_predictions,
"dataset": dataset,
"dataset_name": name
}
return run_predictions, datasets_predictions
def _normalize_pipeline(
self,
pipeline: Union[PipelineConfigs, List[Any], Dict, str],
name: str = "",
max_generation_count: int = 10000
) -> PipelineConfigs:
"""Normalize pipeline input to PipelineConfigs."""
if isinstance(pipeline, PipelineConfigs):
return pipeline
if isinstance(pipeline, list):
pipeline_dict = {"pipeline": pipeline}
return PipelineConfigs(pipeline_dict, name=name, max_generation_count=max_generation_count)
return PipelineConfigs(pipeline, name=name, max_generation_count=max_generation_count)
def _normalize_dataset(
self,
dataset: Union[DatasetConfigs, SpectroDataset, List[SpectroDataset], np.ndarray, Tuple[np.ndarray, ...], Dict, List[Dict], str, List[str]],
dataset_name: str = "array_dataset"
) -> DatasetConfigs:
"""Normalize dataset input to DatasetConfigs."""
if isinstance(dataset, DatasetConfigs):
return dataset
# Handle list of SpectroDataset instances
if isinstance(dataset, list) and len(dataset) > 0 and isinstance(dataset[0], SpectroDataset):
return self._wrap_dataset_list(dataset)
# Simplified normalization - delegate to DatasetConfigs
return DatasetConfigs(dataset) if not isinstance(dataset, (SpectroDataset, np.ndarray, tuple)) else self._wrap_dataset(dataset, dataset_name)
def _wrap_dataset(self, dataset: Union[SpectroDataset, np.ndarray, Tuple], dataset_name: str) -> DatasetConfigs:
"""Wrap SpectroDataset or arrays in DatasetConfigs."""
if isinstance(dataset, SpectroDataset):
configs = DatasetConfigs.__new__(DatasetConfigs)
configs.configs = [({"_preloaded_dataset": dataset}, dataset.name)]
configs.cache = {dataset.name: self._extract_dataset_cache(dataset)}
configs._task_types = ["auto"] # Default task type for wrapped datasets
configs._signal_type_overrides = [None] # No override for wrapped datasets
configs._aggregates = [None] # No aggregation for wrapped datasets
configs._aggregate_methods = [None] # No aggregate method for wrapped datasets
configs._aggregate_exclude_outliers = [False] # No outlier exclusion for wrapped datasets
configs._config_task_types = [None] # No config-level task type
configs._config_aggregates = [None] # No config-level aggregate
configs._config_aggregate_methods = [None] # No config-level aggregate method
configs._config_aggregate_exclude_outliers = [None] # No config-level exclude outliers
return configs
# Handle numpy arrays and tuples
spectro_dataset = SpectroDataset(name=dataset_name)
if isinstance(dataset, np.ndarray):
# Single array X - for prediction mode, add to test partition only
# For training mode, this would have y provided as tuple
spectro_dataset.add_samples(dataset, indexes={"partition": "test"})
elif isinstance(dataset, tuple):
X = dataset[0]
y = dataset[1] if len(dataset) > 1 else None
partition_info = dataset[2] if len(dataset) > 2 else None
if partition_info is None:
# No partition info - add all to train with y if provided
spectro_dataset.add_samples(X, indexes={"partition": "train"})
if y is not None:
spectro_dataset.add_targets(y)
else:
# Split data based on partition_info
self._split_and_add_data(spectro_dataset, X, y, partition_info)
configs = DatasetConfigs.__new__(DatasetConfigs)
configs.configs = [({"_preloaded_dataset": spectro_dataset}, dataset_name)]
configs.cache = {dataset_name: self._extract_dataset_cache(spectro_dataset)}
configs._task_types = ["auto"] # Default task type for wrapped datasets
configs._signal_type_overrides = [None] # No override for wrapped datasets
configs._aggregates = [None] # No aggregation for wrapped datasets
configs._aggregate_methods = [None] # No aggregate method for wrapped datasets
configs._aggregate_exclude_outliers = [False] # No outlier exclusion for wrapped datasets
configs._config_task_types = [None] # No config-level task type
configs._config_aggregates = [None] # No config-level aggregate
configs._config_aggregate_methods = [None] # No config-level aggregate method
configs._config_aggregate_exclude_outliers = [None] # No config-level exclude outliers
return configs
def _wrap_dataset_list(self, datasets: List[SpectroDataset]) -> DatasetConfigs:
"""Wrap a list of SpectroDataset instances in DatasetConfigs."""
configs = DatasetConfigs.__new__(DatasetConfigs)
configs.configs = []
configs.cache = {}
configs._task_types = []
configs._signal_type_overrides = []
configs._aggregates = []
configs._aggregate_methods = []
configs._aggregate_exclude_outliers = []
configs._config_task_types = []
configs._config_aggregates = []
configs._config_aggregate_methods = []
configs._config_aggregate_exclude_outliers = []
for ds in datasets:
configs.configs.append(({"_preloaded_dataset": ds}, ds.name))
configs.cache[ds.name] = self._extract_dataset_cache(ds)
configs._task_types.append("auto")
configs._signal_type_overrides.append(None)
configs._aggregates.append(None)
configs._aggregate_methods.append(None)
configs._aggregate_exclude_outliers.append(False)
configs._config_task_types.append(None)
configs._config_aggregates.append(None)
configs._config_aggregate_methods.append(None)
configs._config_aggregate_exclude_outliers.append(None)
return configs
def _split_and_add_data(self, dataset: SpectroDataset, X: np.ndarray, y: Optional[np.ndarray], partition_info: Dict) -> None:
"""Split data according to partition_info and add to dataset.
partition_info can be:
- {"train": 80} - first 80 samples for train, rest for test
- {"train": slice(0, 70), "test": slice(70, 100)} - explicit slices
- {"train": [0,1,2,...], "test": [80,81,...]} - explicit indices
"""
n_samples = X.shape[0]
# Process partition_info to get indices for each partition
partition_indices = {}
for partition_name, partition_spec in partition_info.items():
if isinstance(partition_spec, int):
# Integer means "first N samples"
partition_indices[partition_name] = slice(0, partition_spec)
elif isinstance(partition_spec, slice):
partition_indices[partition_name] = partition_spec
elif isinstance(partition_spec, (list, np.ndarray)):
partition_indices[partition_name] = partition_spec
else:
raise ValueError(f"Invalid partition spec for '{partition_name}': {partition_spec}")
# If only train is specified, create test from remaining samples
if "train" in partition_indices and "test" not in partition_indices:
train_spec = partition_indices["train"]
if isinstance(train_spec, slice):
train_end = train_spec.stop if train_spec.stop is not None else train_spec.start
elif isinstance(train_spec, int):
train_end = train_spec
else:
# list of indices - find max + 1
train_indices_array = np.array(train_spec)
train_end = train_indices_array.max() + 1 if len(train_indices_array) > 0 else 0
# Test partition is remaining samples
if train_end < n_samples:
partition_indices["test"] = slice(train_end, n_samples)
# Add samples for each partition
for partition_name, indices_spec in partition_indices.items():
# Get the actual data slice
if isinstance(indices_spec, slice):
X_partition = X[indices_spec]
y_partition = y[indices_spec] if y is not None else None
elif isinstance(indices_spec, (list, np.ndarray)):
X_partition = X[indices_spec]
y_partition = y[indices_spec] if y is not None else None
else:
raise ValueError(f"Unexpected indices spec type: {type(indices_spec)}")
# Add to dataset
if len(X_partition) > 0:
dataset.add_samples(X_partition, indexes={"partition": partition_name})
if y_partition is not None and len(y_partition) > 0:
dataset.add_targets(y_partition)
def _extract_dataset_cache(self, dataset: SpectroDataset) -> Tuple:
"""Extract cache tuple from a SpectroDataset.
Returns a 14-tuple matching the format expected by DatasetConfigs:
(x_train, y_train, m_train, train_headers, m_train_headers, train_unit, train_signal_type,
x_test, y_test, m_test, test_headers, m_test_headers, test_unit, test_signal_type)
"""
try:
x_train = dataset.x({"partition": "train"}, layout="2d")
y_train = dataset.y({"partition": "train"})
m_train = None
train_signal_type = dataset.signal_type(0) if dataset.n_sources > 0 else None
except:
x_train = y_train = m_train = None
train_signal_type = None
try:
x_test = dataset.x({"partition": "test"}, layout="2d")
y_test = dataset.y({"partition": "test"})
m_test = None
test_signal_type = dataset.signal_type(0) if dataset.n_sources > 0 else None
except:
x_test = y_test = m_test = None
test_signal_type = None
# Return 14-tuple with signal_type included
return (x_train, y_train, m_train, None, None, None, train_signal_type,
x_test, y_test, m_test, None, None, None, test_signal_type)
def _print_best_predictions(
self,
run_dataset_predictions: Predictions,
global_dataset_predictions: Predictions,
dataset: SpectroDataset,
name: str,
dataset_prediction_path: Path,
saver: SimulationSaver
):
"""Print and save best predictions for a dataset.
Saves best prediction as 'best_<pipeline_folder_name>.csv' in the run directory.
Replaces existing best if the new score is better.
"""
if run_dataset_predictions.num_predictions > 0:
# Use None for ascending to let ranker infer from metric
best = run_dataset_predictions.get_best(
ascending=None
)
logger.success(f"Best prediction in run for dataset '{name}': {Predictions.pred_long_string(best)}")
if self.enable_tab_reports:
best_by_partition = run_dataset_predictions.get_entry_partitions(best)
# Get aggregation setting from dataset for reporting
aggregate_column = dataset.aggregate # Could be None, 'y', or column name
aggregate_method = dataset.aggregate_method # Could be None, 'mean', 'median', 'vote'
aggregate_exclude_outliers = dataset.aggregate_exclude_outliers
# Log aggregation info if enabled
if aggregate_column:
agg_label = "y (target values)" if aggregate_column == 'y' else f"'{aggregate_column}'"
method_label = f", method='{aggregate_method}'" if aggregate_method else ""
outlier_label = ", exclude_outliers=True" if aggregate_exclude_outliers else ""
logger.info(f"Including aggregated scores (by {agg_label}{method_label}{outlier_label}) in report")
tab_report, tab_report_csv_file = TabReportManager.generate_best_score_tab_report(
best_by_partition,
aggregate=aggregate_column,
aggregate_method=aggregate_method,
aggregate_exclude_outliers=aggregate_exclude_outliers
)
logger.info(tab_report)
if tab_report_csv_file:
filename = f"Report_best_{best['config_name']}_{best['model_name']}_{best['id']}.csv"
saver.save_file(filename, tab_report_csv_file)
if self.save_artifacts:
# Only save predictions if there's actual prediction data
if best.get("y_pred") is not None and len(best["y_pred"]) > 0:
# Get the pipeline folder name from config_name (e.g., "0001_pls_baseline_abc123")
pipeline_folder = best.get('config_name', 'unknown')
prediction_name = f"best_{pipeline_folder}.csv"
prediction_path = saver.base_path / prediction_name
# Check if we should replace existing best prediction
should_save = True
existing_best_files = list(saver.base_path.glob("best_*.csv"))
if existing_best_files:
# There's already a best prediction - check if new one is better
# Compare using test_score (lower is better for regression metrics like RMSE)
current_score = best.get('test_score')
if current_score is not None:
# Get best from global predictions to compare properly
global_best = global_dataset_predictions.get_best(ascending=None)
global_best_score = global_best.get('test_score') if global_best else None
if global_best_score is not None:
# Determine if lower is better based on task type
is_regression = best.get('task', 'regression') == 'regression'
if is_regression:
# Lower score is better (RMSE, MAE, etc.)
should_save = current_score <= global_best_score
else:
# Higher score is better (accuracy, F1, etc.)
should_save = current_score >= global_best_score
if should_save:
# Remove old best prediction files
for old_file in existing_best_files:
old_file.unlink()
if should_save:
Predictions.save_predictions_to_csv(best["y_true"], best["y_pred"], prediction_path)
if global_dataset_predictions.num_predictions > 0:
global_dataset_predictions.save_to_file(dataset_prediction_path)
logger.info("=" * 120)