nirs4all.pipeline.storage.workspace_store module
Database-backed workspace storage.
- Replaces: ManifestManager, SimulationSaver, PipelineWriter,
PredictionStorage, ArrayRegistry, WorkspaceExporter, PredictionResolver/TargetResolver.
All metadata, configs, logs, chains, and predictions are stored in a DuckDB database. Binary artifacts (fitted models, transformers) are stored in a flat content-addressed directory alongside the database file. Human-readable files (exported bundles, CSVs, charts) are produced only by explicit export operations.
Workspace layout after migration:
workspace/
store.duckdb # All structured data
artifacts/ # Flat content-addressed binaries
ab/abc123def456.joblib
exports/ # User-triggered exports (on demand)
- class nirs4all.pipeline.storage.workspace_store.WorkspaceStore(workspace_path: Path)[source]
Bases:
objectDatabase-backed workspace storage.
Central storage facade for all workspace data: runs, pipelines, chains, predictions, prediction arrays, artifacts, and structured execution logs.
Replaces the file-hierarchy storage composed of ManifestManager, SimulationSaver, PipelineWriter, PredictionStorage, ArrayRegistry, WorkspaceExporter, and PredictionResolver.
The store manages two on-disk resources:
store.duckdb– a single DuckDB database holding seven tables (runs, pipelines, chains, predictions, prediction_arrays, artifacts, logs).artifacts/– a flat, content-addressed directory for binary artifacts (fitted models, transformers, etc.).
All query methods that return tabular data return
polars.DataFramevia DuckDB’s zero-copy Arrow transfer, enabling efficient downstream analysis with Polars or conversion to pandas/numpy.- Parameters:
workspace_path – Root directory of the workspace. The database file
store.duckdband theartifacts/directory are created inside this path.
Example
>>> from pathlib import Path >>> store = WorkspaceStore(Path("./workspace")) >>> run_id = store.begin_run("experiment_1", config={}, datasets=[]) >>> store.complete_run(run_id, summary={"total_pipelines": 5})
- begin_pipeline(run_id: str, name: str, expanded_config: Any, generator_choices: list, dataset_name: str, dataset_hash: str) str[source]
Register a new pipeline execution under a run.
A pipeline represents a single expanded configuration (after generator expansion) executed on a single dataset.
- Parameters:
run_id – Parent run identifier.
name – Pipeline name (e.g.
"0001_pls_abc123").expanded_config – The fully expanded pipeline configuration (serialisable to JSON). This is the resolved configuration after all
_or_,_range_,_log_range_, and_cartesian_generators have been applied.generator_choices – List of generator choices that produced this pipeline. Each entry is a dict like
{"_or_": value}or{"_range_": 18}.dataset_name – Name of the dataset being processed.
dataset_hash – Content hash of the dataset at execution time, enabling later run-compatibility checks.
- Returns:
A unique pipeline identifier (UUID-based string).
- begin_run(name: str, config: Any, datasets: list[dict]) str[source]
Create a new run and return its unique identifier.
A run groups one or more pipeline executions that share a common configuration and set of target datasets. The run is created in
"running"status.- Parameters:
name – Human-readable name for the run (e.g.
"protein_sweep").config – Serialisable run-level configuration (stored as JSON). Typically contains cross-validation strategy, metric, and global flags.
datasets – List of dataset metadata dictionaries. Each entry should contain at minimum
{"name": str, "path": str}. Additional keys (hash,n_samples,n_features,y_stats) are preserved for provenance tracking.
- Returns:
A unique run identifier (UUID-based string).
- close() None[source]
Close the database connection and release resources.
Safe to call multiple times. After closing, all other methods will raise
RuntimeError.
- complete_pipeline(pipeline_id: str, best_val: float, best_test: float, metric: str, duration_ms: int) None[source]
Mark a pipeline execution as successfully completed.
- Parameters:
pipeline_id – Identifier returned by
begin_pipeline().best_val – Best validation score achieved by this pipeline.
best_test – Corresponding test score for the best validation model.
metric – Name of the metric used for ranking (e.g.
"rmse").duration_ms – Total execution time in milliseconds.
- complete_run(run_id: str, summary: dict) None[source]
Mark a run as successfully completed.
Updates the run status to
"completed"and stores the provided summary (e.g. total pipelines completed, best overall score).- Parameters:
run_id – Identifier returned by
begin_run().summary – Free-form summary dictionary persisted alongside the run record.
- delete_prediction(prediction_id: str) bool[source]
Delete a single prediction and its associated arrays.
- Parameters:
prediction_id – Prediction to delete.
- Returns:
Trueif the prediction existed and was deleted,Falseotherwise.
- delete_run(run_id: str, delete_artifacts: bool = True) int[source]
Delete a run and all its descendant data.
Cascades to pipelines, chains, predictions, prediction arrays, and log entries. If
delete_artifactsisTrue, artifact reference counts are decremented and files with zero references are removed from disk.- Parameters:
run_id – Run to delete.
delete_artifacts – Whether to remove orphaned artifact files.
- Returns:
Total number of database rows deleted across all tables.
- export_chain(chain_id: str, output_path: Path, format: str = 'n4a') Path[source]
Export a chain as a standalone prediction bundle.
Builds a self-contained archive from the chain’s steps and artifacts, suitable for deployment or sharing without the workspace.
Supported formats:
"n4a"– ZIP archive containingmanifest.json,chain.json, and all referenced artifact files."n4a.py"– Portable Python script with embedded (base64-encoded) artifacts.
- Parameters:
chain_id – Chain to export.
output_path – Destination file path.
format – Export format (
"n4a"or"n4a.py").
- Returns:
The resolved output path (may have an extension appended if not already present).
- Raises:
KeyError – If the chain does not exist.
FileNotFoundError – If any referenced artifact file is missing.
- export_pipeline_config(pipeline_id: str, output_path: Path) Path[source]
Export a pipeline’s expanded configuration as JSON.
The exported file contains the fully resolved pipeline configuration (after generator expansion) and can be re-used with
nirs4all.run()ornirs4all.session().- Parameters:
pipeline_id – Pipeline to export.
output_path – Destination
.jsonfile path.
- Returns:
The resolved output path.
- Raises:
KeyError – If the pipeline does not exist.
- export_predictions_parquet(output_path: Path, **filters: Any) Path[source]
Export prediction records to a Parquet file.
Writes a Polars-compatible Parquet file containing prediction metadata (no arrays). Filters use the same keyword arguments as
query_predictions().- Parameters:
output_path – Destination
.parquetfile path.**filters – Optional filters (
dataset_name,model_class,partition,fold_id,branch_id,pipeline_id,run_id).
- Returns:
The resolved output path.
- export_run(run_id: str, output_path: Path) Path[source]
Export full run metadata (run + pipelines + chains) as YAML.
Produces an archival snapshot of the entire run, including all pipeline configs, chain descriptions, and summary metrics. Does not include binary artifacts or arrays.
- Parameters:
run_id – Run to export.
output_path – Destination
.yamlfile path.
- Returns:
The resolved output path.
- Raises:
KeyError – If the run does not exist.
- fail_pipeline(pipeline_id: str, error: str) None[source]
Mark a pipeline execution as failed and roll back its data.
Predictions, chains, and logs associated with this pipeline are removed. Artifacts whose reference count drops to zero become candidates for garbage collection.
- Parameters:
pipeline_id – Identifier returned by
begin_pipeline().error – Human-readable error description.
- fail_run(run_id: str, error: str) None[source]
Mark a run as failed.
Updates the run status to
"failed"and records the error message. Any pipelines still in"running"status under this run should be considered implicitly failed.- Parameters:
run_id – Identifier returned by
begin_run().error – Human-readable error description or traceback excerpt.
- gc_artifacts() int[source]
Garbage-collect unreferenced artifacts.
Removes artifact files from disk whose reference count has dropped to zero (i.e. no chain references them). Also removes the corresponding rows from the
artifactstable.- Returns:
Number of artifact files removed.
- get_artifact_path(artifact_id: str) Path[source]
Return the filesystem path of a stored artifact.
Useful when external tools need direct file access (e.g. for building a
.n4abundle ZIP).- Parameters:
artifact_id – Identifier returned by
save_artifact().- Returns:
Absolute path to the artifact file.
- Raises:
KeyError – If the artifact identifier is unknown.
- get_chain(chain_id: str) dict | None[source]
Retrieve a chain by its identifier.
- Returns:
A dictionary containing all chain fields (steps, model_step_idx, fold_artifacts, shared_artifacts, etc.), or
Noneif the chain does not exist.
- get_chains_for_pipeline(pipeline_id: str) DataFrame[source]
List all chains belonging to a pipeline.
- Returns:
A
polars.DataFramewith one row per chain, including columns forchain_id,model_class,preprocessings,branch_path, andsource_index.
- get_pipeline(pipeline_id: str) dict | None[source]
Retrieve a single pipeline record.
- Returns:
A dictionary with all pipeline fields (
pipeline_id,run_id,name,status,expanded_config,generator_choices,dataset_name,dataset_hash,best_val,best_test,metric,duration_ms,created_at,completed_at,error), orNoneif the pipeline does not exist.
- get_pipeline_log(pipeline_id: str) DataFrame[source]
Retrieve all log entries for a pipeline.
- Returns:
A
polars.DataFrameordered by(step_idx, timestamp)with columnsstep_idx,operator_class,event,duration_ms,message,details,level,timestamp.
- get_prediction(prediction_id: str, load_arrays: bool = False) dict | None[source]
Retrieve a single prediction record.
- Parameters:
prediction_id – Unique prediction identifier.
load_arrays – If
True, the returned dictionary includesy_true,y_pred,y_proba,sample_indices, andweightsasnumpy.ndarrayobjects. IfFalse(default), arrays are omitted for speed.
- Returns:
Prediction dictionary or
Noneif not found.
- get_run(run_id: str) dict | None[source]
Retrieve a single run record.
- Returns:
A dictionary with all run fields (
run_id,name,status,config,datasets,summary,created_at,completed_at,error), orNoneif the run does not exist.
- get_run_log_summary(run_id: str) DataFrame[source]
Aggregate log entries across all pipelines of a run.
Produces a summary suitable for dashboard display: per-pipeline total duration, step counts, warning/error counts.
- Returns:
A
polars.DataFramewith one row per pipeline in the run.
- list_pipelines(run_id: str | None = None, dataset_name: str | None = None) DataFrame[source]
List pipelines with optional filtering.
- Parameters:
run_id – Filter by parent run.
Nonereturns pipelines from all runs.dataset_name – Filter by dataset name.
Nonereturns pipelines for all datasets.
- Returns:
A
polars.DataFramewith one row per matching pipeline, ordered bycreated_atdescending.
- list_runs(status: str | None = None, dataset: str | None = None, limit: int = 100, offset: int = 0) DataFrame[source]
List runs with optional filtering and pagination.
- Parameters:
status – Filter by run status (
"running","completed","failed").Nonereturns all statuses.dataset – Filter to runs that reference this dataset name in their
datasetslist.limit – Maximum number of rows to return.
offset – Number of rows to skip (for pagination).
- Returns:
A
polars.DataFramewith one row per matching run, ordered bycreated_atdescending (newest first).
- load_artifact(artifact_id: str) Any[source]
Load a binary artifact from disk.
- Parameters:
artifact_id – Identifier returned by
save_artifact().- Returns:
The deserialised Python object.
- Raises:
FileNotFoundError – If the artifact file is missing from disk.
KeyError – If the artifact identifier is unknown.
- log_step(pipeline_id: str, step_idx: int, operator_class: str, event: str, duration_ms: int | None = None, message: str | None = None, details: dict | None = None, level: str = 'info') None[source]
Record a structured log entry for a pipeline step.
Step logs enable post-hoc analysis of execution timelines, per-step durations, warnings, and errors without parsing text log files.
- Parameters:
pipeline_id – Pipeline the step belongs to.
step_idx – Zero-based step index in the pipeline.
operator_class – Fully qualified class name of the operator executed in this step.
event – Event name (e.g.
"start","end","skip","warning","error").duration_ms – Step execution time in milliseconds (typically set on
"end"events).message – Optional human-readable message.
details – Optional structured details (stored as JSON).
level – Log level (
"debug","info","warning","error").
- query_predictions(dataset_name: str | None = None, model_class: str | None = None, partition: str | None = None, fold_id: str | None = None, branch_id: int | None = None, pipeline_id: str | None = None, run_id: str | None = None, limit: int | None = None, offset: int = 0) DataFrame[source]
Query predictions with flexible filtering.
All filter arguments are optional. When multiple filters are specified, they are combined with
ANDsemantics.- Parameters:
dataset_name – Filter by dataset name.
model_class – Filter by model class name (supports SQL
LIKEpatterns, e.g."PLS%").partition – Filter by partition (
"train","val","test").fold_id – Filter by fold identifier.
branch_id – Filter by branch index.
pipeline_id – Filter by parent pipeline.
run_id – Filter by parent run (joins through pipelines).
limit – Maximum number of rows.
Nonefor unlimited.offset – Number of rows to skip.
- Returns:
A
polars.DataFramewith one row per matching prediction (arrays are not included; useget_prediction()withload_arrays=Truefor that).
- register_existing_artifact(artifact_id: str, path: str, content_hash: str, operator_class: str, artifact_type: str, format: str, size_bytes: int) str[source]
Register an artifact that was already saved to disk.
This is used to bridge the ArtifactRegistry (which persists files during pipeline execution) with the DuckDB
artifactstable so thatload_artifact()and chain replay can find them.If an artifact with the same artifact_id already exists the call is silently ignored (idempotent).
- Parameters:
artifact_id – Artifact identifier (may be V3 format).
path – Relative path within the
artifacts/directory.content_hash – SHA-256 hex digest of the serialised content.
operator_class – Class name of the operator that produced it.
artifact_type – Category label (
"model","transformer", …).format – Serialisation format (
"joblib","cloudpickle", …).size_bytes – Size of the serialised content in bytes.
- Returns:
The artifact_id that was registered.
- replay_chain(chain_id: str, X: ndarray, wavelengths: ndarray | None = None) ndarray[source]
Replay a stored chain on new data to produce predictions.
Loads each step’s artifact, applies the transformation in order, and for the model step loads all fold models and returns the averaged prediction.
This is the primary in-workspace prediction path. For out-of-workspace prediction, export to
.n4afirst.- Parameters:
chain_id – Chain to replay.
X – Input feature matrix (
n_samples x n_features).wavelengths – Optional wavelength array for wavelength-aware operators. Required if any step in the chain uses
SpectraTransformerMixin.
- Returns:
Predicted values as a 1-D
numpy.ndarrayof shape(n_samples,).- Raises:
KeyError – If the chain does not exist.
RuntimeError – If the chain has no model step.
- save_artifact(obj: Any, operator_class: str, artifact_type: str, format: str) str[source]
Persist a binary artifact (fitted model or transformer).
The object is serialised to disk in the
artifacts/directory using content-addressed storage. If an identical artifact already exists (same content hash), the existing entry is reused and its reference count incremented.- Parameters:
obj – The Python object to persist (e.g. a fitted
sklearn.preprocessing.StandardScaler).operator_class – Fully qualified class name of the operator that produced this artifact.
artifact_type – Category label (
"model","transformer","scaler", etc.).format – Serialisation format hint (
"joblib","cloudpickle","keras_h5", etc.).
- Returns:
A unique artifact identifier. If the content already existed, the same identifier is returned (content-addressed deduplication).
- save_chain(pipeline_id: str, steps: list[dict], model_step_idx: int, model_class: str, preprocessings: str, fold_strategy: str, fold_artifacts: dict, shared_artifacts: dict, branch_path: list[int] | None = None, source_index: int | None = None) str[source]
Store a preprocessing-to-model chain.
A chain captures the complete, ordered sequence of steps (transformers and model) that were executed during training, together with references to the fitted artifacts for each fold. Chains are the unit of export and replay.
- Parameters:
pipeline_id – Parent pipeline identifier.
steps –
Ordered list of step descriptors. Each dict should contain at minimum:
{ "step_idx": int, "operator_class": str, "params": dict, "artifact_id": str | None, "stateless": bool, }
model_step_idx – Index (within steps) of the model step.
model_class – Fully qualified class name of the model (e.g.
"sklearn.cross_decomposition.PLSRegression").preprocessings – Short display string summarising the preprocessing chain (e.g.
"SNV>Detr>MinMax").fold_strategy – Cross-validation fold strategy identifier (e.g.
"per_fold"or"shared").fold_artifacts – Mapping from fold identifier (string) to the artifact ID of the model trained on that fold. Example:
{"fold_0": "art_abc123", "fold_1": "art_def456"}.shared_artifacts – Mapping from step index (string) to a list of artifact IDs of shared (non-fold-specific) fitted objects. Example:
{"0": ["art_scaler_abc"], "3": ["art_snv", "art_sg"]}.branch_path – For branching pipelines, the path of branch indices leading to this chain.
Nonefor non-branching pipelines.source_index – For multi-source pipelines, the source index this chain belongs to.
Nonefor single-source pipelines.
- Returns:
A unique chain identifier (UUID-based string).
- save_prediction(pipeline_id: str, chain_id: str, dataset_name: str, model_name: str, model_class: str, fold_id: str, partition: str, val_score: float, test_score: float, train_score: float, metric: str, task_type: str, n_samples: int, n_features: int, scores: dict, best_params: dict, branch_id: int | None, branch_name: str | None, exclusion_count: int, exclusion_rate: float, preprocessings: str = '', prediction_id: str | None = None) str[source]
Store a single prediction record.
A prediction captures the result of evaluating one model on one fold/partition combination: the scalar scores, the model identity, the preprocessing summary, and links back to the pipeline and chain that produced it.
Arrays (y_true, y_pred, etc.) are stored separately via
save_prediction_arrays().- Parameters:
pipeline_id – Parent pipeline identifier.
chain_id – Chain that produced this prediction.
dataset_name – Name of the dataset.
model_name – Short model name (e.g.
"PLSRegression").model_class – Fully qualified model class name.
fold_id – Fold identifier (e.g.
"fold_0","avg").partition – Data partition (
"train","val","test").val_score – Validation score (primary ranking metric).
test_score – Test score (for reporting).
train_score – Training score (for overfitting diagnostics).
metric – Name of the metric (e.g.
"rmse","r2").task_type –
"regression"or"classification".n_samples – Number of samples in this partition.
n_features – Number of features (wavelengths).
scores – Nested dict of all computed scores per partition. Example:
{"val": {"rmse": 0.12, "r2": 0.95}, ...}.best_params – Best hyperparameters found (if tuning was used).
branch_id – Branch index (0-based) or
None.branch_name – Human-readable branch name or
None.exclusion_count – Number of samples excluded by outlier filters.
exclusion_rate – Fraction of samples excluded (0.0 – 1.0).
preprocessings – Short display string for the preprocessing chain applied before this model.
- Returns:
A unique prediction identifier (UUID-based string).
- save_prediction_arrays(prediction_id: str, y_true: ndarray | None, y_pred: ndarray | None, y_proba: ndarray | None = None, sample_indices: ndarray | None = None, weights: ndarray | None = None) None[source]
Store the dense arrays associated with a prediction.
Arrays are stored as native
DOUBLE[]columns in DuckDB, enabling zero-copy Arrow transfer to Polars and efficient columnar compression.For classification tasks,
y_probaholds the class-probability matrix (shapen_samples x n_classes), flattened to 1-D for storage with the original shape recorded for reconstruction.- Parameters:
prediction_id – Prediction identifier returned by
save_prediction().y_true – Ground-truth values (1-D array).
y_pred – Predicted values (1-D array).
y_proba – Class probabilities (2-D array, flattened for storage).
Nonefor regression tasks.sample_indices – Original dataset indices of the samples in this partition/fold.
weights – Per-sample weights, if applicable.
- top_predictions(n: int, metric: str = 'val_score', ascending: bool = True, partition: str = 'val', dataset_name: str | None = None, group_by: str | None = None) DataFrame[source]
Return the top-N predictions ranked by a score column.
This is the primary ranking interface for finding the best models across a workspace.
- Parameters:
n – Number of top predictions to return. When
group_byis set, returns top n per group.metric – Column to rank by (
"val_score","test_score","train_score").ascending – If
True(default), lower values rank higher (appropriate for error metrics like RMSE). Set toFalsefor higher-is-better metrics like R2.partition – Only consider predictions from this partition.
dataset_name – Optional dataset filter.
group_by – Optional grouping column (e.g.
"model_class","dataset_name"). When set, the result contains top n rows per distinct value of this column.
- Returns:
A
polars.DataFramewith the top predictions.