nirs4all.data.predictions module

Predictions management using Polars.

This module contains the main Predictions facade class that delegates to specialized components for storage, serialization, ranking, and querying.

Refactored architecture (v0.4.1):
  • Storage: PredictionStorage (DataFrame backend)

  • Serializer: PredictionSerializer (JSON/Parquet hybrid)

  • Indexer: PredictionIndexer (filtering operations)

  • Ranker: PredictionRanker (ranking and top-k)

  • Aggregator: PartitionAggregator (partition combining)

  • Query: CatalogQueryEngine (catalog operations)

Public API is preserved for backward compatibility.

class nirs4all.data.predictions.PredictionResult[source]

Bases: dict

Enhanced dictionary for a single prediction with convenience methods.

Extends standard dict with property accessors and methods for saving, evaluating, and summarizing predictions.

Features:
  • Property accessors (id, model_name, dataset_name, etc.)

  • save_to_csv() - save individual result

  • eval_score() - compute metrics on-the-fly

  • summary() - generate tab report

Examples

>>> result = PredictionResult({
...     "id": "abc123",
...     "dataset_name": "wheat",
...     "model_name": "PLS",
...     "y_true": [1, 2, 3],
...     "y_pred": [1.1, 2.2, 3.3]
... })
>>> result.model_name
'PLS'
>>> scores = result.eval_score(["rmse", "r2"])
>>> result.save_to_csv("results")
__repr__() str[source]

String representation showing key info.

__str__() str[source]

String representation showing key info.

property config_name: str

Get config name.

property dataset_name: str

Get dataset name.

eval_score(metrics: List[str] | None = None) Dict[str, Any][source]

Evaluate scores for this prediction using specified metrics.

Parameters:

metrics – List of metrics to compute (if None, returns all available metrics)

Returns:

Dictionary of metric names to scores. For aggregated results: {“train”: {…}, “val”: {…}, “test”: {…}} For single partition: {“rmse”: …, “r2”: …, …}

Examples

>>> scores = result.eval_score(["rmse", "r2", "mae"])
>>> # For aggregated: scores = {"train": {"rmse": 0.5}, "val": {...}, "test": {...}}
>>> # For single: scores = {"rmse": 0.5, "r2": 0.9}
property fold_id: str

Get fold ID.

property id: str

Get prediction ID.

property model_name: str

Get model name.

property op_counter: int

Get operation counter.

save_to_csv(path_or_file: str = 'results', filename: str | None = None) None[source]

Save prediction result to CSV file.

Parameters:
  • path_or_file – Base path (folder) or complete file path (if ends with .csv)

  • filename – Optional filename (if path_or_file is a folder)

Examples

>>> result.save_to_csv("output")  # Saves to output/{dataset}/{id}.csv
>>> result.save_to_csv("output/my_result.csv")  # Saves to output/my_result.csv
>>> result.save_to_csv("output", "my_result.csv")  # Saves to output/my_result.csv
property step_idx: int

Get pipeline step index.

summary() str[source]

Generate a summary tab report for this prediction.

Works with both aggregated and non-aggregated prediction results.

Returns:

Formatted string with tab report

Examples

>>> report = result.summary()
>>> print(report)
class nirs4all.data.predictions.PredictionResultsList(predictions: List[Dict[str, Any] | PredictionResult] | None = None)[source]

Bases: list

List container for PredictionResult objects with batch operations.

Extends standard list with prediction-specific batch functionality.

Features:
  • save() - batch CSV export

  • get() - retrieve by ID

  • filter() - chain filtering

  • Iterator support

Examples

>>> results = PredictionResultsList([result1, result2, result3])
>>> results.save("output/predictions.csv")
>>> best = results.get("abc123")
>>> len(results)
3
__repr__() str[source]

String representation showing count and brief info.

get(prediction_id: str) PredictionResult | None[source]

Get a prediction by its ID.

Parameters:

prediction_id – The ID of the prediction to retrieve

Returns:

PredictionResult if found, None otherwise

Examples

>>> result = results.get("abc123")
save(path: str = 'results', filename: str | None = None) None[source]

Save all predictions to a single CSV file with structured headers.

CSV Structure:
  • Line 1: dataset_name

  • Line 2: model_classname + model_id

  • Line 3: fold_id

  • Line 4: partition

  • Lines 5+: prediction data (y_true, y_pred columns)

Parameters:
  • path – Base directory path (default: “results”)

  • filename – Optional filename (if None, auto-generated from first prediction)

