Artifacts Developer Guide
This guide covers internal implementation details and extension points for the nirs4all artifacts system. For user-facing documentation, see Artifacts & Storage.
Architecture Overview
nirs4all/pipeline/storage/artifacts/
├── __init__.py # Public API exports
├── types.py # ArtifactType, ArtifactRecord, MetaModelConfig
├── registry.py # ArtifactRegistry (registration, deduplication)
├── loader.py # ArtifactLoader (loading, caching)
├── graph.py # DependencyGraph (topological sorting)
└── utils.py # ID generation, hashing, filename utilities
Adding New Artifact Types
Step 1: Add to ArtifactType Enum
Edit nirs4all/pipeline/storage/artifacts/types.py:
class ArtifactType(Enum):
"""Classification of artifact types."""
MODEL = "model"
TRANSFORMER = "transformer"
SPLITTER = "splitter"
ENCODER = "encoder"
META_MODEL = "meta_model"
# Add new type here:
FEATURE_SELECTOR = "feature_selector" # New!
Step 2: Update Serialization Detection
Edit nirs4all/utils/serializer.py:
def _detect_artifact_type(obj: Any) -> str:
"""Detect artifact type from object."""
if hasattr(obj, 'get_support') and hasattr(obj, 'fit_transform'):
return 'feature_selector'
# ... existing detection ...
Step 3: Update Pipeline Controllers
Ensure the relevant controller uses the new type:
record = registry.register(
obj=fitted_selector,
artifact_id=artifact_id,
artifact_type=ArtifactType.FEATURE_SELECTOR,
)
Step 4: Add Unit Tests
def test_feature_selector_type():
"""Test FEATURE_SELECTOR type serialization."""
assert ArtifactType.FEATURE_SELECTOR.value == "feature_selector"
record = ArtifactRecord(
artifact_id="0001:0:all",
content_hash="sha256:abc123",
path="feature_selector_SelectKBest_abc123.pkl",
pipeline_id="0001",
artifact_type=ArtifactType.FEATURE_SELECTOR,
class_name="SelectKBest"
)
d = record.to_dict()
assert d["artifact_type"] == "feature_selector"
Custom Serialization Formats
Adding a New Format
Add format handler in
nirs4all/utils/serializer.py:
def _save_onnx(obj, path: Path) -> None:
"""Save ONNX model."""
import onnx
onnx.save(obj, str(path))
def _load_onnx(path: Path) -> Any:
"""Load ONNX model."""
import onnx
return onnx.load(str(path))
FORMAT_HANDLERS = {
# ... existing handlers ...
"onnx": (_save_onnx, _load_onnx, ".onnx"),
}
Update format detection:
def _detect_format(obj: Any) -> str:
"""Detect best serialization format."""
if hasattr(obj, 'SerializeToString'): # ONNX models
return "onnx"
# ... existing detection ...
Dependency Graph Implementation
Topological Sorting
The DependencyGraph ensures artifacts are loaded in the correct order:
class DependencyGraph:
"""DAG for artifact dependencies."""
def __init__(self):
self._nodes: Set[str] = set()
self._edges: Dict[str, Set[str]] = defaultdict(set)
def add_edge(self, from_node: str, to_node: str) -> None:
"""Add dependency: from_node depends on to_node."""
self._nodes.add(from_node)
self._nodes.add(to_node)
self._edges[from_node].add(to_node)
def topological_sort(self) -> List[str]:
"""Return nodes in dependency order (Kahn's algorithm)."""
in_degree = {node: 0 for node in self._nodes}
for node, deps in self._edges.items():
for dep in deps:
in_degree[dep] += 1
queue = [n for n, d in in_degree.items() if d == 0]
result = []
while queue:
node = queue.pop(0)
result.append(node)
for dependent in self._get_dependents(node):
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
if len(result) != len(self._nodes):
raise ValueError("Cycle detected in dependency graph")
return result
Cycle Detection
The registry prevents cycles when registering dependencies:
def register(self, ..., depends_on: List[str] = None):
"""Register artifact with dependency validation."""
if depends_on:
# Check all dependencies exist
for dep_id in depends_on:
if not self.resolve(dep_id):
raise ValueError(f"Unknown dependency: {dep_id}")
# Check for cycles
temp_graph = self._graph.copy()
for dep_id in depends_on:
temp_graph.add_edge(artifact_id, dep_id)
try:
temp_graph.topological_sort()
except ValueError:
raise ValueError(f"Cycle detected when adding {artifact_id}")
LRU Cache Implementation
class LRUCache:
"""Least Recently Used cache with statistics."""
def __init__(self, max_size: int = 100):
self._cache: OrderedDict[str, Any] = OrderedDict()
self._max_size = max_size
self._hits = 0
self._misses = 0
def get(self, key: str) -> Optional[Any]:
"""Get item, moving to end (most recent)."""
if key in self._cache:
self._hits += 1
self._cache.move_to_end(key)
return self._cache[key]
self._misses += 1
return None
def put(self, key: str, value: Any) -> None:
"""Add item, evicting oldest if at capacity."""
if key in self._cache:
self._cache.move_to_end(key)
else:
if len(self._cache) >= self._max_size:
self._cache.popitem(last=False)
self._cache[key] = value
Testing Patterns
Unit Test Structure
# tests/unit/pipeline/storage/artifacts/test_registry.py
import pytest
from pathlib import Path
from nirs4all.pipeline.storage.artifacts import (
ArtifactRegistry,
ArtifactType,
)
@pytest.fixture
def temp_workspace(tmp_path):
"""Create temporary workspace structure."""
binaries = tmp_path / "workspace" / "binaries" / "test_dataset"
binaries.mkdir(parents=True)
return tmp_path / "workspace"
@pytest.fixture
def registry(temp_workspace):
"""Create registry for testing."""
return ArtifactRegistry(temp_workspace, "test_dataset")
class TestArtifactRegistry:
def test_register_model(self, registry):
"""Test basic model registration."""
from sklearn.linear_model import LinearRegression
model = LinearRegression().fit([[1], [2]], [1, 2])
record = registry.register(
obj=model,
artifact_id="0001:0:all",
artifact_type=ArtifactType.MODEL
)
assert record.artifact_id == "0001:0:all"
assert record.artifact_type == ArtifactType.MODEL
assert record.class_name == "LinearRegression"
assert record.content_hash.startswith("sha256:")
def test_deduplication(self, registry):
"""Test that identical objects share same file."""
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler().fit([[1, 2], [3, 4]])
record1 = registry.register(scaler, "0001:0:all", ArtifactType.TRANSFORMER)
record2 = registry.register(scaler, "0002:0:all", ArtifactType.TRANSFORMER)
assert record1.content_hash == record2.content_hash
assert record1.path == record2.path
Integration Test Structure
# tests/integration/artifacts/test_artifact_flow.py
class TestArtifactFlow:
"""Integration tests for training → prediction artifact flow."""
@pytest.fixture
def sample_pipeline(self):
return [
{"ShuffleSplit": {"n_splits": 2, "test_size": 0.2}},
{"MinMaxScaler": {}},
{"PLSRegression": {"n_components": 3}},
]
def test_train_and_predict(self, temp_workspace, sample_pipeline, sample_data):
"""Test artifacts from training can be loaded for prediction."""
runner = PipelineRunner(
workspace=temp_workspace,
save_artifacts=True
)
predictions, outputs = runner.run(sample_pipeline, sample_data)
manifest = outputs.manifest
loader = ArtifactLoader(temp_workspace, sample_data["dataset"])
loader.import_from_manifest(manifest)
for artifact_id in loader.list_artifact_ids():
obj = loader.load_by_id(artifact_id)
assert obj is not None
Debugging Tips
Inspecting Artifacts
from nirs4all.pipeline.storage.artifacts import ArtifactRegistry
registry = ArtifactRegistry(workspace_path, dataset)
for artifact_id, record in registry._records.items():
print(f"{artifact_id}: {record.class_name} ({record.artifact_type.value})")
print(f" Hash: {record.short_hash}")
print(f" Dependencies: {record.depends_on}")
Cache Statistics
loader = ArtifactLoader(workspace, dataset)
loader.import_from_manifest(manifest)
# Load some artifacts...
loader.load_by_id("0001:0:all")
loader.load_by_id("0001:0:all") # Cache hit
info = loader.get_cache_info()
print(f"Cache size: {info['size']}/{info['max_size']}")
print(f"Hit rate: {info['hit_rate']:.1%}")
Orphan Detection
registry = ArtifactRegistry(workspace, dataset)
orphans = registry.find_orphaned_artifacts()
print(f"Orphaned files: {len(orphans)}")
stats = registry.get_stats()
print(f"Files on disk: {stats['files_on_disk']}")
print(f"Deduplication: {stats['deduplication_ratio']:.1%}")
CLI Implementation
The artifacts CLI commands are in nirs4all/cli/commands/artifacts.py:
import click
from pathlib import Path
from nirs4all.pipeline.storage.artifacts import ArtifactRegistry
@click.group(name="artifacts")
def artifacts_cli():
"""Manage pipeline artifacts."""
pass
@artifacts_cli.command()
@click.option("--dataset", required=True, help="Dataset name")
@click.option("--workspace", default="workspace", help="Workspace path")
def stats(dataset: str, workspace: str):
"""Show artifact storage statistics."""
registry = ArtifactRegistry(Path(workspace), dataset)
stats = registry.get_stats()
click.echo(f"Files on disk: {stats['files_on_disk']}")
click.echo(f"Registered refs: {stats['registered_refs']}")
click.echo(f"Deduplication: {stats['deduplication_ratio']:.1%}")
@artifacts_cli.command(name="cleanup")
@click.option("--dataset", required=True)
@click.option("--workspace", default="workspace")
@click.option("--force", is_flag=True, help="Actually delete files")
def cleanup(dataset: str, workspace: str, force: bool):
"""Delete orphaned artifacts."""
registry = ArtifactRegistry(Path(workspace), dataset)
deleted, bytes_freed = registry.delete_orphaned_artifacts(dry_run=not force)
if force:
click.echo(f"Deleted {len(deleted)} files, freed {bytes_freed / 1024:.1f} KB")
else:
click.echo(f"Would delete {len(deleted)} files")
click.echo("Run with --force to delete")
ArtifactRecord Reference
@dataclass
class ArtifactRecord:
"""Complete artifact metadata."""
# Identification
artifact_id: str
content_hash: str
# Location
path: str
# Context
pipeline_id: str
branch_path: List[int] = field(default_factory=list)
step_index: int = 0
fold_id: Optional[int] = None
# Classification
artifact_type: ArtifactType = ArtifactType.MODEL
class_name: str = ""
# Dependencies
depends_on: List[str] = field(default_factory=list)
# Meta-model config (for stacking)
meta_config: Optional[MetaModelConfig] = None
# Serialization
format: str = "joblib"
format_version: str = ""
nirs4all_version: str = ""
# Metadata
size_bytes: int = 0
created_at: str = ""
params: Dict[str, Any] = field(default_factory=dict)
# Computed properties
@property
def is_branch_specific(self) -> bool: ...
@property
def is_fold_specific(self) -> bool: ...
@property
def is_meta_model(self) -> bool: ...
@property
def short_hash(self) -> str: ...
See Also
Artifacts & Storage - User guide for artifacts system
Storage API Reference - Storage API reference
Architecture Overview - Pipeline architecture overview