Source code for nirs4all.controllers.models.base_model

"""
Simplified Base Model Controller - Clean, readable implementation

This is a complete rewrite following the user's pseudo-code specification.
The controller is designed to be simple, clean, and readable with the
logic properly separated into 3 files maximum.

Key features:
- Simple execute() method with clear train/prediction mode logic
- Externalized prediction storage, model utils, and naming logic
- Clean separation between training, finetuning, and prediction
- Framework-specific models (sklearn, tensorflow) handle their own details
"""

from abc import ABC, abstractmethod
from tabnanny import verbose
from typing import Any, Dict, List, Tuple, Optional, Union, TYPE_CHECKING
import numpy as np
import copy
from joblib import Parallel, delayed
import multiprocessing

from nirs4all.controllers.controller import OperatorController
from ...optimization.optuna import OptunaManager
from nirs4all.data.predictions import Predictions
from nirs4all.data.ensemble_utils import EnsembleUtils
from nirs4all.core.task_type import TaskType
from .utilities import ModelControllerUtils as ModelUtils
from nirs4all.core import metrics as evaluator
from nirs4all.core.logging import get_logger
from nirs4all.pipeline.storage.artifacts.artifact_persistence import ArtifactMeta

logger = get_logger(__name__)
from nirs4all.pipeline.storage.artifacts.types import ArtifactType
from .components import (
    ModelIdentifierGenerator,
    PredictionTransformer,
    PredictionDataAssembler,
    ScoreCalculator,
    IndexNormalizer,
    PartitionScores
)

if TYPE_CHECKING:
    from nirs4all.pipeline.runner import PipelineRunner
    from nirs4all.data.dataset import SpectroDataset
    from nirs4all.pipeline.steps.parser import ParsedStep
    from nirs4all.pipeline.config.context import ExecutionContext