Examples

>>> results.save("output")
>>> results.save("output", "my_predictions.csv")
class nirs4all.data.predictions.Predictions(filepath: str | List[str] | None = None)[source]

Bases: object

Main facade for prediction management.

Delegates to specialized components while maintaining backward-compatible public API.

Architecture:
  • Storage: PredictionStorage (DataFrame backend)

  • Serializer: PredictionSerializer (JSON/Parquet hybrid)

  • Indexer: PredictionIndexer (filtering operations)

  • Ranker: PredictionRanker (ranking and top-k)

  • Aggregator: PartitionAggregator (partition combining)

  • Query: CatalogQueryEngine (catalog operations)

Examples

>>> # Create and add predictions
>>> pred = Predictions()
>>> pred.add_prediction(
...     dataset_name="wheat",
...     model_name="PLS",
...     partition="test",
...     y_true=y_true,
...     y_pred=y_pred,
...     test_score=0.85
... )
>>>
>>> # Query top models
>>> top_5 = pred.top(n=5, rank_metric="rmse", rank_partition="val")
>>>
>>> # Save and load
>>> pred.save_to_file("predictions.json")
>>> loaded = Predictions.load("predictions.json")
__len__() int[source]

Return number of stored predictions.

__repr__() str[source]

String representation.

__str__() str[source]

User-friendly string representation.

add_prediction(dataset_name: str, dataset_path: str = '', config_name: str = '', config_path: str = '', pipeline_uid: str | None = None, step_idx: int = 0, op_counter: int = 0, model_name: str = '', model_classname: str = '', model_path: str = '', fold_id: str | int | None = None, sample_indices: List[int] | None = None, weights: List[float] | None = None, metadata: Dict[str, Any] | None = None, partition: str = '', y_true: ndarray | None = None, y_pred: ndarray | None = None, y_proba: ndarray | None = None, val_score: float | None = None, test_score: float | None = None, train_score: float | None = None, metric: str = 'mse', task_type: str = 'regression', n_samples: int = 0, n_features: int = 0, preprocessings: str = '', best_params: Dict[str, Any] | None = None, scores: Dict[str, Dict[str, float]] | None = None, branch_id: int | None = None, branch_name: str | None = None, exclusion_count: int | None = None, exclusion_rate: float | None = None, model_artifact_id: str | None = None, trace_id: str | None = None) str[source]

Add a single prediction to storage.

Delegates to PredictionStorage component.

Parameters:
  • dataset_name – Dataset name

  • dataset_path – Path to dataset file

  • config_name – Configuration name

  • config_path – Path to config file

  • pipeline_uid – Unique pipeline identifier

  • step_idx – Pipeline step index

  • op_counter – Operation counter

  • model_name – Model name

  • model_classname – Model class name

  • model_path – Path to saved model

  • fold_id – Cross-validation fold ID

  • sample_indices – Indices of samples used

  • weights – Sample weights

  • metadata – Additional metadata

  • partition – Data partition (train/val/test)

  • y_true – True labels

  • y_pred – Predicted labels

  • y_proba – Class probabilities for classification (shape: n_samples x n_classes)

  • val_score – Validation score

  • test_score – Test score

  • train_score – Training score

  • metric – Metric name

  • task_type – Task type (classification/regression)

  • n_samples – Number of samples

  • n_features – Number of features

  • preprocessings – Preprocessing steps applied

  • best_params – Best hyperparameters

  • scores – Dictionary of pre-computed scores per partition

  • branch_id – Branch identifier for pipeline branching (0-indexed)

  • branch_name – Human-readable branch name

  • exclusion_count – Number of samples excluded during training (outlier_excluder)

  • exclusion_rate – Rate of samples excluded (0.0-1.0, outlier_excluder)

  • model_artifact_id – Deterministic artifact ID for model loading (v2 system)

  • trace_id – Execution trace ID for deterministic prediction replay (v2 system)

Returns:

Prediction ID

