Artifacts & Storage
This guide covers the artifact storage system and workspace structure in nirs4all.
Overview
The storage system is centered on a DuckDB-backed WorkspaceStore that provides:
Single database – all structured data in
store.duckdb(runs, pipelines, chains, predictions, logs)Content-addressed artifacts – binary deduplication via SHA-256 hashing in flat
artifacts/directoryChain-based replay – in-workspace prediction by replaying stored chains
Export on demand – no files written during training except
store.duckdband artifact binaries
Workspace Structure
workspace/
├── store.duckdb # All structured data (7 tables)
├── artifacts/ # Flat content-addressed binary storage
│ ├── ab/abc123def456.joblib # Sharded by first 2 chars of hash
│ └── cd/cde789012345.joblib
├── exports/ # User-triggered exports (on demand)
│ ├── wheat_model.n4a # Bundle exports
│ └── results.parquet # Prediction exports
└── library/ # Reusable pipeline templates
└── templates/
└── baseline_pls.json
DuckDB Tables
Table |
Purpose |
|---|---|
|
Experiment sessions (name, config, datasets, status) |
|
Individual pipeline executions within a run |
|
Preprocessing-to-model step sequences with artifact references |
|
Per-fold, per-partition prediction scores and metadata |
|
Dense arrays (y_true, y_pred, y_proba) |
|
Content-addressed artifact registry with ref_count |
|
Structured execution logs per pipeline step |
Using the WorkspaceStore
The WorkspaceStore is the central class for all workspace persistence:
from pathlib import Path
from nirs4all.pipeline.storage import WorkspaceStore
# Initialize (creates store.duckdb and artifacts/ if they don't exist)
store = WorkspaceStore(Path("./workspace"))
# Run lifecycle
run_id = store.begin_run("experiment_1", config={...}, datasets=[...])
pipeline_id = store.begin_pipeline(run_id, name="0001_pls", ...)
chain_id = store.save_chain(pipeline_id, steps=[...], ...)
pred_id = store.save_prediction(pipeline_id, chain_id, ...)
store.complete_pipeline(pipeline_id, best_val=0.95, ...)
store.complete_run(run_id, summary={"total_pipelines": 5})
# Queries (return polars.DataFrame)
top = store.top_predictions(n=5, metric="val_score")
preds = store.query_predictions(dataset_name="wheat", partition="val")
runs = store.list_runs(status="completed")
# Chain replay (in-workspace prediction)
y_pred = store.replay_chain(chain_id, X_new)
# Export (on demand)
store.export_chain(chain_id, Path("model.n4a"))
# Cleanup
store.delete_run(run_id)
store.gc_artifacts()
store.close()
Artifact Storage
Saving Artifacts
Artifacts are persisted through the WorkspaceStore.save_artifact() method:
# Save a fitted model
artifact_id = store.save_artifact(
obj=trained_model,
operator_class="sklearn.cross_decomposition.PLSRegression",
artifact_type="model",
format="joblib"
)
# Save a fitted transformer
artifact_id = store.save_artifact(
obj=fitted_scaler,
operator_class="sklearn.preprocessing.StandardScaler",
artifact_type="transformer",
format="joblib"
)
Content-Addressed Deduplication
When an artifact is saved, its binary content is SHA-256 hashed. If an identical artifact already exists (same content hash), the existing entry is reused and its ref_count incremented. This provides automatic deduplication across pipelines and runs.
# Same fitted scaler saved twice -> same artifact_id returned
id1 = store.save_artifact(scaler, "StandardScaler", "transformer", "joblib")
id2 = store.save_artifact(scaler, "StandardScaler", "transformer", "joblib")
assert id1 == id2 # Content-addressed deduplication
Loading Artifacts
# Load by artifact ID
model = store.load_artifact(artifact_id)
# Get filesystem path (for external tools or bundle building)
path = store.get_artifact_path(artifact_id)
Chain Management
A chain captures the complete, ordered sequence of steps (transformers and model) executed during training, with references to fitted artifacts for each fold. Chains are the unit of export and replay.
Building Chains from Execution Traces
The ChainBuilder converts an ExecutionTrace into the chain dict format:
from nirs4all.pipeline.storage import ChainBuilder
builder = ChainBuilder(trace, artifact_registry)
chain_data = builder.build()
chain_id = store.save_chain(pipeline_id=pipeline_id, **chain_data)
Chain Replay
Replay a stored chain on new data to produce predictions:
# In-workspace prediction
y_pred = store.replay_chain(chain_id, X_new)
# With wavelength-aware operators
y_pred = store.replay_chain(chain_id, X_new, wavelengths=wavelengths)
The replay loads each step’s artifact, applies transformations in order, and averages predictions across fold models.
Library Management
Save Pipeline Templates
from nirs4all.pipeline.storage import PipelineLibrary
library = PipelineLibrary(workspace_path)
# Save config-only template with category and tags
library.save_template(
pipeline_config=pipeline_dict,
name="baseline_pls",
category="regression",
description="PLS baseline with SNV preprocessing",
tags=["nirs", "pls"],
)
Load and Reuse
# List templates
templates = library.list_templates(category="regression")
for t in templates:
print(f"{t['name']}: {t['description']}")
# Load template
config = library.load_template("baseline_pls")
# Use in pipeline
runner = PipelineRunner(workspace="./workspace")
predictions = runner.run(config, new_dataset)
Cleanup Utilities
# Garbage-collect unreferenced artifacts (ref_count = 0)
removed = store.gc_artifacts()
print(f"Removed {removed} orphaned artifact files")
# Delete a run and all descendant data
rows_deleted = store.delete_run(run_id)
print(f"Deleted {rows_deleted} rows")
# Reclaim disk space after large deletions
store.vacuum()
Export Operations
All exports are user-triggered (on demand):
# Export chain as .n4a bundle (self-contained ZIP)
store.export_chain(chain_id, Path("exports/model.n4a"))
# Export pipeline configuration as JSON
store.export_pipeline_config(pipeline_id, Path("exports/config.json"))
# Export run metadata as YAML
store.export_run(run_id, Path("exports/run.yaml"))
# Export predictions as Parquet
store.export_predictions_parquet(
Path("exports/results.parquet"),
dataset_name="wheat",
partition="val"
)
Best Practices
Let the store handle deduplication – identical artifacts (same content hash) automatically share the same file via
ref_counttracking.Use chain replay for in-workspace prediction –
store.replay_chain()loads artifacts and applies transformations without exporting to.n4abundles.Export on demand – the workspace only produces
store.duckdbandartifacts/during training. Use the export methods to create shareable files.Clean up periodically – use
gc_artifacts()after deleting runs or pipelines to reclaim disk space from unreferenced artifact files.Close the store when done – call
store.close()to release the DuckDB connection.
See Also
Storage API Reference - Storage API reference
Workspace Architecture - Workspace architecture
Architecture Overview - Pipeline architecture overview
Writing a Pipeline in nirs4all - Pipeline configuration syntax