nirs4all.data.predictions module
Predictions management using Polars.
This module contains the main Predictions facade class that delegates to specialized components for storage, serialization, ranking, and querying.
- Refactored architecture (v0.4.1):
Storage: PredictionStorage (DataFrame backend)
Serializer: PredictionSerializer (JSON/Parquet hybrid)
Indexer: PredictionIndexer (filtering operations)
Ranker: PredictionRanker (ranking and top-k)
Aggregator: PartitionAggregator (partition combining)
Query: CatalogQueryEngine (catalog operations)
Public API is preserved for backward compatibility.
- class nirs4all.data.predictions.PredictionResult[source]
Bases:
dictEnhanced dictionary for a single prediction with convenience methods.
Extends standard dict with property accessors and methods for saving, evaluating, and summarizing predictions.
- Features:
Property accessors (id, model_name, dataset_name, etc.)
save_to_csv() - save individual result
eval_score() - compute metrics on-the-fly
summary() - generate tab report
Examples
>>> result = PredictionResult({ ... "id": "abc123", ... "dataset_name": "wheat", ... "model_name": "PLS", ... "y_true": [1, 2, 3], ... "y_pred": [1.1, 2.2, 3.3] ... }) >>> result.model_name 'PLS' >>> scores = result.eval_score(["rmse", "r2"]) >>> result.save_to_csv("results")
- eval_score(metrics: List[str] | None = None) Dict[str, Any][source]
Evaluate scores for this prediction using specified metrics.
- Parameters:
metrics – List of metrics to compute (if None, returns all available metrics)
- Returns:
Dictionary of metric names to scores. For aggregated results: {“train”: {…}, “val”: {…}, “test”: {…}} For single partition: {“rmse”: …, “r2”: …, …}
Examples
>>> scores = result.eval_score(["rmse", "r2", "mae"]) >>> # For aggregated: scores = {"train": {"rmse": 0.5}, "val": {...}, "test": {...}} >>> # For single: scores = {"rmse": 0.5, "r2": 0.9}
- save_to_csv(path_or_file: str = 'results', filename: str | None = None) None[source]
Save prediction result to CSV file.
- Parameters:
path_or_file – Base path (folder) or complete file path (if ends with .csv)
filename – Optional filename (if path_or_file is a folder)
Examples
>>> result.save_to_csv("output") # Saves to output/{dataset}/{id}.csv >>> result.save_to_csv("output/my_result.csv") # Saves to output/my_result.csv >>> result.save_to_csv("output", "my_result.csv") # Saves to output/my_result.csv
- class nirs4all.data.predictions.PredictionResultsList(predictions: List[Dict[str, Any] | PredictionResult] | None = None)[source]
Bases:
listList container for PredictionResult objects with batch operations.
Extends standard list with prediction-specific batch functionality.
- Features:
save() - batch CSV export
get() - retrieve by ID
filter() - chain filtering
Iterator support
Examples
>>> results = PredictionResultsList([result1, result2, result3]) >>> results.save("output/predictions.csv") >>> best = results.get("abc123") >>> len(results) 3
- get(prediction_id: str) PredictionResult | None[source]
Get a prediction by its ID.
- Parameters:
prediction_id – The ID of the prediction to retrieve
- Returns:
PredictionResult if found, None otherwise
Examples
>>> result = results.get("abc123")
- save(path: str = 'results', filename: str | None = None) None[source]
Save all predictions to a single CSV file with structured headers.
- CSV Structure:
Line 1: dataset_name
Line 2: model_classname + model_id
Line 3: fold_id
Line 4: partition
Lines 5+: prediction data (y_true, y_pred columns)
- Parameters:
path – Base directory path (default: “results”)
filename – Optional filename (if None, auto-generated from first prediction)
Examples
>>> results.save("output") >>> results.save("output", "my_predictions.csv")
- class nirs4all.data.predictions.Predictions(filepath: str | List[str] | None = None)[source]
Bases:
objectMain facade for prediction management.
Delegates to specialized components while maintaining backward-compatible public API.
- Architecture:
Storage: PredictionStorage (DataFrame backend)
Serializer: PredictionSerializer (JSON/Parquet hybrid)
Indexer: PredictionIndexer (filtering operations)
Ranker: PredictionRanker (ranking and top-k)
Aggregator: PartitionAggregator (partition combining)
Query: CatalogQueryEngine (catalog operations)
Examples
>>> # Create and add predictions >>> pred = Predictions() >>> pred.add_prediction( ... dataset_name="wheat", ... model_name="PLS", ... partition="test", ... y_true=y_true, ... y_pred=y_pred, ... test_score=0.85 ... ) >>> >>> # Query top models >>> top_5 = pred.top(n=5, rank_metric="rmse", rank_partition="val") >>> >>> # Save and load >>> pred.save_to_file("predictions.json") >>> loaded = Predictions.load("predictions.json")
- add_prediction(dataset_name: str, dataset_path: str = '', config_name: str = '', config_path: str = '', pipeline_uid: str | None = None, step_idx: int = 0, op_counter: int = 0, model_name: str = '', model_classname: str = '', model_path: str = '', fold_id: str | int | None = None, sample_indices: List[int] | None = None, weights: List[float] | None = None, metadata: Dict[str, Any] | None = None, partition: str = '', y_true: ndarray | None = None, y_pred: ndarray | None = None, y_proba: ndarray | None = None, val_score: float | None = None, test_score: float | None = None, train_score: float | None = None, metric: str = 'mse', task_type: str = 'regression', n_samples: int = 0, n_features: int = 0, preprocessings: str = '', best_params: Dict[str, Any] | None = None, scores: Dict[str, Dict[str, float]] | None = None, branch_id: int | None = None, branch_name: str | None = None, exclusion_count: int | None = None, exclusion_rate: float | None = None, model_artifact_id: str | None = None, trace_id: str | None = None) str[source]
Add a single prediction to storage.
Delegates to PredictionStorage component.
- Parameters:
dataset_name – Dataset name
dataset_path – Path to dataset file
config_name – Configuration name
config_path – Path to config file
pipeline_uid – Unique pipeline identifier
step_idx – Pipeline step index
op_counter – Operation counter
model_name – Model name
model_classname – Model class name
model_path – Path to saved model
fold_id – Cross-validation fold ID
sample_indices – Indices of samples used
weights – Sample weights
metadata – Additional metadata
partition – Data partition (train/val/test)
y_true – True labels
y_pred – Predicted labels
y_proba – Class probabilities for classification (shape: n_samples x n_classes)
val_score – Validation score
test_score – Test score
train_score – Training score
metric – Metric name
task_type – Task type (classification/regression)
n_samples – Number of samples
n_features – Number of features
preprocessings – Preprocessing steps applied
best_params – Best hyperparameters
scores – Dictionary of pre-computed scores per partition
branch_id – Branch identifier for pipeline branching (0-indexed)
branch_name – Human-readable branch name
exclusion_count – Number of samples excluded during training (outlier_excluder)
exclusion_rate – Rate of samples excluded (0.0-1.0, outlier_excluder)
model_artifact_id – Deterministic artifact ID for model loading (v2 system)
trace_id – Execution trace ID for deterministic prediction replay (v2 system)
- Returns:
Prediction ID
- add_predictions(dataset_name: str | List[str], dataset_path: str | List[str] = '', config_name: str | List[str] = '', config_path: str | List[str] = '', pipeline_uid: str | None | List[str | None] = None, step_idx: int | List[int] = 0, op_counter: int | List[int] = 0, model_name: str | List[str] = '', model_classname: str | List[str] = '', model_path: str | List[str] = '', fold_id: str | None | List[str | None] = None, sample_indices: List[int] | None | List[List[int] | None] = None, weights: List[float] | None | List[List[float] | None] = None, metadata: Dict[str, Any] | None | List[Dict[str, Any] | None] = None, partition: str | List[str] = '', y_true: ndarray | None | List[ndarray | None] = None, y_pred: ndarray | None | List[ndarray | None] = None, val_score: float | None | List[float | None] = None, test_score: float | None | List[float | None] = None, train_score: float | None | List[float | None] = None, metric: str | List[str] = 'mse', task_type: str | List[str] = 'regression', n_samples: int | List[int] = 0, n_features: int | List[int] = 0, preprocessings: str | List[str] = '', best_params: Dict[str, Any] | None | List[Dict[str, Any] | None] = None, scores: Dict[str, Dict[str, float]] | None | List[Dict[str, Dict[str, float]] | None] = None, branch_id: int | None | List[int | None] = None, branch_name: str | None | List[str | None] = None, trace_id: str | None | List[str | None] = None) None[source]
Add multiple predictions to storage (batch operation).
For each parameter, if it’s a single value it will be broadcast to all predictions. If it’s a list, each index corresponds to one prediction.
- Parameters:
add_prediction (Same as)
lists (but can be single values or)
- static aggregate(y_pred: ndarray, group_ids: ndarray, y_proba: ndarray | None = None, y_true: ndarray | None = None, method: str = 'mean', exclude_outliers: bool = False, outlier_threshold: float = 0.95) Dict[str, Any][source]
Aggregate predictions by group (e.g., same sample ID with multiple measurements).
For datasets with multiple samples per target (e.g., 4 measurements for each sample ID), this function averages predictions within each group to produce one prediction per group.
For regression: averages y_pred values within each group. For classification: averages y_proba (if available) then takes argmax,
or uses majority voting on y_pred if no probabilities.
- Parameters:
y_pred – Predicted values array (n_samples,) or (n_samples, 1)
group_ids – Group identifiers array (n_samples,) - samples with same ID are grouped
y_proba – Optional class probabilities array (n_samples, n_classes) for classification
y_true – Optional true values array (n_samples,) for computing aggregated ground truth
method – Aggregation method - ‘mean’ (default), ‘median’, ‘vote’ (for classification)
exclude_outliers – If True, exclude outliers within each group before aggregation using Hotelling’s T² statistic. Useful when some measurements are anomalous.
outlier_threshold – Confidence level for T² outlier detection (default 0.95). Measurements with T² > chi2.ppf(threshold, 1) are excluded.
- Returns:
‘y_pred’: Aggregated predictions (n_groups,)
’y_proba’: Aggregated probabilities (n_groups, n_classes) if input had y_proba
’y_true’: Aggregated true values (n_groups,) if input had y_true
’group_ids’: Unique group identifiers (n_groups,)
’group_sizes’: Number of samples per group (n_groups,)
’outliers_excluded’: Number of outliers excluded per group (if exclude_outliers=True)
- Return type:
Dictionary containing
Examples
>>> # Aggregate 4 samples per ID for regression >>> result = Predictions.aggregate(y_pred, sample_ids) >>> aggregated_pred = result['y_pred'] # One prediction per unique ID
>>> # Aggregate for classification with probabilities >>> result = Predictions.aggregate(y_pred, sample_ids, y_proba=proba) >>> aggregated_proba = result['y_proba'] # Averaged probabilities
>>> # Aggregate with outlier exclusion >>> result = Predictions.aggregate(y_pred, sample_ids, exclude_outliers=True) >>> print(f"Outliers excluded: {result['outliers_excluded'].sum()}")
- archive_to_catalog(catalog_dir: Path, pipeline_dir: Path, metrics: Dict[str, Any] = None) str[source]
Archive pipeline predictions to catalog.
Loads predictions CSV from pipeline directory, adds metadata, and saves to catalog.
Delegates to PredictionStorage for CSV loading.
- Parameters:
catalog_dir – Catalog directory for storage
pipeline_dir – Pipeline directory containing predictions.csv
metrics – Optional metadata dict to add to predictions
- Returns:
Generated prediction ID
- clear_caches() None[source]
Clear all internal caches.
Call this when the underlying data has been modified to ensure fresh results are computed. This clears: - Ranker’s aggregation cache (cached aggregated y_true/y_pred) - Ranker’s score cache (cached metric scores)
Examples
>>> predictions.add_prediction(...) # Add new data >>> predictions.clear_caches() # Clear to ensure fresh results
- compare_across_datasets(pipeline_hash: str, metric: str = 'test_score') DataFrame[source]
Compare a pipeline’s performance across multiple datasets.
Delegates to CatalogQueryEngine component.
- Parameters:
pipeline_hash – Pipeline UID to compare
metric – Metric column to compare
- Returns:
DataFrame with one row per dataset
- filter_by_branch(branch_id: int | None = None, branch_name: str | None = None, include_no_branch: bool = False, load_arrays: bool = True) List[Dict[str, Any]][source]
Filter predictions by branch context.
Convenience method for meta-model stacking to retrieve predictions from a specific branch in branched pipelines.
- Parameters:
branch_id – Branch ID to filter by.
branch_name – Branch name to filter by.
include_no_branch – If True, include predictions with no branch info.
load_arrays – If True, load actual arrays from registry.
- Returns:
List of predictions from the specified branch.
Examples
>>> # Get predictions from branch 0 >>> branch_preds = predictions.filter_by_branch(branch_id=0) >>> # Get predictions from named branch >>> branch_preds = predictions.filter_by_branch(branch_name='preprocessing_a')
- filter_by_criteria(dataset_name: str | None = None, date_range: Tuple[str, str] | None = None, metric_thresholds: Dict[str, float] | None = None) DataFrame[source]
Filter predictions by multiple criteria (catalog query).
Delegates to CatalogQueryEngine component.
- Parameters:
dataset_name – Filter by dataset name
date_range – Tuple of (start_date, end_date)
metric_thresholds – Dict of metric names to threshold values
- Returns:
Filtered DataFrame
- filter_predictions(dataset_name: str | None = None, partition: str | None = None, config_name: str | None = None, model_name: str | None = None, fold_id: str | None = None, step_idx: int | None = None, branch_id: int | None = None, branch_name: str | None = None, load_arrays: bool = True, **kwargs) List[Dict[str, Any]][source]
Filter predictions and return as list of dictionaries.
Delegates to PredictionIndexer for filtering, then deserializes results. Supports lazy loading of arrays for performance optimization.
- Parameters:
dataset_name – Filter by dataset name
partition – Filter by partition
config_name – Filter by config name
model_name – Filter by model name
fold_id – Filter by fold ID
step_idx – Filter by step index
branch_id – Filter by branch ID (for pipeline branching)
branch_name – Filter by branch name (for pipeline branching)
load_arrays – If True, loads actual arrays from registry (slower). If False, returns metadata only with array references (fast).
**kwargs – Additional filter criteria
- Returns:
List of prediction dictionaries with deserialized numpy arrays (if load_arrays=True) or metadata with array_id references (if load_arrays=False)
Examples
>>> # Fast metadata-only query >>> preds = predictions.filter_predictions(dataset_name="wheat", load_arrays=False) >>> # Full query with arrays >>> preds = predictions.filter_predictions(dataset_name="wheat", load_arrays=True) >>> # Filter by branch >>> branch_preds = predictions.filter_predictions(branch_id=0)
- get_best(metric: str = '', ascending: bool | None = None, aggregate_partitions: bool = False, **filters) PredictionResult | None[source]
Get the best prediction for a specific metric.
Delegates to PredictionRanker component.
- Parameters:
metric – Metric to optimize
ascending – Sort order. If True, sorts ascending (lower is better). If False, sorts descending (higher is better). If None, infers from metric.
aggregate_partitions – If True, add partition data
**filters – Additional filter criteria
- Returns:
Best prediction or None
- get_cache_stats() Dict[str, Any][source]
Get cache statistics for debugging performance.
Returns a dictionary with hit rates and sizes for: - aggregation_cache: Cached aggregated arrays - score_cache: Cached metric scores
- Returns:
Dictionary with cache statistics
Examples
>>> stats = predictions.get_cache_stats() >>> print(f"Aggregation cache hit rate: {stats['aggregation_cache']['hit_rate']:.1%}")
- get_configs() List[str][source]
Get list of unique config names.
Delegates to PredictionIndexer component.
- Returns:
List of config names
- get_datasets() List[str][source]
Get list of unique dataset names.
Delegates to PredictionIndexer component.
- Returns:
List of dataset names
- get_entry_partitions(entry: Dict) Dict[str, Dict | None][source]
Get all partition data for an entry.
- Parameters:
entry – Prediction entry dictionary
- Returns:
Dictionary with ‘train’, ‘val’, ‘test’ keys containing partition data
- get_folds() List[str][source]
Get list of unique fold IDs.
Delegates to PredictionIndexer component.
- Returns:
List of fold IDs
- get_models() List[str][source]
Get list of unique model names.
Delegates to PredictionIndexer component.
- Returns:
List of model names
- get_models_before_step(step_idx: int, branch_id: int | None = None, unique_names: bool = True) List[str][source]
Get model names from steps before a given step index.
Convenience method for meta-model stacking to identify source models that can be used for stacking.
- Parameters:
step_idx – Current step index (models before this are returned).
branch_id – Optional filter by branch ID.
unique_names – If True, return unique model names only.
- Returns:
List of model names from previous steps.
Examples
>>> # Get models available for stacking at step 5 >>> source_models = predictions.get_models_before_step(step_idx=5)
- get_oof_predictions(model_name: str | None = None, step_idx: int | None = None, branch_id: int | None = None, exclude_averaged: bool = True, load_arrays: bool = True) List[Dict[str, Any]][source]
Get out-of-fold (validation partition) predictions.
Convenience method for meta-model stacking to retrieve OOF predictions that can be used to construct training features without data leakage.
- Parameters:
model_name – Optional filter by model name.
step_idx – Optional filter by step index.
branch_id – Optional filter by branch ID.
exclude_averaged – If True, exclude ‘avg’ and ‘w_avg’ fold entries. Default True for OOF reconstruction.
load_arrays – If True, load actual arrays from registry.
- Returns:
List of validation partition predictions.
Examples
>>> # Get all OOF predictions >>> oof = predictions.get_oof_predictions() >>> # Get OOF predictions for a specific model >>> oof = predictions.get_oof_predictions(model_name='PLS')
- get_partitions() List[str][source]
Get list of unique partitions.
Delegates to PredictionIndexer component.
- Returns:
List of partitions
- get_prediction_by_id(prediction_id: str, load_arrays: bool = True) Dict[str, Any] | None[source]
Get a single prediction by its ID using direct lookup.
This is an O(1) lookup that avoids iterating all predictions, which is much faster than using filter_predictions for ID lookups.
- Parameters:
prediction_id – Unique prediction identifier (hash ID)
load_arrays – If True, loads actual arrays from registry (slower). If False, returns metadata only with array references (fast).
- Returns:
Prediction dictionary or None if not found
Examples
>>> pred = predictions.get_prediction_by_id("abc123def456") >>> if pred: ... print(f"Found model: {pred['model_name']}")
- get_predictions_by_step(step_idx: int, partition: str | None = None, branch_id: int | None = None, load_arrays: bool = True, **kwargs) List[Dict[str, Any]][source]
Get predictions from a specific pipeline step.
Convenience method for meta-model stacking to retrieve predictions from source models at a specific step index.
- Parameters:
step_idx – Pipeline step index to filter by.
partition – Optional partition filter (‘train’, ‘val’, ‘test’).
branch_id – Optional branch ID filter.
load_arrays – If True, load actual arrays from registry.
**kwargs – Additional filter criteria.
- Returns:
List of prediction dictionaries from the specified step.
Examples
>>> # Get all predictions from step 2 >>> preds = predictions.get_predictions_by_step(step_idx=2) >>> # Get validation predictions from step 2 >>> val_preds = predictions.get_predictions_by_step( ... step_idx=2, partition='val' ... )
- get_similar(**filter_kwargs) Dict[str, Any] | None[source]
Get the first prediction matching filter criteria.
- Parameters:
**filter_kwargs – Filter criteria (same as filter_predictions)
- Returns:
First matching prediction or None
- get_summary_stats(metric: str = 'test_score') Dict[str, float][source]
Get summary statistics for a metric.
Delegates to CatalogQueryEngine component.
- Parameters:
metric – Metric column name
- Returns:
Dictionary with min, max, mean, median, std
- get_unique_values(column: str) List[str][source]
Get unique values for a specific column.
Delegates to PredictionIndexer component.
- Parameters:
column – Column name
- Returns:
List of unique values
- list_runs(dataset_name: str | None = None) DataFrame[source]
List all prediction runs with summary information.
Delegates to CatalogQueryEngine component.
- Parameters:
dataset_name – Filter by dataset name (None for all)
- Returns:
DataFrame with run summary
- classmethod load(dataset_name: str | None = None, path: str = 'results', aggregate_partitions: bool = False, **filters) Predictions[source]
Load predictions from results directory structure.
- Parameters:
dataset_name – Name of dataset to load (None for all)
path – Base path to search for predictions
aggregate_partitions – If True, aggregate partition data
**filters – Additional filter criteria
- Returns:
Predictions instance with loaded data
- load_from_file(filepath: str, merge: bool = True) None[source]
Load predictions from split Parquet format.
Supports: - Split Parquet with array registry (.meta.parquet + .arrays.parquet)
When called multiple times (e.g., from __init__ with multiple files), predictions are merged by default.
- Parameters:
filepath – Path to .meta.parquet file
merge – If True and storage already has data, merge loaded data. If False, replace existing data. (default: True)
Examples
>>> predictions.load_from_file("predictions.meta.parquet") >>> # Load additional predictions (merged) >>> predictions.load_from_file("more_predictions.meta.parquet")
- classmethod load_from_file_cls(filepath: str) Predictions[source]
Load predictions from JSON file as class method.
- Parameters:
filepath – Input file path
- Returns:
Predictions instance with loaded data (empty if file doesn’t exist)
- classmethod load_from_parquet(catalog_dir: Path, prediction_ids: list = None) Predictions[source]
Load predictions from split Parquet storage.
- Parameters:
catalog_dir – Path to catalog directory
prediction_ids – Optional list of prediction IDs to load
- Returns:
Predictions instance with loaded data
- classmethod merge_parquet_files(input_files: List[str], output_file: str, deduplicate: bool = True) Predictions[source]
Merge multiple prediction parquet files into a single output file.
This is a utility method to consolidate predictions from multiple experiment runs into a single file for easier analysis.
- Parameters:
input_files – List of paths to .meta.parquet files to merge.
output_file – Output path for the merged .meta.parquet file.
deduplicate – If True, remove duplicate prediction IDs (keep first). Default is True.
- Returns:
Predictions instance containing the merged data.
- Raises:
ValueError – If no input files are provided.
FileNotFoundError – If any input file does not exist.
Examples
>>> # Merge multiple experiment runs >>> merged = Predictions.merge_parquet_files( ... input_files=[ ... "run1/predictions.meta.parquet", ... "run2/predictions.meta.parquet", ... "run3/predictions.meta.parquet" ... ], ... output_file="combined/all_predictions.meta.parquet" ... ) >>> print(f"Merged {len(merged)} predictions")
>>> # Merge without deduplication >>> merged = Predictions.merge_parquet_files( ... input_files=["exp1.meta.parquet", "exp2.meta.parquet"], ... output_file="merged.meta.parquet", ... deduplicate=False ... )
- merge_predictions(other: Predictions) None[source]
Merge predictions from another Predictions instance.
Delegates to PredictionStorage component.
- Parameters:
other – Another Predictions instance to merge
- classmethod pred_long_string(entry: Dict, metrics: List[str] | None = None) str[source]
Generate long string representation of a prediction.
- Parameters:
entry – Prediction dictionary
metrics – Optional list of metrics to display
- Returns:
Long description string with config
- classmethod pred_short_string(entry: Dict, metrics: List[str] | None = None, partition: str | List[str] = 'test') str[source]
Generate short string representation of a prediction.
- Parameters:
entry – Prediction dictionary
metrics – Optional list of metrics to display
- Returns:
Short description string
- query_best(dataset_name: str | None = None, metric: str = 'test_score', n: int = 10, ascending: bool = False) DataFrame[source]
Query for best performing pipelines by metric (catalog query).
Delegates to CatalogQueryEngine component.
- Parameters:
dataset_name – Filter by dataset name
metric – Metric column to rank by
n – Number of top results
ascending – If True, lower scores rank higher
- Returns:
DataFrame with top n predictions
- static save_all_to_csv(predictions: Predictions, path: str = 'results', aggregate_partitions: bool = False, **filters) None[source]
Save all predictions to CSV files.
- Parameters:
predictions – Predictions instance
path – Base path for saving
aggregate_partitions – If True, save one file per model with all partitions
**filters – Additional filter criteria
- static save_predictions_to_csv(y_true: ndarray | List[float] | None = None, y_pred: ndarray | List[float] | None = None, filepath: str = '', prefix: str = '', suffix: str = '') None[source]
Save y_true and y_pred arrays to a CSV file.
- Parameters:
y_true – True values array
y_pred – Predicted values array
filepath – Output CSV file path
prefix – Optional prefix for column names
suffix – Optional suffix for column names
- save_to_file(filepath: str, format: str = 'parquet') None[source]
Save predictions to split Parquet format with array registry.
- Parameters:
filepath – Output file path (should end with .meta.parquet)
format – Format to use (only “parquet” is supported)
Examples
>>> predictions.save_to_file("predictions.meta.parquet")
- save_to_parquet(catalog_dir: Path, prediction_id: str = None) tuple[source]
Save predictions as split Parquet (metadata + arrays separate).
Appends to existing files if they exist.
Delegates to PredictionStorage component.
- Parameters:
catalog_dir – Directory for catalog storage
prediction_id – Optional prediction ID (generates UUID if None)
- Returns:
Tuple of (meta_path, data_path)
- to_dicts(load_arrays: bool = True) List[Dict[str, Any]][source]
Get predictions as list of dictionaries.
- Parameters:
load_arrays – If True, hydrate array references with actual arrays. If False, returns metadata with array IDs only (faster).
- Returns:
List of prediction dictionaries
- top(n: int, rank_metric: str = '', rank_partition: str = 'val', display_metrics: List[str] | None = None, display_partition: str = 'test', aggregate_partitions: bool = False, ascending: bool | None = None, group_by_fold: bool = False, aggregate: str | None = None, group_by: str | List[str] | None = None, best_per_model: bool = False, return_grouped: bool = False, **filters) PredictionResultsList | Dict[Tuple, PredictionResultsList][source]
Get top n models ranked by a metric on a specific partition.
Delegates to PredictionRanker component.
- Parameters:
n – Number of top models to return. When group_by is used, this means top N per group (e.g., top 3 per dataset).
rank_metric – Metric to rank by (if empty, uses record’s metric or val_score)
rank_partition – Partition to rank on (default: “val”)
display_metrics – Metrics to compute for display (default: task_type defaults)
display_partition – Partition to display results from (default: “test”)
aggregate_partitions – If True, add train/val/test nested dicts in results
ascending – Sort order. If True, sorts ascending (lower is better). If False, sorts descending (higher is better). If None, infers from metric.
group_by_fold – If True, include fold_id in model identity (rank per fold)
aggregate – If provided, aggregate predictions by this metadata column or ‘y’. When ‘y’, groups by y_true values. When a column name (e.g., ‘ID’), groups by that metadata column. Aggregated predictions have recalculated metrics.
group_by – Group predictions by column(s). When provided: - Returns top N results per group (not N total) - Each result includes a ‘group_key’ field for easy filtering - Can be a single column name (str) or list of columns - Examples: ‘dataset_name’, [‘model_name’, ‘dataset_name’]
best_per_model – DEPRECATED - Use group_by=[‘model_name’] instead. If True, keep only the best prediction per model_name.
return_grouped – If True and group_by is set, return a dict mapping group keys to PredictionResultsList instead of a flat list. Default: False (returns flat list sorted by global rank).
**filters – Additional filter criteria (dataset_name, config_name, etc.)
- Returns:
- PredictionResultsList containing top n
models per group, sorted by rank_metric. Each result includes ‘group_key’.
If return_grouped=True: Dict mapping group keys (tuples) to PredictionResultsList, one list per group with top n results each.
- Return type:
If return_grouped=False (default)
Examples
>>> # Top 3 per dataset (flat list) >>> top_per_ds = predictions.top(n=3, group_by='dataset_name') >>> # Filter by group_key >>> ds1_results = [r for r in top_per_ds if r['group_key'] == ('dataset1',)] >>> >>> # Top 3 per dataset (grouped dict) >>> grouped = predictions.top(n=3, group_by='dataset_name', return_grouped=True) >>> for key, results in grouped.items(): ... print(f"{key}: {len(results)} results")