Artifacts Developer Guide

This guide covers internal implementation details and extension points for the nirs4all artifacts system. For user-facing documentation, see Artifacts & Storage.

Architecture Overview

nirs4all/pipeline/storage/artifacts/
├── __init__.py           # Public API exports
├── types.py              # ArtifactType, ArtifactRecord, MetaModelConfig
├── registry.py           # ArtifactRegistry (registration, deduplication)
├── loader.py             # ArtifactLoader (loading, caching)
├── graph.py              # DependencyGraph (topological sorting)
└── utils.py              # ID generation, hashing, filename utilities

Adding New Artifact Types

Step 1: Add to ArtifactType Enum

Edit nirs4all/pipeline/storage/artifacts/types.py:

class ArtifactType(Enum):
    """Classification of artifact types."""

    MODEL = "model"
    TRANSFORMER = "transformer"
    SPLITTER = "splitter"
    ENCODER = "encoder"
    META_MODEL = "meta_model"
    # Add new type here:
    FEATURE_SELECTOR = "feature_selector"  # New!

Step 2: Update Serialization Detection

Edit nirs4all/utils/serializer.py:

def _detect_artifact_type(obj: Any) -> str:
    """Detect artifact type from object."""
    if hasattr(obj, 'get_support') and hasattr(obj, 'fit_transform'):
        return 'feature_selector'
    # ... existing detection ...

Step 3: Update Pipeline Controllers

Ensure the relevant controller uses the new type:

record = registry.register(
    obj=fitted_selector,
    artifact_id=artifact_id,
    artifact_type=ArtifactType.FEATURE_SELECTOR,
)

Step 4: Add Unit Tests

def test_feature_selector_type():
    """Test FEATURE_SELECTOR type serialization."""
    assert ArtifactType.FEATURE_SELECTOR.value == "feature_selector"

    record = ArtifactRecord(
        artifact_id="0001:0:all",
        content_hash="sha256:abc123",
        path="feature_selector_SelectKBest_abc123.pkl",
        pipeline_id="0001",
        artifact_type=ArtifactType.FEATURE_SELECTOR,
        class_name="SelectKBest"
    )

    d = record.to_dict()
    assert d["artifact_type"] == "feature_selector"

Custom Serialization Formats

Adding a New Format

  1. Add format handler in nirs4all/utils/serializer.py:

def _save_onnx(obj, path: Path) -> None:
    """Save ONNX model."""
    import onnx
    onnx.save(obj, str(path))

def _load_onnx(path: Path) -> Any:
    """Load ONNX model."""
    import onnx
    return onnx.load(str(path))

FORMAT_HANDLERS = {
    # ... existing handlers ...
    "onnx": (_save_onnx, _load_onnx, ".onnx"),
}
  1. Update format detection:

def _detect_format(obj: Any) -> str:
    """Detect best serialization format."""
    if hasattr(obj, 'SerializeToString'):  # ONNX models
        return "onnx"
    # ... existing detection ...

Dependency Graph Implementation

Topological Sorting

The DependencyGraph ensures artifacts are loaded in the correct order:

class DependencyGraph:
    """DAG for artifact dependencies."""

    def __init__(self):
        self._nodes: Set[str] = set()
        self._edges: Dict[str, Set[str]] = defaultdict(set)

    def add_edge(self, from_node: str, to_node: str) -> None:
        """Add dependency: from_node depends on to_node."""
        self._nodes.add(from_node)
        self._nodes.add(to_node)
        self._edges[from_node].add(to_node)

    def topological_sort(self) -> List[str]:
        """Return nodes in dependency order (Kahn's algorithm)."""
        in_degree = {node: 0 for node in self._nodes}
        for node, deps in self._edges.items():
            for dep in deps:
                in_degree[dep] += 1

        queue = [n for n, d in in_degree.items() if d == 0]
        result = []

        while queue:
            node = queue.pop(0)
            result.append(node)

            for dependent in self._get_dependents(node):
                in_degree[dependent] -= 1
                if in_degree[dependent] == 0:
                    queue.append(dependent)

        if len(result) != len(self._nodes):
            raise ValueError("Cycle detected in dependency graph")

        return result

