nirs4all.pipeline.storage.artifacts.artifact_registry module
Artifact Registry V3 - Central registry for artifact management.
This module provides the ArtifactRegistry class which serves as the central hub for artifact operations in the V3 artifacts system:
Chain-based artifact identification for complete execution path tracking
Content-addressed storage with global deduplication
Dependency graph tracking for stacking/transfer
Cleanup utilities for orphan detection and deletion
V3 Key Changes: - Uses OperatorChain for artifact identification instead of V2 ID format - Chain hash-based artifact IDs for deterministic identification - Unified handling of branching, multi-source, stacking, and bundles
The registry works with centralized storage at workspace/binaries/<dataset>/ and coordinates with ManifestManager for manifest updates.
- class nirs4all.pipeline.storage.artifacts.artifact_registry.ArtifactRegistry(workspace: Path, dataset: str, manifest_manager: Any | None = None, pipeline_id: str = '')[source]
Bases:
objectCentral registry for artifact management (V3).
Provides: - Chain-based ID generation for complete execution path tracking - Content-addressed storage with deduplication - Dependency graph for stacking/transfer - Cleanup utilities
V3 Key Changes: - Uses OperatorChain for artifact identification - Chain hash-based artifact IDs for deterministic identification - Chain path stored in ArtifactRecord for complete traceability - Lookup by chain path for prediction replay
The registry coordinates between: - Centralized binaries at workspace/binaries/<dataset>/ - Per-run manifests with artifact references - Dependency tracking for complex pipelines
- workspace
Workspace root path
- dataset
Current dataset name
- binaries_dir
Path to centralized binaries
- dependency_graph
Dependency tracking graph
- pipeline_id
Current pipeline identifier for chain generation
- cleanup_failed_run() int[source]
Clean up artifacts from a failed run.
Deletes artifacts registered during the current run. Called automatically on exception.
- Returns:
Number of artifacts cleaned up
- delete_orphaned_artifacts(dry_run: bool = True, scan_all_manifests: bool = True) Tuple[List[str], int][source]
Delete artifacts not referenced by any manifest.
- Parameters:
dry_run – If True, only report what would be deleted
scan_all_manifests – If True, scan all manifests before deletion
- Returns:
Tuple of (deleted_files, bytes_freed)
- delete_pipeline_artifacts(pipeline_id: str, delete_files: bool = False) int[source]
Delete all artifacts for a specific pipeline.
- Parameters:
pipeline_id – Pipeline to delete artifacts for
delete_files – If True, also delete the binary files from disk
- Returns:
Number of artifacts deleted
- export_to_manifest() Dict[str, Any][source]
Export registry to manifest V3 format.
- Returns:
Dictionary suitable for manifest artifacts section
- find_orphaned_artifacts(scan_all_manifests: bool = True) List[str][source]
Find artifact files not referenced by any manifest.
Scans binaries directory and compares with all referenced artifacts from manifests in the workspace.
- Parameters:
scan_all_manifests – If True, scan all manifests in workspace/runs/. If False, only check against in-memory registry.
- Returns:
List of orphaned filenames
- generate_id(chain: OperatorChain | str, fold_id: int | None = None, pipeline_id: str | None = None) str[source]
Generate deterministic V3 artifact ID from operator chain.
V3 Format: {pipeline_id}${chain_hash}:{fold_id}
- Parameters:
chain – OperatorChain or chain path string
fold_id – CV fold (None for shared)
pipeline_id – Pipeline identifier (uses self.pipeline_id if None)
- Returns:
V3 Artifact ID string
Examples
>>> registry.generate_id(chain, fold_id=0) '0001_pls$a1b2c3d4e5f6:0' >>> registry.generate_id("s1.MinMaxScaler>s3.PLS", fold_id=None) '0001_pls$7f8e9d0c1b2a:all'
- get_all_records() List[ArtifactRecord][source]
Get all registered artifacts.
- Returns:
List of all ArtifactRecords
- get_artifacts_for_step(pipeline_id: str, step_index: int, branch_path: List[int] | None = None, fold_id: int | None = None) List[ArtifactRecord][source]
Get all artifacts for a specific step context.
- Parameters:
pipeline_id – Pipeline to query
step_index – Step number
branch_path – Optional branch filter
fold_id – Optional fold filter
- Returns:
List of matching ArtifactRecords
- get_by_chain(chain: OperatorChain | str, fold_id: int | None = None) ArtifactRecord | None[source]
Get artifact by exact chain path match.
- Parameters:
chain – OperatorChain or chain path string
fold_id – Optional fold ID to filter (None = any fold)
- Returns:
ArtifactRecord or None if not found
- get_chain_prefix(prefix: str, branch_path: List[int] | None = None, source_index: int | None = None) List[ArtifactRecord][source]
Get all artifacts whose chain path starts with the given prefix.
Useful for finding all artifacts in a chain for prediction replay.
- Parameters:
prefix – Chain path prefix to match
branch_path – Optional branch path filter
source_index – Optional source index filter
- Returns:
List of matching ArtifactRecords
- get_dependencies(artifact_id: str) List[str][source]
Get direct dependencies of an artifact.
- Parameters:
artifact_id – Artifact to query
- Returns:
List of artifact IDs
- get_fold_models(pipeline_id: str, step_index: int, branch_path: List[int] | None = None) List[ArtifactRecord][source]
Get all fold-specific model artifacts for CV averaging.
- Parameters:
pipeline_id – Pipeline to query
step_index – Model step number
branch_path – Optional branch filter
- Returns:
List of per-fold model ArtifactRecords
- get_stats(scan_all_manifests: bool = True) Dict[str, Any][source]
Get storage statistics.
- Parameters:
scan_all_manifests – If True, scan all manifests for accurate stats
- Returns:
total_artifacts: Number of registered artifacts
unique_files: Number of unique binary files
total_size_bytes: Total size of all artifacts
deduplication_ratio: Ratio of saved space from deduplication
by_type: Count of artifacts by type
orphaned_count: Number of orphaned files
disk_usage_bytes: Actual disk usage in binaries directory
- Return type:
Dictionary with storage stats including
- import_from_manifest(manifest: Dict[str, Any], results_dir: Path) None[source]
Import artifact records from a manifest.
Loads V3 format manifests into the registry, building all indexes including the chain_path index for V3 lookups.
- Parameters:
manifest – Manifest dictionary
results_dir – Path to results directory
- load_artifact(record: ArtifactRecord) Any[source]
Load artifact binary from disk.
- Parameters:
record – ArtifactRecord with path and format
- Returns:
Deserialized object
- Raises:
FileNotFoundError – If artifact file doesn’t exist
- purge_dataset_artifacts(confirm: bool = False) Tuple[int, int][source]
Delete ALL artifacts for this dataset.
This is a destructive operation that removes all artifacts in the binaries directory for this dataset, regardless of manifest references.
- Parameters:
confirm – Must be True to actually delete files
- Returns:
Tuple of (files_deleted, bytes_freed)
- Raises:
ValueError – If confirm is False
- register(obj: Any, artifact_id: str, artifact_type: ArtifactType, depends_on: List[str] | None = None, params: Dict[str, Any] | None = None, meta_config: MetaModelConfig | None = None, format_hint: str | None = None, custom_name: str | None = None, chain_path: str = '', source_index: int | None = None) ArtifactRecord[source]
Register and persist an artifact.
Serializes the object, stores in centralized binaries (with deduplication), and creates an ArtifactRecord.
Note: This method accepts pre-generated artifact IDs for backward compatibility. For new code, use register_with_chain() which generates IDs from OperatorChain.
- Parameters:
obj – Object to persist (model, transformer, etc.)
artifact_id – Pre-generated artifact ID (V3 format: pipeline$hash:fold)
artifact_type – Classification (model, transformer, etc.)
depends_on – List of artifact IDs this depends on
params – Model parameters for inspection
meta_config – Meta-model configuration (for stacking)
format_hint – Optional serialization format hint
custom_name – User-defined name for the artifact (e.g., “Q5_PLS_10”)
chain_path – V3 operator chain path (required for full traceability)
source_index – Multi-source index (None for single source)
- Returns:
ArtifactRecord with full metadata
- Raises:
ValueError – If object cannot be serialized or if meta-model dependencies are missing
- register_meta_model(obj: Any, artifact_id: str, source_model_ids: List[str], feature_columns: List[str] | None = None, params: Dict[str, Any] | None = None, format_hint: str | None = None) ArtifactRecord[source]
Register a stacking meta-model with source model references.
Convenience method for registering meta-models that automatically: - Creates the MetaModelConfig with ordered source model references - Sets up dependency tracking to source models - Validates that all source models exist
- Parameters:
obj – The meta-model object to persist
artifact_id – Pre-generated artifact ID for the meta-model
source_model_ids – Ordered list of source model artifact IDs
feature_columns – Optional feature column names matching source order
params – Optional meta-model parameters
format_hint – Optional serialization format hint
- Returns:
ArtifactRecord for the registered meta-model
- Raises:
ValueError – If any source model is not found in the registry
Example
>>> meta_config_record = registry.register_meta_model( ... obj=ridge_meta_model, ... artifact_id="0001:5:all", ... source_model_ids=["0001:3:all", "0001:4:all"], ... feature_columns=["PLSRegression_pred", "RandomForestRegressor_pred"] ... )
- register_with_chain(obj: Any, chain: OperatorChain | str, artifact_type: ArtifactType, step_index: int, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None, depends_on: List[str] | None = None, params: Dict[str, Any] | None = None, meta_config: MetaModelConfig | None = None, format_hint: str | None = None, custom_name: str | None = None, pipeline_id: str | None = None) ArtifactRecord[source]
Register and persist an artifact using V3 chain-based identification.
This is the primary registration method for V3. It generates a deterministic artifact ID from the operator chain and stores the chain path for later lookup.
- Parameters:
obj – Object to persist (model, transformer, etc.)
chain – OperatorChain or chain path string
artifact_type – Classification (model, transformer, etc.)
step_index – Pipeline step index (1-based)
branch_path – List of branch indices (empty for non-branching)
source_index – Multi-source index (None for single source)
fold_id – CV fold (None for shared artifacts)
substep_index – Substep index for [model1, model2]
depends_on – List of artifact IDs this depends on
params – Model parameters for inspection
meta_config – Meta-model configuration (for stacking)
format_hint – Optional serialization format hint
custom_name – User-defined name for the artifact
- Returns:
ArtifactRecord with full metadata
- Raises:
ValueError – If object cannot be serialized or if meta-model dependencies are missing
- resolve(artifact_id: str) ArtifactRecord | None[source]
Resolve artifact ID to record.
- Parameters:
artifact_id – Artifact ID to resolve
- Returns:
ArtifactRecord or None if not found
- resolve_by_hash(content_hash: str) ArtifactRecord | None[source]
Resolve content hash to artifact record.
- Parameters:
content_hash – Content hash to look up
- Returns:
ArtifactRecord or None if not found
- resolve_dependencies(artifact_id: str) List[ArtifactRecord][source]
Get all transitive dependencies as records.
- Parameters:
artifact_id – Starting artifact
- Returns:
List of ArtifactRecords in topological order
- class nirs4all.pipeline.storage.artifacts.artifact_registry.DependencyGraph[source]
Bases:
objectTracks artifact dependencies for stacking and transfer.
Maintains a directed graph where edges represent “depends on” relationships. Supports transitive dependency resolution with cycle detection.
- add_dependencies(artifact_id: str, depends_on: List[str]) None[source]
Add multiple dependencies at once.
- Parameters:
artifact_id – The dependent artifact
depends_on – List of artifacts being depended upon
- add_dependency(artifact_id: str, depends_on: str) None[source]
Add a dependency relationship.
- Parameters:
artifact_id – The dependent artifact
depends_on – The artifact being depended upon
- get_dependencies(artifact_id: str) List[str][source]
Get direct dependencies of an artifact.
- Parameters:
artifact_id – Artifact to query
- Returns:
List of artifact IDs this artifact depends on
- get_dependents(artifact_id: str) List[str][source]
Get artifacts that directly depend on this artifact.
- Parameters:
artifact_id – Artifact to query
- Returns:
List of artifact IDs that depend on this artifact
- remove_artifact(artifact_id: str) None[source]
Remove an artifact and its edges from the graph.
- Parameters:
artifact_id – Artifact to remove
- resolve_dependencies(artifact_id: str, max_depth: int = 100) List[str][source]
Get all transitive dependencies (topologically sorted).
Returns dependencies in order suitable for loading - dependencies before dependents.
- Parameters:
artifact_id – Starting artifact
max_depth – Maximum recursion depth (prevents cycles)
- Returns:
List of all dependencies in topological order
- Raises:
ValueError – If cycle detected or max depth exceeded