Source code for nirs4all.pipeline.execution.builder

"""Executor builder for creating configured PipelineExecutor instances.

This module provides a builder pattern for constructing PipelineExecutor instances
with all necessary dependencies and configuration.
"""
from pathlib import Path
from typing import Any, Optional

from nirs4all.data.dataset import SpectroDataset
from nirs4all.pipeline.execution.executor import PipelineExecutor
from nirs4all.pipeline.storage.io import SimulationSaver
from nirs4all.pipeline.storage.manifest_manager import ManifestManager
from nirs4all.pipeline.steps.parser import StepParser
from nirs4all.pipeline.steps.router import ControllerRouter
from nirs4all.pipeline.steps.step_runner import StepRunner


[docs] class ExecutorBuilder: """Builder for creating PipelineExecutor instances. Provides a fluent interface for configuring and building executors with all necessary dependencies. Encapsulates the complex setup logic previously duplicated between Orchestrator and other components. Example: >>> builder = ExecutorBuilder() >>> executor = (builder ... .with_run_directory(run_dir) ... .with_mode("train") ... .with_verbose(1) ... .build()) """ def __init__(self): """Initialize builder with default values.""" # Required parameters self._run_directory: Optional[Path] = None self._dataset: Optional[SpectroDataset] = None self._workspace: Optional[Path] = None # Optional parameters with defaults self._verbose: int = 0 self._mode: str = "train" self._save_artifacts: bool = True self._save_charts: bool = True self._continue_on_error: bool = False self._show_spinner: bool = True self._plots_visible: bool = False self._artifact_loader: Any = None self._artifact_registry: Any = None # Components (will be created if not provided) self._saver: Optional[SimulationSaver] = None self._manifest_manager: Optional[ManifestManager] = None self._step_runner: Optional[StepRunner] = None
[docs] def with_run_directory(self, run_directory: Path) -> 'ExecutorBuilder': """Set the run directory for this execution. Args: run_directory: Path to the run directory where outputs will be saved Returns: Self for method chaining """ self._run_directory = run_directory return self
[docs] def with_dataset(self, dataset: SpectroDataset) -> 'ExecutorBuilder': """Set the dataset for this execution. Args: dataset: Dataset to process Returns: Self for method chaining """ self._dataset = dataset return self
[docs] def with_verbose(self, verbose: int) -> 'ExecutorBuilder': """Set verbosity level. Args: verbose: Verbosity level (0=quiet, 1=info, 2=debug) Returns: Self for method chaining """ self._verbose = verbose return self
[docs] def with_mode(self, mode: str) -> 'ExecutorBuilder': """Set execution mode. Args: mode: Execution mode ('train', 'predict', 'explain') Returns: Self for method chaining """ self._mode = mode return self
[docs] def with_save_artifacts(self, save_artifacts: bool) -> 'ExecutorBuilder': """Set whether to save binary artifacts (models, transformers). Args: save_artifacts: Whether to save artifacts Returns: Self for method chaining """ self._save_artifacts = save_artifacts return self
[docs] def with_save_charts(self, save_charts: bool) -> 'ExecutorBuilder': """Set whether to save charts and visual outputs. Args: save_charts: Whether to save charts Returns: Self for method chaining """ self._save_charts = save_charts return self
[docs] def with_continue_on_error(self, continue_on_error: bool) -> 'ExecutorBuilder': """Set whether to continue execution on errors. Args: continue_on_error: Whether to continue on errors Returns: Self for method chaining """ self._continue_on_error = continue_on_error return self
[docs] def with_show_spinner(self, show_spinner: bool) -> 'ExecutorBuilder': """Set whether to show progress spinners. Args: show_spinner: Whether to show spinners Returns: Self for method chaining """ self._show_spinner = show_spinner return self
[docs] def with_plots_visible(self, plots_visible: bool) -> 'ExecutorBuilder': """Set whether to display plots. Args: plots_visible: Whether to display plots Returns: Self for method chaining """ self._plots_visible = plots_visible return self
[docs] def with_artifact_loader(self, artifact_loader: Any) -> 'ExecutorBuilder': """Set artifact loader for predict/explain modes. Args: artifact_loader: ArtifactLoader instance Returns: Self for method chaining """ self._artifact_loader = artifact_loader return self
[docs] def with_artifact_registry(self, artifact_registry: Any) -> 'ExecutorBuilder': """Set artifact registry for train mode. Args: artifact_registry: ArtifactRegistry instance Returns: Self for method chaining """ self._artifact_registry = artifact_registry return self
[docs] def with_workspace(self, workspace: Path) -> 'ExecutorBuilder': """Set workspace root path for artifact storage. Args: workspace: Workspace root path Returns: Self for method chaining """ self._workspace = workspace return self
[docs] def with_saver(self, saver: SimulationSaver) -> 'ExecutorBuilder': """Set custom simulation saver. Args: saver: SimulationSaver instance Returns: Self for method chaining """ self._saver = saver return self
[docs] def with_manifest_manager(self, manifest_manager: ManifestManager) -> 'ExecutorBuilder': """Set custom manifest manager. Args: manifest_manager: ManifestManager instance Returns: Self for method chaining """ self._manifest_manager = manifest_manager return self
[docs] def with_step_runner(self, step_runner: StepRunner) -> 'ExecutorBuilder': """Set custom step runner. Args: step_runner: StepRunner instance Returns: Self for method chaining """ self._step_runner = step_runner return self
[docs] def build(self) -> PipelineExecutor: """Build the configured PipelineExecutor instance. Creates all necessary components if not already provided, then constructs and returns a fully configured PipelineExecutor. Returns: Configured PipelineExecutor instance Raises: ValueError: If run_directory is not set """ if self._run_directory is None: raise ValueError("run_directory must be set before building executor") # Create saver if not provided if self._saver is None: self._saver = SimulationSaver( self._run_directory, save_artifacts=self._save_artifacts, save_charts=self._save_charts ) # Create manifest manager if not provided if self._manifest_manager is None: self._manifest_manager = ManifestManager(self._run_directory) # Create step runner if not provided if self._step_runner is None: self._step_runner = StepRunner( parser=StepParser(), router=ControllerRouter(), verbose=self._verbose, mode=self._mode, show_spinner=self._show_spinner, plots_visible=self._plots_visible ) # Build and return executor return PipelineExecutor( step_runner=self._step_runner, manifest_manager=self._manifest_manager, verbose=self._verbose, mode=self._mode, continue_on_error=self._continue_on_error, saver=self._saver, artifact_loader=self._artifact_loader, artifact_registry=self._artifact_registry )
@property def workspace(self) -> Optional[Path]: """Get the workspace path.""" return self._workspace @property def artifact_registry(self) -> Any: """Get the artifact registry.""" return self._artifact_registry