Source code for nirs4all.controllers.models.components.prediction_assembler

"""
Prediction Data Assembler - Assemble prediction data for storage

This component creates structured prediction records from model outputs.
Extracted from launch_training() lines 462-494 and _create_fold_averages()
to eliminate duplicate assembly logic.
"""

from dataclasses import dataclass
from typing import List, Tuple, Optional, Any
import numpy as np


[docs] @dataclass class PartitionPrediction: """Single partition prediction data.""" partition: str # 'train', 'val', or 'test' indices: List[int] # Sample indices y_true: np.ndarray # True values (unscaled) y_pred: np.ndarray # Predicted values (unscaled) score: float # Evaluation score for this partition
[docs] @dataclass class PredictionRecord: """Complete prediction record for storage.""" metadata: dict # Model and pipeline metadata partitions: List[Tuple[str, List[int], np.ndarray, np.ndarray]] # [(partition, indices, y_true, y_pred)]
[docs] class PredictionDataAssembler: """Assembles prediction data for storage. Creates structured prediction records with all metadata required for storage in the prediction database. Example: >>> assembler = PredictionDataAssembler() >>> record = assembler.assemble( ... dataset=dataset, ... identifiers=identifiers, ... scores={'train': 0.95, 'val': 0.90, 'test': 0.88}, ... predictions={'train': y_train_pred, 'val': y_val_pred, 'test': y_test_pred}, ... true_values={'train': y_train, 'val': y_val, 'test': y_test}, ... indices={'train': train_idx, 'val': val_idx, 'test': test_idx}, ... runner=runner, ... X_shape=X_train.shape, ... best_params=params ... ) """
[docs] def assemble( self, dataset: Any, identifiers: Any, # ModelIdentifiers scores: dict, # {'train': float, 'val': float, 'test': float} predictions: dict, # {'train': ndarray, 'val': ndarray, 'test': ndarray} true_values: dict, # {'train': ndarray, 'val': ndarray, 'test': ndarray} indices: dict, # {'train': list, 'val': list, 'test': list} runner: Any, X_shape: Tuple[int, ...], best_params: Optional[dict] = None, context: Any = None # ExecutionContext for branch info ) -> dict: """Assemble complete prediction record. Args: dataset: SpectroDataset instance identifiers: ModelIdentifiers with name, id, etc. scores: Dictionary of scores per partition predictions: Dictionary of prediction arrays per partition (unscaled) true_values: Dictionary of true value arrays per partition (unscaled) indices: Dictionary of sample indices per partition runner: PipelineRunner instance X_shape: Shape of input data (for n_features) best_params: Optional hyperparameters from optimization context: Optional ExecutionContext for branch information Returns: Dictionary ready for storage in prediction database """ pipeline_uid = getattr(runner, 'pipeline_uid', None) pipeline_name = runner.saver.pipeline_id dataset_name = dataset.name # Get trace_id from runtime context (Phase 2) trace_id = runner.get_trace_id() if hasattr(runner, 'get_trace_id') else None # Ensure task_type is a string (convert from enum if needed) task_type_str = str(dataset.task_type.value) if hasattr(dataset.task_type, 'value') else str(dataset.task_type) # Extract branch info from context branch_id = None branch_name = None if context is not None: selector = getattr(context, 'selector', None) if selector is not None: branch_id = getattr(selector, 'branch_id', None) branch_name = getattr(selector, 'branch_name', None) prediction_data = { 'dataset_name': dataset_name, 'dataset_path': dataset_name, 'config_name': pipeline_name, 'config_path': f"{dataset_name}/{pipeline_name}", 'pipeline_uid': pipeline_uid if pipeline_uid else "", 'trace_id': trace_id, # Phase 2: Link to execution trace 'step_idx': int(identifiers.step_id) if identifiers.step_id else 0, 'op_counter': int(identifiers.operation_counter), 'model_name': identifiers.name, 'model_classname': identifiers.classname, 'model_path': f"{dataset_name}/{pipeline_name}/{identifiers.step_id}_{identifiers.name}_{identifiers.operation_counter}.pkl", 'fold_id': identifiers.fold_idx, 'val_score': scores.get('val', 0.0), 'test_score': scores.get('test', 0.0), 'train_score': scores.get('train', 0.0), 'metric': scores.get('metric', 'unknown'), 'task_type': task_type_str, 'n_features': X_shape[1] if len(X_shape) > 1 else 1, 'preprocessings': dataset.short_preprocessings_str(), 'best_params': best_params if best_params is not None else {}, 'partitions': [], 'partition_metadata': {}, # Store metadata per partition for aggregation 'branch_id': branch_id, 'branch_name': branch_name or "", } # Cache train metadata since val samples come from train partition train_meta_df = None try: train_meta_df = dataset.metadata({"partition": "train"}) except (KeyError, AttributeError, ValueError, TypeError): pass # Add partition data with metadata for partition in ['train', 'val', 'test']: if partition in predictions: prediction_data['partitions'].append(( partition, indices[partition], true_values[partition], predictions[partition] )) # Extract metadata for this partition if available # Note: For 'val' partition in CV, samples come from train partition # so we use train metadata with val indices to select the right rows try: if partition == 'val': # Validation samples are from train partition partition_metadata = train_meta_df elif partition == 'train': partition_metadata = train_meta_df else: # test partition_metadata = dataset.metadata({"partition": partition}) if partition_metadata is not None and len(partition_metadata) > 0: # Get the sample indices for this partition's predictions partition_indices = indices.get(partition, []) # Convert to dictionary with column names as keys and arrays as values metadata_dict = {} for col in partition_metadata.columns: col_data = partition_metadata[col].to_numpy() # If we have indices and they're within bounds, use them to select metadata if len(partition_indices) > 0 and max(partition_indices) < len(col_data): # Select metadata rows corresponding to prediction indices selected_data = col_data[partition_indices] metadata_dict[col] = selected_data.tolist() elif len(col_data) == len(predictions[partition]): # Direct match - use as-is metadata_dict[col] = col_data.tolist() if metadata_dict: prediction_data['partition_metadata'][partition] = metadata_dict except (KeyError, AttributeError, ValueError, TypeError, IndexError): # If metadata extraction fails, continue without it pass return prediction_data
[docs] def assemble_fold_average( self, base_prediction: dict, averaged_predictions: dict, # {'train': ndarray, 'val': ndarray, 'test': ndarray} averaged_scores: dict, # {'train': float, 'val': float, 'test': float} is_weighted: bool = False ) -> dict: """Assemble prediction record for fold-averaged model. Args: base_prediction: Base prediction record from a single fold (for metadata) averaged_predictions: Dictionary of averaged prediction arrays averaged_scores: Dictionary of averaged scores is_weighted: Whether averaging was weighted by scores Returns: Dictionary ready for storage as fold-averaged prediction """ avg_prediction = base_prediction.copy() # Update name to indicate averaging suffix = "_weighted_avg" if is_weighted else "_avg" avg_prediction['model_name'] = base_prediction['model_name'] + suffix avg_prediction['fold_id'] = None # No specific fold for average # Ensure step_idx is an integer (copy from base may have string) if 'step_idx' in avg_prediction: step_idx = avg_prediction['step_idx'] avg_prediction['step_idx'] = int(step_idx) if step_idx and str(step_idx).strip() else 0 # Update scores avg_prediction['train_score'] = averaged_scores.get('train', 0.0) avg_prediction['val_score'] = averaged_scores.get('val', 0.0) avg_prediction['test_score'] = averaged_scores.get('test', 0.0) # Update partition predictions with averaged values updated_partitions = [] for partition, indices, y_true, _ in base_prediction['partitions']: if partition in averaged_predictions: updated_partitions.append(( partition, indices, y_true, averaged_predictions[partition] )) avg_prediction['partitions'] = updated_partitions return avg_prediction