Cycle Detection

The registry prevents cycles when registering dependencies:

def register(self, ..., depends_on: List[str] = None):
    """Register artifact with dependency validation."""
    if depends_on:
        # Check all dependencies exist
        for dep_id in depends_on:
            if not self.resolve(dep_id):
                raise ValueError(f"Unknown dependency: {dep_id}")

        # Check for cycles
        temp_graph = self._graph.copy()
        for dep_id in depends_on:
            temp_graph.add_edge(artifact_id, dep_id)

        try:
            temp_graph.topological_sort()
        except ValueError:
            raise ValueError(f"Cycle detected when adding {artifact_id}")

LRU Cache Implementation

class LRUCache:
    """Least Recently Used cache with statistics."""

    def __init__(self, max_size: int = 100):
        self._cache: OrderedDict[str, Any] = OrderedDict()
        self._max_size = max_size
        self._hits = 0
        self._misses = 0

    def get(self, key: str) -> Optional[Any]:
        """Get item, moving to end (most recent)."""
        if key in self._cache:
            self._hits += 1
            self._cache.move_to_end(key)
            return self._cache[key]
        self._misses += 1
        return None

    def put(self, key: str, value: Any) -> None:
        """Add item, evicting oldest if at capacity."""
        if key in self._cache:
            self._cache.move_to_end(key)
        else:
            if len(self._cache) >= self._max_size:
                self._cache.popitem(last=False)
            self._cache[key] = value

Testing Patterns

Unit Test Structure

# tests/unit/pipeline/storage/artifacts/test_registry.py

import pytest
from pathlib import Path
from nirs4all.pipeline.storage.artifacts import (
    ArtifactRegistry,
    ArtifactType,
)

@pytest.fixture
def temp_workspace(tmp_path):
    """Create temporary workspace structure."""
    binaries = tmp_path / "workspace" / "binaries" / "test_dataset"
    binaries.mkdir(parents=True)
    return tmp_path / "workspace"

@pytest.fixture
def registry(temp_workspace):
    """Create registry for testing."""
    return ArtifactRegistry(temp_workspace, "test_dataset")

class TestArtifactRegistry:
    def test_register_model(self, registry):
        """Test basic model registration."""
        from sklearn.linear_model import LinearRegression
        model = LinearRegression().fit([[1], [2]], [1, 2])

        record = registry.register(
            obj=model,
            artifact_id="0001:0:all",
            artifact_type=ArtifactType.MODEL
        )

        assert record.artifact_id == "0001:0:all"
        assert record.artifact_type == ArtifactType.MODEL
        assert record.class_name == "LinearRegression"
        assert record.content_hash.startswith("sha256:")

    def test_deduplication(self, registry):
        """Test that identical objects share same file."""
        from sklearn.preprocessing import StandardScaler
        scaler = StandardScaler().fit([[1, 2], [3, 4]])

        record1 = registry.register(scaler, "0001:0:all", ArtifactType.TRANSFORMER)
        record2 = registry.register(scaler, "0002:0:all", ArtifactType.TRANSFORMER)

        assert record1.content_hash == record2.content_hash
        assert record1.path == record2.path

Integration Test Structure

# tests/integration/artifacts/test_artifact_flow.py

class TestArtifactFlow:
    """Integration tests for training → prediction artifact flow."""

    @pytest.fixture
    def sample_pipeline(self):
        return [
            {"ShuffleSplit": {"n_splits": 2, "test_size": 0.2}},
            {"MinMaxScaler": {}},
            {"PLSRegression": {"n_components": 3}},
        ]

    def test_train_and_predict(self, temp_workspace, sample_pipeline, sample_data):
        """Test artifacts from training can be loaded for prediction."""
        runner = PipelineRunner(
            workspace=temp_workspace,
            save_artifacts=True
        )

        predictions, outputs = runner.run(sample_pipeline, sample_data)
        manifest = outputs.manifest

        loader = ArtifactLoader(temp_workspace, sample_data["dataset"])
        loader.import_from_manifest(manifest)

        for artifact_id in loader.list_artifact_ids():
            obj = loader.load_by_id(artifact_id)
            assert obj is not None

