Source code for nirs4all.data.ensemble_utils

"""
Ensemble Prediction Utilities - Weighted averaging for ensemble predictions

This module provides utilities for combining predictions from multiple models
using weighted averaging based on their scores. Relocated from utils/model_utils.py
to be with data/prediction modules.

Supports both regression (numeric averaging) and classification (soft/hard voting).
"""

from typing import List, Dict, Any, Optional, Tuple
import numpy as np


[docs] class EnsembleUtils: """Utilities for ensemble prediction with weighted averaging and voting.""" # ========================================================================= # Classification Ensemble Methods (Soft/Hard Voting) # =========================================================================
[docs] @staticmethod def compute_soft_voting_average( probability_arrays: List[np.ndarray], weights: Optional[np.ndarray] = None, use_confidence_weighting: bool = False ) -> Tuple[np.ndarray, np.ndarray]: """Compute soft voting average of class probabilities. Averages probability distributions from multiple models (weighted or simple), then takes argmax to get final class predictions. Args: probability_arrays: List of probability arrays, each shape (n_samples, n_classes). Arrays can have different numbers of classes; they will be padded/aligned to the maximum number of classes found. weights: Optional weights for each model (fold weights based on validation scores). If None, uses uniform weights. use_confidence_weighting: If True, additionally weight each fold's contribution per-sample by its prediction confidence (max probability). This gives more influence to confident predictions. Returns: Tuple of: - class_predictions: Class labels as (n_samples, 1) array - averaged_probabilities: Averaged probabilities (n_samples, n_classes) Raises: ValueError: If probability_arrays is empty or sample counts don't match. """ if not probability_arrays: raise ValueError("probability_arrays cannot be empty") # Validate sample counts and find max classes n_samples = probability_arrays[0].shape[0] max_classes = probability_arrays[0].shape[1] if probability_arrays[0].ndim > 1 else 1 for i, arr in enumerate(probability_arrays): if arr.shape[0] != n_samples: raise ValueError(f"Array {i} has {arr.shape[0]} samples, expected {n_samples}") n_classes = arr.shape[1] if arr.ndim > 1 else 1 max_classes = max(max_classes, n_classes) # Align arrays to max_classes by padding with zeros aligned_arrays = [] for arr in probability_arrays: if arr.ndim == 1: arr = arr.reshape(-1, 1) n_classes = arr.shape[1] if n_classes < max_classes: # Pad with zeros for missing classes padding = np.zeros((n_samples, max_classes - n_classes), dtype=arr.dtype) arr = np.hstack([arr, padding]) aligned_arrays.append(arr) n_models = len(aligned_arrays) # Default to uniform fold weights if weights is None: fold_weights = np.ones(n_models) / n_models else: fold_weights = np.asarray(weights, dtype=float) # Normalize weights to sum to 1 fold_weights = fold_weights / np.sum(fold_weights) if use_confidence_weighting: # Confidence-weighted averaging: each sample gets per-fold weights # based on prediction confidence (max probability) averaged_probs = np.zeros((n_samples, max_classes), dtype=float) for sample_idx in range(n_samples): # Compute confidence for each fold at this sample confidences = np.array([ np.max(probs[sample_idx]) for probs in aligned_arrays ]) # Combine fold weights with confidence weights combined_weights = fold_weights * confidences combined_weights = combined_weights / np.sum(combined_weights) # Normalize # Weighted average for this sample for fold_idx, probs in enumerate(aligned_arrays): averaged_probs[sample_idx] += combined_weights[fold_idx] * probs[sample_idx] else: # Standard weighted average of probabilities averaged_probs = np.zeros((n_samples, max_classes), dtype=float) for probs, w in zip(aligned_arrays, fold_weights): averaged_probs += w * probs # Get class predictions via argmax class_predictions = np.argmax(averaged_probs, axis=1).reshape(-1, 1).astype(float) return class_predictions, averaged_probs
[docs] @staticmethod def compute_hard_voting( class_predictions: List[np.ndarray], weights: Optional[np.ndarray] = None, n_classes: Optional[int] = None ) -> np.ndarray: """Compute hard voting (majority vote) from class predictions. Each model votes for a class, and the class with most votes wins. Supports weighted voting where each model's vote is weighted. Args: class_predictions: List of class prediction arrays, each shape (n_samples,) or (n_samples, 1). weights: Optional weights for each model's vote. If None, uses uniform weights (standard majority vote). n_classes: Number of classes. If None, inferred from predictions. Returns: Final class predictions as (n_samples, 1) array. Raises: ValueError: If class_predictions is empty. """ if not class_predictions: raise ValueError("class_predictions cannot be empty") n_models = len(class_predictions) n_samples = class_predictions[0].shape[0] # Flatten all predictions to 1D predictions = [np.asarray(p).flatten().astype(int) for p in class_predictions] # Infer n_classes if not provided if n_classes is None: n_classes = max(p.max() for p in predictions) + 1 # Default to uniform weights if weights is None: weights = np.ones(n_models) else: weights = np.asarray(weights, dtype=float) # Count weighted votes for each class per sample vote_counts = np.zeros((n_samples, n_classes), dtype=float) for pred, w in zip(predictions, weights): for sample_idx in range(n_samples): class_idx = pred[sample_idx] vote_counts[sample_idx, class_idx] += w # Get winning class (most votes) final_predictions = np.argmax(vote_counts, axis=1).reshape(-1, 1).astype(float) return final_predictions
# ========================================================================= # Regression Ensemble Methods (Weighted Averaging) # =========================================================================
[docs] @staticmethod def compute_weighted_average( arrays: List[np.ndarray], scores: List[float], metric: Optional[str] = None, higher_is_better: Optional[bool] = None ) -> np.ndarray: """ Compute weighted average of arrays based on their scores. Args: arrays: List of numpy arrays to average (must have same shape) scores: List of scores corresponding to each array metric: Name of the metric (used to determine if higher is better) Supported: 'mse', 'rmse', 'mae', 'r2', 'accuracy', 'f1', 'precision', 'recall' higher_is_better: Boolean indicating if higher scores are better If None, will be inferred from metric name Returns: Weighted average array Raises: ValueError: If arrays have different shapes or invalid parameters """ if not arrays: raise ValueError("arrays list cannot be empty") if len(arrays) != len(scores): raise ValueError(f"Number of arrays ({len(arrays)}) must match number of scores ({len(scores)})") # Convert to numpy arrays and validate shapes arrays = [np.asarray(arr) for arr in arrays] base_shape = arrays[0].shape for i, arr in enumerate(arrays): if arr.shape != base_shape: raise ValueError(f"Array {i} has shape {arr.shape}, expected {base_shape}") scores_array = np.asarray(scores, dtype=float) # Determine if higher scores are better if higher_is_better is None: if metric is None: raise ValueError("Either 'metric' or 'higher_is_better' must be specified") higher_is_better = EnsembleUtils._is_higher_better(metric) # Convert scores to weights weights = EnsembleUtils._scores_to_weights(scores_array, higher_is_better) # Compute weighted average weighted_sum = np.zeros_like(arrays[0], dtype=float) for arr, weight in zip(arrays, weights): weighted_sum += weight * arr return weighted_sum
@staticmethod def _is_higher_better(metric: str) -> bool: """ Determine if higher values are better for a given metric. Args: metric: Metric name Returns: True if higher is better, False if lower is better """ # Metrics where higher is better higher_better_metrics = { 'r2', 'accuracy', 'f1', 'precision', 'recall', 'auc', 'roc_auc', 'score' } # Metrics where lower is better lower_better_metrics = { 'mse', 'rmse', 'mae', 'loss', 'error', 'mean_squared_error', 'mean_absolute_error', 'root_mean_squared_error' } metric_lower = metric.lower() if metric_lower in higher_better_metrics: return True elif metric_lower in lower_better_metrics: return False else: # Default assumption: if it contains 'error', 'loss', or 'mse', lower is better if any(term in metric_lower for term in ['error', 'loss', 'mse', 'mae']): return False else: # Default to higher is better for unknown metrics return True @staticmethod def _scores_to_weights(scores: np.ndarray, higher_is_better: bool) -> np.ndarray: """ Convert scores to normalized weights for weighted averaging. Args: scores: Array of scores higher_is_better: Whether higher scores are better Returns: Array of normalized weights (sum to 1.0) """ scores = scores.astype(float) # Handle edge case: all scores are the same if np.allclose(scores, scores[0]): return np.ones_like(scores) / len(scores) if higher_is_better: # For higher-is-better metrics, use scores directly # Ensure non-negative by shifting if needed if np.min(scores) < 0: shifted_scores = scores - np.min(scores) else: shifted_scores = scores.copy() # Handle case where all shifted scores are zero if np.allclose(shifted_scores, 0): return np.ones_like(scores) / len(scores) weights = shifted_scores else: # For lower-is-better metrics, invert the scores min_score = np.min(scores) if min_score <= 0: # Shift scores to be positive shifted_scores = scores - min_score + 1e-8 else: shifted_scores = scores.copy() # Invert: better (lower) scores get higher weights weights = 1.0 / shifted_scores # Normalize weights to sum to 1 weights = weights / np.sum(weights) return weights
[docs] @staticmethod def compute_ensemble_prediction( predictions_data: List[Dict[str, Any]], score_metric: str = "test_score", prediction_key: str = "y_pred", metric_for_direction: Optional[str] = None, higher_is_better: Optional[bool] = None ) -> Dict[str, Any]: """ Compute ensemble prediction from a list of prediction dictionaries. Args: predictions_data: List of prediction dictionaries score_metric: Key to extract score from each prediction prediction_key: Key to extract predictions array from each prediction metric_for_direction: Metric name to infer direction (if higher_is_better is None) higher_is_better: Whether higher scores are better (None to infer) Returns: Dictionary with ensemble prediction and metadata Raises: ValueError: If predictions_data is empty or missing required keys """ if not predictions_data: raise ValueError("predictions_data cannot be empty") # Extract arrays and scores arrays = [] scores = [] metadata = { 'model_names': [], 'individual_scores': [], 'weights': [], 'n_models': len(predictions_data) } for pred_dict in predictions_data: # Get prediction array if prediction_key not in pred_dict: raise ValueError(f"Prediction key '{prediction_key}' not found in prediction data") pred_array = pred_dict[prediction_key] if isinstance(pred_array, list): pred_array = np.array(pred_array) elif not isinstance(pred_array, np.ndarray): pred_array = np.asarray(pred_array) arrays.append(pred_array) # Get score if score_metric not in pred_dict: raise ValueError(f"Score metric '{score_metric}' not found in prediction data") score = pred_dict[score_metric] if score is None: raise ValueError(f"Score metric '{score_metric}' is None for one of the predictions") scores.append(float(score)) # Collect metadata metadata['model_names'].append(pred_dict.get('model_name', 'unknown')) metadata['individual_scores'].append(score) # Determine scoring direction if higher_is_better is None: if metric_for_direction is None: # Try to infer from score_metric name metric_for_direction = score_metric higher_is_better = EnsembleUtils._is_higher_better(metric_for_direction) # Compute weighted average ensemble_pred = EnsembleUtils.compute_weighted_average( arrays=arrays, scores=scores, higher_is_better=higher_is_better ) # Calculate weights for metadata weights = EnsembleUtils._scores_to_weights(np.array(scores), higher_is_better) metadata['weights'] = weights.tolist() metadata['weight_sum'] = float(np.sum(weights)) # Should be 1.0 metadata['score_direction'] = 'higher_better' if higher_is_better else 'lower_better' # Create result dictionary result = { 'y_pred': ensemble_pred, 'ensemble_method': 'weighted_average', 'score_metric': score_metric, 'n_models': len(predictions_data), 'metadata': metadata } # Copy other common fields from first prediction first_pred = predictions_data[0] for key in ['dataset_name', 'partition', 'task_type', 'y_true', 'n_samples', 'n_features']: if key in first_pred: result[key] = first_pred[key] return result