nirs4all.pipeline.storage.store_protocol module

Protocol definition for workspace storage backends.

Provides a structural subtyping contract (typing.Protocol) that any storage backend must satisfy. This enables testing with in-memory stubs and, in the future, swapping DuckDB for another backend without changing consumer code.

The protocol captures the minimal set of methods that all consumers depend on. WorkspaceStore itself implements a much richer interface (export, replay, logging, cleanup); those additional methods are not part of the protocol because backend-swappability is only required for the core storage operations.

class nirs4all.pipeline.storage.store_protocol.WorkspaceStoreProtocol(*args, **kwargs)[source]

Bases: Protocol

Minimal protocol for workspace storage backends.

A backend is any object that satisfies this structural interface. Use isinstance(obj, WorkspaceStoreProtocol) at runtime to verify compliance (enabled by @runtime_checkable).

The protocol covers:

  • Run lifecycle – creating, completing, and failing runs.

  • Pipeline lifecycle – creating, completing, and failing pipeline executions.

  • Chain storage – persisting the preprocessing-to-model chain.

  • Prediction storage – saving scalar prediction records and their associated dense arrays.

  • Artifact storage – content-addressed persistence of fitted Python objects.

  • Core queries – ranking predictions and retrieving single records.

  • Export – producing standalone bundles from stored chains.

begin_pipeline(run_id: str, name: str, expanded_config: Any, generator_choices: list, dataset_name: str, dataset_hash: str) str[source]

Register a new pipeline execution under a run.

begin_run(name: str, config: Any, datasets: list[dict]) str[source]

Create a new run and return its identifier.

complete_pipeline(pipeline_id: str, best_val: float, best_test: float, metric: str, duration_ms: int) None[source]

Mark a pipeline as completed.

complete_run(run_id: str, summary: dict) None[source]

Mark a run as completed.

export_chain(chain_id: str, output_path: Path, format: str = 'n4a') Path[source]

Export a chain as a standalone bundle.

fail_pipeline(pipeline_id: str, error: str) None[source]

Mark a pipeline as failed.

fail_run(run_id: str, error: str) None[source]

Mark a run as failed.

get_chain(chain_id: str) dict | None[source]

Retrieve a chain by identifier.

get_prediction(prediction_id: str, load_arrays: bool = False) dict | None[source]

Retrieve a single prediction record.

load_artifact(artifact_id: str) Any[source]

Load a binary artifact by identifier.

save_artifact(obj: Any, operator_class: str, artifact_type: str, format: str) str[source]

Persist a binary artifact and return its identifier.

save_chain(pipeline_id: str, steps: list[dict], model_step_idx: int, model_class: str, preprocessings: str, fold_strategy: str, fold_artifacts: dict, shared_artifacts: dict, branch_path: list[int] | None = None, source_index: int | None = None) str[source]

Store a chain and return its identifier.

save_prediction(pipeline_id: str, chain_id: str, dataset_name: str, model_name: str, model_class: str, fold_id: str, partition: str, val_score: float, test_score: float, train_score: float, metric: str, task_type: str, n_samples: int, n_features: int, scores: dict, best_params: dict, branch_id: int | None, branch_name: str | None, exclusion_count: int, exclusion_rate: float, preprocessings: str = '', prediction_id: str | None = None) str[source]

Store a prediction record and return its identifier.

save_prediction_arrays(prediction_id: str, y_true: ndarray | None, y_pred: ndarray | None, y_proba: ndarray | None = None, sample_indices: ndarray | None = None, weights: ndarray | None = None) None[source]

Store prediction arrays.

top_predictions(n: int, metric: str = 'val_score', ascending: bool = True, partition: str = 'val', dataset_name: str | None = None, group_by: str | None = None) DataFrame[source]

Return top-N predictions ranked by a score column.