nirs4all.pipeline.storage.artifacts.artifact_registry module

Artifact Registry V3 - Central registry for artifact management.

This module provides the ArtifactRegistry class which serves as the central hub for artifact operations in the V3 artifacts system:

  • Chain-based artifact identification for complete execution path tracking

  • Content-addressed storage with global deduplication

  • Dependency graph tracking for stacking/transfer

  • Cleanup utilities for orphan detection and deletion

V3 Key Changes: - Uses OperatorChain for artifact identification instead of V2 ID format - Chain hash-based artifact IDs for deterministic identification - Unified handling of branching, multi-source, stacking, and bundles

The registry works with centralized storage at workspace/binaries/<dataset>/ and coordinates with ManifestManager for manifest updates.

class nirs4all.pipeline.storage.artifacts.artifact_registry.ArtifactRegistry(workspace: Path, dataset: str, manifest_manager: Any | None = None, pipeline_id: str = '')[source]

Bases: object

Central registry for artifact management (V3).

Provides: - Chain-based ID generation for complete execution path tracking - Content-addressed storage with deduplication - Dependency graph for stacking/transfer - Cleanup utilities

V3 Key Changes: - Uses OperatorChain for artifact identification - Chain hash-based artifact IDs for deterministic identification - Chain path stored in ArtifactRecord for complete traceability - Lookup by chain path for prediction replay

The registry coordinates between: - Centralized binaries at workspace/binaries/<dataset>/ - Per-run manifests with artifact references - Dependency tracking for complex pipelines

workspace

Workspace root path

dataset

Current dataset name

binaries_dir

Path to centralized binaries

dependency_graph

Dependency tracking graph

pipeline_id

Current pipeline identifier for chain generation

cleanup_failed_run() int[source]

Clean up artifacts from a failed run.

Deletes artifacts registered during the current run. Called automatically on exception.

Returns:

Number of artifacts cleaned up

delete_orphaned_artifacts(dry_run: bool = True, scan_all_manifests: bool = True) Tuple[List[str], int][source]

Delete artifacts not referenced by any manifest.

Parameters:
  • dry_run – If True, only report what would be deleted

  • scan_all_manifests – If True, scan all manifests before deletion

Returns:

Tuple of (deleted_files, bytes_freed)

delete_pipeline_artifacts(pipeline_id: str, delete_files: bool = False) int[source]

Delete all artifacts for a specific pipeline.

Parameters:
  • pipeline_id – Pipeline to delete artifacts for

  • delete_files – If True, also delete the binary files from disk

Returns:

Number of artifacts deleted

end_run() None[source]

End run tracking (successful completion).

export_to_manifest() Dict[str, Any][source]

Export registry to manifest V3 format.

Returns:

Dictionary suitable for manifest artifacts section

find_orphaned_artifacts(scan_all_manifests: bool = True) List[str][source]

Find artifact files not referenced by any manifest.

Scans binaries directory and compares with all referenced artifacts from manifests in the workspace.

Parameters:

scan_all_manifests – If True, scan all manifests in workspace/runs/. If False, only check against in-memory registry.

Returns:

List of orphaned filenames

generate_id(chain: OperatorChain | str, fold_id: int | None = None, pipeline_id: str | None = None) str[source]

Generate deterministic V3 artifact ID from operator chain.

V3 Format: {pipeline_id}${chain_hash}:{fold_id}

Parameters:
  • chain – OperatorChain or chain path string

  • fold_id – CV fold (None for shared)

  • pipeline_id – Pipeline identifier (uses self.pipeline_id if None)

Returns:

V3 Artifact ID string

Examples

>>> registry.generate_id(chain, fold_id=0)
'0001_pls$a1b2c3d4e5f6:0'
>>> registry.generate_id("s1.MinMaxScaler>s3.PLS", fold_id=None)
'0001_pls$7f8e9d0c1b2a:all'
get_all_records() List[ArtifactRecord][source]

Get all registered artifacts.

Returns:

List of all ArtifactRecords

get_artifacts_for_step(pipeline_id: str, step_index: int, branch_path: List[int] | None = None, fold_id: int | None = None) List[ArtifactRecord][source]

Get all artifacts for a specific step context.

Parameters:
  • pipeline_id – Pipeline to query

  • step_index – Step number

  • branch_path – Optional branch filter

  • fold_id – Optional fold filter

Returns:

List of matching ArtifactRecords