add_predictions(dataset_name: str | List[str], dataset_path: str | List[str] = '', config_name: str | List[str] = '', config_path: str | List[str] = '', pipeline_uid: str | None | List[str | None] = None, step_idx: int | List[int] = 0, op_counter: int | List[int] = 0, model_name: str | List[str] = '', model_classname: str | List[str] = '', model_path: str | List[str] = '', fold_id: str | None | List[str | None] = None, sample_indices: List[int] | None | List[List[int] | None] = None, weights: List[float] | None | List[List[float] | None] = None, metadata: Dict[str, Any] | None | List[Dict[str, Any] | None] = None, partition: str | List[str] = '', y_true: ndarray | None | List[ndarray | None] = None, y_pred: ndarray | None | List[ndarray | None] = None, val_score: float | None | List[float | None] = None, test_score: float | None | List[float | None] = None, train_score: float | None | List[float | None] = None, metric: str | List[str] = 'mse', task_type: str | List[str] = 'regression', n_samples: int | List[int] = 0, n_features: int | List[int] = 0, preprocessings: str | List[str] = '', best_params: Dict[str, Any] | None | List[Dict[str, Any] | None] = None, scores: Dict[str, Dict[str, float]] | None | List[Dict[str, Dict[str, float]] | None] = None, branch_id: int | None | List[int | None] = None, branch_name: str | None | List[str | None] = None, trace_id: str | None | List[str | None] = None) None[source]

Add multiple predictions to storage (batch operation).

For each parameter, if it’s a single value it will be broadcast to all predictions. If it’s a list, each index corresponds to one prediction.

Parameters:
  • add_prediction (Same as)

  • lists (but can be single values or)

static aggregate(y_pred: ndarray, group_ids: ndarray, y_proba: ndarray | None = None, y_true: ndarray | None = None, method: str = 'mean', exclude_outliers: bool = False, outlier_threshold: float = 0.95) Dict[str, Any][source]

Aggregate predictions by group (e.g., same sample ID with multiple measurements).

For datasets with multiple samples per target (e.g., 4 measurements for each sample ID), this function averages predictions within each group to produce one prediction per group.

For regression: averages y_pred values within each group. For classification: averages y_proba (if available) then takes argmax,

or uses majority voting on y_pred if no probabilities.

Parameters:
  • y_pred – Predicted values array (n_samples,) or (n_samples, 1)

  • group_ids – Group identifiers array (n_samples,) - samples with same ID are grouped

  • y_proba – Optional class probabilities array (n_samples, n_classes) for classification

  • y_true – Optional true values array (n_samples,) for computing aggregated ground truth

  • method – Aggregation method - ‘mean’ (default), ‘median’, ‘vote’ (for classification)

  • exclude_outliers – If True, exclude outliers within each group before aggregation using Hotelling’s T² statistic. Useful when some measurements are anomalous.

  • outlier_threshold – Confidence level for T² outlier detection (default 0.95). Measurements with T² > chi2.ppf(threshold, 1) are excluded.

Returns:

  • ‘y_pred’: Aggregated predictions (n_groups,)

  • ’y_proba’: Aggregated probabilities (n_groups, n_classes) if input had y_proba

  • ’y_true’: Aggregated true values (n_groups,) if input had y_true

  • ’group_ids’: Unique group identifiers (n_groups,)

  • ’group_sizes’: Number of samples per group (n_groups,)

  • ’outliers_excluded’: Number of outliers excluded per group (if exclude_outliers=True)

Return type:

Dictionary containing

Examples

>>> # Aggregate 4 samples per ID for regression
>>> result = Predictions.aggregate(y_pred, sample_ids)
>>> aggregated_pred = result['y_pred']  # One prediction per unique ID
>>> # Aggregate for classification with probabilities
>>> result = Predictions.aggregate(y_pred, sample_ids, y_proba=proba)
>>> aggregated_proba = result['y_proba']  # Averaged probabilities
>>> # Aggregate with outlier exclusion
>>> result = Predictions.aggregate(y_pred, sample_ids, exclude_outliers=True)
>>> print(f"Outliers excluded: {result['outliers_excluded'].sum()}")
archive_to_catalog(catalog_dir: Path, pipeline_dir: Path, metrics: Dict[str, Any] = None) str[source]

Archive pipeline predictions to catalog.

Loads predictions CSV from pipeline directory, adds metadata, and saves to catalog.

Delegates to PredictionStorage for CSV loading.

Parameters:
  • catalog_dir – Catalog directory for storage

  • pipeline_dir – Pipeline directory containing predictions.csv

  • metrics – Optional metadata dict to add to predictions

Returns:

Generated prediction ID

clear() None[source]

Clear all predictions.

Delegates to PredictionStorage component.

clear_caches() None[source]

Clear all internal caches.

