Artifacts & Storage

This guide covers the artifact storage system and workspace structure in nirs4all.

Overview

The storage system is centered on a DuckDB-backed WorkspaceStore that provides:

  • Single database – all structured data in store.duckdb (runs, pipelines, chains, predictions, logs)

  • Content-addressed artifacts – binary deduplication via SHA-256 hashing in flat artifacts/ directory

  • Chain-based replay – in-workspace prediction by replaying stored chains

  • Export on demand – no files written during training except store.duckdb and artifact binaries

Workspace Structure

workspace/
├── store.duckdb                   # All structured data (7 tables)
├── artifacts/                     # Flat content-addressed binary storage
│   ├── ab/abc123def456.joblib     # Sharded by first 2 chars of hash
│   └── cd/cde789012345.joblib
├── exports/                       # User-triggered exports (on demand)
│   ├── wheat_model.n4a            # Bundle exports
│   └── results.parquet            # Prediction exports
└── library/                       # Reusable pipeline templates
    └── templates/
        └── baseline_pls.json

DuckDB Tables

Table

Purpose

runs

Experiment sessions (name, config, datasets, status)

pipelines

Individual pipeline executions within a run

chains

Preprocessing-to-model step sequences with artifact references

predictions

Per-fold, per-partition prediction scores and metadata

prediction_arrays

Dense arrays (y_true, y_pred, y_proba)

artifacts

Content-addressed artifact registry with ref_count

logs

Structured execution logs per pipeline step

Using the WorkspaceStore

The WorkspaceStore is the central class for all workspace persistence:

from pathlib import Path
from nirs4all.pipeline.storage import WorkspaceStore

# Initialize (creates store.duckdb and artifacts/ if they don't exist)
store = WorkspaceStore(Path("./workspace"))

# Run lifecycle
run_id = store.begin_run("experiment_1", config={...}, datasets=[...])
pipeline_id = store.begin_pipeline(run_id, name="0001_pls", ...)
chain_id = store.save_chain(pipeline_id, steps=[...], ...)
pred_id = store.save_prediction(pipeline_id, chain_id, ...)
store.complete_pipeline(pipeline_id, best_val=0.95, ...)
store.complete_run(run_id, summary={"total_pipelines": 5})

# Queries (return polars.DataFrame)
top = store.top_predictions(n=5, metric="val_score")
preds = store.query_predictions(dataset_name="wheat", partition="val")
runs = store.list_runs(status="completed")

# Chain replay (in-workspace prediction)
y_pred = store.replay_chain(chain_id, X_new)

# Export (on demand)
store.export_chain(chain_id, Path("model.n4a"))

# Cleanup
store.delete_run(run_id)
store.gc_artifacts()
store.close()

Artifact Storage

Saving Artifacts

Artifacts are persisted through the WorkspaceStore.save_artifact() method:

# Save a fitted model
artifact_id = store.save_artifact(
    obj=trained_model,
    operator_class="sklearn.cross_decomposition.PLSRegression",
    artifact_type="model",
    format="joblib"
)

# Save a fitted transformer
artifact_id = store.save_artifact(
    obj=fitted_scaler,
    operator_class="sklearn.preprocessing.StandardScaler",
    artifact_type="transformer",
    format="joblib"
)

Content-Addressed Deduplication

When an artifact is saved, its binary content is SHA-256 hashed. If an identical artifact already exists (same content hash), the existing entry is reused and its ref_count incremented. This provides automatic deduplication across pipelines and runs.

# Same fitted scaler saved twice -> same artifact_id returned
id1 = store.save_artifact(scaler, "StandardScaler", "transformer", "joblib")
id2 = store.save_artifact(scaler, "StandardScaler", "transformer", "joblib")
assert id1 == id2  # Content-addressed deduplication

Loading Artifacts

# Load by artifact ID
model = store.load_artifact(artifact_id)

# Get filesystem path (for external tools or bundle building)
path = store.get_artifact_path(artifact_id)

Chain Management

A chain captures the complete, ordered sequence of steps (transformers and model) executed during training, with references to fitted artifacts for each fold. Chains are the unit of export and replay.

Building Chains from Execution Traces

The ChainBuilder converts an ExecutionTrace into the chain dict format:

from nirs4all.pipeline.storage import ChainBuilder

builder = ChainBuilder(trace, artifact_registry)
chain_data = builder.build()
chain_id = store.save_chain(pipeline_id=pipeline_id, **chain_data)

Chain Replay

Replay a stored chain on new data to produce predictions:

# In-workspace prediction
y_pred = store.replay_chain(chain_id, X_new)

# With wavelength-aware operators
y_pred = store.replay_chain(chain_id, X_new, wavelengths=wavelengths)

The replay loads each step’s artifact, applies transformations in order, and averages predictions across fold models.

Library Management

Save Pipeline Templates

from nirs4all.pipeline.storage import PipelineLibrary

library = PipelineLibrary(workspace_path)

# Save config-only template with category and tags
library.save_template(
    pipeline_config=pipeline_dict,
    name="baseline_pls",
    category="regression",
    description="PLS baseline with SNV preprocessing",
    tags=["nirs", "pls"],
)

Load and Reuse

# List templates
templates = library.list_templates(category="regression")
for t in templates:
    print(f"{t['name']}: {t['description']}")

# Load template
config = library.load_template("baseline_pls")

# Use in pipeline
runner = PipelineRunner(workspace="./workspace")
predictions = runner.run(config, new_dataset)

Cleanup Utilities

# Garbage-collect unreferenced artifacts (ref_count = 0)
removed = store.gc_artifacts()
print(f"Removed {removed} orphaned artifact files")

# Delete a run and all descendant data
rows_deleted = store.delete_run(run_id)
print(f"Deleted {rows_deleted} rows")

# Reclaim disk space after large deletions
store.vacuum()

Export Operations

All exports are user-triggered (on demand):

# Export chain as .n4a bundle (self-contained ZIP)
store.export_chain(chain_id, Path("exports/model.n4a"))

# Export pipeline configuration as JSON
store.export_pipeline_config(pipeline_id, Path("exports/config.json"))

# Export run metadata as YAML
store.export_run(run_id, Path("exports/run.yaml"))

# Export predictions as Parquet
store.export_predictions_parquet(
    Path("exports/results.parquet"),
    dataset_name="wheat",
    partition="val"
)

Best Practices

  1. Let the store handle deduplication – identical artifacts (same content hash) automatically share the same file via ref_count tracking.

  2. Use chain replay for in-workspace predictionstore.replay_chain() loads artifacts and applies transformations without exporting to .n4a bundles.

  3. Export on demand – the workspace only produces store.duckdb and artifacts/ during training. Use the export methods to create shareable files.

  4. Clean up periodically – use gc_artifacts() after deleting runs or pipelines to reclaim disk space from unreferenced artifact files.

  5. Close the store when done – call store.close() to release the DuckDB connection.

See Also