get_by_chain(chain: OperatorChain | str, fold_id: int | None = None) ArtifactRecord | None[source]

Get artifact by exact chain path match.

Parameters:
  • chain – OperatorChain or chain path string

  • fold_id – Optional fold ID to filter (None = any fold)

Returns:

ArtifactRecord or None if not found

get_chain_prefix(prefix: str, branch_path: List[int] | None = None, source_index: int | None = None) List[ArtifactRecord][source]

Get all artifacts whose chain path starts with the given prefix.

Useful for finding all artifacts in a chain for prediction replay.

Parameters:
  • prefix – Chain path prefix to match

  • branch_path – Optional branch path filter

  • source_index – Optional source index filter

Returns:

List of matching ArtifactRecords

get_dependencies(artifact_id: str) List[str][source]

Get direct dependencies of an artifact.

Parameters:

artifact_id – Artifact to query

Returns:

List of artifact IDs

get_fold_models(pipeline_id: str, step_index: int, branch_path: List[int] | None = None) List[ArtifactRecord][source]

Get all fold-specific model artifacts for CV averaging.

Parameters:
  • pipeline_id – Pipeline to query

  • step_index – Model step number

  • branch_path – Optional branch filter

Returns:

List of per-fold model ArtifactRecords

get_stats(scan_all_manifests: bool = True) Dict[str, Any][source]

Get storage statistics.

Parameters:

scan_all_manifests – If True, scan all manifests for accurate stats

Returns:

  • total_artifacts: Number of registered artifacts

  • unique_files: Number of unique binary files

  • total_size_bytes: Total size of all artifacts

  • deduplication_ratio: Ratio of saved space from deduplication

  • by_type: Count of artifacts by type

  • orphaned_count: Number of orphaned files

  • disk_usage_bytes: Actual disk usage in binaries directory

Return type:

Dictionary with storage stats including

import_from_manifest(manifest: Dict[str, Any], results_dir: Path) None[source]

Import artifact records from a manifest.

Loads V3 format manifests into the registry, building all indexes including the chain_path index for V3 lookups.

Parameters:
  • manifest – Manifest dictionary

  • results_dir – Path to results directory

load_artifact(record: ArtifactRecord) Any[source]

Load artifact binary from disk.

Parameters:

record – ArtifactRecord with path and format

Returns:

Deserialized object

Raises:

FileNotFoundError – If artifact file doesn’t exist

purge_dataset_artifacts(confirm: bool = False) Tuple[int, int][source]

Delete ALL artifacts for this dataset.

This is a destructive operation that removes all artifacts in the binaries directory for this dataset, regardless of manifest references.

Parameters:

confirm – Must be True to actually delete files

Returns:

Tuple of (files_deleted, bytes_freed)

Raises:

ValueError – If confirm is False

register(obj: Any, artifact_id: str, artifact_type: ArtifactType, depends_on: List[str] | None = None, params: Dict[str, Any] | None = None, meta_config: MetaModelConfig | None = None, format_hint: str | None = None, custom_name: str | None = None, chain_path: str = '', source_index: int | None = None) ArtifactRecord[source]

Register and persist an artifact.

Serializes the object, stores in centralized binaries (with deduplication), and creates an ArtifactRecord.

Note: This method accepts pre-generated artifact IDs for backward compatibility. For new code, use register_with_chain() which generates IDs from OperatorChain.

Parameters:
  • obj – Object to persist (model, transformer, etc.)

  • artifact_id – Pre-generated artifact ID (V3 format: pipeline$hash:fold)

  • artifact_type – Classification (model, transformer, etc.)

  • depends_on – List of artifact IDs this depends on

  • params – Model parameters for inspection

  • meta_config – Meta-model configuration (for stacking)

  • format_hint – Optional serialization format hint

  • custom_name – User-defined name for the artifact (e.g., “Q5_PLS_10”)

  • chain_path – V3 operator chain path (required for full traceability)

  • source_index – Multi-source index (None for single source)

Returns:

ArtifactRecord with full metadata

Raises:

ValueError – If object cannot be serialized or if meta-model dependencies are missing

register_meta_model(obj: Any, artifact_id: str, source_model_ids: List[str], feature_columns: List[str] | None = None, params: Dict[str, Any] | None = None, format_hint: str | None = None) ArtifactRecord[source]

Register a stacking meta-model with source model references.

Convenience method for registering meta-models that automatically: - Creates the MetaModelConfig with ordered source model references - Sets up dependency tracking to source models - Validates that all source models exist