Call this when the underlying data has been modified to ensure fresh results are computed. This clears: - Ranker’s aggregation cache (cached aggregated y_true/y_pred) - Ranker’s score cache (cached metric scores)

Examples

>>> predictions.add_prediction(...)  # Add new data
>>> predictions.clear_caches()  # Clear to ensure fresh results
compare_across_datasets(pipeline_hash: str, metric: str = 'test_score') DataFrame[source]

Compare a pipeline’s performance across multiple datasets.

Delegates to CatalogQueryEngine component.

Parameters:
  • pipeline_hash – Pipeline UID to compare

  • metric – Metric column to compare

Returns:

DataFrame with one row per dataset

filter_by_branch(branch_id: int | None = None, branch_name: str | None = None, include_no_branch: bool = False, load_arrays: bool = True) List[Dict[str, Any]][source]

Filter predictions by branch context.

Convenience method for meta-model stacking to retrieve predictions from a specific branch in branched pipelines.

Parameters:
  • branch_id – Branch ID to filter by.

  • branch_name – Branch name to filter by.

  • include_no_branch – If True, include predictions with no branch info.

  • load_arrays – If True, load actual arrays from registry.

Returns:

List of predictions from the specified branch.

Examples

>>> # Get predictions from branch 0
>>> branch_preds = predictions.filter_by_branch(branch_id=0)
>>> # Get predictions from named branch
>>> branch_preds = predictions.filter_by_branch(branch_name='preprocessing_a')
filter_by_criteria(dataset_name: str | None = None, date_range: Tuple[str, str] | None = None, metric_thresholds: Dict[str, float] | None = None) DataFrame[source]

Filter predictions by multiple criteria (catalog query).

Delegates to CatalogQueryEngine component.

Parameters:
  • dataset_name – Filter by dataset name

  • date_range – Tuple of (start_date, end_date)

  • metric_thresholds – Dict of metric names to threshold values

Returns:

Filtered DataFrame

filter_predictions(dataset_name: str | None = None, partition: str | None = None, config_name: str | None = None, model_name: str | None = None, fold_id: str | None = None, step_idx: int | None = None, branch_id: int | None = None, branch_name: str | None = None, load_arrays: bool = True, **kwargs) List[Dict[str, Any]][source]

Filter predictions and return as list of dictionaries.

Delegates to PredictionIndexer for filtering, then deserializes results. Supports lazy loading of arrays for performance optimization.

Parameters:
  • dataset_name – Filter by dataset name

  • partition – Filter by partition

  • config_name – Filter by config name

  • model_name – Filter by model name

  • fold_id – Filter by fold ID

  • step_idx – Filter by step index

  • branch_id – Filter by branch ID (for pipeline branching)

  • branch_name – Filter by branch name (for pipeline branching)

  • load_arrays – If True, loads actual arrays from registry (slower). If False, returns metadata only with array references (fast).

  • **kwargs – Additional filter criteria

Returns:

List of prediction dictionaries with deserialized numpy arrays (if load_arrays=True) or metadata with array_id references (if load_arrays=False)

Examples

>>> # Fast metadata-only query
>>> preds = predictions.filter_predictions(dataset_name="wheat", load_arrays=False)
>>> # Full query with arrays
>>> preds = predictions.filter_predictions(dataset_name="wheat", load_arrays=True)
>>> # Filter by branch
>>> branch_preds = predictions.filter_predictions(branch_id=0)
get_best(metric: str = '', ascending: bool | None = None, aggregate_partitions: bool = False, **filters) PredictionResult | None[source]

Get the best prediction for a specific metric.

Delegates to PredictionRanker component.

Parameters:
  • metric – Metric to optimize

  • ascending – Sort order. If True, sorts ascending (lower is better). If False, sorts descending (higher is better). If None, infers from metric.

  • aggregate_partitions – If True, add partition data

  • **filters – Additional filter criteria

Returns:

Best prediction or None

get_cache_stats() Dict[str, Any][source]

Get cache statistics for debugging performance.

Returns a dictionary with hit rates and sizes for: - aggregation_cache: Cached aggregated arrays - score_cache: Cached metric scores

Returns:

Dictionary with cache statistics

Examples

>>> stats = predictions.get_cache_stats()
>>> print(f"Aggregation cache hit rate: {stats['aggregation_cache']['hit_rate']:.1%}")
get_configs() List[str][source]

Get list of unique config names.

Delegates to PredictionIndexer component.

Returns:

List of config names

get_datasets() List[str][source]

