Source code for nirs4all.controllers.models.stacking.classification

"""
Classification Support for Meta-Model Stacking.

Phase 5 Implementation - Provides utilities for:
1. Detecting classification vs regression task types from predictions
2. Extracting probability features for classification stacking
3. Handling binary and multiclass classification scenarios
4. Generating meaningful feature names with class information

Key components:
- ClassificationFeatureExtractor: Extracts probability features from predictions
- TaskTypeDetector: Detects task type from prediction metadata
- FeatureNameGenerator: Creates descriptive feature names for meta-features
"""

from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, TYPE_CHECKING, Union
import numpy as np
import warnings

if TYPE_CHECKING:
    from nirs4all.data.predictions import Predictions
    from nirs4all.pipeline.config.context import ExecutionContext


[docs] class StackingTaskType(Enum): """Task type for stacking. Attributes: REGRESSION: Regression task using y_pred as features. BINARY_CLASSIFICATION: Binary classification (2 classes). MULTICLASS_CLASSIFICATION: Multi-class classification (>2 classes). UNKNOWN: Could not determine task type. """ REGRESSION = "regression" BINARY_CLASSIFICATION = "binary_classification" MULTICLASS_CLASSIFICATION = "multiclass_classification" UNKNOWN = "unknown" @property def is_classification(self) -> bool: """Check if this is a classification task type.""" return self in ( StackingTaskType.BINARY_CLASSIFICATION, StackingTaskType.MULTICLASS_CLASSIFICATION ) @property def n_classes(self) -> Optional[int]: """Return expected number of classes or None for regression.""" if self == StackingTaskType.BINARY_CLASSIFICATION: return 2 elif self == StackingTaskType.MULTICLASS_CLASSIFICATION: return None # Variable return None
[docs] @dataclass class ClassificationInfo: """Information about classification task detected from predictions. Attributes: task_type: Detected task type (regression/binary/multiclass). n_classes: Number of classes if classification, else None. class_labels: Optional class labels if available. has_probabilities: Whether y_proba is available in predictions. proba_shape: Shape of probability arrays if available. """ task_type: StackingTaskType n_classes: Optional[int] = None class_labels: Optional[List[Any]] = None has_probabilities: bool = False proba_shape: Optional[Tuple[int, ...]] = None @property def is_classification(self) -> bool: """Check if this is a classification task.""" return self.task_type.is_classification @property def is_binary(self) -> bool: """Check if this is binary classification.""" return self.task_type == StackingTaskType.BINARY_CLASSIFICATION @property def is_multiclass(self) -> bool: """Check if this is multiclass classification.""" return self.task_type == StackingTaskType.MULTICLASS_CLASSIFICATION
[docs] def get_n_features_per_model(self, use_proba: bool = False) -> int: """Get number of features per source model. Args: use_proba: Whether probability features are requested. Returns: Number of feature columns per source model. - Regression: 1 (y_pred) - Binary + use_proba: 1 (positive class probability) - Multiclass + use_proba: n_classes (all class probabilities) - Classification without use_proba: 1 (y_pred) """ if not use_proba or not self.is_classification: return 1 if self.is_binary: return 1 # Only positive class probability if self.is_multiclass and self.n_classes: return self.n_classes return 1 # Fallback
[docs] class TaskTypeDetector: """Detects task type from prediction metadata. Uses prediction store metadata and y_proba presence to determine whether the stacking involves regression or classification. """ def __init__(self, prediction_store: 'Predictions'): """Initialize detector. Args: prediction_store: Predictions storage with metadata. """ self.prediction_store = prediction_store
[docs] def detect( self, source_model_names: List[str], context: 'ExecutionContext' ) -> ClassificationInfo: """Detect task type from source model predictions. Examines predictions from source models to determine task type and gather classification metadata. Args: source_model_names: List of source model names to examine. context: Execution context with branch info. Returns: ClassificationInfo with detected task type and metadata. """ branch_id = getattr(context.selector, 'branch_id', None) current_step = context.state.step_number # Check each source model for task type info task_types_found = [] n_classes_found = [] has_proba = False proba_shape = None for model_name in source_model_names: info = self._get_model_task_info( model_name, branch_id, current_step ) if info: task_types_found.append(info['task_type']) if info.get('n_classes'): n_classes_found.append(info['n_classes']) if info.get('has_proba'): has_proba = True if info.get('proba_shape'): proba_shape = info['proba_shape'] # Determine overall task type task_type = self._resolve_task_type(task_types_found) # Determine number of classes n_classes = None if n_classes_found: n_classes = max(n_classes_found) # Use max to handle any inconsistency return ClassificationInfo( task_type=task_type, n_classes=n_classes, class_labels=None, # Could be extracted from metadata if available has_probabilities=has_proba, proba_shape=proba_shape )
def _get_model_task_info( self, model_name: str, branch_id: Optional[int], max_step: int ) -> Optional[Dict[str, Any]]: """Get task type info for a single model. Args: model_name: Name of the source model. branch_id: Branch ID filter. max_step: Maximum step index (exclusive). Returns: Dictionary with task_type, n_classes, has_proba, proba_shape or None if no predictions found. """ filter_kwargs = { 'model_name': model_name, 'partition': 'val', # Check validation predictions 'load_arrays': True, } if branch_id is not None: filter_kwargs['branch_id'] = branch_id predictions = self.prediction_store.filter_predictions(**filter_kwargs) # Filter by step predictions = [p for p in predictions if p.get('step_idx', 0) < max_step] # Filter out averaged predictions predictions = [ p for p in predictions if str(p.get('fold_id', '')) not in {'avg', 'w_avg'} ] if not predictions: return None # Take first prediction for task type info pred = predictions[0] task_type_str = pred.get('task_type', 'regression') task_type = self._string_to_task_type(task_type_str) # Check for probabilities y_proba = pred.get('y_proba') has_proba = y_proba is not None and ( hasattr(y_proba, 'size') and y_proba.size > 0 ) proba_shape = None n_classes = None if has_proba: y_proba = np.asarray(y_proba) proba_shape = y_proba.shape if y_proba.ndim == 2: n_classes = y_proba.shape[1] elif y_proba.ndim == 1: n_classes = 2 # Binary with single probability column elif task_type.is_classification: # Try to infer n_classes from y_true/y_pred y_true = pred.get('y_true') if y_true is not None: y_true = np.asarray(y_true) n_classes = len(np.unique(y_true)) return { 'task_type': task_type, 'n_classes': n_classes, 'has_proba': has_proba, 'proba_shape': proba_shape } def _string_to_task_type(self, task_type_str: str) -> StackingTaskType: """Convert task type string to StackingTaskType enum. Args: task_type_str: Task type string from predictions. Returns: StackingTaskType enum value. """ task_type_str = task_type_str.lower() if 'binary' in task_type_str: return StackingTaskType.BINARY_CLASSIFICATION elif 'multiclass' in task_type_str or 'classification' in task_type_str: # Need to check more carefully if 'binary' not in task_type_str: return StackingTaskType.MULTICLASS_CLASSIFICATION return StackingTaskType.BINARY_CLASSIFICATION elif 'regression' in task_type_str: return StackingTaskType.REGRESSION return StackingTaskType.UNKNOWN def _resolve_task_type( self, task_types: List[StackingTaskType] ) -> StackingTaskType: """Resolve conflicting task types from multiple models. Args: task_types: List of task types from different models. Returns: Resolved task type. Note: All source models should have the same task type. If mixed, we warn and use the most common. """ if not task_types: return StackingTaskType.UNKNOWN # Filter out unknown known_types = [t for t in task_types if t != StackingTaskType.UNKNOWN] if not known_types: return StackingTaskType.UNKNOWN # Check for consistency unique_types = set(known_types) if len(unique_types) == 1: return known_types[0] # Mixed types - warn and use most common from collections import Counter counter = Counter(known_types) most_common = counter.most_common(1)[0][0] warnings.warn( f"Mixed task types detected in source models: {unique_types}. " f"Using most common: {most_common.value}. " f"All source models should have the same task type for proper stacking." ) return most_common
[docs] class ClassificationFeatureExtractor: """Extracts classification features from predictions. Handles extraction of probability features for binary and multiclass classification, with proper handling of different array shapes. """ def __init__( self, classification_info: ClassificationInfo, use_proba: bool = False ): """Initialize extractor. Args: classification_info: Classification metadata. use_proba: Whether to extract probability features. """ self.classification_info = classification_info self.use_proba = use_proba
[docs] def extract_features( self, pred: Dict[str, Any], n_samples: int ) -> np.ndarray: """Extract features from a single prediction entry. Args: pred: Prediction dictionary with y_pred and optionally y_proba. n_samples: Expected number of samples. Returns: Feature array of shape (n_samples,) or (n_samples, n_features). """ if self.use_proba and self.classification_info.is_classification: return self._extract_proba_features(pred, n_samples) else: return self._extract_pred_features(pred, n_samples)
def _extract_pred_features( self, pred: Dict[str, Any], n_samples: int ) -> np.ndarray: """Extract y_pred as features. Args: pred: Prediction dictionary. n_samples: Expected number of samples. Returns: 1D array of predictions. """ y_pred = pred.get('y_pred', []) y_pred = np.asarray(y_pred).flatten() if len(y_pred) != n_samples: # Pad or truncate result = np.full(n_samples, np.nan) result[:min(len(y_pred), n_samples)] = y_pred[:n_samples] return result return y_pred def _extract_proba_features( self, pred: Dict[str, Any], n_samples: int ) -> np.ndarray: """Extract probability features. For binary classification: returns positive class probability (1 column). For multiclass: returns all class probabilities (n_classes columns). Args: pred: Prediction dictionary. n_samples: Expected number of samples. Returns: Array of shape (n_samples,) for binary or (n_samples, n_classes) for multiclass. """ y_proba = pred.get('y_proba') # Fallback to y_pred if no probabilities if y_proba is None or (hasattr(y_proba, 'size') and y_proba.size == 0): warnings.warn( f"use_proba=True but no y_proba available for model " f"{pred.get('model_name', 'unknown')}. Falling back to y_pred." ) return self._extract_pred_features(pred, n_samples) y_proba = np.asarray(y_proba) if self.classification_info.is_binary: return self._extract_binary_proba(y_proba, n_samples) else: return self._extract_multiclass_proba(y_proba, n_samples) def _extract_binary_proba( self, y_proba: np.ndarray, n_samples: int ) -> np.ndarray: """Extract binary classification probability. Returns probability of the positive class (class 1). Args: y_proba: Probability array. n_samples: Expected number of samples. Returns: 1D array of positive class probabilities. """ if y_proba.ndim == 1: # Already 1D - assume it's positive class probability proba_1d = y_proba elif y_proba.ndim == 2: if y_proba.shape[1] == 2: # Standard (n_samples, 2) shape - take positive class proba_1d = y_proba[:, 1] elif y_proba.shape[1] == 1: # Single column - treat as positive class proba_1d = y_proba[:, 0] else: # More than 2 classes - should be multiclass, take positive warnings.warn( f"Expected binary probabilities but got shape {y_proba.shape}. " f"Using column 1 as positive class." ) proba_1d = y_proba[:, 1] if y_proba.shape[1] > 1 else y_proba[:, 0] else: raise ValueError(f"Unexpected y_proba shape: {y_proba.shape}") # Handle size mismatch if len(proba_1d) != n_samples: result = np.full(n_samples, np.nan) result[:min(len(proba_1d), n_samples)] = proba_1d[:n_samples] return result return proba_1d def _extract_multiclass_proba( self, y_proba: np.ndarray, n_samples: int ) -> np.ndarray: """Extract multiclass classification probabilities. Returns all class probabilities as separate features. Args: y_proba: Probability array. n_samples: Expected number of samples. Returns: 2D array of shape (n_samples, n_classes). """ if y_proba.ndim == 1: # Convert 1D to 2D for consistency # Assume binary if 1D proba_2d = np.column_stack([1 - y_proba, y_proba]) elif y_proba.ndim == 2: proba_2d = y_proba else: raise ValueError(f"Unexpected y_proba shape: {y_proba.shape}") # Handle size mismatch if proba_2d.shape[0] != n_samples: n_classes = proba_2d.shape[1] result = np.full((n_samples, n_classes), np.nan) n_copy = min(proba_2d.shape[0], n_samples) result[:n_copy, :] = proba_2d[:n_copy, :] return result return proba_2d
[docs] def get_n_features(self) -> int: """Get number of features that will be extracted per model. Returns: Number of feature columns per source model. """ return self.classification_info.get_n_features_per_model(self.use_proba)
[docs] class FeatureNameGenerator: """Generates meaningful feature names for meta-model. Creates descriptive feature names that include model name and, for classification with probabilities, class information. """ def __init__( self, classification_info: ClassificationInfo, use_proba: bool = False, pattern: str = "{model_name}_pred" ): """Initialize generator. Args: classification_info: Classification metadata. use_proba: Whether probability features are used. pattern: Base pattern for feature names. """ self.classification_info = classification_info self.use_proba = use_proba self.pattern = pattern
[docs] def generate_names( self, source_model_names: List[str] ) -> List[str]: """Generate feature names for all source models. Args: source_model_names: List of source model names. Returns: List of feature column names. """ names = [] for model_name in source_model_names: model_names = self._generate_model_names(model_name) names.extend(model_names) return names
def _generate_model_names(self, model_name: str) -> List[str]: """Generate feature names for a single source model. Args: model_name: Source model name. Returns: List of feature names (1 for regression, may be more for classification). """ if not self.use_proba or not self.classification_info.is_classification: # Single prediction feature return [self._format_name(model_name, suffix="_pred")] if self.classification_info.is_binary: # Single probability feature (positive class) return [self._format_name(model_name, suffix="_proba_1")] # Multiclass - one feature per class n_classes = self.classification_info.n_classes or 2 names = [] for class_idx in range(n_classes): names.append( self._format_name(model_name, suffix=f"_proba_{class_idx}") ) return names def _format_name(self, model_name: str, suffix: str = "") -> str: """Format a single feature name. Args: model_name: Source model name. suffix: Suffix to append (only used with default pattern). Returns: Formatted feature name. """ # Use simple format if pattern is default default_pattern = "{model_name}_pred" if self.pattern == default_pattern: return f"{model_name}{suffix}" # Custom pattern: use pattern as-is, only add class suffix for multiclass try: base = self.pattern.format(model_name=model_name) # Only append class suffix for multiclass proba (e.g., _proba_0, _proba_1) if suffix.startswith("_proba_") and suffix != "_proba_1": return f"{base}{suffix}" return base except KeyError: return f"{model_name}{suffix}"
[docs] def get_feature_importance_mapping( self, source_model_names: List[str] ) -> Dict[str, List[str]]: """Get mapping from source models to their feature names. Useful for feature importance analysis. Args: source_model_names: List of source model names. Returns: Dictionary mapping model name to list of feature names. """ mapping = {} for model_name in source_model_names: feature_names = self._generate_model_names(model_name) mapping[model_name] = feature_names return mapping
[docs] @dataclass class MetaFeatureInfo: """Information about generated meta-features. Used for tracking feature importance and providing interpretable results. Attributes: feature_names: List of all feature column names. source_models: List of source model names. feature_to_model: Mapping from feature name to source model. classification_info: Classification metadata. n_features_per_model: Number of features from each model. """ feature_names: List[str] source_models: List[str] feature_to_model: Dict[str, str] classification_info: ClassificationInfo n_features_per_model: Dict[str, int] = field(default_factory=dict)
[docs] def get_model_for_feature(self, feature_name: str) -> Optional[str]: """Get source model name for a feature. Args: feature_name: Feature column name. Returns: Source model name or None if not found. """ return self.feature_to_model.get(feature_name)
[docs] def aggregate_importance_by_model( self, feature_importances: Dict[str, float] ) -> Dict[str, float]: """Aggregate feature importances by source model. Sums importance scores for all features from the same source model. Args: feature_importances: Mapping from feature name to importance score. Returns: Mapping from model name to aggregated importance. """ model_importance = {model: 0.0 for model in self.source_models} for feature_name, importance in feature_importances.items(): model_name = self.get_model_for_feature(feature_name) if model_name is not None: model_importance[model_name] += importance return model_importance
[docs] def build_meta_feature_info( source_model_names: List[str], classification_info: ClassificationInfo, use_proba: bool = False, name_pattern: str = "{model_name}_pred" ) -> MetaFeatureInfo: """Build MetaFeatureInfo from source models and classification info. Args: source_model_names: List of source model names. classification_info: Classification metadata. use_proba: Whether probability features are used. name_pattern: Pattern for feature names. Returns: MetaFeatureInfo with all mappings populated. """ generator = FeatureNameGenerator( classification_info=classification_info, use_proba=use_proba, pattern=name_pattern ) feature_names = generator.generate_names(source_model_names) feature_to_model_mapping = generator.get_feature_importance_mapping(source_model_names) # Invert to get feature->model mapping feature_to_model = {} n_features_per_model = {} for model_name, features in feature_to_model_mapping.items(): n_features_per_model[model_name] = len(features) for feature in features: feature_to_model[feature] = model_name return MetaFeatureInfo( feature_names=feature_names, source_models=source_model_names, feature_to_model=feature_to_model, classification_info=classification_info, n_features_per_model=n_features_per_model )