Parameters:
  • obj – The meta-model object to persist

  • artifact_id – Pre-generated artifact ID for the meta-model

  • source_model_ids – Ordered list of source model artifact IDs

  • feature_columns – Optional feature column names matching source order

  • params – Optional meta-model parameters

  • format_hint – Optional serialization format hint

Returns:

ArtifactRecord for the registered meta-model

Raises:

ValueError – If any source model is not found in the registry

Example

>>> meta_config_record = registry.register_meta_model(
...     obj=ridge_meta_model,
...     artifact_id="0001:5:all",
...     source_model_ids=["0001:3:all", "0001:4:all"],
...     feature_columns=["PLSRegression_pred", "RandomForestRegressor_pred"]
... )
register_with_chain(obj: Any, chain: OperatorChain | str, artifact_type: ArtifactType, step_index: int, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None, depends_on: List[str] | None = None, params: Dict[str, Any] | None = None, meta_config: MetaModelConfig | None = None, format_hint: str | None = None, custom_name: str | None = None, pipeline_id: str | None = None) ArtifactRecord[source]

Register and persist an artifact using V3 chain-based identification.

This is the primary registration method for V3. It generates a deterministic artifact ID from the operator chain and stores the chain path for later lookup.

Parameters:
  • obj – Object to persist (model, transformer, etc.)

  • chain – OperatorChain or chain path string

  • artifact_type – Classification (model, transformer, etc.)

  • step_index – Pipeline step index (1-based)

  • branch_path – List of branch indices (empty for non-branching)

  • source_index – Multi-source index (None for single source)

  • fold_id – CV fold (None for shared artifacts)

  • substep_index – Substep index for [model1, model2]

  • depends_on – List of artifact IDs this depends on

  • params – Model parameters for inspection

  • meta_config – Meta-model configuration (for stacking)

  • format_hint – Optional serialization format hint

  • custom_name – User-defined name for the artifact

Returns:

ArtifactRecord with full metadata

Raises:

ValueError – If object cannot be serialized or if meta-model dependencies are missing

resolve(artifact_id: str) ArtifactRecord | None[source]

Resolve artifact ID to record.

Parameters:

artifact_id – Artifact ID to resolve

Returns:

ArtifactRecord or None if not found

resolve_by_hash(content_hash: str) ArtifactRecord | None[source]

Resolve content hash to artifact record.

Parameters:

content_hash – Content hash to look up

Returns:

ArtifactRecord or None if not found

resolve_dependencies(artifact_id: str) List[ArtifactRecord][source]

Get all transitive dependencies as records.

Parameters:

artifact_id – Starting artifact

Returns:

List of ArtifactRecords in topological order

start_run() None[source]

Start tracking a new run for cleanup purposes.

class nirs4all.pipeline.storage.artifacts.artifact_registry.DependencyGraph[source]

Bases: object

Tracks artifact dependencies for stacking and transfer.

Maintains a directed graph where edges represent “depends on” relationships. Supports transitive dependency resolution with cycle detection.

add_dependencies(artifact_id: str, depends_on: List[str]) None[source]

Add multiple dependencies at once.

Parameters:
  • artifact_id – The dependent artifact

  • depends_on – List of artifacts being depended upon

add_dependency(artifact_id: str, depends_on: str) None[source]

Add a dependency relationship.

Parameters:
  • artifact_id – The dependent artifact

  • depends_on – The artifact being depended upon

clear() None[source]

Clear all dependencies.

get_dependencies(artifact_id: str) List[str][source]

Get direct dependencies of an artifact.

Parameters:

artifact_id – Artifact to query

Returns:

List of artifact IDs this artifact depends on

get_dependents(artifact_id: str) List[str][source]

Get artifacts that directly depend on this artifact.

Parameters:

artifact_id – Artifact to query

Returns:

List of artifact IDs that depend on this artifact

remove_artifact(artifact_id: str) None[source]

Remove an artifact and its edges from the graph.

Parameters:

artifact_id – Artifact to remove

resolve_dependencies(artifact_id: str, max_depth: int = 100) List[str][source]

Get all transitive dependencies (topologically sorted).

Returns dependencies in order suitable for loading - dependencies before dependents.

Parameters:
  • artifact_id – Starting artifact

  • max_depth – Maximum recursion depth (prevents cycles)

Returns:

List of all dependencies in topological order

Raises:

ValueError – If cycle detected or max depth exceeded