Get list of unique dataset names.

Delegates to PredictionIndexer component.

Returns:

List of dataset names

get_entry_partitions(entry: Dict) Dict[str, Dict | None][source]

Get all partition data for an entry.

Parameters:

entry – Prediction entry dictionary

Returns:

Dictionary with ‘train’, ‘val’, ‘test’ keys containing partition data

get_folds() List[str][source]

Get list of unique fold IDs.

Delegates to PredictionIndexer component.

Returns:

List of fold IDs

get_models() List[str][source]

Get list of unique model names.

Delegates to PredictionIndexer component.

Returns:

List of model names

get_models_before_step(step_idx: int, branch_id: int | None = None, unique_names: bool = True) List[str][source]

Get model names from steps before a given step index.

Convenience method for meta-model stacking to identify source models that can be used for stacking.

Parameters:
  • step_idx – Current step index (models before this are returned).

  • branch_id – Optional filter by branch ID.

  • unique_names – If True, return unique model names only.

Returns:

List of model names from previous steps.

Examples

>>> # Get models available for stacking at step 5
>>> source_models = predictions.get_models_before_step(step_idx=5)
get_oof_predictions(model_name: str | None = None, step_idx: int | None = None, branch_id: int | None = None, exclude_averaged: bool = True, load_arrays: bool = True) List[Dict[str, Any]][source]

Get out-of-fold (validation partition) predictions.

Convenience method for meta-model stacking to retrieve OOF predictions that can be used to construct training features without data leakage.

Parameters:
  • model_name – Optional filter by model name.

  • step_idx – Optional filter by step index.

  • branch_id – Optional filter by branch ID.

  • exclude_averaged – If True, exclude ‘avg’ and ‘w_avg’ fold entries. Default True for OOF reconstruction.

  • load_arrays – If True, load actual arrays from registry.

Returns:

List of validation partition predictions.

Examples

>>> # Get all OOF predictions
>>> oof = predictions.get_oof_predictions()
>>> # Get OOF predictions for a specific model
>>> oof = predictions.get_oof_predictions(model_name='PLS')
get_partitions() List[str][source]

Get list of unique partitions.

Delegates to PredictionIndexer component.

Returns:

List of partitions

get_prediction_by_id(prediction_id: str, load_arrays: bool = True) Dict[str, Any] | None[source]

Get a single prediction by its ID using direct lookup.

This is an O(1) lookup that avoids iterating all predictions, which is much faster than using filter_predictions for ID lookups.

Parameters:
  • prediction_id – Unique prediction identifier (hash ID)

  • load_arrays – If True, loads actual arrays from registry (slower). If False, returns metadata only with array references (fast).

Returns:

Prediction dictionary or None if not found

Examples

>>> pred = predictions.get_prediction_by_id("abc123def456")
>>> if pred:
...     print(f"Found model: {pred['model_name']}")
get_predictions_by_step(step_idx: int, partition: str | None = None, branch_id: int | None = None, load_arrays: bool = True, **kwargs) List[Dict[str, Any]][source]

Get predictions from a specific pipeline step.

Convenience method for meta-model stacking to retrieve predictions from source models at a specific step index.

Parameters:
  • step_idx – Pipeline step index to filter by.

  • partition – Optional partition filter (‘train’, ‘val’, ‘test’).

  • branch_id – Optional branch ID filter.

  • load_arrays – If True, load actual arrays from registry.

  • **kwargs – Additional filter criteria.

Returns:

List of prediction dictionaries from the specified step.

Examples

>>> # Get all predictions from step 2
>>> preds = predictions.get_predictions_by_step(step_idx=2)
>>> # Get validation predictions from step 2
>>> val_preds = predictions.get_predictions_by_step(
...     step_idx=2, partition='val'
... )
get_similar(**filter_kwargs) Dict[str, Any] | None[source]

Get the first prediction matching filter criteria.

Parameters:

**filter_kwargs – Filter criteria (same as filter_predictions)

Returns:

First matching prediction or None

get_summary_stats(metric: str = 'test_score') Dict[str, float][source]

Get summary statistics for a metric.

Delegates to CatalogQueryEngine component.

Parameters:

metric – Metric column name

Returns:

Dictionary with min, max, mean, median, std

get_unique_values(column: str) List[str][source]

Get unique values for a specific column.

Delegates to PredictionIndexer component.

Parameters:

column – Column name

Returns:

List of unique values

