Artifacts Developer Guide
This guide covers internal implementation details and extension points for the nirs4all artifacts system. For user-facing documentation, see Artifacts & Storage.
Architecture Overview
The storage system has two layers:
WorkspaceStore (
nirs4all.pipeline.storage.workspace_store) – DuckDB-backed central store for all structured data and artifact management.Artifacts subpackage (
nirs4all.pipeline.storage.artifacts/) – Low-level artifact utilities (types, serialization, operator chains) used by WorkspaceStore and the execution engine.
nirs4all/pipeline/storage/
├── __init__.py # Public API exports (WorkspaceStore, ChainBuilder, etc.)
├── workspace_store.py # DuckDB-backed central store
├── store_schema.py # DuckDB table schema creation
├── store_queries.py # SQL query constants
├── store_protocol.py # StoreProtocol abstract interface
├── chain_builder.py # ChainBuilder (ExecutionTrace -> chain dict)
├── chain_replay.py # Standalone replay_chain() function
├── library.py # PipelineLibrary (template management)
└── artifacts/
├── __init__.py # Subpackage exports
├── types.py # ArtifactType enum
├── artifact_registry.py # ArtifactRegistry (used by execution engine)
├── artifact_loader.py # ArtifactLoader (loading, caching)
├── artifact_persistence.py # Low-level persist/load functions
├── operator_chain.py # OperatorChain (execution path tracking)
└── utils.py # Hashing, filename, ID utilities
Adding New Artifact Types
Step 1: Add to ArtifactType Enum
Edit nirs4all/pipeline/storage/artifacts/types.py:
class ArtifactType(Enum):
"""Classification of artifact types."""
MODEL = "model"
TRANSFORMER = "transformer"
SPLITTER = "splitter"
ENCODER = "encoder"
META_MODEL = "meta_model"
# Add new type here:
FEATURE_SELECTOR = "feature_selector" # New!
Step 2: Update Serialization Detection
Edit nirs4all/utils/serializer.py:
def _detect_artifact_type(obj: Any) -> str:
"""Detect artifact type from object."""
if hasattr(obj, 'get_support') and hasattr(obj, 'fit_transform'):
return 'feature_selector'
# ... existing detection ...
Step 3: Update Pipeline Controllers
Ensure the relevant controller returns the artifact in StepOutput:
return context, StepOutput(
artifacts={"feature_selector": fitted_selector}
)
The execution engine will call store.save_artifact() with the appropriate type.
Step 4: Add Unit Tests
def test_feature_selector_artifact_roundtrip(tmp_path):
"""Test FEATURE_SELECTOR artifact save/load via WorkspaceStore."""
from sklearn.feature_selection import SelectKBest
from nirs4all.pipeline.storage import WorkspaceStore
store = WorkspaceStore(tmp_path / "workspace")
selector = SelectKBest(k=5)
selector.fit([[1, 2, 3, 4, 5], [6, 7, 8, 9, 10]], [0, 1])
artifact_id = store.save_artifact(
obj=selector,
operator_class="sklearn.feature_selection.SelectKBest",
artifact_type="feature_selector",
format="joblib"
)
loaded = store.load_artifact(artifact_id)
assert isinstance(loaded, SelectKBest)
store.close()
Custom Serialization Formats
Adding a New Format
Add format handler in
nirs4all/utils/serializer.py:
def _save_onnx(obj, path: Path) -> None:
"""Save ONNX model."""
import onnx
onnx.save(obj, str(path))
def _load_onnx(path: Path) -> Any:
"""Load ONNX model."""
import onnx
return onnx.load(str(path))
FORMAT_HANDLERS = {
# ... existing handlers ...
"onnx": (_save_onnx, _load_onnx, ".onnx"),
}
Update format detection:
def _detect_format(obj: Any) -> str:
"""Detect best serialization format."""
if hasattr(obj, 'SerializeToString'): # ONNX models
return "onnx"
# ... existing detection ...
Dependency Graph Implementation
Topological Sorting
The DependencyGraph ensures artifacts are loaded in the correct order:
class DependencyGraph:
"""DAG for artifact dependencies."""
def __init__(self):
self._nodes: Set[str] = set()
self._edges: Dict[str, Set[str]] = defaultdict(set)
def add_edge(self, from_node: str, to_node: str) -> None:
"""Add dependency: from_node depends on to_node."""
self._nodes.add(from_node)
self._nodes.add(to_node)
self._edges[from_node].add(to_node)
def topological_sort(self) -> List[str]:
"""Return nodes in dependency order (Kahn's algorithm)."""
in_degree = {node: 0 for node in self._nodes}
for node, deps in self._edges.items():
for dep in deps:
in_degree[dep] += 1
queue = [n for n, d in in_degree.items() if d == 0]
result = []
while queue:
node = queue.pop(0)
result.append(node)
for dependent in self._get_dependents(node):
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
if len(result) != len(self._nodes):
raise ValueError("Cycle detected in dependency graph")
return result
LRU Cache Implementation
class LRUCache:
"""Least Recently Used cache with statistics."""
def __init__(self, max_size: int = 100):
self._cache: OrderedDict[str, Any] = OrderedDict()
self._max_size = max_size
self._hits = 0
self._misses = 0
def get(self, key: str) -> Optional[Any]:
"""Get item, moving to end (most recent)."""
if key in self._cache:
self._hits += 1
self._cache.move_to_end(key)
return self._cache[key]
self._misses += 1
return None
def put(self, key: str, value: Any) -> None:
"""Add item, evicting oldest if at capacity."""
if key in self._cache:
self._cache.move_to_end(key)
else:
if len(self._cache) >= self._max_size:
self._cache.popitem(last=False)
self._cache[key] = value
Testing Patterns
Unit Test Structure
# tests/unit/pipeline/storage/test_workspace_store.py
import pytest
from pathlib import Path
from nirs4all.pipeline.storage import WorkspaceStore
@pytest.fixture
def store(tmp_path):
"""Create a WorkspaceStore for testing."""
s = WorkspaceStore(tmp_path / "workspace")
yield s
s.close()
class TestWorkspaceStore:
def test_run_lifecycle(self, store):
"""Test basic run lifecycle."""
run_id = store.begin_run("test", config={}, datasets=[])
store.complete_run(run_id, summary={"total": 1})
run = store.get_run(run_id)
assert run is not None
assert run["status"] == "completed"
def test_artifact_deduplication(self, store):
"""Test that identical objects share the same file."""
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
id1 = store.save_artifact(scaler, "StandardScaler", "transformer", "joblib")
id2 = store.save_artifact(scaler, "StandardScaler", "transformer", "joblib")
assert id1 == id2 # Content-addressed deduplication
Integration Test Structure
# tests/integration/artifacts/test_artifact_flow.py
class TestArtifactFlow:
"""Integration tests for training -> prediction artifact flow."""
def test_train_creates_store_with_artifacts(self, workspace_path, dataset):
"""Test that training creates store.duckdb and artifacts."""
runner = PipelineRunner(
workspace_path=workspace_path,
save_artifacts=True,
verbose=0,
)
predictions, _ = runner.run(PipelineConfigs(pipeline), dataset)
assert (workspace_path / "store.duckdb").exists()
artifacts = list((workspace_path / "artifacts").rglob("*.joblib"))
assert len(artifacts) >= 1
Debugging Tips
Inspecting the Store
from nirs4all.pipeline.storage import WorkspaceStore
store = WorkspaceStore(workspace_path)
# List all runs
runs = store.list_runs()
print(runs)
# Get top predictions
top = store.top_predictions(n=5, metric="val_score")
print(top)
# Inspect a chain
chain = store.get_chain(chain_id)
print(f"Model: {chain['model_class']}")
print(f"Steps: {len(chain['steps'])}")
print(f"Fold artifacts: {chain['fold_artifacts']}")
store.close()
DuckDB Direct Queries
For advanced debugging, query the DuckDB store directly:
import duckdb
conn = duckdb.connect(str(workspace_path / "store.duckdb"))
# Count records
print(conn.execute("SELECT COUNT(*) FROM predictions").fetchone())
# Inspect artifacts
print(conn.execute(
"SELECT artifact_id, operator_class, ref_count, size_bytes FROM artifacts"
).fetchall())
conn.close()
Cleanup Diagnostics
store = WorkspaceStore(workspace_path)
# Check for unreferenced artifacts
orphan_count = store.gc_artifacts()
print(f"Removed {orphan_count} orphaned artifacts")
# Reclaim disk space
store.vacuum()
store.close()
See Also
Artifacts & Storage - User guide for artifacts system
Storage API Reference - Storage API reference
Architecture Overview - Pipeline architecture overview