Storage API Reference
This document provides the API reference for the nirs4all storage system, centered on the DuckDB-backed WorkspaceStore.
Module: nirs4all.pipeline.storage
WorkspaceStore
class WorkspaceStore:
"""Database-backed workspace storage.
Central storage facade for all workspace data: runs, pipelines, chains,
predictions, prediction arrays, artifacts, and structured execution logs.
The store manages two on-disk resources:
- store.duckdb: A single DuckDB database with 7 tables
- artifacts/: A flat, content-addressed directory for binary artifacts
"""
Constructor
def __init__(self, workspace_path: Path) -> None:
"""
Initialize the workspace store.
Creates store.duckdb and artifacts/ directory if they don't exist.
Schema is created automatically on first use.
Args:
workspace_path: Root directory of the workspace.
"""
Run Lifecycle
def begin_run(self, name: str, config: Any, datasets: list[dict]) -> str:
"""Create a new run and return its unique identifier.
Args:
name: Human-readable name for the run.
config: Serializable run-level configuration (stored as JSON).
datasets: List of dataset metadata dicts (name, path, hash, etc.).
Returns:
A unique run identifier (UUID string).
"""
def complete_run(self, run_id: str, summary: dict) -> None:
"""Mark a run as successfully completed."""
def fail_run(self, run_id: str, error: str) -> None:
"""Mark a run as failed with an error message."""
Pipeline Lifecycle
def begin_pipeline(
self, run_id: str, name: str, expanded_config: Any,
generator_choices: list, dataset_name: str, dataset_hash: str,
) -> str:
"""Register a new pipeline execution under a run.
Args:
run_id: Parent run identifier.
name: Pipeline name (e.g., "0001_pls_abc123").
expanded_config: Fully resolved pipeline configuration (JSON).
generator_choices: List of generator choices for this expansion.
dataset_name: Name of the dataset being processed.
dataset_hash: Content hash of the dataset.
Returns:
A unique pipeline identifier (UUID string).
"""
def complete_pipeline(
self, pipeline_id: str, best_val: float, best_test: float,
metric: str, duration_ms: int,
) -> None:
"""Mark a pipeline execution as completed with scores."""
def fail_pipeline(self, pipeline_id: str, error: str) -> None:
"""Mark a pipeline execution as failed, rolling back associated data."""
Chain Management
A chain captures the complete, ordered sequence of preprocessing steps and model that were executed during training, together with references to fitted artifacts for each fold. Chains are the unit of export and replay.
def save_chain(
self, pipeline_id: str, steps: list[dict], model_step_idx: int,
model_class: str, preprocessings: str, fold_strategy: str,
fold_artifacts: dict, shared_artifacts: dict,
branch_path: list[int] | None = None,
source_index: int | None = None,
) -> str:
"""Store a preprocessing-to-model chain.
Args:
pipeline_id: Parent pipeline identifier.
steps: Ordered list of step descriptors, each containing:
step_idx, operator_class, params, artifact_id, stateless.
model_step_idx: Index of the model step within steps.
model_class: Fully qualified model class name.
preprocessings: Display string (e.g., "SNV>Detr>MinMax").
fold_strategy: CV fold strategy ("per_fold" or "shared").
fold_artifacts: Mapping from fold ID to model artifact ID.
shared_artifacts: Mapping from step index to shared artifact ID.
branch_path: Branch indices for branching pipelines.
source_index: Source index for multi-source pipelines.
Returns:
A unique chain identifier (UUID string).
"""
def get_chain(self, chain_id: str) -> dict | None:
"""Retrieve a chain by its identifier."""
def get_chains_for_pipeline(self, pipeline_id: str) -> pl.DataFrame:
"""List all chains belonging to a pipeline."""
Prediction Storage
def save_prediction(
self, pipeline_id: str, chain_id: str, dataset_name: str,
model_name: str, model_class: str, fold_id: str, partition: str,
val_score: float, test_score: float, train_score: float,
metric: str, task_type: str, n_samples: int, n_features: int,
scores: dict, best_params: dict,
branch_id: int | None, branch_name: str | None,
exclusion_count: int, exclusion_rate: float,
preprocessings: str = "",
) -> str:
"""Store a single prediction record.
Returns:
A unique prediction identifier (UUID string).
"""
def save_prediction_arrays(
self, prediction_id: str, y_true: np.ndarray | None,
y_pred: np.ndarray | None, y_proba: np.ndarray | None = None,
sample_indices: np.ndarray | None = None,
weights: np.ndarray | None = None,
) -> None:
"""Store dense arrays as native DOUBLE[] columns in DuckDB."""
Artifact Storage
Binary artifacts (fitted models, transformers) are stored in the artifacts/ directory using content-addressed storage with automatic deduplication.
def save_artifact(
self, obj: Any, operator_class: str,
artifact_type: str, format: str,
) -> str:
"""Persist a binary artifact with content-addressed deduplication.
Args:
obj: Python object to persist (e.g., fitted StandardScaler).
operator_class: Fully qualified class name.
artifact_type: Category ("model", "transformer", "scaler").
format: Serialization format ("joblib", "cloudpickle").
Returns:
Artifact identifier (same ID returned for duplicate content).
"""
def load_artifact(self, artifact_id: str) -> Any:
"""Load a binary artifact from disk."""
def get_artifact_path(self, artifact_id: str) -> Path:
"""Return the filesystem path of a stored artifact."""
Structured Logging
def log_step(
self, pipeline_id: str, step_idx: int, operator_class: str,
event: str, duration_ms: int | None = None,
message: str | None = None, details: dict | None = None,
level: str = "info",
) -> None:
"""Record a structured log entry for a pipeline step."""
Queries
def get_run(self, run_id: str) -> dict | None:
"""Retrieve a single run record."""
def list_runs(
self, status: str | None = None, dataset: str | None = None,
limit: int = 100, offset: int = 0,
) -> pl.DataFrame:
"""List runs with optional filtering and pagination."""
def get_pipeline(self, pipeline_id: str) -> dict | None:
"""Retrieve a single pipeline record."""
def list_pipelines(
self, run_id: str | None = None,
dataset_name: str | None = None,
) -> pl.DataFrame:
"""List pipelines with optional filtering."""
def get_prediction(
self, prediction_id: str, load_arrays: bool = False,
) -> dict | None:
"""Retrieve a prediction (optionally with y_true/y_pred arrays)."""
def query_predictions(
self, dataset_name: str | None = None,
model_class: str | None = None, partition: str | None = None,
fold_id: str | None = None, branch_id: int | None = None,
pipeline_id: str | None = None, run_id: str | None = None,
limit: int | None = None, offset: int = 0,
) -> pl.DataFrame:
"""Query predictions with flexible filtering (AND semantics)."""
def top_predictions(
self, n: int, metric: str = "val_score", ascending: bool = True,
partition: str = "val", dataset_name: str | None = None,
group_by: str | None = None,
) -> pl.DataFrame:
"""Return the top-N predictions ranked by a score column."""
def get_pipeline_log(self, pipeline_id: str) -> pl.DataFrame:
"""Retrieve all log entries for a pipeline."""
def get_run_log_summary(self, run_id: str) -> pl.DataFrame:
"""Aggregate log entries across all pipelines of a run."""
Export Operations
Exports produce files on demand from the store. No files are written during training except store.duckdb and artifact binaries.
def export_chain(
self, chain_id: str, output_path: Path, format: str = "n4a",
) -> Path:
"""Export a chain as a standalone .n4a bundle or .n4a.py script."""
def export_pipeline_config(
self, pipeline_id: str, output_path: Path,
) -> Path:
"""Export a pipeline's expanded configuration as JSON."""
def export_run(self, run_id: str, output_path: Path) -> Path:
"""Export full run metadata (run + pipelines + chains) as YAML."""
def export_predictions_parquet(
self, output_path: Path, **filters,
) -> Path:
"""Export prediction records to a Parquet file."""
Chain Replay
def replay_chain(
self, chain_id: str, X: np.ndarray,
wavelengths: np.ndarray | None = None,
) -> np.ndarray:
"""Replay a stored chain on new data to produce predictions.
Loads each step's artifact, applies transformations in order,
and for the model step loads all fold models and averages.
Args:
chain_id: Chain to replay.
X: Input feature matrix (n_samples x n_features).
wavelengths: Optional wavelength array for wavelength-aware ops.
Returns:
Predicted values as 1-D numpy array.
"""
Deletion and Cleanup
def delete_run(self, run_id: str, delete_artifacts: bool = True) -> int:
"""Delete a run and cascade to all descendant data.
Returns:
Total number of database rows deleted.
"""
def delete_prediction(self, prediction_id: str) -> bool:
"""Delete a single prediction and its arrays."""
def gc_artifacts(self) -> int:
"""Garbage-collect unreferenced artifacts (ref_count == 0)."""
def vacuum(self) -> None:
"""Reclaim unused space in the DuckDB database file."""
def close(self) -> None:
"""Close the database connection."""
ChainBuilder
from nirs4all.pipeline.storage import ChainBuilder
Converts an ExecutionTrace into a chain dict suitable for store.save_chain(). Used internally by the pipeline execution engine.
replay_chain
from nirs4all.pipeline.storage import replay_chain
Standalone function for replaying chains from store data. Used by the prediction API.
PipelineLibrary
from nirs4all.pipeline.storage import PipelineLibrary
Manages reusable pipeline templates with category and tag support. Templates are stored as JSON files in the workspace library directory.
See Also
Workspace Architecture - Workspace directory structure
Pipeline Syntax - Pipeline configuration reference