list_runs(dataset_name: str | None = None) DataFrame[source]

List all prediction runs with summary information.

Delegates to CatalogQueryEngine component.

Parameters:

dataset_name – Filter by dataset name (None for all)

Returns:

DataFrame with run summary

classmethod load(dataset_name: str | None = None, path: str = 'results', aggregate_partitions: bool = False, **filters) Predictions[source]

Load predictions from results directory structure.

Parameters:
  • dataset_name – Name of dataset to load (None for all)

  • path – Base path to search for predictions

  • aggregate_partitions – If True, aggregate partition data

  • **filters – Additional filter criteria

Returns:

Predictions instance with loaded data

load_from_file(filepath: str, merge: bool = True) None[source]

Load predictions from split Parquet format.

Supports: - Split Parquet with array registry (.meta.parquet + .arrays.parquet)

When called multiple times (e.g., from __init__ with multiple files), predictions are merged by default.

Parameters:
  • filepath – Path to .meta.parquet file

  • merge – If True and storage already has data, merge loaded data. If False, replace existing data. (default: True)

Examples

>>> predictions.load_from_file("predictions.meta.parquet")
>>> # Load additional predictions (merged)
>>> predictions.load_from_file("more_predictions.meta.parquet")
classmethod load_from_file_cls(filepath: str) Predictions[source]

Load predictions from JSON file as class method.

Parameters:

filepath – Input file path

Returns:

Predictions instance with loaded data (empty if file doesn’t exist)

classmethod load_from_parquet(catalog_dir: Path, prediction_ids: list = None) Predictions[source]

Load predictions from split Parquet storage.

Parameters:
  • catalog_dir – Path to catalog directory

  • prediction_ids – Optional list of prediction IDs to load

Returns:

Predictions instance with loaded data

classmethod merge_parquet_files(input_files: List[str], output_file: str, deduplicate: bool = True) Predictions[source]

Merge multiple prediction parquet files into a single output file.

This is a utility method to consolidate predictions from multiple experiment runs into a single file for easier analysis.

Parameters:
  • input_files – List of paths to .meta.parquet files to merge.

  • output_file – Output path for the merged .meta.parquet file.

  • deduplicate – If True, remove duplicate prediction IDs (keep first). Default is True.

Returns:

Predictions instance containing the merged data.

Raises:

Examples

>>> # Merge multiple experiment runs
>>> merged = Predictions.merge_parquet_files(
...     input_files=[
...         "run1/predictions.meta.parquet",
...         "run2/predictions.meta.parquet",
...         "run3/predictions.meta.parquet"
...     ],
...     output_file="combined/all_predictions.meta.parquet"
... )
>>> print(f"Merged {len(merged)} predictions")
>>> # Merge without deduplication
>>> merged = Predictions.merge_parquet_files(
...     input_files=["exp1.meta.parquet", "exp2.meta.parquet"],
...     output_file="merged.meta.parquet",
...     deduplicate=False
... )
merge_predictions(other: Predictions) None[source]

Merge predictions from another Predictions instance.

Delegates to PredictionStorage component.

Parameters:

other – Another Predictions instance to merge

property num_predictions: int

Get the number of stored predictions.

classmethod pred_long_string(entry: Dict, metrics: List[str] | None = None) str[source]

Generate long string representation of a prediction.

Parameters:
  • entry – Prediction dictionary

  • metrics – Optional list of metrics to display

Returns:

Long description string with config

classmethod pred_short_string(entry: Dict, metrics: List[str] | None = None, partition: str | List[str] = 'test') str[source]

Generate short string representation of a prediction.

Parameters:
  • entry – Prediction dictionary

  • metrics – Optional list of metrics to display

Returns:

Short description string

query_best(dataset_name: str | None = None, metric: str = 'test_score', n: int = 10, ascending: bool = False) DataFrame[source]

Query for best performing pipelines by metric (catalog query).

Delegates to CatalogQueryEngine component.

Parameters:
  • dataset_name – Filter by dataset name

  • metric – Metric column to rank by

  • n – Number of top results

  • ascending – If True, lower scores rank higher

Returns:

DataFrame with top n predictions

static save_all_to_csv(predictions: Predictions, path: str = 'results', aggregate_partitions: bool = False, **filters) None[source]

Save all predictions to CSV files.

Parameters:
  • predictions – Predictions instance

  • path – Base path for saving

  • aggregate_partitions – If True, save one file per model with all partitions

  • **filters – Additional filter criteria

