Source code for nirs4all.data._predictions.result

"""
User-facing result container classes for predictions.

This module provides PredictionResult and PredictionResultsList classes
that extend standard Python dict/list with prediction-specific functionality.
"""

import csv
import io
import json
from typing import Dict, Any, List, Optional, Union
from pathlib import Path
import numpy as np
import polars as pl

from nirs4all.core.logging import get_logger
from nirs4all.core import metrics as evaluator

logger = get_logger(__name__)


[docs] class PredictionResult(dict): """ Enhanced dictionary for a single prediction with convenience methods. Extends standard dict with property accessors and methods for saving, evaluating, and summarizing predictions. Features: - Property accessors (id, model_name, dataset_name, etc.) - save_to_csv() - save individual result - eval_score() - compute metrics on-the-fly - summary() - generate tab report Examples: >>> result = PredictionResult({ ... "id": "abc123", ... "dataset_name": "wheat", ... "model_name": "PLS", ... "y_true": [1, 2, 3], ... "y_pred": [1.1, 2.2, 3.3] ... }) >>> result.model_name 'PLS' >>> scores = result.eval_score(["rmse", "r2"]) >>> result.save_to_csv("results") """ @property def id(self) -> str: """Get prediction ID.""" return self.get("id", "unknown") @property def fold_id(self) -> str: """Get fold ID.""" return self.get("fold_id", "unknown") @property def dataset_name(self) -> str: """Get dataset name.""" return self.get("dataset_name", "unknown") @property def model_name(self) -> str: """Get model name.""" return self.get("model_name", "unknown") @property def step_idx(self) -> int: """Get pipeline step index.""" return self.get("step_idx", 0) @property def op_counter(self) -> int: """Get operation counter.""" return self.get("op_counter", 0) @property def config_name(self) -> str: """Get config name.""" return self.get("config_name", "unknown")
[docs] def save_to_csv(self, path_or_file: str = "results", filename: Optional[str] = None) -> None: """ Save prediction result to CSV file. Args: path_or_file: Base path (folder) or complete file path (if ends with .csv) filename: Optional filename (if path_or_file is a folder) Examples: >>> result.save_to_csv("output") # Saves to output/{dataset}/{id}.csv >>> result.save_to_csv("output/my_result.csv") # Saves to output/my_result.csv >>> result.save_to_csv("output", "my_result.csv") # Saves to output/my_result.csv """ destinations = [] path_obj = Path(path_or_file) # Check if path_or_file looks like a file (has extension .csv) is_file_1 = path_obj.suffix.lower() == '.csv' if is_file_1: destinations.append(path_obj) # If filename is also provided and looks like a file, save there too if filename: file_obj = Path(filename) if file_obj.suffix.lower() == '.csv': destinations.append(file_obj) else: # path_or_file is a directory base_dir = path_obj if filename: # filename provided destinations.append(base_dir / filename) else: # Auto-generate filename dataset_name = self.get("dataset_name", "unknown") model_id = self.get("id", "unknown") destinations.append(base_dir / dataset_name / f"{model_id}.csv") # Determine data structure csv_data = [] # Check if this is an aggregated result (has train/val/test keys) has_partitions = all(k in self for k in ["train", "val", "test"]) if has_partitions: # Aggregated data: create columns for each partition for partition in ["train", "val", "test"]: if partition in self and self[partition] is not None: partition_data = self[partition] y_true = partition_data.get("y_true", []) y_pred = partition_data.get("y_pred", []) # Get fold_id for column naming from partition data (more reliable) # For aggregated data, each partition might have its own fold_id partition_fold_id = partition_data.get("fold_id", self.get("fold_id", "")) if isinstance(partition_fold_id, list) and partition_fold_id: partition_fold_id = partition_fold_id[0] # Take first if it's a list fold_suffix = f"_fold{partition_fold_id}" if partition_fold_id and partition in ["train", "val"] else "" # Extend csv_data with this partition's data max_len = max(len(y_true), len(y_pred)) if y_true or y_pred else 0 for i in range(max_len): if i >= len(csv_data): csv_data.append({}) if i < len(y_true): csv_data[i][f"y_true_{partition}{fold_suffix}"] = y_true[i] if i < len(y_pred): csv_data[i][f"y_pred_{partition}{fold_suffix}"] = y_pred[i] else: # Single partition data: use y_true/y_pred from root y_true = self.get("y_true", []) y_pred = self.get("y_pred", []) # Check if arrays exist and have length (avoid ambiguous truth value with numpy arrays) has_y_true = y_true is not None and len(y_true) > 0 has_y_pred = y_pred is not None and len(y_pred) > 0 max_len = max(len(y_true) if has_y_true else 0, len(y_pred) if has_y_pred else 0) for i in range(max_len): row = {} if i < len(y_true): row["y_true"] = y_true[i] if i < len(y_pred): row["y_pred"] = y_pred[i] csv_data.append(row) if csv_data: # Convert to DataFrame and save # Handle potential nested data by converting to strings clean_csv_data = [] for row in csv_data: clean_row = {} for key, value in row.items(): if isinstance(value, (list, np.ndarray)): clean_row[key] = float(value[0]) if len(value) > 0 else 0.0 else: clean_row[key] = value clean_csv_data.append(clean_row) df_csv = pl.DataFrame(clean_csv_data) for filepath in destinations: # Create directory if it doesn't exist filepath.parent.mkdir(parents=True, exist_ok=True) df_csv.write_csv(str(filepath)) logger.info(f"Saved prediction result to {filepath}") else: logger.warning("No prediction data found to save")
[docs] def eval_score(self, metrics: Optional[List[str]] = None) -> Dict[str, Any]: """ Evaluate scores for this prediction using specified metrics. Args: metrics: List of metrics to compute (if None, returns all available metrics) Returns: Dictionary of metric names to scores. For aggregated results: {"train": {...}, "val": {...}, "test": {...}} For single partition: {"rmse": ..., "r2": ..., ...} Examples: >>> scores = result.eval_score(["rmse", "r2", "mae"]) >>> # For aggregated: scores = {"train": {"rmse": 0.5}, "val": {...}, "test": {...}} >>> # For single: scores = {"rmse": 0.5, "r2": 0.9} """ scores = {} # Check if this is an aggregated result has_partitions = all(k in self for k in ["train", "val", "test"]) if has_partitions: # For aggregated results, organize scores by partition in sub-dicts for partition in ["train", "val", "test"]: if partition in self and self[partition] is not None: partition_data = self[partition] y_true = partition_data.get("y_true", []) y_pred = partition_data.get("y_pred", []) if len(y_true) > 0 and len(y_pred) > 0: y_true_arr = np.array(y_true) y_pred_arr = np.array(y_pred) if metrics is None: # Get all available metrics using task_type task_type = self.get("task_type", "regression") partition_scores = evaluator.eval_multi(y_true_arr, y_pred_arr, task_type) else: # Get specific metrics partition_scores = {} for metric in metrics: try: partition_scores[metric] = evaluator.eval(y_true_arr, y_pred_arr, metric) except Exception: partition_scores[metric] = None # Store scores in partition sub-dictionary scores[partition] = partition_scores else: # Single partition: use root y_true/y_pred y_true = self.get("y_true", []) y_pred = self.get("y_pred", []) if len(y_true) > 0 and len(y_pred) > 0: y_true_arr = np.array(y_true) y_pred_arr = np.array(y_pred) if metrics is None: # Get all available metrics using task_type task_type = self.get("task_type", "regression") scores = evaluator.eval_multi(y_true_arr, y_pred_arr, task_type) else: # Get specific metrics for metric in metrics: try: scores[metric] = evaluator.eval(y_true_arr, y_pred_arr, metric) except Exception: scores[metric] = None return scores
[docs] def summary(self) -> str: """ Generate a summary tab report for this prediction. Works with both aggregated and non-aggregated prediction results. Returns: Formatted string with tab report Examples: >>> report = result.summary() >>> print(report) """ # Import tab report manager try: from nirs4all.visualization.reports import TabReportManager except ImportError: return f"{WARNING}TabReportManager not available" # Check if this is an aggregated result (has train/val/test keys) has_partitions = all(k in self for k in ["train", "val", "test"]) if has_partitions: # Build partition dictionary with y_true/y_pred and metadata best_by_partition = {} for partition in ["train", "val", "test"]: if partition in self and self[partition] is not None: partition_data = self[partition].copy() if isinstance(self[partition], dict) else {} # Add metadata from root level partition_data['n_features'] = self.get('n_features', 0) partition_data['task_type'] = self.get('task_type', 'regression') best_by_partition[partition] = partition_data else: # Single partition result - treat as test partition partition = self.get('partition', 'test') best_by_partition = { partition: { 'y_true': self.get('y_true', []), 'y_pred': self.get('y_pred', []), 'n_features': self.get('n_features', 0), 'task_type': self.get('task_type', 'regression') } } # Generate tab report using TabReportManager formatted_string, _ = TabReportManager.generate_best_score_tab_report(best_by_partition) return formatted_string
[docs] def __repr__(self) -> str: """String representation showing key info.""" return f"PredictionResult(id={self.id}, model={self.model_name}, dataset={self.dataset_name}, fold={self.fold_id}, step={self.step_idx}, op={self.op_counter})"
[docs] def __str__(self) -> str: """String representation showing key info.""" return self.__repr__()
[docs] class PredictionResultsList(list): """ List container for PredictionResult objects with batch operations. Extends standard list with prediction-specific batch functionality. Features: - save() - batch CSV export - get() - retrieve by ID - filter() - chain filtering - Iterator support Examples: >>> results = PredictionResultsList([result1, result2, result3]) >>> results.save("output/predictions.csv") >>> best = results.get("abc123") >>> len(results) 3 """ def __init__(self, predictions: Optional[List[Union[Dict[str, Any], PredictionResult]]] = None): """ Initialize with optional list of PredictionResult objects. Args: predictions: List of predictions (dicts or PredictionResult objects) """ super().__init__(predictions or [])
[docs] def save(self, path: str = "results", filename: Optional[str] = None) -> None: """ Save all predictions to a single CSV file with structured headers. CSV Structure: - Line 1: dataset_name - Line 2: model_classname + model_id - Line 3: fold_id - Line 4: partition - Lines 5+: prediction data (y_true, y_pred columns) Args: path: Base directory path (default: "results") filename: Optional filename (if None, auto-generated from first prediction) Examples: >>> results.save("output") >>> results.save("output", "my_predictions.csv") """ if not self: logger.warning("No predictions to save") return # Generate filename if not provided if filename is None: first_pred = self[0] dataset_name = first_pred.get("dataset_name", "unknown") model_name = first_pred.get("model_name", "unknown") filename = f"{dataset_name}_{model_name}_predictions.csv" # Ensure path directory exists path_obj = Path(path) path_obj.mkdir(parents=True, exist_ok=True) filepath = path_obj / filename # Prepare CSV data output = io.StringIO() writer = csv.writer(output) # Collect all columns needed all_columns = [] for pred in self: # Check if this is an aggregated result (has train/val/test keys) has_partitions = all(k in pred for k in ["train", "val", "test"]) if has_partitions: for partition in ["train", "val", "test"]: if partition in pred and pred[partition] is not None: partition_data = pred[partition] fold_id = partition_data.get("fold_id", pred.get("fold_id", "")) fold_suffix = f"_fold{fold_id}" if fold_id and partition in ["train", "val"] else "" all_columns.append(f"y_true_{partition}{fold_suffix}") all_columns.append(f"y_pred_{partition}{fold_suffix}") else: if "y_true" not in all_columns: all_columns.append("y_true") if "y_pred" not in all_columns: all_columns.append("y_pred") # Remove duplicates while preserving order seen = set() all_columns = [x for x in all_columns if not (x in seen or seen.add(x))] # Write header rows (metadata) for pred in self: writer.writerow(['dataset_name', pred.get('dataset_name', '')]) writer.writerow(['model_name', pred.get('model_name', '')]) writer.writerow(['fold_id', pred.get('fold_id', '')]) writer.writerow(['partition', pred.get('partition', '')]) break # Only write once # Row 5: Column headers writer.writerow(all_columns) # Write data rows for pred in self: has_partitions = all(k in pred for k in ["train", "val", "test"]) if has_partitions: # Aggregated data max_len = 0 for partition in ["train", "val", "test"]: if partition in pred and pred[partition] is not None: partition_data = pred[partition] y_true = partition_data.get("y_true", []) max_len = max(max_len, len(y_true)) for i in range(max_len): row = [] for col in all_columns: # Parse column name to get partition and field if col.startswith("y_true_") or col.startswith("y_pred_"): field, partition_part = col.split("_", 1) partition = partition_part.split("_fold")[0] if "_fold" in partition_part else partition_part if partition in pred and pred[partition] is not None: partition_data = pred[partition] data_list = partition_data.get(field, []) if i < len(data_list): row.append(data_list[i]) else: row.append("") else: row.append("") else: row.append("") writer.writerow(row) else: # Single partition data y_true = pred.get("y_true", []) y_pred = pred.get("y_pred", []) max_len = max(len(y_true), len(y_pred)) for i in range(max_len): row = [] for col in all_columns: if col == "y_true": row.append(y_true[i] if i < len(y_true) else "") elif col == "y_pred": row.append(y_pred[i] if i < len(y_pred) else "") else: row.append("") writer.writerow(row) # Write to file with open(filepath, 'w', newline='') as f: f.write(output.getvalue()) output.close() logger.info(f"Saved {len(self)} predictions to {filepath}")
[docs] def get(self, prediction_id: str) -> Optional[PredictionResult]: """ Get a prediction by its ID. Args: prediction_id: The ID of the prediction to retrieve Returns: PredictionResult if found, None otherwise Examples: >>> result = results.get("abc123") """ for pred in self: if pred.get("id") == prediction_id: return pred return None
[docs] def __repr__(self) -> str: """String representation showing count and brief info.""" if not self: return "PredictionResultsList(0 predictions)" return f"PredictionResultsList({len(self)} predictions)"