[docs] class BaseModelController(OperatorController, ABC): """Abstract base controller for machine learning model training and prediction. This controller provides a unified interface for training, finetuning, and predicting with machine learning models across different frameworks (scikit-learn, TensorFlow, PyTorch). It implements cross-validation, fold averaging, hyperparameter optimization, and comprehensive prediction tracking. The controller delegates framework-specific operations to subclasses while handling: - Cross-validation fold management - Model identification and naming - Prediction storage and tracking - Score calculation and aggregation - Fold-averaged predictions (simple and weighted) Attributes: optuna_manager (OptunaManager): Manager for hyperparameter optimization. identifier_generator (ModelIdentifierGenerator): Component for model naming. prediction_transformer (PredictionTransformer): Component for prediction scaling. prediction_assembler (PredictionDataAssembler): Component for assembling prediction records. score_calculator (ScoreCalculator): Component for calculating evaluation scores. index_normalizer (IndexNormalizer): Component for normalizing sample indices. prediction_store (Predictions): External storage for predictions. verbose (int): Verbosity level for logging. """ priority = 15 def __init__(self): super().__init__() self.optuna_manager = OptunaManager() # Initialize components for modular operations self.identifier_generator = ModelIdentifierGenerator() self.prediction_transformer = PredictionTransformer() self.prediction_assembler = PredictionDataAssembler() self.score_calculator = ScoreCalculator() self.index_normalizer = IndexNormalizer()
[docs] @classmethod def use_multi_source(cls) -> bool: return True
[docs] @classmethod def supports_prediction_mode(cls) -> bool: return True
# Abstract methods that subclasses must implement for their frameworks @abstractmethod def _get_model_instance(self, dataset: 'SpectroDataset', model_config: Dict[str, Any], force_params: Optional[Dict[str, Any]] = None) -> Any: """Create model instance from configuration using framework-specific builder. Args: dataset: SpectroDataset containing training data and metadata. model_config: Model configuration dictionary with architecture and parameters. force_params: Optional parameters to override config values (used in finetuning). Returns: Framework-specific model instance ready for training. """ pass @abstractmethod def _train_model(self, model: Any, X_train: Any, y_train: Any, X_val: Any = None, y_val: Any = None, **kwargs) -> Any: """Train the model using framework-specific training logic. Args: model: Model instance to train. X_train: Training features. y_train: Training targets. X_val: Optional validation features. y_val: Optional validation targets. **kwargs: Additional framework-specific training parameters. Returns: Trained model instance. """ pass @abstractmethod def _predict_model(self, model: Any, X: Any) -> np.ndarray: """Generate predictions using framework-specific prediction logic. Args: model: Trained model instance. X: Input features for prediction. Returns: NumPy array of predictions. """ pass @abstractmethod def _prepare_data(self, X: Any, y: Any, context: 'ExecutionContext') -> Tuple[Any, Any]: """Prepare data in framework-specific format (e.g., tensors, DataFrames). Args: X: Input features to prepare. y: Target values to prepare. context: Execution context with preprocessing and partition information. Returns: Tuple of (prepared_X, prepared_y) in framework-specific format. """ pass @abstractmethod def _clone_model(self, model: Any) -> Any: """Clone model using framework-specific cloning method. Each framework has its own best practice for cloning models: - sklearn: use sklearn.base.clone() - tensorflow/keras: use keras.models.clone_model() - pytorch: use copy.deepcopy() or custom cloning Args: model: Model instance to clone. Returns: Cloned model instance with same architecture but fresh weights. """ pass @abstractmethod def _evaluate_model(self, model: Any, X_val: Any, y_val: Any) -> float: """Evaluate model performance for hyperparameter optimization. Args: model: Trained model instance to evaluate. X_val: Validation features. y_val: Validation targets. Returns: Validation score to minimize (e.g., RMSE, negative accuracy). """ pass def _predict_proba_model(self, model: Any, X: Any) -> Optional[np.ndarray]: """Get class probabilities for classification models. Returns probability distributions for each sample. Used for soft voting in fold averaging for classification tasks. Args: model: Trained model instance. X: Input features for prediction. Returns: NumPy array of shape (n_samples, n_classes) with class probabilities, or None if the model doesn't support probability predictions. Note: Default implementation returns None. Subclasses should override this method for classification support. """ return None
[docs] def save_model(self, model: Any, filepath: str) -> None: """Optional: Save model in framework-specific format. Default implementation delegates to artifact_serialization.persist(). Subclasses can override to use framework-specific formats: - TensorFlow: .h5 or .keras format - PyTorch: .ckpt or .pt format - sklearn: .joblib format Args: model: Trained model to save. filepath: Path to save (without extension, will be added by implementation). """ from nirs4all.pipeline.storage.artifacts.artifact_persistence import persist persist(model, filepath)
[docs] def load_model(self, filepath: str) -> Any: """Optional: Load model from framework-specific format. Default implementation delegates to artifact_serialization.load(). Subclasses can override to use framework-specific loading. Args: filepath: Path to load from. Returns: Loaded model instance. """ from nirs4all.pipeline.storage.artifacts.artifact_persistence import load return load(filepath)
[docs] def get_xy(self, dataset: 'SpectroDataset', context: 'ExecutionContext') -> Tuple[Any, Any, Any, Any, Any, Any]: """Extract train/test splits with scaled and unscaled targets. For classification tasks, both scaled and unscaled targets are transformed. For regression tasks, scaled targets are used for training while unscaled (numeric) targets are used for evaluation. In prediction mode, uses all available data (partition=None) instead of splitting. Also handles sample_partitioner branches, which restrict data to a subset of samples. Args: dataset: SpectroDataset with partitioned data. context: Execution context with partition and preprocessing info. Returns: Tuple of (X_train, y_train, X_test, y_test, y_train_unscaled, y_test_unscaled). """ # Use layout from context (set by controller, may include force_layout) layout = context.selector.layout if hasattr(context, 'selector') and hasattr(context.selector, 'layout') else self.get_preferred_layout() # Check if we're in prediction/explain mode mode = context.state.mode # Check for sample partition (from sample_partitioner branches) sample_partition = context.custom.get("sample_partition") partition_sample_indices = None if sample_partition: partition_sample_indices = set(sample_partition.get("sample_indices", [])) if mode in ("predict", "explain"): # In prediction mode, use all available data (no partition split) pred_context = context.with_partition(None) X_all = dataset.x(pred_context.selector, layout=layout) # Build selector for y with processing from state pred_context.selector['y'] = pred_context.state.y_processing y_all = dataset.y(pred_context.selector) # If sample partition is active, filter to partition samples if partition_sample_indices: all_sample_ids = dataset._indexer.x_indices( pred_context.selector, include_augmented=True, include_excluded=False ) mask = np.array([int(sid) in partition_sample_indices for sid in all_sample_ids]) X_all = X_all[mask] y_all = y_all[mask] # Return empty training data and all data as "test" for prediction empty_X = np.array([]).reshape(0, X_all.shape[1] if len(X_all.shape) > 1 else 0) empty_y = np.array([]) # For unscaled targets if dataset.task_type and dataset.task_type.is_classification: y_all_unscaled = y_all else: # For regression, get numeric (unscaled) targets pred_context.selector['y'] = 'numeric' y_all_unscaled = dataset.y(pred_context.selector) if partition_sample_indices: y_all_unscaled = y_all_unscaled[mask] return empty_X, empty_y, X_all, y_all, empty_y, y_all_unscaled # Normal training mode: split into train/test train_context = context.with_partition('train') test_context = context.with_partition('test') X_train = dataset.x(train_context.selector, layout=layout) X_test = dataset.x(test_context.selector, layout=layout) # Build selectors for y with processing from state train_context.selector['y'] = train_context.state.y_processing y_train = dataset.y(train_context.selector) test_context.selector['y'] = test_context.state.y_processing y_test = dataset.y(test_context.selector) # If sample partition is active, filter train and test data if partition_sample_indices: # Get sample IDs for train partition train_sample_ids = dataset._indexer.x_indices( train_context.selector, include_augmented=True, include_excluded=False ) train_mask = np.array([int(sid) in partition_sample_indices for sid in train_sample_ids]) X_train = X_train[train_mask] y_train = y_train[train_mask] # Get sample IDs for test partition test_sample_ids = dataset._indexer.x_indices( test_context.selector, include_augmented=True, include_excluded=False ) test_mask = np.array([int(sid) in partition_sample_indices for sid in test_sample_ids]) X_test = X_test[test_mask] y_test = y_test[test_mask] # For classification tasks, use the transformed targets for evaluation # For regression tasks, use the original "numeric" targets if dataset.task_type and dataset.task_type.is_classification: # Use the same y context as the model training (transformed targets) y_train_unscaled = dataset.y(train_context.selector) y_test_unscaled = dataset.y(test_context.selector) # Apply partition mask if active if partition_sample_indices: y_train_unscaled = y_train_unscaled[train_mask] y_test_unscaled = y_test_unscaled[test_mask] else: # Use numeric targets for regression train_context.selector['y'] = 'numeric' test_context.selector['y'] = 'numeric' y_train_unscaled = dataset.y(train_context.selector) y_test_unscaled = dataset.y(test_context.selector) # Apply partition mask if active if partition_sample_indices: y_train_unscaled = y_train_unscaled[train_mask] y_test_unscaled = y_test_unscaled[test_mask] return X_train, y_train, X_test, y_test, y_train_unscaled, y_test_unscaled
def _remap_folds_to_positions( self, dataset: 'SpectroDataset', context: 'ExecutionContext', mode: str ) -> List[Tuple[List[int], List[int]]]: """Convert fold sample IDs to positional indices for the current active samples. Folds are stored with absolute sample IDs (from the indexer), which remain valid even after sample filtering excludes samples. This method converts those sample IDs to positional indices into the current X_train array. Also handles: - Branch-specific outlier exclusion masks from outlier_excluder branches - Sample partitioning from sample_partitioner branches Args: dataset: SpectroDataset with the fold information. context: Execution context with partition info. mode: Execution mode ('train', 'predict', 'explain'). Returns: List of (train_indices, val_indices) tuples with positional indices. """ raw_folds = dataset.folds if not raw_folds: return [] # In predict/explain mode, folds are not used for training, just return as-is if mode in ("predict", "explain"): return raw_folds # Get current active sample IDs for train partition (respecting indexer exclusions) train_context = context.with_partition("train") active_sample_ids = dataset._indexer.x_indices( # noqa: SLF001 train_context.selector, include_augmented=True, include_excluded=False ) # Check for sample partition (from sample_partitioner branches) # When active, we need to filter active_sample_ids to only those in the partition sample_partition = context.custom.get("sample_partition") partition_sample_ids_set = None if sample_partition: partition_sample_ids_set = set(sample_partition.get("sample_indices", [])) # Filter active_sample_ids to only those in the partition active_sample_ids = np.array([ sid for sid in active_sample_ids if int(sid) in partition_sample_ids_set ]) # Build a mapping from sample ID to positional index id_to_pos = {int(sid): pos for pos, sid in enumerate(active_sample_ids)} # Check for branch-specific outlier exclusion (from outlier_excluder branches) outlier_exclusion = context.custom.get("outlier_exclusion") excluded_sample_ids_set = set() if outlier_exclusion: # The outlier_exclusion stores: # - mask: boolean array where True = keep, False = exclude # - sample_indices: the sample IDs corresponding to the mask mask = outlier_exclusion.get("mask") sample_indices = outlier_exclusion.get("sample_indices", []) if mask is not None and len(sample_indices) > 0: # Get sample IDs that are excluded by this branch's outlier detection for i, sid in enumerate(sample_indices): if i < len(mask) and not mask[i]: excluded_sample_ids_set.add(int(sid)) # Remap each fold's sample IDs to positional indices # Only include samples that are still active (in partition, not excluded by indexer or outlier_excluder) remapped_folds = [] for train_ids, val_ids in raw_folds: # For train indices: filter to partition and exclude outlier_excluder samples train_indices = [ id_to_pos[int(sid)] for sid in train_ids if int(sid) in id_to_pos and int(sid) not in excluded_sample_ids_set ] # For val indices: filter to partition (keep all samples in partition for evaluation) val_indices = [id_to_pos[int(sid)] for sid in val_ids if int(sid) in id_to_pos] remapped_folds.append((train_indices, val_indices)) return remapped_folds def _get_partition_sample_indices( self, dataset: 'SpectroDataset', context: 'ExecutionContext', mode: str ) -> Tuple[List[int], List[int]]: """Get sample IDs for train and test partitions. For OOF reconstruction and stacking, we need the actual sample IDs (not positional indices) to match predictions stored in the prediction store. Args: dataset: SpectroDataset with partition info. context: Execution context with partition info. mode: Execution mode ('train', 'predict', 'explain'). Returns: Tuple of (train_sample_ids, test_sample_ids) as lists. """ if mode in ("predict", "explain"): # In predict mode, there's no train/test split - all data is "test" pred_context = context.with_partition(None) all_ids = dataset._indexer.x_indices( pred_context.selector, include_augmented=False, include_excluded=False ) return [], list(all_ids) # Get sample IDs for train and test partitions train_context = context.with_partition("train") test_context = context.with_partition("test") train_sample_ids = dataset._indexer.x_indices( train_context.selector, include_augmented=False, include_excluded=False ) test_sample_ids = dataset._indexer.x_indices( test_context.selector, include_augmented=False, include_excluded=False ) # Check for sample partition filtering sample_partition = context.custom.get("sample_partition") if sample_partition: partition_sample_ids_set = set(sample_partition.get("sample_indices", [])) train_sample_ids = [sid for sid in train_sample_ids if int(sid) in partition_sample_ids_set] test_sample_ids = [sid for sid in test_sample_ids if int(sid) in partition_sample_ids_set] return list(train_sample_ids), list(test_sample_ids)
[docs] def execute( self, step_info: 'ParsedStep', dataset: 'SpectroDataset', context: 'ExecutionContext', runtime_context: 'RuntimeContext', source: int = -1, mode: str = "train", loaded_binaries: Optional[List[Tuple[str, bytes]]] = None, prediction_store: 'Predictions' = None ) -> Tuple['ExecutionContext', List['ArtifactMeta']]: """Execute model training, finetuning, or prediction. This is the main entry point for model execution. It handles: - Extracting model configuration - Restoring task type in predict/explain modes - Delegating to finetune() or train() based on configuration - Managing prediction storage Args: step_info: Parsed step containing model configuration and operator. dataset: SpectroDataset with features and targets. context: Execution context with step_id, partition info, etc. runtime_context: Runtime context managing execution state. source: Data source index (default: -1). mode: Execution mode ('train', 'finetune', 'predict', 'explain'). loaded_binaries: Optional list of (name, bytes) tuples for prediction mode. prediction_store: External Predictions storage instance. Returns: Tuple of (updated_context, list_of_artifact_metadata). """ # Extract for compatibility with existing code step = step_info.original_step operator = step_info.operator self.prediction_store = prediction_store model_config = self._extract_model_config(step, operator) self.verbose = model_config.get('train_params', {}).get('verbose', 0) # In predict/explain mode, restore task_type from target_model if not set if mode in ("predict", "explain") and dataset.task_type is None: if hasattr(runtime_context, 'target_model') and runtime_context.target_model: task_type_str = runtime_context.target_model.get('task_type', 'regression') dataset.set_task_type(task_type_str) X_train, y_train, X_test, y_test, y_train_unscaled, y_test_unscaled = self.get_xy(dataset, context) # Get actual sample IDs for train and test partitions (for stacking/OOF reconstruction) train_sample_ids, test_sample_ids = self._get_partition_sample_indices(dataset, context, mode) # Convert fold sample IDs to positional indices # Folds now store absolute sample IDs, which remain valid even after sample filtering # We need to convert them to positional indices into the current X_train array folds = self._remap_folds_to_positions(dataset, context, mode) if self.verbose > 0: logger.debug(f"Model config: {model_config}") finetune_params = model_config.get('finetune_params') if mode == "finetune" or (mode == "train" and finetune_params): return self._execute_finetune( dataset, model_config, context, runtime_context, prediction_store, X_train, y_train, X_test, y_test, y_train_unscaled, y_test_unscaled, folds, finetune_params, loaded_binaries, train_sample_ids=train_sample_ids, test_sample_ids=test_sample_ids ) elif mode == "train": return self._execute_train( dataset, model_config, context, runtime_context, prediction_store, X_train, y_train, X_test, y_test, y_train_unscaled, y_test_unscaled, folds, loaded_binaries, train_sample_ids=train_sample_ids, test_sample_ids=test_sample_ids ) elif mode in ("predict", "explain"): return self._execute_predict( dataset, model_config, context, runtime_context, prediction_store, X_train, y_train, X_test, y_test, y_train_unscaled, y_test_unscaled, folds, loaded_binaries, mode, train_sample_ids=train_sample_ids, test_sample_ids=test_sample_ids ) else: raise ValueError(f"Unknown execution mode: {mode}")
def _execute_finetune( self, dataset, model_config, context, runtime_context, prediction_store, X_train, y_train, X_test, y_test, y_train_unscaled, y_test_unscaled, folds, finetune_params, loaded_binaries, train_sample_ids=None, test_sample_ids=None ): self.mode = "finetune" if self.verbose > 0: logger.info("Starting finetuning...") best_model_params = self.finetune( dataset, model_config, X_train, y_train, X_test, y_test, folds, finetune_params, self.prediction_store, context, runtime_context ) logger.info(f"Best parameters: {best_model_params}") binaries = self.train( dataset, model_config, context, runtime_context, prediction_store, X_train, y_train, X_test, y_test, y_train_unscaled, y_test_unscaled, folds, loaded_binaries=loaded_binaries, mode="train", best_params=best_model_params, train_sample_ids=train_sample_ids, test_sample_ids=test_sample_ids ) return context, binaries def _execute_train( self, dataset, model_config, context, runtime_context, prediction_store, X_train, y_train, X_test, y_test, y_train_unscaled, y_test_unscaled, folds, loaded_binaries, train_sample_ids=None, test_sample_ids=None ): if self.verbose > 0: logger.starting("Starting training...") binaries = self.train( dataset, model_config, context, runtime_context, prediction_store, X_train, y_train, X_test, y_test, y_train_unscaled, y_test_unscaled, folds, loaded_binaries=loaded_binaries, mode="train", train_sample_ids=train_sample_ids, test_sample_ids=test_sample_ids ) return context, binaries def _execute_predict( self, dataset, model_config, context, runtime_context, prediction_store, X_train, y_train, X_test, y_test, y_train_unscaled, y_test_unscaled, folds, loaded_binaries, mode, train_sample_ids=None, test_sample_ids=None ): binaries = self.train( dataset, model_config, context, runtime_context, prediction_store, X_train, y_train, X_test, y_test, y_train_unscaled, y_test_unscaled, folds, loaded_binaries=loaded_binaries, mode=mode, train_sample_ids=train_sample_ids, test_sample_ids=test_sample_ids ) return context, binaries
[docs] def finetune( self, dataset: 'SpectroDataset', model_config: Dict[str, Any], X_train: Any, y_train: Any, X_test: Any, y_test: Any, folds: Optional[List], finetune_params: Dict[str, Any], predictions: Dict, context: 'ExecutionContext', runtime_context: 'RuntimeContext', ) -> Union[Dict[str, Any], List[Dict[str, Any]]]: """Optimize hyperparameters using Optuna. Delegates to OptunaManager for Bayesian hyperparameter optimization. Returns optimized parameters that will be used in subsequent training. Args: dataset: SpectroDataset for optimization. model_config: Base model configuration. X_train: Training features. y_train: Training targets. X_test: Test features. y_test: Test targets. folds: List of (train_idx, val_idx) tuples for cross-validation. finetune_params: Optuna configuration with search space and trials. predictions: Prediction storage dictionary. context: Execution context. runtime_context: Runtime context. Returns: Dictionary of optimized parameters (single model) or list of dicts (per-fold). """ # Store dataset reference for model building self.dataset = dataset return self.optuna_manager.finetune( dataset, model_config=model_config, X_train=X_train, y_train=y_train, X_test=X_test, y_test=y_test, folds=folds, finetune_params=finetune_params, context=context, controller=self )
[docs] def train( self, dataset, model_config, context, runtime_context, prediction_store, X_train, y_train, X_test, y_test, y_train_unscaled, y_test_unscaled, folds, best_params=None, loaded_binaries=None, mode="train", train_sample_ids=None, test_sample_ids=None ) -> List['ArtifactMeta']: """Orchestrate model training across folds with prediction tracking. Manages the complete training workflow: - Iterates through cross-validation folds - Delegates to launch_training() for each fold - Creates fold-averaged predictions for regression tasks - Persists trained models as artifacts - Stores all predictions with weights Args: dataset: SpectroDataset with features and targets. model_config: Model configuration dictionary. context: Execution context with step_id and preprocessing info. runtime_context: Runtime context. prediction_store: External Predictions storage. X_train: Training features (all folds). y_train: Training targets (scaled). X_test: Test features. y_test: Test targets (scaled). y_train_unscaled: Training targets (unscaled for evaluation). y_test_unscaled: Test targets (unscaled for evaluation). folds: List of (train_idx, val_idx) tuples or empty list. best_params: Optional hyperparameters from finetuning. loaded_binaries: Optional model binaries for prediction mode. mode: Execution mode ('train', 'finetune', 'predict', 'explain'). train_sample_ids: List of actual sample IDs for train partition. test_sample_ids: List of actual sample IDs for test partition. Returns: List of ArtifactMeta objects for persisted models. """ verbose = model_config.get('train_params', {}).get('verbose', 0) n_jobs = model_config.get('train_params', {}).get('n_jobs', 1) # Auto-detect n_jobs if -1 if n_jobs == -1: n_jobs = multiprocessing.cpu_count() binaries = [] # In predict/explain mode, skip fold iteration entirely # Just load the model and predict on X_test (which contains all prediction samples) if mode in ("predict", "explain"): # For prediction, we run once per fold to match training fold structure # but we predict on X_test (all samples) not X_train n_folds = len(folds) if folds else 1 all_fold_predictions = [] for fold_iter in range(n_folds): # Use X_test as both val and test data (for prediction on all samples) # X_train and y_train are empty in predict mode # When no folds exist (folds is empty), pass fold_idx=None to load shared artifact # When folds exist, pass the actual fold index actual_fold_idx = fold_iter if folds else None model, model_id, score, model_name, prediction_data = self.launch_training( dataset, model_config, context, runtime_context, prediction_store, X_train, y_train, X_test, y_test, X_test, y_train_unscaled, y_test_unscaled, y_test_unscaled, train_indices=None, val_indices=None, fold_idx=actual_fold_idx, best_params=best_params, loaded_binaries=loaded_binaries, mode=mode, test_sample_ids=test_sample_ids ) all_fold_predictions.append(prediction_data) self._add_all_predictions(prediction_store, all_fold_predictions, None, mode=mode) return binaries if len(folds) > 0: folds_models = [] fold_val_indices = [] scores = [] all_fold_predictions = [] base_model_name = "" model_classname = "" # Prepare arguments for parallel execution fold_args = [] for fold_idx, (train_indices, val_indices) in enumerate(folds): fold_val_indices.append(val_indices) X_train_fold = X_train[train_indices] if X_train.shape[0] > 0 else np.array([]) y_train_fold = y_train[train_indices] if y_train.shape[0] > 0 else np.array([]) y_train_fold_unscaled = y_train_unscaled[train_indices] if y_train_unscaled.shape[0] > 0 else np.array([]) X_val_fold = X_train[val_indices] if X_train.shape[0] > 0 else np.array([]) y_val_fold = y_train[val_indices] if y_train.shape[0] > 0 else np.array([]) y_val_fold_unscaled = y_train_unscaled[val_indices] if y_train_unscaled.shape[0] > 0 else np.array([]) if isinstance(best_params, list): best_params_fold = best_params[fold_idx] if fold_idx < len(best_params) else None else: best_params_fold = best_params fold_args.append(( dataset, model_config, context, runtime_context, prediction_store, X_train_fold, y_train_fold, X_val_fold, y_val_fold, X_test, y_train_fold_unscaled, y_val_fold_unscaled, y_test_unscaled, train_indices, val_indices, fold_idx, best_params_fold, loaded_binaries, mode, test_sample_ids # Added for proper test sample indexing )) if verbose > 0: logger.info(f"Training {len(folds)} folds (n_jobs={n_jobs})...") # Execute folds (parallel or sequential) if n_jobs > 1 and mode == "train": # Only parallelize training, not prediction/finetuning for now to avoid complexity results = Parallel(n_jobs=n_jobs)( delayed(self.launch_training)(*args) for args in fold_args ) else: results = [self.launch_training(*args) for args in fold_args] # Process results for i, (model, model_id, score, model_name, prediction_data) in enumerate(results): folds_models.append((model_id, model, score)) all_fold_predictions.append(prediction_data) base_model_name = model_name scores.append(score) model_classname = model.__class__.__name__ # Only persist in train mode, not in predict/explain modes if mode == "train": artifact = self._persist_model( runtime_context, model, model_id, branch_id=context.selector.branch_id, branch_name=context.selector.branch_name, branch_path=context.selector.branch_path, fold_id=i, custom_name=model_name ) binaries.append(artifact) # Attach artifact_id to prediction_data for deterministic loading artifact_id = getattr(artifact, 'artifact_id', None) if artifact_id: prediction_data['model_artifact_id'] = artifact_id # Compute weights based on scores metric, higher_is_better = ModelUtils.get_best_score_metric(dataset.task_type) weights = EnsembleUtils._scores_to_weights(np.array(scores), higher_is_better=higher_is_better) # Create fold averages and get average predictions data if len(folds) > 1: avg_predictions, w_avg_predictions = self._create_fold_averages( base_model_name, dataset, model_config, context, runtime_context, prediction_store, model_classname, folds_models, fold_val_indices, scores, X_train, X_test, y_train_unscaled, y_test_unscaled, mode=mode, best_params=best_params, test_sample_ids=test_sample_ids ) # Collect ALL predictions (folds + averages) and add them in one shot with same weights all_fold_predictions = all_fold_predictions + [avg_predictions, w_avg_predictions] self._add_all_predictions(prediction_store, all_fold_predictions, weights, mode=mode) else: logger.warning("WARNING: Using test set as validation set (no folds provided)") model, model_id, score, model_name, prediction_data = self.launch_training( dataset, model_config, context, runtime_context, prediction_store, X_train, y_train, X_test, y_test, X_test, y_train_unscaled, y_test_unscaled, y_test_unscaled, loaded_binaries=loaded_binaries, mode=mode, test_sample_ids=test_sample_ids ) artifact = self._persist_model( runtime_context, model, model_id, branch_id=context.selector.branch_id, branch_name=context.selector.branch_name, branch_path=context.selector.branch_path, fold_id=None, # Single model, no folds custom_name=model_name ) binaries.append(artifact) # Attach artifact_id to prediction_data for deterministic loading artifact_id = getattr(artifact, 'artifact_id', None) if artifact_id: prediction_data['model_artifact_id'] = artifact_id # Add predictions for single model case (no weights) self._add_all_predictions(prediction_store, [prediction_data], None, mode=mode) return binaries
[docs] def process_hyperparameters(self, params: Dict[str, Any]) -> Dict[str, Any]: """Process hyperparameters before use. Can be overridden by subclasses to structure parameters (e.g. nesting for TensorFlow). Args: params: Flat dictionary of sampled parameters. Returns: Processed dictionary of parameters. """ return params
[docs] def launch_training( self, dataset, model_config, context, runtime_context, prediction_store, X_train, y_train, X_val, y_val, X_test, y_train_unscaled, y_val_unscaled, y_test_unscaled, train_indices=None, val_indices=None, fold_idx=None, best_params=None, loaded_binaries=None, mode="train", test_sample_ids=None): """Execute single model training or prediction. This refactored method uses modular components to handle: - Model identification and naming - Model loading for predict/explain modes - Training execution - Prediction transformation - Score calculation - Prediction data assembly Args: dataset: SpectroDataset instance model_config: Model configuration dictionary context: Execution context with step_id, y processing, etc. runtime_context: Runtime context. prediction_store: Predictions storage instance X_train, y_train: Training data (scaled) X_val, y_val: Validation data (scaled) X_test: Test data (scaled) y_train_unscaled, y_val_unscaled, y_test_unscaled: True values (unscaled) train_indices, val_indices: Sample indices for each partition fold_idx: Optional fold index for CV best_params: Optional hyperparameters from optimization loaded_binaries: Optional binaries for predict/explain mode mode: Execution mode ('train', 'finetune', 'predict', 'explain') test_sample_ids: List of actual sample IDs for test partition (for stacking). Returns: Tuple of (trained_model, model_id, val_score, model_name, prediction_data) """ # === 1. GENERATE IDENTIFIERS === identifiers = self.identifier_generator.generate(model_config, runtime_context, context, fold_idx) # Debug: check identifiers if identifiers.step_id == '' or identifiers.step_id == 0: logger.warning(f"WARNING in launch_training: step_id={identifiers.step_id}") logger.warning(f"context.state.step_number={context.state.step_number}") # === 2. GET OR LOAD MODEL === if mode in ("predict", "explain"): model = None # V3: Use artifact_provider for chain-based loading if runtime_context.artifact_provider is not None: step_index = runtime_context.step_number import logging logging.debug(f"MODEL: loading from artifact_provider step={step_index}, fold={fold_idx}") # Get artifact with branch awareness branch_path = context.selector.branch_path if hasattr(context.selector, 'branch_path') else None if fold_idx is not None: # Get fold-specific artifact using get_fold_artifacts (supports branch_path) fold_artifacts = runtime_context.artifact_provider.get_fold_artifacts( step_index, branch_path=branch_path ) # Find artifact for this specific fold for fid, artifact_obj in fold_artifacts: if fid == fold_idx: model = artifact_obj break else: # Get primary artifact (for non-CV or single model case) # Use get_artifacts_for_step which supports branch_path step_artifacts = runtime_context.artifact_provider.get_artifacts_for_step( step_index, branch_path=branch_path ) if step_artifacts: model = step_artifacts[0][1] if model is not None: logging.debug(f"MODEL: loaded model type={type(model).__name__}") if self.verbose > 0: logger.debug(f"Loaded model via artifact_provider for step {step_index}, fold={fold_idx}") if model is None: raise ValueError( f"Model not found at step {runtime_context.step_number}, fold={fold_idx}. " f"Ensure artifact_provider is properly configured." ) # Capture model for SHAP explanation if mode == "explain" and self._should_capture_for_explanation(runtime_context, identifiers): if hasattr(runtime_context, 'explainer') and hasattr(runtime_context.explainer, 'capture_model'): runtime_context.explainer.capture_model(model, self) trained_model = model else: # Create new model for training if mode == "finetune" and best_params is not None: if self.verbose > 0: print(f"Training model {identifiers.name} with: {best_params}...") model = self._get_model_instance(dataset, model_config, force_params=best_params) else: # Support model_params for customizing NN architecture at training time model_params = model_config.get('model_params', {}) if model_params: base_model = self._get_model_instance(dataset, model_config, force_params=model_params) else: base_model = self._get_model_instance(dataset, model_config) model = self._clone_model(base_model) # === 3. TRAIN MODEL === X_train_prep, y_train_prep = self._prepare_data(X_train, y_train, context or {}) X_val_prep, y_val_prep = self._prepare_data(X_val, y_val, context or {}) X_test_prep, _ = self._prepare_data(X_test, None, context or {}) # Log data shapes before training if self.verbose > 0: logger.debug(f"Training data shapes - X_train: {X_train_prep.shape}, y_train: {y_train_prep.shape if y_train_prep is not None else 'None'}, " f"X_val: {X_val_prep.shape}, y_val: {y_val_prep.shape if y_val_prep is not None else 'None'}, " f"X_test: {X_test_prep.shape}") # Pass task_type to train_model train_params = model_config.get('train_params', {}).copy() train_params['task_type'] = dataset.task_type trained_model = self._train_model( model, X_train_prep, y_train_prep, X_val_prep, y_val_prep, **train_params ) # === 4. GENERATE PREDICTIONS (scaled) === X_train_prep, y_train_prep = self._prepare_data(X_train, y_train, context or {}) X_val_prep, y_val_prep = self._prepare_data(X_val, y_val, context or {}) X_test_prep, _ = self._prepare_data(X_test, None, context or {}) # Generate predictions for all partitions with data (based on X, not y) predictions_scaled = { 'train': self._predict_model(trained_model, X_train_prep) if X_train_prep.shape[0] > 0 else np.array([]), 'val': self._predict_model(trained_model, X_val_prep) if X_val_prep.shape[0] > 0 else np.array([]), 'test': self._predict_model(trained_model, X_test_prep) if X_test_prep.shape[0] > 0 else np.array([]) } # === 4b. GENERATE PROBABILITIES FOR CLASSIFICATION === is_classification = dataset.task_type and dataset.task_type.is_classification probabilities = {'train': None, 'val': None, 'test': None} if is_classification: probabilities = { 'train': self._predict_proba_model(trained_model, X_train_prep) if X_train_prep.shape[0] > 0 else None, 'val': self._predict_proba_model(trained_model, X_val_prep) if X_val_prep.shape[0] > 0 else None, 'test': self._predict_proba_model(trained_model, X_test_prep) if X_test_prep.shape[0] > 0 else None } # === 5. TRANSFORM PREDICTIONS TO UNSCALED === predictions_unscaled = { 'train': self.prediction_transformer.transform_to_unscaled(predictions_scaled['train'], dataset, context), 'val': self.prediction_transformer.transform_to_unscaled(predictions_scaled['val'], dataset, context), 'test': self.prediction_transformer.transform_to_unscaled(predictions_scaled['test'], dataset, context) } # === 6. CALCULATE SCORES === true_values = { 'train': y_train_unscaled, 'val': y_val_unscaled, 'test': y_test_unscaled } partition_scores = self.score_calculator.calculate( true_values, predictions_unscaled, dataset.task_type ) # Calculate full metrics for all partitions full_scores = {} for partition in ['train', 'val', 'test']: if len(true_values[partition]) > 0 and len(predictions_unscaled[partition]) > 0: full_scores[partition] = self._calculate_and_print_scores( true_values[partition], predictions_unscaled[partition], dataset.task_type, partition=partition, model_name=identifiers.name, show_detailed_scores=False # (partition == 'test') # Only show detailed scores for test ) else: full_scores[partition] = {} # === 7. NORMALIZE INDICES === # In predict mode with no y, use X shape to determine sample counts n_samples = { 'train': len(y_train_unscaled) if y_train_unscaled is not None and len(y_train_unscaled) > 0 else (len(y_train_prep) if y_train_prep is not None and len(y_train_prep) > 0 else X_train_prep.shape[0]), 'val': len(y_val_unscaled) if y_val_unscaled is not None and len(y_val_unscaled) > 0 else (len(y_val_prep) if y_val_prep is not None and len(y_val_prep) > 0 else X_val_prep.shape[0]), 'test': len(y_test_unscaled) if y_test_unscaled is not None and len(y_test_unscaled) > 0 else X_test_prep.shape[0] } # For test indices: use actual sample IDs if provided, otherwise use range test_indices_normalized = test_sample_ids if test_sample_ids is not None else self.index_normalizer.normalize(None, n_samples['test']) indices = { 'train': self.index_normalizer.normalize(train_indices, n_samples['train']), 'val': self.index_normalizer.normalize(val_indices, n_samples['val']), 'test': test_indices_normalized } # === 8. ASSEMBLE PREDICTION DATA === scores_dict = { 'train': partition_scores.train, 'val': partition_scores.val, 'test': partition_scores.test, 'metric': partition_scores.metric } prediction_data = self.prediction_assembler.assemble( dataset=dataset, identifiers=identifiers, scores=scores_dict, predictions=predictions_unscaled, true_values=true_values, indices=indices, runner=runtime_context, X_shape=X_train.shape, best_params=best_params, context=context ) # Add full scores to prediction data prediction_data['scores'] = full_scores # Add probabilities for classification tasks prediction_data['probabilities'] = probabilities return trained_model, identifiers.model_id, partition_scores.val, identifiers.name, prediction_data
def _should_capture_for_explanation(self, runtime_context, identifiers) -> bool: """Check if current model should be captured for SHAP explanation. Compares model name and step index with runner's target_model to determine if this is the model requiring explanation. Args: runtime_context: Runtime context with target_model info. identifiers: ModelIdentifiers with name and step_id. Returns: True if model should be captured for explanation, False otherwise. """ target = runtime_context.target_model # Convert both to string for comparison to handle int/string mismatch target_step = str(target["step_idx"]) ident_step = str(identifiers.step_id) return (target["model_name"] == identifiers.name and target_step == ident_step) def _print_prediction_summary(self, prediction_data, pred_id, mode): """Print formatted summary for a single prediction. Displays model name, metric, test/val scores, and fold information with appropriate directional indicators (↑ for metrics to maximize, ↓ to minimize). Args: prediction_data: Prediction dictionary with scores and metadata. pred_id: Unique prediction identifier. mode: Execution mode. """ model_name = prediction_data['model_name'] fold_id = prediction_data['fold_id'] op_counter = prediction_data['op_counter'] val_score = prediction_data['val_score'] test_score = prediction_data['test_score'] metric = prediction_data['metric'] direction = "↑" if metric in ['r2', 'accuracy', 'balanced_accuracy'] else "↓" summary = f"{model_name} {metric} {direction} [test: {test_score:.4f}], [val: {val_score:.4f}]" if fold_id not in [None, 'None', 'avg', 'w_avg']: summary += f", (fold: {fold_id}, id: {op_counter})" elif fold_id in ['avg', 'w_avg']: summary += f", ({fold_id}, id: {op_counter})" summary += f" - [{pred_id}]" logger.success(summary)
[docs] def get_preferred_layout(self) -> str: """Get preferred data layout for the framework. Returns: Data layout string ('2d' for NumPy arrays, '3d' for TensorFlow, etc.). Note: Override in subclasses for framework-specific layouts. """ return "2d"
[docs] def get_effective_layout(self, step_info: Optional['ParsedStep'] = None) -> str: """Get effective data layout, respecting force_layout if specified. This method checks if the step configuration has a force_layout override. If not, it falls back to the controller's preferred layout. Args: step_info: ParsedStep containing potential force_layout override. Returns: Data layout string to use for this step. """ if step_info is not None and hasattr(step_info, 'force_layout') and step_info.force_layout is not None: return step_info.force_layout return self.get_preferred_layout()
def _calculate_and_print_scores( self, y_true: Any, y_pred: Any, task_type: TaskType, partition: str = "test", model_name: str = "model", show_detailed_scores: bool = True ) -> Dict[str, float]: """Calculate evaluation scores and print formatted output. Args: y_true: True target values. y_pred: Predicted values. task_type: TaskType enum indicating regression or classification. partition: Partition name for display ('train', 'val', 'test'). model_name: Model name for display. show_detailed_scores: Whether to print detailed score breakdown. Returns: Dictionary of metric names and scores. """ # Get default metrics for the task type default_metrics = evaluator.get_default_metrics(task_type.value) # Calculate all default metrics scores_list = evaluator.eval_list(y_true, y_pred, default_metrics) scores = dict(zip(default_metrics, scores_list)) if scores and show_detailed_scores: score_str = ModelUtils.format_scores(scores) logger.info(f"{model_name} {partition} scores: {score_str}") return scores def _extract_model_config(self, step: Any, operator: Any = None) -> Dict[str, Any]: """Extract and normalize model configuration from step or operator. Handles various configuration formats: - Dictionary with 'model' key - Dictionary with 'function'/'class'/'import' keys (serialized) - Direct model instance - Nested model dictionaries Args: step: Pipeline step configuration (dict or model instance). operator: Optional model operator instance. Returns: Normalized configuration dictionary with 'model_instance' or builder keys. """ if operator is not None: # print(f"DEBUG operator branch taken") if isinstance(step, dict): config = step.copy() config['model_instance'] = operator # Preserve function/class keys from nested model structure for name extraction if 'model' in step and isinstance(step['model'], dict): if 'function' in step['model']: config['function'] = step['model']['function'] elif 'class' in step['model']: config['class'] = step['model']['class'] # print(f"DEBUG returning config (step is dict): {list(config.keys())}") return config else: # print(f"DEBUG returning model_instance wrapper") return {'model_instance': operator} if isinstance(step, dict): # If step is already a serialized format with 'function', 'class', or 'import', # pass it through as-is for ModelFactory if any(key in step for key in ('function', 'class', 'import')): # print(f"DEBUG returning step as-is: {step}") return step if 'model' in step: config = step.copy() model_obj = step['model'] # Handle nested model format if isinstance(model_obj, dict): if 'model' in model_obj: config['model_instance'] = model_obj['model'] if 'name' in model_obj: config['name'] = model_obj['name'] else: config['model_instance'] = model_obj else: config['model_instance'] = model_obj return config else: return {'model_instance': step} else: return {'model_instance': step} def _create_fold_averages( self, base_model_name, dataset, model_config, context, runtime_context, prediction_store, model_classname, folds_models, fold_val_indices, scores, X_train, X_test, y_train_unscaled, y_test_unscaled, mode="train", best_params=None, test_sample_ids=None ) -> Tuple[Dict, Dict]: """Create simple and weighted fold-averaged predictions. Generates two averaged predictions: 1. Simple average: Equal weight to all folds 2. Weighted average: Weights based on validation scores For regression: Uses arithmetic mean of predictions. For classification: Uses soft voting (average probabilities, then argmax). Falls back to hard voting if probabilities unavailable. Args: base_model_name: Base name for averaged models. dataset: SpectroDataset with task type and preprocessing info. model_config: Model configuration dictionary. context: Execution context. runtime_context: Runtime context. prediction_store: Predictions storage. model_classname: Model class name string. folds_models: List of (model_id, model, score) tuples from folds. fold_val_indices: List of validation indices for each fold. scores: List of validation scores for each fold. X_train: Training features (all folds). X_test: Test features. y_train_unscaled: Training targets (unscaled). y_test_unscaled: Test targets (unscaled). mode: Execution mode. best_params: Optional hyperparameters. test_sample_ids: List of actual sample IDs for test partition. Returns: Tuple of (avg_prediction_dict, weighted_avg_prediction_dict). """ is_classification = dataset.task_type and dataset.task_type.is_classification # Prepare validation data X_val = np.vstack([X_train[val_idx] for val_idx in fold_val_indices]) y_val_unscaled = np.vstack([y_train_unscaled[val_idx] for val_idx in fold_val_indices]) all_val_indices = np.hstack(fold_val_indices) # Calculate weights based on scores metric, higher_is_better = ModelUtils.get_best_score_metric(dataset.task_type) weights = self._get_fold_weights(scores, higher_is_better, mode, runtime_context) # Initialize probabilities (will be set for classification with soft voting) avg_probs = {'train': None, 'val': None, 'test': None} w_avg_probs = {'train': None, 'val': None, 'test': None} if is_classification: # Use classification-specific averaging (soft or hard voting) avg_preds, w_avg_preds, avg_probs, w_avg_probs = self._create_classification_fold_averages( folds_models, X_train, X_val, X_test, weights, mode ) else: # Use regression averaging (arithmetic mean) avg_preds, w_avg_preds = self._create_regression_fold_averages( folds_models, X_train, X_val, X_test, weights, dataset, context, mode ) # Calculate scores for averaged predictions true_values = {'train': y_train_unscaled, 'val': y_val_unscaled, 'test': y_test_unscaled} avg_scores = self.score_calculator.calculate( true_values, avg_preds, dataset.task_type ) if mode not in ("predict", "explain") else None w_avg_scores = self.score_calculator.calculate( true_values, w_avg_preds, dataset.task_type ) if mode not in ("predict", "explain") else None # IMPORTANT: Override val_score for avg/w_avg to avoid data leakage. # The validation predictions above are computed by having each fold model # predict on ALL validation samples, but each fold's model has seen its own # validation samples during training. This causes biased (overly optimistic) val scores. # Instead, use the average of individual fold validation scores (which are unbiased OOF scores). if mode not in ("predict", "explain") and len(scores) > 0: # Simple average of fold val_scores for 'avg' fold avg_val_score = float(np.mean(scores)) if avg_scores is not None: avg_scores = PartitionScores( train=avg_scores.train, val=avg_val_score, # Override with unbiased average test=avg_scores.test, metric=avg_scores.metric, higher_is_better=avg_scores.higher_is_better ) # Weighted average of fold val_scores for 'w_avg' fold w_avg_val_score = float(np.sum([w * s for w, s in zip(weights, scores)])) if w_avg_scores is not None: w_avg_scores = PartitionScores( train=w_avg_scores.train, val=w_avg_val_score, # Override with unbiased weighted average test=w_avg_scores.test, metric=w_avg_scores.metric, higher_is_better=w_avg_scores.higher_is_better ) # Calculate full metrics for average predictions avg_full_scores = {} w_avg_full_scores = {} if mode not in ("predict", "explain"): for partition in ['train', 'val', 'test']: if len(true_values[partition]) > 0 and len(avg_preds[partition]) > 0: avg_full_scores[partition] = self._calculate_and_print_scores( true_values[partition], avg_preds[partition], dataset.task_type, partition=partition, model_name=f"{base_model_name}_avg", show_detailed_scores=False ) else: avg_full_scores[partition] = {} if len(true_values[partition]) > 0 and len(w_avg_preds[partition]) > 0: w_avg_full_scores[partition] = self._calculate_and_print_scores( true_values[partition], w_avg_preds[partition], dataset.task_type, partition=partition, model_name=f"{base_model_name}_w_avg", show_detailed_scores=False ) else: w_avg_full_scores[partition] = {} # IMPORTANT: Override val metrics for avg/w_avg with unbiased scores. # The metrics computed above for 'val' partition are biased because each fold # model predicts on ALL validation samples (including samples it was trained on). # We must use the unbiased scores (mean/weighted mean of individual fold OOF scores) # to ensure correct ranking. This matches the override done for avg_scores.val above. if avg_scores is not None and avg_scores.metric and 'val' in avg_full_scores: avg_full_scores['val'][avg_scores.metric] = avg_scores.val if w_avg_scores is not None and w_avg_scores.metric and 'val' in w_avg_full_scores: w_avg_full_scores['val'][w_avg_scores.metric] = w_avg_scores.val # Use prediction_assembler component to create prediction dicts avg_predictions = self._assemble_avg_prediction( dataset, runtime_context, context, base_model_name, model_classname, avg_preds, avg_scores, true_values, all_val_indices, "avg", best_params, mode, X_train.shape, test_sample_ids=test_sample_ids ) avg_predictions['scores'] = avg_full_scores avg_predictions['probabilities'] = avg_probs w_avg_predictions = self._assemble_avg_prediction( dataset, runtime_context, context, base_model_name, model_classname, w_avg_preds, w_avg_scores, true_values, all_val_indices, "w_avg", best_params, mode, X_train.shape, weights, test_sample_ids=test_sample_ids ) w_avg_predictions['scores'] = w_avg_full_scores w_avg_predictions['probabilities'] = w_avg_probs return avg_predictions, w_avg_predictions def _create_regression_fold_averages( self, folds_models, X_train, X_val, X_test, weights, dataset, context, mode ) -> Tuple[Dict, Dict]: """Create fold-averaged predictions for regression using arithmetic mean. Args: folds_models: List of (model_id, model, score) tuples. X_train, X_val, X_test: Feature arrays for each partition. weights: Fold weights for weighted averaging. dataset: SpectroDataset for prediction transformation. context: Execution context. mode: Execution mode. Returns: Tuple of (simple_avg_preds, weighted_avg_preds) dictionaries. """ all_train_preds = [] all_val_preds = [] all_test_preds = [] for _, fold_model, _ in folds_models: preds_scaled = { 'train': self._predict_model(fold_model, X_train) if X_train.shape[0] > 0 else np.array([]), 'val': self._predict_model(fold_model, X_val) if X_val.shape[0] > 0 else np.array([]), 'test': self._predict_model(fold_model, X_test) if X_test.shape[0] > 0 else np.array([]) } # Use prediction_transformer component for unscaling preds_unscaled = { 'train': self.prediction_transformer.transform_to_unscaled(preds_scaled['train'], dataset, context), 'val': self.prediction_transformer.transform_to_unscaled(preds_scaled['val'], dataset, context), 'test': self.prediction_transformer.transform_to_unscaled(preds_scaled['test'], dataset, context) } all_train_preds.append(preds_unscaled['train']) all_val_preds.append(preds_unscaled['val']) all_test_preds.append(preds_unscaled['test']) # Simple average avg_preds = { 'train': np.mean(all_train_preds, axis=0), 'val': np.mean(all_val_preds, axis=0) if mode not in ("predict", "explain") else np.array([]), 'test': np.mean(all_test_preds, axis=0) } # Weighted average w_avg_preds = { 'train': np.sum([w * p for w, p in zip(weights, all_train_preds)], axis=0), 'val': np.sum([w * p for w, p in zip(weights, all_val_preds)], axis=0) if mode not in ("predict", "explain") else np.array([]), 'test': np.sum([w * p for w, p in zip(weights, all_test_preds)], axis=0) } return avg_preds, w_avg_preds def _create_classification_fold_averages( self, folds_models, X_train, X_val, X_test, weights, mode ) -> Tuple[Dict, Dict]: """Create fold-averaged predictions for classification using soft voting. Uses probability averaging (soft voting) when probabilities are available, otherwise falls back to hard voting (majority vote). Args: folds_models: List of (model_id, model, score) tuples. X_train, X_val, X_test: Feature arrays for each partition. weights: Fold weights for weighted voting. mode: Execution mode. Returns: Tuple of (simple_avg_preds, weighted_avg_preds) dictionaries. """ # Collect probabilities or class predictions from all folds all_train_probs = [] all_val_probs = [] all_test_probs = [] all_train_preds = [] all_val_preds = [] all_test_preds = [] use_soft_voting = True for _, fold_model, _ in folds_models: # Try to get probabilities first train_probs = self._predict_proba_model(fold_model, X_train) if X_train.shape[0] > 0 else None val_probs = self._predict_proba_model(fold_model, X_val) if X_val.shape[0] > 0 else None test_probs = self._predict_proba_model(fold_model, X_test) if X_test.shape[0] > 0 else None if train_probs is None or test_probs is None: # Model doesn't support probabilities, use hard voting use_soft_voting = False all_train_preds.append( self._predict_model(fold_model, X_train) if X_train.shape[0] > 0 else np.array([]) ) all_val_preds.append( self._predict_model(fold_model, X_val) if X_val.shape[0] > 0 else np.array([]) ) all_test_preds.append( self._predict_model(fold_model, X_test) if X_test.shape[0] > 0 else np.array([]) ) else: all_train_probs.append(train_probs) all_val_probs.append(val_probs if val_probs is not None else np.array([])) all_test_probs.append(test_probs) # Also collect class predictions for potential fallback all_train_preds.append( self._predict_model(fold_model, X_train) if X_train.shape[0] > 0 else np.array([]) ) all_val_preds.append( self._predict_model(fold_model, X_val) if X_val.shape[0] > 0 else np.array([]) ) all_test_preds.append( self._predict_model(fold_model, X_test) if X_test.shape[0] > 0 else np.array([]) ) if use_soft_voting and all_train_probs: # Soft voting: average probabilities, then argmax # avg: simple average of probabilities # w_avg: uses fold weights AND confidence weighting for meaningful differences avg_preds, avg_probs = self._soft_vote_partitions( all_train_probs, all_val_probs, all_test_probs, weights=None, mode=mode, use_confidence_weighting=False ) w_avg_preds, w_avg_probs = self._soft_vote_partitions( all_train_probs, all_val_probs, all_test_probs, weights=weights, mode=mode, use_confidence_weighting=True ) else: # Hard voting: majority vote on class predictions avg_preds = self._hard_vote_partitions( all_train_preds, all_val_preds, all_test_preds, weights=None, mode=mode ) w_avg_preds = self._hard_vote_partitions( all_train_preds, all_val_preds, all_test_preds, weights=weights, mode=mode ) # No probabilities for hard voting avg_probs = {'train': None, 'val': None, 'test': None} w_avg_probs = {'train': None, 'val': None, 'test': None} return avg_preds, w_avg_preds, avg_probs, w_avg_probs def _soft_vote_partitions( self, all_train_probs, all_val_probs, all_test_probs, weights, mode, use_confidence_weighting: bool = False ) -> Tuple[Dict[str, np.ndarray], Dict[str, np.ndarray]]: """Apply soft voting to each partition. Args: all_*_probs: Lists of probability arrays from each fold. weights: Optional fold weights. mode: Execution mode. use_confidence_weighting: If True, weight by prediction confidence. Returns: Tuple of: - predictions: Dictionary with class predictions for each partition. - probabilities: Dictionary with averaged probabilities for each partition. """ predictions = {} probabilities = {} # Train partition if all_train_probs and all_train_probs[0].size > 0: predictions['train'], probabilities['train'] = EnsembleUtils.compute_soft_voting_average( all_train_probs, weights, use_confidence_weighting ) else: predictions['train'] = np.array([]) probabilities['train'] = None # Validation partition if mode not in ("predict", "explain") and all_val_probs and all_val_probs[0].size > 0: predictions['val'], probabilities['val'] = EnsembleUtils.compute_soft_voting_average( all_val_probs, weights, use_confidence_weighting ) else: predictions['val'] = np.array([]) probabilities['val'] = None # Test partition if all_test_probs and all_test_probs[0].size > 0: predictions['test'], probabilities['test'] = EnsembleUtils.compute_soft_voting_average( all_test_probs, weights, use_confidence_weighting ) else: predictions['test'] = np.array([]) probabilities['test'] = None return predictions, probabilities def _hard_vote_partitions( self, all_train_preds, all_val_preds, all_test_preds, weights, mode ) -> Dict[str, np.ndarray]: """Apply hard voting (majority vote) to each partition. Args: all_*_preds: Lists of class prediction arrays from each fold. weights: Optional fold weights. mode: Execution mode. Returns: Dictionary with voted predictions for each partition. """ result = {} # Train partition if all_train_preds and all_train_preds[0].size > 0: result['train'] = EnsembleUtils.compute_hard_voting(all_train_preds, weights) else: result['train'] = np.array([]) # Validation partition if mode not in ("predict", "explain") and all_val_preds and all_val_preds[0].size > 0: result['val'] = EnsembleUtils.compute_hard_voting(all_val_preds, weights) else: result['val'] = np.array([]) # Test partition if all_test_preds and all_test_preds[0].size > 0: result['test'] = EnsembleUtils.compute_hard_voting(all_test_preds, weights) else: result['test'] = np.array([]) return result def _get_fold_weights(self, scores, higher_is_better, mode, runtime_context): """Calculate weights for fold averaging based on validation scores. In prediction/explain modes, restores weights from target model if available. Otherwise, computes weights using EnsembleUtils._scores_to_weights(). Args: scores: Array of validation scores for each fold. higher_is_better: Whether higher scores are better (True for R², False for RMSE). mode: Execution mode. runtime_context: Runtime context with target_model info. Returns: NumPy array of normalized weights summing to 1.0. """ scores = np.asarray(scores, dtype=float) if mode in ("predict", "explain") and runtime_context.target_model and "weights" in runtime_context.target_model: weights_from_model = runtime_context.target_model["weights"] # Check if weights exist and are not None/empty if weights_from_model is not None: if isinstance(weights_from_model, str): import json return np.array(json.loads(weights_from_model), dtype=float) elif isinstance(weights_from_model, (list, np.ndarray)): weights_array = np.asarray(weights_from_model, dtype=float) if len(weights_array) > 0: return weights_array if np.all(np.isnan(scores)): return np.ones(len(scores), dtype=float) / len(scores) return EnsembleUtils._scores_to_weights(scores, higher_is_better=higher_is_better) def _assemble_avg_prediction(self, dataset, runner, context, model_name, model_classname, predictions, scores, true_values, val_indices, fold_id, best_params, mode, X_shape, weights=None, test_sample_ids=None): """Assemble prediction dictionary for averaged model. Creates a complete prediction record with all metadata, scores, and partition data for simple or weighted averaged predictions. Args: dataset: SpectroDataset with name and preprocessing info. runner: PipelineRunner with pipeline metadata. context: Execution context with step_id. model_name: Base model name. model_classname: Model class name string. predictions: Dict of {partition: predictions_array}. scores: PartitionScores object with train/val/test scores. true_values: Dict of {partition: true_values_array}. val_indices: Validation sample indices. fold_id: Fold identifier ('avg' or 'w_avg'). best_params: Optional hyperparameters dictionary. mode: Execution mode. X_shape: Shape of input features (for n_features). weights: Optional array of fold weights. test_sample_ids: List of actual sample IDs for test partition. Returns: Dictionary ready for prediction storage with all required fields. """ op_counter = runner.next_op() # Build partitions with actual sample IDs (not range indices) # For test: use test_sample_ids if provided, otherwise fallback to range test_indices = test_sample_ids if test_sample_ids is not None else list(range(len(true_values['test']))) partitions = [ ("train", list(range(len(true_values['train']))), true_values['train'], predictions['train']) ] if mode not in ("predict", "explain"): partitions.append(("val", val_indices.tolist(), true_values['val'], predictions['val'])) partitions.append(("test", test_indices, true_values['test'], predictions['test'])) # Extract metadata for each partition # Note: For 'val' partition in CV, samples come from train partition # so we use train metadata with val_indices to select the right rows partition_metadata = {} # Cache train metadata since val samples come from train train_meta_df = None try: train_meta_df = dataset.metadata({"partition": "train"}) except (KeyError, AttributeError, ValueError, TypeError): pass for partition_name in ['train', 'val', 'test']: if partition_name in predictions: try: pred_len = len(predictions[partition_name]) if partition_name == 'val': # Validation samples are from train partition - use val_indices meta_df = train_meta_df indices_to_use = val_indices.tolist() if hasattr(val_indices, 'tolist') else list(val_indices) elif partition_name == 'train': meta_df = train_meta_df indices_to_use = None # Use all rows else: # test meta_df = dataset.metadata({"partition": partition_name}) indices_to_use = None # Use all rows if meta_df is not None and len(meta_df) > 0: metadata_dict = {} for col in meta_df.columns: col_data = meta_df[col].to_numpy() if indices_to_use is not None: # Select specific rows for validation if len(indices_to_use) > 0 and max(indices_to_use) < len(col_data): selected_data = col_data[indices_to_use] if len(selected_data) == pred_len: metadata_dict[col] = selected_data.tolist() else: # Direct match for train/test if len(col_data) == pred_len: metadata_dict[col] = col_data.tolist() if metadata_dict: partition_metadata[partition_name] = metadata_dict except (KeyError, AttributeError, ValueError, TypeError, IndexError): pass # Get trace_id from runtime context (Phase 2) trace_id = runner.get_trace_id() if hasattr(runner, 'get_trace_id') else None result = { 'dataset_name': dataset.name, 'dataset_path': dataset.name, 'config_name': runner.saver.pipeline_id, 'config_path': f"{dataset.name}/{runner.saver.pipeline_id}", 'pipeline_uid': getattr(runner, 'pipeline_uid', None), 'trace_id': trace_id, # Phase 2: Link to execution trace 'step_idx': context.state.step_number, # Use step_number (int) not step_id (str) 'op_counter': op_counter, 'model_name': model_name, 'model_classname': str(model_classname), 'model_path': "", 'fold_id': fold_id, 'val_score': scores.val if scores else 0.0, 'test_score': scores.test if scores else 0.0, 'train_score': scores.train if scores else 0.0, 'metric': scores.metric if scores else ModelUtils.get_best_score_metric(dataset.task_type)[0], 'task_type': dataset.task_type, 'target_processing': context.state.y_processing, # Track which target processing was used 'n_features': X_shape[1] if len(X_shape) > 1 else 1, 'preprocessings': dataset.short_preprocessings_str(), 'partitions': partitions, 'partition_metadata': partition_metadata, 'best_params': best_params if best_params else {}, 'branch_id': getattr(context.selector, 'branch_id', None), 'branch_name': getattr(context.selector, 'branch_name', None) or "", } # Add outlier exclusion info if available (from outlier_excluder branches) exclusion_info = context.custom.get('exclusion_info', {}) if exclusion_info: result['exclusion_count'] = exclusion_info.get('n_excluded', 0) result['exclusion_rate'] = exclusion_info.get('exclusion_rate', 0.0) if weights is not None: result['weights'] = weights.tolist() return result def _add_all_predictions(self, prediction_store, all_predictions, weights, mode="train"): """Add all predictions to storage and print summaries. Iterates through prediction records, adds each partition to the store, and prints formatted summaries in train mode. Args: prediction_store: Predictions storage instance. all_predictions: List of prediction dictionaries. weights: Optional array of fold weights (applied to all predictions). mode: Execution mode ('train', 'finetune', 'predict', 'explain'). """ for idx, prediction_data in enumerate(all_predictions): if not prediction_data: continue partitions = prediction_data.get('partitions', []) probabilities = prediction_data.get('probabilities', {}) partition_metadata = prediction_data.get('partition_metadata', {}) # Add each partition's predictions pred_id = None for partition_name, indices, y_true_part, y_pred_part in partitions: if len(indices) == 0: continue # Get probabilities for this partition if available y_proba_part = probabilities.get(partition_name) if probabilities else None # Get metadata for this partition (for aggregation support) metadata = partition_metadata.get(partition_name, {}) pred_id = prediction_store.add_prediction( dataset_name=prediction_data['dataset_name'], dataset_path=prediction_data['dataset_path'], config_name=prediction_data['config_name'], config_path=prediction_data['config_path'], pipeline_uid=prediction_data.get('pipeline_uid'), step_idx=prediction_data['step_idx'], op_counter=prediction_data['op_counter'], model_name=prediction_data['model_name'], model_classname=prediction_data['model_classname'], model_path=prediction_data['model_path'], fold_id=prediction_data['fold_id'], sample_indices=indices, weights=weights, metadata=metadata, partition=partition_name, y_true=y_true_part, y_pred=y_pred_part, y_proba=y_proba_part, val_score=prediction_data['val_score'], test_score=prediction_data['test_score'], train_score=prediction_data['train_score'], metric=prediction_data['metric'], task_type=prediction_data['task_type'], n_samples=len(y_true_part), n_features=prediction_data['n_features'], preprocessings=prediction_data['preprocessings'], best_params=prediction_data['best_params'], scores=prediction_data.get('scores', {}), branch_id=prediction_data.get('branch_id'), branch_name=prediction_data.get('branch_name'), exclusion_count=prediction_data.get('exclusion_count'), exclusion_rate=prediction_data.get('exclusion_rate'), model_artifact_id=prediction_data.get('model_artifact_id'), trace_id=prediction_data.get('trace_id') ) # Print summary (only once per model) if pred_id and mode not in ("predict", "explain"): self._print_prediction_summary(prediction_data, pred_id, mode) def _persist_model( self, runtime_context: 'RuntimeContext', model: Any, model_id: str, branch_id: Optional[int] = None, branch_name: Optional[str] = None, branch_path: Optional[List[int]] = None, fold_id: Optional[int] = None, params: Optional[Dict[str, Any]] = None, custom_name: Optional[str] = None ) -> 'ArtifactMeta': """Persist trained model to disk using registry or legacy saver. Uses artifact_registry.register_with_chain() for V3 chain-based identification, enabling complete execution path tracking for deterministic reload. Also records the artifact in the execution trace for deterministic replay. Args: runtime_context: Runtime context with saver/registry instances. model: Trained model to persist. model_id: Unique identifier for the model. branch_id: Optional branch identifier for branched pipelines. branch_name: Optional branch name for branched pipelines. branch_path: Optional list of branch indices for nested branching. fold_id: Optional fold identifier for CV artifacts. params: Optional model parameters for inspection. custom_name: User-defined name for the model (e.g., "Q5_PLS_10"). Returns: ArtifactMeta or ArtifactRecord with persistence metadata. """ # Detect framework hint from model type model_type = type(model).__module__ if 'sklearn' in model_type: format_hint = 'sklearn' elif 'tensorflow' in model_type or 'keras' in model_type: format_hint = 'tensorflow' elif 'torch' in model_type: format_hint = 'pytorch' elif 'xgboost' in model_type: format_hint = 'xgboost' elif 'catboost' in model_type: format_hint = 'catboost' elif 'lightgbm' in model_type: format_hint = 'lightgbm' else: format_hint = None # Let serializer auto-detect # Use artifact registry if available (V3 system) if runtime_context.artifact_registry is not None: registry = runtime_context.artifact_registry pipeline_id = runtime_context.saver.pipeline_id if runtime_context.saver else "unknown" step_index = runtime_context.step_number # Use branch_path or convert branch_id to branch_path bp = branch_path or ([branch_id] if branch_id is not None else []) # Use substep_number as substep_index when in a subpipeline substep_index = runtime_context.substep_number if runtime_context.substep_number >= 0 else None # V3: Build operator chain for this artifact from nirs4all.pipeline.storage.artifacts.operator_chain import OperatorNode, OperatorChain # Get the current chain from trace recorder or build new one if runtime_context.trace_recorder is not None: current_chain = runtime_context.trace_recorder.current_chain() else: current_chain = OperatorChain(pipeline_id=pipeline_id) # Create node for this model model_node = OperatorNode( step_index=step_index, operator_class=model.__class__.__name__, branch_path=bp, fold_id=fold_id, substep_index=substep_index, operator_name=custom_name, ) # Build chain path for this artifact artifact_chain = current_chain.append(model_node) chain_path = artifact_chain.to_path() # Generate V3 artifact ID using chain artifact_id = registry.generate_id(chain_path, fold_id, pipeline_id) # Register artifact with V3 chain tracking record = registry.register( obj=model, artifact_id=artifact_id, artifact_type=ArtifactType.MODEL, params=params or {}, format_hint=format_hint, custom_name=custom_name, chain_path=chain_path, source_index=None, # Models don't have source_index ) # Record artifact in execution trace with chain_path runtime_context.record_step_artifact( artifact_id=artifact_id, is_primary=(fold_id is None), # Primary if not fold-specific fold_id=fold_id, chain_path=chain_path, branch_path=bp, metadata={"class_name": model.__class__.__name__, "custom_name": custom_name} ) return record # No registry available - skip persistence (for unit tests) # In production, artifact_registry should always be set by the runner return None