"""
Sklearn Model Controller - Controller for scikit-learn models
This controller handles sklearn models with support for:
- Training on 2D data (samples x features)
- Cross-validation and hyperparameter tuning with Optuna
- Model persistence and prediction storage
- Integration with the nirs4all pipeline
Matches any sklearn model object (estimators with fit/predict methods).
"""
from typing import Any, Dict, List, Tuple, Optional, TYPE_CHECKING
import numpy as np
import copy
from sklearn.base import BaseEstimator, RegressorMixin, ClassifierMixin
from sklearn.model_selection import cross_val_score
from sklearn.metrics import mean_squared_error, accuracy_score
from sklearn.base import is_classifier, is_regressor
from ..models.base_model import BaseModelController
from nirs4all.controllers.registry import register_controller
from nirs4all.core.logging import get_logger
from .utilities import ModelControllerUtils as ModelUtils
from .factory import ModelFactory
logger = get_logger(__name__)
def _reset_gpu_memory() -> bool:
"""Reset GPU memory using PyTorch CUDA.
Clears cached memory and synchronizes GPU operations.
Useful for preventing memory leaks with GPU-based models like CatBoost.
Returns:
bool: True if GPU was successfully reset, False if torch/CUDA unavailable.
"""
try:
import torch
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
return True
except ImportError:
pass
return False
if TYPE_CHECKING:
from nirs4all.pipeline.runner import PipelineRunner
from nirs4all.data.dataset import SpectroDataset
from nirs4all.pipeline.config.context import ExecutionContext
[docs]
@register_controller
class SklearnModelController(BaseModelController):
"""Controller for scikit-learn models.
This controller handles sklearn models with support for training on 2D data,
cross-validation, hyperparameter tuning with Optuna, model persistence,
and integration with the nirs4all pipeline.
Attributes:
priority (int): Controller priority (6) - higher than TransformerMixin to
prioritize supervised models over transformers.
"""
priority = 6 # Higher priority than TransformerMixin (10) to win matching
[docs]
@classmethod
def matches(cls, step: Any, operator: Any, keyword: str) -> bool:
"""Match sklearn estimators and model dictionaries with sklearn models.
Prioritizes supervised models (regressors and classifiers) over transformers
by checking for predict methods and using sklearn's is_regressor/is_classifier.
Args:
step (Any): Pipeline step to check, can be a dict with 'model' key or
BaseEstimator instance.
operator (Any): Optional operator object to check if it's a BaseEstimator.
keyword (str): Pipeline keyword (unused in this implementation).
Returns:
bool: True if the step matches a sklearn estimator (regressor, classifier,
or has predict method), False otherwise.
"""
# Check if it is an explicit model configuration
is_explicit_model = isinstance(step, dict) and 'model' in step
# Extract the model object/config
model = step.get('model') if is_explicit_model else step
# Use Factory to detect framework
framework = ModelFactory.detect_framework(model)
# If framework is unknown but we have an operator, try to detect from operator
if framework == 'unknown' and operator is not None:
framework = ModelFactory.detect_framework(operator)
# 1. Safety Net: Explicitly reject other specific frameworks
# This prevents the controller from "stealing" models if priorities are messed up later
if framework in ['tensorflow', 'pytorch', 'jax']:
return False
# 2. Explicitly accept known libraries that follow sklearn API
# But for sklearn, we must be strict to avoid transformers
if framework == 'sklearn':
# If it's a raw object, it MUST have predict
if not is_explicit_model:
if hasattr(model, 'predict'):
return True
# Also check the operator if available (handles dict steps where operator is instantiated)
if operator is not None and hasattr(operator, 'predict'):
return True
return False
# If it's explicit {"model": ...}, we accept it
return True
# 3. For other frameworks (xgboost, etc), accept them
if framework in ['xgboost', 'lightgbm', 'catboost']:
return True
# 4. Accept generic objects with fit/predict methods (Duck Typing)
if hasattr(model, 'fit') and hasattr(model, 'predict'):
return True
return False
def _get_model_instance(self, dataset: 'SpectroDataset', model_config: Dict[str, Any], force_params: Optional[Dict[str, Any]] = None) -> BaseEstimator:
"""Create sklearn model instance from configuration.
Handles multiple configuration formats:
- Direct model_instance (class or instance)
- New serialization format with 'function', 'class', or 'import' keys
- Legacy format with nested 'model' dict containing 'class' key
Args:
dataset (SpectroDataset): Dataset for context-aware parameter building.
model_config (Dict[str, Any]): Model configuration containing model class,
instance, or serialization info with optional params.
force_params (Optional[Dict[str, Any]]): Parameters to override or merge
with existing model parameters. Defaults to None.
Returns:
BaseEstimator: Instantiated sklearn model with configured parameters.
Raises:
ValueError: If model instance cannot be created from the configuration.
"""
# If we have a model_instance (class or instance) and force_params, we need to rebuild with new params
if 'model_instance' in model_config:
model = model_config['model_instance']
# If no force_params and it's already an instance, just return it
if force_params is None:
return model
# If we have force_params, we need to get the class and rebuild
if force_params:
# Get the model class (either from instance or if it's already a class)
if isinstance(model, type):
model_class = model
else:
model_class = type(model)
# Rebuild with force_params
return ModelFactory.build_single_model(model_class, dataset, force_params)
# Handle new serialization formats: {'function': ..., 'params': ...} or {'class': ..., 'params': ...}
if any(key in model_config for key in ('function', 'class', 'import')):
params = model_config.get('params', {})
if force_params:
params.update(force_params)
return ModelFactory.build_single_model(model_config, dataset, params)
# Handle old format: model_config['model']['class']
if 'model' in model_config and 'class' in model_config['model']:
model_class = model_config['model']['class']
model_params = model_config.get('model_params', {})
if force_params:
model_params.update(force_params)
model = ModelFactory.build_single_model(model_class, dataset, model_params)
return model
# Fallback: if model_config itself is a model instance (e.g. XGBRegressor)
# This happens when we pass {"model": XGBRegressor()} and _extract_model_config returns it wrapped or unwrapped
# Actually, _extract_model_config returns {'model_instance': ...} usually.
# But if something slipped through, we can try to use it directly.
# Check if model_config has 'model' key which is an instance
if 'model' in model_config:
model = model_config['model']
if force_params:
return ModelFactory.build_single_model(model, dataset, force_params)
return model
raise ValueError(f"Could not create model instance from configuration: {model_config.keys()}")
def _train_model(
self,
model: BaseEstimator,
X_train: np.ndarray,
y_train: np.ndarray,
X_val: Optional[np.ndarray] = None,
y_val: Optional[np.ndarray] = None,
**kwargs
) -> BaseEstimator:
"""Train sklearn model with score tracking.
Trains the model on training data, validates parameters against model's
available parameters, and optionally calculates training and validation
scores based on verbosity level.
Args:
model (BaseEstimator): Sklearn model instance to train (already cloned).
X_train (np.ndarray): Training features, shape (n_samples, n_features).
y_train (np.ndarray): Training targets, shape (n_samples, n_targets).
X_val (Optional[np.ndarray]): Validation features for score calculation.
Defaults to None.
y_val (Optional[np.ndarray]): Validation targets for score calculation.
Defaults to None.
**kwargs: Training parameters including 'verbose' level for output control
and 'task_type' for metric calculation.
Returns:
BaseEstimator: Trained sklearn model instance.
Note:
- y_train is automatically raveled to 1D for sklearn compatibility
- Only valid model parameters are applied from kwargs
- Training and validation scores are displayed when verbose > 1
"""
train_params = kwargs.copy()
# Extract controller-specific parameters that shouldn't be passed to the model
# task_type is injected by launch_training and conflicts with CatBoost's task_type (CPU/GPU)
task_type = train_params.pop('task_type', None)
# verbose controls controller output, we don't want to force it on the model
verbose = train_params.pop('verbose', 0)
# reset_gpu: if True, reset GPU memory before/after training (helps with CatBoost GPU memory leaks)
reset_gpu = train_params.pop('reset_gpu', False)
# Reset GPU memory BEFORE training to ensure clean state
if reset_gpu:
if _reset_gpu_memory():
if verbose > 1:
logger.debug(f"GPU memory cleared before training {model.__class__.__name__}")
elif verbose > 0:
logger.warning("reset_gpu=True but PyTorch/CUDA not available")
# if verbose > 1 and train_params:
# print(f"🔧 Training {model.__class__.__name__} with params: {train_params}")
# elif verbose > 1:
# print(f"🔧 Training {model.__class__.__name__}") # Model is already cloned in base class, just use it directly
trained_model = model
# Set additional parameters if provided
if train_params:
# Filter out parameters that don't exist in the model
valid_params = {}
model_params = trained_model.get_params()
for key, value in train_params.items():
if key in model_params:
valid_params[key] = value
# else:
# print(f"{WARNING}Parameter {key} not found in model {model.__class__.__name__}")
if valid_params:
trained_model.set_params(**valid_params)
# Fit the model
trained_model.fit(X_train, y_train.ravel()) # Ensure y is 1D for sklearn
# Reset GPU memory AFTER training as well to free model's training buffers
if reset_gpu:
_reset_gpu_memory()
# Always calculate and display final test scores, regardless of verbose level
# But control the detail level based on verbose
if verbose > 1:
# Get task_type from train_params (passed by base controller)
task_type = kwargs.get('task_type')
if task_type is None:
raise ValueError("task_type must be provided in train_params")
# Show detailed training scores at verbose > 1
y_train_pred = self._predict_model(trained_model, X_train)
train_scores = self._calculate_and_print_scores(
y_train, y_train_pred, task_type, "train",
trained_model.__class__.__name__, show_detailed_scores=False
)
# Display concise training summary
if train_scores:
best_metric, higher_is_better = ModelUtils.get_best_score_metric(task_type)
best_score = train_scores.get(best_metric)
if best_score is not None:
direction = "↑" if higher_is_better else "↓"
all_scores_str = ModelUtils.format_scores(train_scores)
# Commented out - using logger instead when needed
# Validation scores if available
if X_val is not None and y_val is not None:
y_val_pred = self._predict_model(trained_model, X_val)
val_scores = self._calculate_and_print_scores(
y_val, y_val_pred, task_type, "validation",
trained_model.__class__.__name__, show_detailed_scores=False
)
# Display concise validation summary
if val_scores:
best_metric, higher_is_better = ModelUtils.get_best_score_metric(task_type)
best_score = val_scores.get(best_metric)
if best_score is not None:
direction = "↑" if higher_is_better else "↓"
all_scores_str = ModelUtils.format_scores(val_scores)
# Commented out - using logger instead when needed
return trained_model
def _predict_model(self, model: BaseEstimator, X: np.ndarray) -> np.ndarray:
"""Generate predictions with sklearn model.
Args:
model (BaseEstimator): Trained sklearn model instance.
X (np.ndarray): Input features for prediction, shape (n_samples, n_features).
Returns:
np.ndarray: Model predictions, reshaped to (n_samples, n_outputs) format
for consistency with pipeline expectations.
"""
predictions = model.predict(X)
# Ensure predictions are in the correct shape
if predictions.ndim == 1:
predictions = predictions.reshape(-1, 1)
return predictions
def _predict_proba_model(self, model: BaseEstimator, X: np.ndarray) -> Optional[np.ndarray]:
"""Get class probabilities for sklearn classification models.
Supports all sklearn classifiers with predict_proba, plus XGBoost,
LightGBM, and CatBoost classifiers.
Args:
model (BaseEstimator): Trained sklearn classifier.
X (np.ndarray): Input features, shape (n_samples, n_features).
Returns:
np.ndarray: Class probabilities, shape (n_samples, n_classes),
or None if model doesn't support probability predictions.
"""
if not hasattr(model, 'predict_proba'):
return None
try:
proba = model.predict_proba(X)
# Ensure 2D array (some models return 1D for binary classification)
if proba.ndim == 1:
proba = np.column_stack([1 - proba, proba])
return proba
except Exception:
# Some models may have predict_proba but fail (e.g., SVC without probability=True)
return None
def _prepare_data(
self,
X: np.ndarray,
y: np.ndarray,
context: 'ExecutionContext'
) -> Tuple[np.ndarray, np.ndarray]:
"""Prepare data for sklearn (ensure 2D X and 2D y for consistency).
Reshapes input data to ensure proper dimensionality for sklearn models:
- X is reshaped to 2D (n_samples, n_features)
- y is reshaped to 2D (n_samples, n_targets) for consistency
Args:
X (np.ndarray): Input features, can be 1D, 2D, or higher dimensional.
y (np.ndarray): Target values, can be None for prediction-only scenarios.
context (ExecutionContext): Pipeline context (unused in this implementation).
Returns:
Tuple[np.ndarray, np.ndarray]: Prepared (X, y) arrays in proper format,
or (None, None) if X is None.
Note:
- Extra dimensions in X are flattened to (n_samples, n_features)
- y can be None for prediction-only scenarios
"""
if X is None:
return None, None
# Ensure X is 2D
if X.ndim > 2:
# Flatten extra dimensions
X = X.reshape(X.shape[0], -1)
elif X.ndim == 1:
X = X.reshape(-1, 1)
# Handle y (can be None for prediction-only scenarios)
if y is not None:
# Ensure y is 2D for consistency with predictions
if y.ndim == 1:
y = y.reshape(-1, 1)
elif y.ndim > 2:
y = y.reshape(y.shape[0], -1)
return X, y
def _evaluate_model(self, model: BaseEstimator, X_val: np.ndarray, y_val: np.ndarray) -> float:
"""Evaluate sklearn model using cross-validation.
Uses task-appropriate metrics:
- Classifiers: negative accuracy (for minimization)
- Regressors: negative MSE (for minimization)
- Others: model's score method or fallback to MSE
Args:
model (BaseEstimator): Sklearn model to evaluate.
X_val (np.ndarray): Validation features, shape (n_samples, n_features).
y_val (np.ndarray): Validation targets, shape (n_samples, n_targets).
Returns:
float: Evaluation score (negative for maximization metrics to support
minimization-based optimization). Returns inf on error.
Note:
- Uses 3-fold cross-validation
- y_val is automatically raveled to 1D for sklearn compatibility
- Fallback to MSE if cross-validation fails
"""
# Ensure y_val is 1D for sklearn functions
y_val_1d = y_val.ravel() if y_val.ndim > 1 else y_val
try:
# Use cross-validation for evaluation
# Note: is_classifier/is_regressor may fail for custom models that don't
# implement __sklearn_tags__. Fall back to isinstance checks with Mixin classes.
is_clf = is_classifier(model) or isinstance(model, ClassifierMixin)
is_reg = is_regressor(model) or isinstance(model, RegressorMixin)
if is_clf:
# For classifiers, use negative balanced accuracy (to minimize)
scores = cross_val_score(model, X_val, y_val_1d, cv=3, scoring='balanced_accuracy')
return -np.mean(scores) # Negative because we want to minimize
elif is_reg:
# For regressors, use negative MSE (to minimize)
scores = cross_val_score(model, X_val, y_val_1d, cv=3, scoring='neg_mean_squared_error')
return -np.mean(scores) # Already negative, so negate to get positive MSE
else:
# Default: use model's score method if available
if hasattr(model, 'score'):
score = model.score(X_val, y_val_1d)
return -score # Negative to minimize
else:
# Fallback: MSE for any model
y_pred = model.predict(X_val)
return mean_squared_error(y_val_1d, y_pred)
except Exception as e:
logger.warning(f"Error in model evaluation: {e}")
# Fallback evaluation
try:
y_pred = model.predict(X_val)
return mean_squared_error(y_val_1d, y_pred)
except Exception:
return float('inf') # Return worst possible score
[docs]
def get_preferred_layout(self) -> str:
"""Return the preferred data layout for sklearn models.
Returns:
str: Data layout preference, always '2d' for sklearn models which
expect (n_samples, n_features) input format.
"""
return "2d"
def _clone_model(self, model: BaseEstimator) -> BaseEstimator:
"""Clone sklearn model using sklearn's clone function.
Uses sklearn.base.clone() which creates a new instance with the same
parameters but without fitted attributes. This is the recommended way
to clone sklearn estimators.
For meta-estimators (StackingRegressor, VotingClassifier, etc.) and
boosting libraries (XGBoost, LightGBM, CatBoost), deepcopy is used
to preserve all nested structures properly.
Args:
model (BaseEstimator): Sklearn model instance to clone.
Returns:
BaseEstimator: Cloned sklearn model with same parameters but fresh state.
Raises:
RuntimeError: If sklearn is not available.
"""
# For boosting libraries, deepcopy might be safer to preserve all params
# especially if they don't perfectly adhere to sklearn API
framework = ModelFactory.detect_framework(model)
if framework in ['xgboost', 'lightgbm', 'catboost']:
return copy.deepcopy(model)
# Meta-estimators (stacking/voting) need deepcopy to preserve nested estimators
if ModelFactory.is_meta_estimator(model):
return copy.deepcopy(model)
try:
from sklearn.base import clone as sklearn_clone
return sklearn_clone(model)
except ImportError:
raise RuntimeError("sklearn is required to clone sklearn models")
def _sample_hyperparameters(self, trial, finetune_params: Dict[str, Any]) -> Dict[str, Any]:
"""Sample hyperparameters specific to sklearn models.
Extends base hyperparameter sampling with sklearn-specific handling.
Currently delegates to parent implementation but provides extension point
for sklearn-specific cases like random_state preservation.
Args:
trial: Optuna trial object for hyperparameter sampling.
finetune_params (Dict[str, Any]): Hyperparameter search space configuration.
Returns:
Dict[str, Any]: Sampled hyperparameters for model instantiation.
"""
params = super()._sample_hyperparameters(trial, finetune_params)
# Add sklearn-specific parameter handling if needed
# For example, handle special cases like random_state preservation
return params
[docs]
def execute(
self,
step_info: 'ParsedStep',
dataset: 'SpectroDataset',
context: 'ExecutionContext',
runtime_context: 'RuntimeContext',
source: int = -1,
mode: str = "train",
loaded_binaries: Optional[List[Tuple[str, bytes]]] = None,
prediction_store: Optional[Any] = None
) -> Tuple['ExecutionContext', List[Tuple[str, bytes]]]:
"""Execute sklearn model controller with score management.
Main entry point for sklearn model execution in the pipeline. Sets the
preferred data layout to '2d' and delegates to parent execute method.
Args:
step_info: Parsed step containing model configuration and operator.
dataset (SpectroDataset): Dataset containing features and targets.
context (ExecutionContext): Pipeline execution context with state info.
runtime_context (RuntimeContext): Runtime context managing execution state.
source (int): Source index for multi-source pipelines. Defaults to -1.
mode (str): Execution mode ('train' or 'predict'). Defaults to 'train'.
loaded_binaries (Optional[List[Tuple[str, bytes]]]): Pre-loaded model
binaries for prediction mode. Defaults to None.
prediction_store (Optional[Any]): Store for managing predictions.
Defaults to None.
Returns:
Tuple[ExecutionContext, List[Tuple[str, bytes]]]: Updated context and
list of model binaries (name, serialized_model) for persistence.
Note:
- Automatically sets context['layout'] = '2d' for sklearn compatibility
- Inherits full training, evaluation, and prediction logic from BaseModelController
- Respects force_layout if specified in step configuration
"""
# Set layout preference for sklearn models (force_layout overrides preferred)
context = context.with_layout(self.get_effective_layout(step_info))
# Call parent execute method
return super().execute(step_info, dataset, context, runtime_context, source, mode, loaded_binaries, prediction_store)