static save_predictions_to_csv(y_true: ndarray | List[float] | None = None, y_pred: ndarray | List[float] | None = None, filepath: str = '', prefix: str = '', suffix: str = '') None[source]

Save y_true and y_pred arrays to a CSV file.

Parameters:
  • y_true – True values array

  • y_pred – Predicted values array

  • filepath – Output CSV file path

  • prefix – Optional prefix for column names

  • suffix – Optional suffix for column names

save_to_file(filepath: str, format: str = 'parquet') None[source]

Save predictions to split Parquet format with array registry.

Parameters:
  • filepath – Output file path (should end with .meta.parquet)

  • format – Format to use (only “parquet” is supported)

Examples

>>> predictions.save_to_file("predictions.meta.parquet")
save_to_parquet(catalog_dir: Path, prediction_id: str = None) tuple[source]

Save predictions as split Parquet (metadata + arrays separate).

Appends to existing files if they exist.

Delegates to PredictionStorage component.

Parameters:
  • catalog_dir – Directory for catalog storage

  • prediction_id – Optional prediction ID (generates UUID if None)

Returns:

Tuple of (meta_path, data_path)

to_dataframe() DataFrame[source]

Get predictions as Polars DataFrame.

to_dicts(load_arrays: bool = True) List[Dict[str, Any]][source]

Get predictions as list of dictionaries.

Parameters:

load_arrays – If True, hydrate array references with actual arrays. If False, returns metadata with array IDs only (faster).

Returns:

List of prediction dictionaries

to_pandas()[source]

Get predictions as pandas DataFrame.

top(n: int, rank_metric: str = '', rank_partition: str = 'val', display_metrics: List[str] | None = None, display_partition: str = 'test', aggregate_partitions: bool = False, ascending: bool | None = None, group_by_fold: bool = False, aggregate: str | None = None, group_by: str | List[str] | None = None, best_per_model: bool = False, return_grouped: bool = False, **filters) PredictionResultsList | Dict[Tuple, PredictionResultsList][source]

Get top n models ranked by a metric on a specific partition.

Delegates to PredictionRanker component.

Parameters:
  • n – Number of top models to return. When group_by is used, this means top N per group (e.g., top 3 per dataset).

  • rank_metric – Metric to rank by (if empty, uses record’s metric or val_score)

  • rank_partition – Partition to rank on (default: “val”)

  • display_metrics – Metrics to compute for display (default: task_type defaults)

  • display_partition – Partition to display results from (default: “test”)

  • aggregate_partitions – If True, add train/val/test nested dicts in results

  • ascending – Sort order. If True, sorts ascending (lower is better). If False, sorts descending (higher is better). If None, infers from metric.

  • group_by_fold – If True, include fold_id in model identity (rank per fold)

  • aggregate – If provided, aggregate predictions by this metadata column or ‘y’. When ‘y’, groups by y_true values. When a column name (e.g., ‘ID’), groups by that metadata column. Aggregated predictions have recalculated metrics.

  • group_by – Group predictions by column(s). When provided: - Returns top N results per group (not N total) - Each result includes a ‘group_key’ field for easy filtering - Can be a single column name (str) or list of columns - Examples: ‘dataset_name’, [‘model_name’, ‘dataset_name’]

  • best_per_model – DEPRECATED - Use group_by=[‘model_name’] instead. If True, keep only the best prediction per model_name.

  • return_grouped – If True and group_by is set, return a dict mapping group keys to PredictionResultsList instead of a flat list. Default: False (returns flat list sorted by global rank).

  • **filters – Additional filter criteria (dataset_name, config_name, etc.)

Returns:

PredictionResultsList containing top n

models per group, sorted by rank_metric. Each result includes ‘group_key’.

  • If return_grouped=True: Dict mapping group keys (tuples) to PredictionResultsList, one list per group with top n results each.

Return type:

  • If return_grouped=False (default)

Examples

>>> # Top 3 per dataset (flat list)
>>> top_per_ds = predictions.top(n=3, group_by='dataset_name')
>>> # Filter by group_key
>>> ds1_results = [r for r in top_per_ds if r['group_key'] == ('dataset1',)]
>>>
>>> # Top 3 per dataset (grouped dict)
>>> grouped = predictions.top(n=3, group_by='dataset_name', return_grouped=True)
>>> for key, results in grouped.items():
...     print(f"{key}: {len(results)} results")