Source code for nirs4all.api.session

"""
Session context manager for nirs4all API.

A Session maintains shared resources across multiple nirs4all operations,
including a reusable PipelineRunner instance, consistent workspace paths,
and shared logging configuration.

Two usage patterns:

1. Context manager for shared runner (resource reuse):
    >>> with nirs4all.session(verbose=2, save_artifacts=True) as s:
    ...     r1 = nirs4all.run(pipeline1, data1, session=s)
    ...     r2 = nirs4all.run(pipeline2, data2, session=s)
    ...     # Both runs share workspace and configuration

2. Stateful session with pipeline (training workflow):
    >>> session = nirs4all.Session(pipeline=pipeline, name="MyModel")
    >>> result = session.run(dataset)
    >>> predictions = session.predict(new_data)
    >>> session.save("model.n4a")
"""

from contextlib import contextmanager
from typing import Any, Dict, Generator, List, Optional, Union, TYPE_CHECKING
from pathlib import Path

if TYPE_CHECKING:
    from nirs4all.pipeline import PipelineRunner
    from nirs4all.api.result import RunResult, PredictResult


[docs] class Session: """Execution session for resource reuse and stateful pipeline management. A session can be used in two modes: 1. **Resource sharing mode** (no pipeline): Share a PipelineRunner across multiple nirs4all.run() calls. 2. **Stateful pipeline mode** (with pipeline): Manage a single pipeline's lifecycle: train, predict, save, load. Attributes: name: Session/pipeline name for identification. pipeline: Pipeline definition (if in stateful mode). status: Current session status ('initialized', 'trained', 'error'). is_trained: Whether the pipeline has been trained. runner: The shared PipelineRunner instance. workspace_path: Path to the workspace directory. Example (resource sharing): >>> with nirs4all.session(verbose=1) as s: ... result1 = nirs4all.run(pipeline1, data1, session=s) ... result2 = nirs4all.run(pipeline2, data2, session=s) Example (stateful pipeline): >>> session = nirs4all.Session(pipeline=pipeline, name="MyModel") >>> result = session.run("sample_data/regression") >>> predictions = session.predict(new_data) >>> session.save("exports/my_model.n4a") """ def __init__( self, pipeline: Optional[List[Any]] = None, name: str = "", **runner_kwargs: Any ) -> None: """Initialize a session. Args: pipeline: Optional pipeline definition for stateful mode. If provided, enables run(), predict(), save() methods. name: Name for the session/pipeline. **runner_kwargs: Arguments to pass to PipelineRunner. Common options: verbose, save_artifacts, workspace_path, random_state, plots_visible, etc. """ self._pipeline = pipeline self._name = name or "Session" self._runner_kwargs = runner_kwargs self._runner: Optional["PipelineRunner"] = None self._status = "initialized" self._last_result: Optional["RunResult"] = None self._run_history: List[Dict[str, Any]] = [] self._bundle_path: Optional[Path] = None # Set when loading from bundle @property def name(self) -> str: """Get session name.""" return self._name @property def pipeline(self) -> Optional[List[Any]]: """Get pipeline definition.""" return self._pipeline @property def status(self) -> str: """Get current session status. Returns: One of: 'initialized', 'trained', 'error' """ return self._status @property def is_trained(self) -> bool: """Check if pipeline has been trained or loaded from a bundle.""" if self._status != "trained": return False # Trained if we have a result from training or a loaded bundle return self._last_result is not None or self._bundle_path is not None @property def runner(self) -> "PipelineRunner": """Get or create the shared PipelineRunner instance. The runner is created lazily on first access. Returns: The shared PipelineRunner instance. """ if self._runner is None: from nirs4all.pipeline import PipelineRunner self._runner = PipelineRunner(**self._runner_kwargs) return self._runner @property def workspace_path(self) -> Optional[Path]: """Get the workspace path from the runner. Returns: Path to the workspace directory, or None if runner not created. """ if self._runner is not None: return getattr(self._runner, 'workspace_path', None) return self._runner_kwargs.get('workspace_path') @property def history(self) -> List[Dict[str, Any]]: """Get run history for this session.""" return self._run_history
[docs] def run( self, dataset: Union[str, Path, Any], *, plots_visible: bool = False, **kwargs: Any ) -> "RunResult": """Train the session's pipeline on a dataset. Args: dataset: Dataset to train on. Can be: - Path to data folder: "sample_data/regression" - Numpy arrays: (X, y) - Dict: {"X": X, "y": y} plots_visible: Whether to show plots during training. **kwargs: Additional arguments passed to runner.run(). Returns: RunResult with predictions and metrics. Raises: ValueError: If no pipeline was provided to the session. """ if self._pipeline is None: raise ValueError( "No pipeline defined for this session. " "Either pass pipeline= to Session() or use nirs4all.run() directly." ) from nirs4all.api.result import RunResult from nirs4all.pipeline import PipelineConfigs from nirs4all.data import DatasetConfigs try: # Build configs pipeline_config = PipelineConfigs(self._pipeline, self._name) # Handle dataset if isinstance(dataset, DatasetConfigs): dataset_config = dataset elif isinstance(dataset, (str, Path)): dataset_config = DatasetConfigs(str(dataset)) else: dataset_config = DatasetConfigs(dataset) # Update runner's plots_visible if specified if plots_visible is not None: self.runner.plots_visible = plots_visible if hasattr(self.runner, 'orchestrator') and self.runner.orchestrator: self.runner.orchestrator.plots_visible = plots_visible # Filter kwargs for runner.run() - only pass valid parameters valid_run_params = {'pipeline_name', 'dataset_name', 'max_generation_count'} run_kwargs = {k: v for k, v in kwargs.items() if k in valid_run_params} # Run pipeline predictions, per_dataset = self.runner.run( pipeline_config, dataset_config, **run_kwargs ) self._last_result = RunResult( predictions=predictions, per_dataset=per_dataset, _runner=self.runner ) self._status = "trained" # Record in history self._run_history.append({ 'dataset': str(dataset), 'best_score': self._last_result.best_score, 'num_predictions': self._last_result.num_predictions }) return self._last_result except Exception as e: self._status = "error" raise
[docs] def predict( self, dataset: Union[str, Path, Any], **kwargs: Any ) -> "PredictResult": """Make predictions using the trained pipeline. Args: dataset: Data to predict on. Can be: - Path to data folder - Numpy array X - Dict with 'X' key **kwargs: Additional arguments for prediction. Returns: PredictResult with predictions. Raises: ValueError: If session has not been trained. """ if not self.is_trained: raise ValueError( "Session must be trained before prediction. " "Call session.run(dataset) first." ) from nirs4all.api.result import PredictResult import numpy as np # Handle dataset if isinstance(dataset, (str, Path)): from nirs4all.data import DatasetConfigs dataset_config = DatasetConfigs(str(dataset)) else: dataset_config = dataset # Determine prediction source: bundle path or trained model if self._bundle_path is not None: # Use bundle file for loaded sessions prediction_obj = str(self._bundle_path) model_name = self._name elif self._last_result is not None: # Use best model from training best = self._last_result.best if not best: raise ValueError("No trained model available for prediction.") prediction_obj = best model_name = best.get('model_name', '') else: raise ValueError("No trained model available for prediction.") y_pred, predictions = self.runner.predict( prediction_obj=prediction_obj, dataset=dataset_config, **kwargs ) return PredictResult( y_pred=np.asarray(y_pred).flatten() if y_pred is not None else np.array([]), metadata=predictions.__dict__ if hasattr(predictions, '__dict__') else {}, model_name=model_name )
[docs] def retrain( self, dataset: Union[str, Path, Any], mode: str = "full", **kwargs: Any ) -> "RunResult": """Retrain the pipeline on new data. Args: dataset: New dataset to train on. mode: Retrain mode ('full', 'transfer', 'finetune'). **kwargs: Additional arguments for retraining. Returns: RunResult from retraining. Raises: ValueError: If session has not been trained. """ if not self.is_trained: raise ValueError( "Session must be trained before retraining. " "Call session.run(dataset) first." ) from nirs4all.api.result import RunResult from nirs4all.data import DatasetConfigs # Determine source: bundle path or trained model if self._bundle_path is not None: # Use bundle file for loaded sessions source = str(self._bundle_path) elif self._last_result is not None: best = self._last_result.best if not best: raise ValueError("No trained model available for retraining.") source = best else: raise ValueError("No trained model available for retraining.") # Handle dataset if isinstance(dataset, (str, Path)): dataset_config = DatasetConfigs(str(dataset)) else: dataset_config = dataset predictions, per_dataset = self.runner.retrain( source=source, dataset=dataset_config, mode=mode, **kwargs ) self._last_result = RunResult( predictions=predictions, per_dataset=per_dataset, _runner=self.runner ) # Record in history self._run_history.append({ 'dataset': str(dataset), 'mode': mode, 'best_score': self._last_result.best_score, 'num_predictions': self._last_result.num_predictions }) return self._last_result
[docs] def save(self, path: Union[str, Path]) -> Path: """Save the trained session to a bundle file. Args: path: Output path for the .n4a bundle file. Returns: Path to the saved bundle file. Raises: ValueError: If session has not been trained. """ if not self.is_trained or self._last_result is None: raise ValueError( "Session must be trained before saving. " "Call session.run(dataset) first." ) return self._last_result.export(path)
[docs] def close(self) -> None: """Clean up session resources. Called automatically when exiting a context manager block. """ self._runner = None
[docs] def __enter__(self) -> "Session": """Enter the session context.""" return self
[docs] def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Exit the session context and clean up resources.""" self.close()
[docs] def __repr__(self) -> str: """Return string representation of session.""" if self._pipeline is not None: return f"Session(name='{self._name}', status='{self._status}', steps={len(self._pipeline)})" else: status = "active" if self._runner is not None else "idle" return f"Session({status}, kwargs={list(self._runner_kwargs.keys())})"
[docs] def load_session(path: Union[str, Path]) -> Session: """Load a session from a saved bundle file. Args: path: Path to .n4a bundle file. Returns: Session ready for prediction. Example: >>> session = nirs4all.load_session("exports/model.n4a") >>> predictions = session.predict(new_data) """ from nirs4all.pipeline.bundle import BundleLoader from nirs4all.api.result import RunResult path = Path(path) if not path.exists(): raise FileNotFoundError(f"Bundle not found: {path}") # Load the bundle to get pipeline info loader = BundleLoader(path) pipeline_config = loader.pipeline_config # pipeline_config is a dict with 'steps', 'name', etc. steps = pipeline_config.get('steps', []) if isinstance(pipeline_config, dict) else [] name = pipeline_config.get('name', path.stem) if isinstance(pipeline_config, dict) else path.stem # Create session with loaded pipeline session = Session( pipeline=steps, name=name ) # Mark as trained by creating a mock result # The actual prediction will use the bundle directly session._status = "trained" session._bundle_path = path return session
[docs] @contextmanager def session( pipeline: Optional[List[Any]] = None, name: str = "", **kwargs: Any ) -> Generator[Session, None, None]: """Create an execution session context manager. This is a convenience function that creates a Session and yields it within a context manager block. Args: pipeline: Optional pipeline definition for stateful mode. name: Name for the session/pipeline. **kwargs: Arguments passed to Session (and ultimately PipelineRunner). Common options: - verbose (int): Verbosity level (0-3). Default: 1 - save_artifacts (bool): Save model artifacts. Default: True - workspace_path (str|Path): Workspace directory. - random_state (int): Random seed for reproducibility. Yields: Session: The active session for use within the block. Example (resource sharing): >>> with nirs4all.session(verbose=2, save_artifacts=True) as s: ... r1 = nirs4all.run(pipeline1, data1, session=s) ... r2 = nirs4all.run(pipeline2, data2, session=s) ... print(f"PLS: {r1.best_score:.4f}, RF: {r2.best_score:.4f}") Example (stateful pipeline): >>> with nirs4all.session(pipeline=my_pipeline, name="Demo") as s: ... result = s.run("sample_data/regression") ... print(f"Best score: {result.best_score:.4f}") """ s = Session(pipeline=pipeline, name=name, **kwargs) try: yield s finally: s.close()