Debugging Tips

Inspecting Artifacts

from nirs4all.pipeline.storage.artifacts import ArtifactRegistry

registry = ArtifactRegistry(workspace_path, dataset)

for artifact_id, record in registry._records.items():
    print(f"{artifact_id}: {record.class_name} ({record.artifact_type.value})")
    print(f"  Hash: {record.short_hash}")
    print(f"  Dependencies: {record.depends_on}")

Cache Statistics

loader = ArtifactLoader(workspace, dataset)
loader.import_from_manifest(manifest)

# Load some artifacts...
loader.load_by_id("0001:0:all")
loader.load_by_id("0001:0:all")  # Cache hit

info = loader.get_cache_info()
print(f"Cache size: {info['size']}/{info['max_size']}")
print(f"Hit rate: {info['hit_rate']:.1%}")

Orphan Detection

registry = ArtifactRegistry(workspace, dataset)

orphans = registry.find_orphaned_artifacts()
print(f"Orphaned files: {len(orphans)}")

stats = registry.get_stats()
print(f"Files on disk: {stats['files_on_disk']}")
print(f"Deduplication: {stats['deduplication_ratio']:.1%}")

CLI Implementation

The artifacts CLI commands are in nirs4all/cli/commands/artifacts.py:

import click
from pathlib import Path
from nirs4all.pipeline.storage.artifacts import ArtifactRegistry

@click.group(name="artifacts")
def artifacts_cli():
    """Manage pipeline artifacts."""
    pass

@artifacts_cli.command()
@click.option("--dataset", required=True, help="Dataset name")
@click.option("--workspace", default="workspace", help="Workspace path")
def stats(dataset: str, workspace: str):
    """Show artifact storage statistics."""
    registry = ArtifactRegistry(Path(workspace), dataset)
    stats = registry.get_stats()

    click.echo(f"Files on disk:     {stats['files_on_disk']}")
    click.echo(f"Registered refs:   {stats['registered_refs']}")
    click.echo(f"Deduplication:     {stats['deduplication_ratio']:.1%}")

@artifacts_cli.command(name="cleanup")
@click.option("--dataset", required=True)
@click.option("--workspace", default="workspace")
@click.option("--force", is_flag=True, help="Actually delete files")
def cleanup(dataset: str, workspace: str, force: bool):
    """Delete orphaned artifacts."""
    registry = ArtifactRegistry(Path(workspace), dataset)
    deleted, bytes_freed = registry.delete_orphaned_artifacts(dry_run=not force)

    if force:
        click.echo(f"Deleted {len(deleted)} files, freed {bytes_freed / 1024:.1f} KB")
    else:
        click.echo(f"Would delete {len(deleted)} files")
        click.echo("Run with --force to delete")

ArtifactRecord Reference

@dataclass
class ArtifactRecord:
    """Complete artifact metadata."""

    # Identification
    artifact_id: str
    content_hash: str

    # Location
    path: str

    # Context
    pipeline_id: str
    branch_path: List[int] = field(default_factory=list)
    step_index: int = 0
    fold_id: Optional[int] = None

    # Classification
    artifact_type: ArtifactType = ArtifactType.MODEL
    class_name: str = ""

    # Dependencies
    depends_on: List[str] = field(default_factory=list)

    # Meta-model config (for stacking)
    meta_config: Optional[MetaModelConfig] = None

    # Serialization
    format: str = "joblib"
    format_version: str = ""
    nirs4all_version: str = ""

    # Metadata
    size_bytes: int = 0
    created_at: str = ""
    params: Dict[str, Any] = field(default_factory=dict)

    # Computed properties
    @property
    def is_branch_specific(self) -> bool: ...
    @property
    def is_fold_specific(self) -> bool: ...
    @property
    def is_meta_model(self) -> bool: ...
    @property
    def short_hash(self) -> str: ...

See Also