nirs4all.pipeline.storage.artifacts package
Submodules
- nirs4all.pipeline.storage.artifacts.artifact_loader module
ArtifactLoaderArtifactLoader.workspaceArtifactLoader.datasetArtifactLoader.binaries_dirArtifactLoader.results_dirArtifactLoader.DEFAULT_CACHE_SIZEArtifactLoader.clear_cache()ArtifactLoader.find_artifact_by_custom_name()ArtifactLoader.from_manifest()ArtifactLoader.get_all_records()ArtifactLoader.get_artifacts_by_chain_filter()ArtifactLoader.get_cache_info()ArtifactLoader.get_record()ArtifactLoader.get_record_by_chain()ArtifactLoader.get_step_binaries()ArtifactLoader.get_step_binaries_by_artifact_ids()ArtifactLoader.has_binaries_for_step()ArtifactLoader.import_from_manifest()ArtifactLoader.load_by_artifact_id()ArtifactLoader.load_by_chain()ArtifactLoader.load_by_chain_prefix()ArtifactLoader.load_by_id()ArtifactLoader.load_fold_models()ArtifactLoader.load_for_step()ArtifactLoader.load_meta_model_for_prediction()ArtifactLoader.load_meta_model_with_sources()ArtifactLoader.load_with_dependencies()ArtifactLoader.preload_artifacts()ArtifactLoader.set_cache_size()
LRUCache
- nirs4all.pipeline.storage.artifacts.artifact_persistence module
- nirs4all.pipeline.storage.artifacts.artifact_registry module
ArtifactRegistryArtifactRegistry.workspaceArtifactRegistry.datasetArtifactRegistry.binaries_dirArtifactRegistry.dependency_graphArtifactRegistry.pipeline_idArtifactRegistry.cleanup_failed_run()ArtifactRegistry.delete_orphaned_artifacts()ArtifactRegistry.delete_pipeline_artifacts()ArtifactRegistry.end_run()ArtifactRegistry.export_to_manifest()ArtifactRegistry.find_orphaned_artifacts()ArtifactRegistry.generate_id()ArtifactRegistry.get_all_records()ArtifactRegistry.get_artifacts_for_step()ArtifactRegistry.get_by_chain()ArtifactRegistry.get_chain_prefix()ArtifactRegistry.get_dependencies()ArtifactRegistry.get_fold_models()ArtifactRegistry.get_stats()ArtifactRegistry.import_from_manifest()ArtifactRegistry.load_artifact()ArtifactRegistry.purge_dataset_artifacts()ArtifactRegistry.register()ArtifactRegistry.register_meta_model()ArtifactRegistry.register_with_chain()ArtifactRegistry.resolve()ArtifactRegistry.resolve_by_hash()ArtifactRegistry.resolve_dependencies()ArtifactRegistry.start_run()
DependencyGraph
- nirs4all.pipeline.storage.artifacts.operator_chain module
OperatorChainOperatorChain.nodesOperatorChain.pipeline_idOperatorChain.append()OperatorChain.copy()OperatorChain.extend()OperatorChain.filter_branch()OperatorChain.filter_source()OperatorChain.filter_step()OperatorChain.from_dict()OperatorChain.from_path()OperatorChain.get_branch_path()OperatorChain.get_last_node()OperatorChain.get_nodes_at_step()OperatorChain.is_empty()OperatorChain.merge_with_prefix()OperatorChain.nodesOperatorChain.pipeline_idOperatorChain.remap_steps()OperatorChain.to_dict()OperatorChain.to_hash()OperatorChain.to_path()OperatorChain.with_pipeline_id()
OperatorNodeOperatorNode.step_indexOperatorNode.operator_classOperatorNode.branch_pathOperatorNode.source_indexOperatorNode.fold_idOperatorNode.substep_indexOperatorNode.operator_nameOperatorNode.branch_pathOperatorNode.fold_idOperatorNode.from_dict()OperatorNode.from_key()OperatorNode.matches_context()OperatorNode.operator_classOperatorNode.operator_nameOperatorNode.source_indexOperatorNode.step_indexOperatorNode.substep_indexOperatorNode.to_dict()OperatorNode.to_key()OperatorNode.with_fold()OperatorNode.with_source()
compute_chain_hash()generate_artifact_id_v3()is_v3_artifact_id()parse_artifact_id_v3()
- nirs4all.pipeline.storage.artifacts.types module
ArtifactRecordArtifactRecord.artifact_idArtifactRecord.content_hashArtifactRecord.pathArtifactRecord.chain_pathArtifactRecord.source_indexArtifactRecord.pipeline_idArtifactRecord.branch_pathArtifactRecord.step_indexArtifactRecord.substep_indexArtifactRecord.fold_idArtifactRecord.artifact_typeArtifactRecord.class_nameArtifactRecord.custom_nameArtifactRecord.depends_onArtifactRecord.formatArtifactRecord.format_versionArtifactRecord.nirs4all_versionArtifactRecord.size_bytesArtifactRecord.created_atArtifactRecord.paramsArtifactRecord.meta_configArtifactRecord.versionArtifactRecord.artifact_idArtifactRecord.artifact_typeArtifactRecord.branch_pathArtifactRecord.chain_hashArtifactRecord.chain_pathArtifactRecord.class_nameArtifactRecord.content_hashArtifactRecord.created_atArtifactRecord.custom_nameArtifactRecord.depends_onArtifactRecord.fold_idArtifactRecord.formatArtifactRecord.format_versionArtifactRecord.from_dict()ArtifactRecord.get_branch_path_str()ArtifactRecord.get_fold_str()ArtifactRecord.is_branch_specificArtifactRecord.is_fold_specificArtifactRecord.is_meta_modelArtifactRecord.is_source_specificArtifactRecord.matches_context()ArtifactRecord.meta_configArtifactRecord.nirs4all_versionArtifactRecord.paramsArtifactRecord.pathArtifactRecord.pipeline_idArtifactRecord.short_hashArtifactRecord.size_bytesArtifactRecord.source_indexArtifactRecord.step_indexArtifactRecord.substep_indexArtifactRecord.to_dict()ArtifactRecord.version
ArtifactTypeMetaModelConfig
- nirs4all.pipeline.storage.artifacts.utils module
ExecutionPathExecutionPath.pipeline_idExecutionPath.chain_pathExecutionPath.branch_pathExecutionPath.step_indexExecutionPath.source_indexExecutionPath.fold_idExecutionPath.substep_indexExecutionPath.branch_pathExecutionPath.chain_pathExecutionPath.fold_idExecutionPath.from_artifact_id_v3()ExecutionPath.pipeline_idExecutionPath.source_indexExecutionPath.step_indexExecutionPath.substep_indexExecutionPath.to_artifact_id()
artifact_id_matches_context()compute_content_hash()extract_fold_id_from_artifact_id()extract_pipeline_id_from_artifact_id()generate_filename()get_binaries_path()get_short_hash()parse_artifact_id()parse_filename()validate_artifact_id()
Module contents
Artifact management module (V3).
This module provides the V3 artifacts system with: - ArtifactRecord: Complete artifact metadata dataclass with chain tracking - ArtifactType: Enum for artifact classification - ArtifactRegistry: Central registry for artifact management - ArtifactLoader: Load artifacts by ID or execution context - OperatorNode/OperatorChain: V3 operator path tracking - Utility functions for ID generation and path handling
- class nirs4all.pipeline.storage.artifacts.ArtifactLoader(workspace: Path, dataset: str, results_dir: Path | None = None, cache_size: int = 100)[source]
Bases:
objectLoad artifacts using V3 chain-based identification.
This class provides efficient loading of artifacts from centralized storage, with support for: - Direct loading by V3 artifact ID (pipeline$hash:fold) - Chain path-based loading for deterministic replay - Context-based loading (step/branch/source/fold) - Dependency resolution for stacking meta-models - Per-fold model loading for cross-validation ensemble - LRU caching to avoid redundant I/O
V3 Key Features: - Chain path indexing for O(1) lookup by chain - Source index support for multi-source pipelines - Branch path filtering using chain metadata
The loader uses lazy loading - artifacts are only deserialized when actually accessed via load_by_id() or related methods.
- workspace
Workspace root path
- dataset
Dataset name
- binaries_dir
Path to centralized binaries
- results_dir
Path to results directory (for manifest reference)
Example
>>> loader = ArtifactLoader.from_manifest(manifest, results_dir) >>> model = loader.load_by_id("0001_pls$abc123def456:0") >>> artifacts = loader.load_by_chain("s1.MinMaxScaler>s3.PLS[br=0]")
- DEFAULT_CACHE_SIZE = 100
- find_artifact_by_custom_name(custom_name: str, step_index: int | None = None, fold_id: int | None = None, branch_path: List[int] | None = None) ArtifactRecord | None[source]
Find an artifact by its custom_name.
Used for reverse lookup when only the model name is known but not the artifact_id. Useful for legacy compatibility.
- Parameters:
custom_name – User-defined model name (e.g., “Q5_PLS_10”)
step_index – Optional filter by step
fold_id – Optional filter by fold
branch_path – Optional filter by branch
- Returns:
ArtifactRecord if found, None otherwise
- classmethod from_manifest(manifest: Dict[str, Any], results_dir: Path) ArtifactLoader[source]
Create an ArtifactLoader from a pipeline manifest.
Factory method for easy creation from manifest data.
- Parameters:
manifest – Pipeline manifest dictionary
results_dir – Path to results directory (manifest.yaml’s parent)
- Returns:
Initialized ArtifactLoader instance
- get_all_records() List[ArtifactRecord][source]
Get all artifact records.
- Returns:
List of all ArtifactRecords
- get_artifacts_by_chain_filter(step_index: int | None = None, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None) List[ArtifactRecord][source]
Get artifact records matching chain-based filters.
Uses the chain_path information stored in V3 records to filter.
- Parameters:
step_index – Filter by step index
branch_path – Filter by branch path
source_index – Filter by source index
fold_id – Filter by fold ID
- Returns:
List of matching ArtifactRecords
- get_cache_info() Dict[str, Any][source]
Get information about the current cache state.
- Returns:
Dictionary with cache statistics
- get_record(artifact_id: str) ArtifactRecord | None[source]
Get artifact record by ID.
- Parameters:
artifact_id – Artifact ID
- Returns:
ArtifactRecord or None if not found
- get_record_by_chain(chain_path: str) ArtifactRecord | None[source]
Get artifact record by chain path.
- Parameters:
chain_path – Operator chain path
- Returns:
ArtifactRecord or None if not found
- get_step_binaries(step_id: int, branch_id: int | None = None, branch_path: List[int] | None = None) List[Tuple[str, Any]][source]
Legacy-compatible method for loading step binaries.
This method provides backward compatibility with the BinaryLoader API. Prefer using load_for_step() for new code.
Returns names in a format compatible with controller lookup patterns: - For models with fold_id: “ClassName_<op_num>” where op_num = step*100 + fold - For shared models: “ClassName_<op_num>” where op_num = step*100 - For y_transformers (ENCODER type): “y_ClassName_<op_num>” - For x_transformers (TRANSFORMER type): “ClassName_<op_num>”
- Parameters:
step_id – Step identifier (supports int or “step_substep” format)
branch_id – Optional branch ID (converts to branch_path [branch_id])
branch_path – Optional full branch path for nested branches (takes precedence over branch_id)
- Returns:
List of (name, loaded_object) tuples
- get_step_binaries_by_artifact_ids(artifact_ids: List[str]) List[Tuple[str, Any]][source]
Load multiple artifacts by their deterministic artifact_ids.
This method is used in prediction mode when model_artifact_id is available in the prediction record. It provides deterministic loading that works correctly with custom model names.
- Parameters:
artifact_ids – List of artifact IDs to load
- Returns:
List of (name, loaded_object) tuples
- Raises:
KeyError – If any artifact_id is not found
Example
>>> artifact_ids = ["abc123:4:0", "abc123:4:1"] >>> binaries = loader.get_step_binaries_by_artifact_ids(artifact_ids)
- has_binaries_for_step(step_number: int, substep_number: int | None = None, branch_id: int | None = None) bool[source]
Check if binaries exist for a specific step.
Legacy-compatible method for checking artifact availability.
- Parameters:
step_number – The main step number
substep_number – Ignored (kept for compatibility)
branch_id – Optional branch ID to check
- Returns:
True if artifacts exist for this step
- import_from_manifest(manifest: Dict[str, Any], results_dir: Path | None = None) None[source]
Import artifact records from a V3 manifest.
Builds all indexes including chain_path index for V3 lookups.
- Parameters:
manifest – Manifest dictionary
results_dir – Optional results directory override
- load_by_artifact_id(artifact_id: str) Tuple[str, Any][source]
Load a single artifact by its deterministic artifact_id.
This method provides deterministic artifact loading using the artifact_id stored in predictions. Unlike name-based loading which can be ambiguous with custom model names, artifact_id-based loading is always exact.
- Parameters:
artifact_id – The deterministic artifact ID (e.g., “0001:4:0” for fold 0 or “0001:4:all” for shared artifacts)
- Returns:
Tuple of (name, loaded_object) where name is built from custom_name if available, otherwise from class_name.
- Raises:
KeyError – If artifact_id not found in registry
FileNotFoundError – If artifact file doesn’t exist on disk
Example
>>> loader = ArtifactLoader.from_manifest(manifest, results_dir) >>> name, model = loader.load_by_artifact_id("abc123:4:0") >>> predictions = model.predict(X_new)
- load_by_chain(chain: str, fold_id: int | None = None) Any | None[source]
Load artifact by exact chain path match.
- Parameters:
chain – Operator chain path string (e.g., “s1.MinMaxScaler>s3.PLS[br=0]”)
fold_id – Optional fold ID filter
- Returns:
Loaded artifact object or None if not found
- load_by_chain_prefix(prefix: str, branch_path: List[int] | None = None, source_index: int | None = None) List[Tuple[str, Any]][source]
Load all artifacts whose chain path starts with the given prefix.
Useful for loading all artifacts in a chain for prediction replay.
- Parameters:
prefix – Chain path prefix to match
branch_path – Optional branch path filter
source_index – Optional source index filter
- Returns:
List of (artifact_id, loaded_object) tuples
- load_by_id(artifact_id: str) Any[source]
Load a single artifact by its V3 ID.
Uses LRU cache to avoid redundant disk I/O. Artifacts are loaded lazily on first access.
- Parameters:
artifact_id – V3 artifact identifier (pipeline$hash:fold)
- Returns:
Deserialized artifact object
- Raises:
KeyError – If artifact ID not found
FileNotFoundError – If artifact file doesn’t exist
- load_fold_models(step_index: int, branch_path: List[int] | None = None, pipeline_id: str | None = None) List[Tuple[int, Any]][source]
Load all fold-specific model artifacts for CV averaging.
Returns models for all folds at the specified step, sorted by fold_id.
- Parameters:
step_index – Step number where models are
branch_path – Optional branch path filter
pipeline_id – Optional pipeline ID filter
- Returns:
List of (fold_id, loaded_model) tuples, sorted by fold_id
- load_for_step(step_index: int, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None, pipeline_id: str | None = None) List[Tuple[str, Any]][source]
Load all artifacts for a step context.
Returns artifacts matching the specified step, branch path, source, and fold. If branch_path is provided, includes both branch-specific and shared (pre-branch) artifacts.
- Parameters:
step_index – Step number to load
branch_path – Optional branch path filter
source_index – Optional source index filter
fold_id – Optional fold ID filter
pipeline_id – Optional pipeline ID filter
- Returns:
List of (artifact_id, loaded_object) tuples
- load_meta_model_for_prediction(artifact_id: str, X: Any = None) Tuple[Any, List[Tuple[str, Any]], List[str]][source]
Load a meta-model and its sources, ready for prediction.
This method loads the complete stacking ensemble and validates that all components are compatible for prediction.
- Parameters:
artifact_id – Meta-model artifact ID
X – Optional input features for validation
- Returns:
Tuple of (meta_model, source_models, feature_columns) where source_models is list of (artifact_id, model) tuples in the correct order for feature construction
- Raises:
KeyError – If artifact or source models not found
ValueError – If artifact is not a meta-model
- load_meta_model_with_sources(artifact_id: str, validate_branch: bool = True) Tuple[Any, List[Tuple[str, Any]], List[str]][source]
Load a meta-model and its source models.
For stacking, loads the meta-model and all source models it depends on, preserving the feature column order as specified in meta_config.
- Parameters:
artifact_id – Meta-model artifact ID
validate_branch – If True, validate branch context matches
- Returns:
Tuple of (meta_model, [(source_id, source_model), …], feature_columns) where source_models are in the correct order for feature construction
- Raises:
KeyError – If artifact not found
ValueError – If artifact is not a meta-model or if branch validation fails
- load_with_dependencies(artifact_id: str) Dict[str, Any][source]
Load an artifact and all its transitive dependencies.
Returns a dictionary mapping artifact IDs to loaded objects, in topological order (dependencies before dependents).
- Parameters:
artifact_id – Starting artifact ID
- Returns:
loaded_object}
- Return type:
Dictionary of {artifact_id
- Raises:
KeyError – If artifact or dependency not found
ValueError – If cycle detected in dependencies
- preload_artifacts(artifact_ids: List[str] | None = None, artifact_types: List[ArtifactType] | None = None) int[source]
Preload artifacts into cache.
Useful for warming the cache before prediction or when you know which artifacts will be needed.
- Parameters:
artifact_ids – Specific artifact IDs to preload (default: all)
artifact_types – Filter by artifact types (default: all)
- Returns:
Number of artifacts loaded
- class nirs4all.pipeline.storage.artifacts.ArtifactRecord(artifact_id: str, content_hash: str, path: str, chain_path: str = '', source_index: int | None = None, pipeline_id: str = '', branch_path: List[int] = <factory>, step_index: int = 0, substep_index: int | None = None, fold_id: int | None = None, artifact_type: ArtifactType = ArtifactType.MODEL, class_name: str = '', custom_name: str = '', depends_on: List[str] = <factory>, format: str = 'joblib', format_version: str = '', nirs4all_version: str = '', size_bytes: int = 0, created_at: str = <factory>, params: Dict[str, ~typing.Any]=<factory>, meta_config: MetaModelConfig | None = None, version: int = 3)[source]
Bases:
objectComplete artifact metadata for manifest storage (V3).
This record contains all metadata needed to: - Uniquely identify an artifact via operator chain - Load the artifact from centralized storage - Resolve dependencies for stacking/transfer - Track serialization format and library versions
- V3 Format:
artifact_id: “{pipeline_id}${chain_hash}:{fold_id}” chain_path: Full operator chain path string
- artifact_id
Unique, deterministic ID based on chain hash Format: “{pipeline_id}${chain_hash}:{fold_id}”
- Type:
- # Chain tracking
- Type:
V3
- # Context
- # Classification
- artifact_type
Type classification (model, transformer, etc.)
- # Dependencies
- # Serialization
- # Metadata
- meta_config
Configuration for meta-models
- artifact_type: ArtifactType = 'model'
- property chain_hash: str
Get chain hash from artifact ID (V3 format).
- Returns:
Chain hash portion of the artifact ID, or empty if not V3 format
- classmethod from_dict(data: Dict[str, Any]) ArtifactRecord[source]
Create ArtifactRecord from dictionary.
- Parameters:
data – Dictionary from YAML manifest
- Returns:
ArtifactRecord instance
- get_branch_path_str() str[source]
Get branch path as string.
- Returns:
Colon-separated branch indices or empty string
- get_fold_str() str[source]
Get fold ID as string.
- Returns:
Fold ID as string or “all” for shared artifacts
- property is_branch_specific: bool
Check if artifact is branch-specific.
- Returns:
True if artifact belongs to a specific branch path
- property is_fold_specific: bool
Check if artifact is fold-specific.
- Returns:
True if artifact belongs to a specific CV fold
- property is_meta_model: bool
Check if artifact is a meta-model.
- Returns:
True if artifact is a stacking meta-model
- property is_source_specific: bool
Check if artifact is source-specific.
- Returns:
True if artifact belongs to a specific source in multi-source
- matches_context(step_index: int | None = None, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None) bool[source]
Check if artifact matches a given context.
- Parameters:
step_index – Step to match (None = any)
branch_path – Branch path to match (None = any)
source_index – Source index to match (None = any)
fold_id – Fold ID to match (None = any)
- Returns:
True if artifact matches all specified filters
- meta_config: MetaModelConfig | None = None
- property short_hash: str
Get short version of content hash for filenames.
- Returns:
prefix if present)
- Return type:
First 12 characters of hash (after sha256
- class nirs4all.pipeline.storage.artifacts.ArtifactRegistry(workspace: Path, dataset: str, manifest_manager: Any | None = None, pipeline_id: str = '')[source]
Bases:
objectCentral registry for artifact management (V3).
Provides: - Chain-based ID generation for complete execution path tracking - Content-addressed storage with deduplication - Dependency graph for stacking/transfer - Cleanup utilities
V3 Key Changes: - Uses OperatorChain for artifact identification - Chain hash-based artifact IDs for deterministic identification - Chain path stored in ArtifactRecord for complete traceability - Lookup by chain path for prediction replay
The registry coordinates between: - Centralized binaries at workspace/binaries/<dataset>/ - Per-run manifests with artifact references - Dependency tracking for complex pipelines
- workspace
Workspace root path
- dataset
Current dataset name
- binaries_dir
Path to centralized binaries
- dependency_graph
Dependency tracking graph
- pipeline_id
Current pipeline identifier for chain generation
- cleanup_failed_run() int[source]
Clean up artifacts from a failed run.
Deletes artifacts registered during the current run. Called automatically on exception.
- Returns:
Number of artifacts cleaned up
- delete_orphaned_artifacts(dry_run: bool = True, scan_all_manifests: bool = True) Tuple[List[str], int][source]
Delete artifacts not referenced by any manifest.
- Parameters:
dry_run – If True, only report what would be deleted
scan_all_manifests – If True, scan all manifests before deletion
- Returns:
Tuple of (deleted_files, bytes_freed)
- delete_pipeline_artifacts(pipeline_id: str, delete_files: bool = False) int[source]
Delete all artifacts for a specific pipeline.
- Parameters:
pipeline_id – Pipeline to delete artifacts for
delete_files – If True, also delete the binary files from disk
- Returns:
Number of artifacts deleted
- export_to_manifest() Dict[str, Any][source]
Export registry to manifest V3 format.
- Returns:
Dictionary suitable for manifest artifacts section
- find_orphaned_artifacts(scan_all_manifests: bool = True) List[str][source]
Find artifact files not referenced by any manifest.
Scans binaries directory and compares with all referenced artifacts from manifests in the workspace.
- Parameters:
scan_all_manifests – If True, scan all manifests in workspace/runs/. If False, only check against in-memory registry.
- Returns:
List of orphaned filenames
- generate_id(chain: OperatorChain | str, fold_id: int | None = None, pipeline_id: str | None = None) str[source]
Generate deterministic V3 artifact ID from operator chain.
V3 Format: {pipeline_id}${chain_hash}:{fold_id}
- Parameters:
chain – OperatorChain or chain path string
fold_id – CV fold (None for shared)
pipeline_id – Pipeline identifier (uses self.pipeline_id if None)
- Returns:
V3 Artifact ID string
Examples
>>> registry.generate_id(chain, fold_id=0) '0001_pls$a1b2c3d4e5f6:0' >>> registry.generate_id("s1.MinMaxScaler>s3.PLS", fold_id=None) '0001_pls$7f8e9d0c1b2a:all'
- get_all_records() List[ArtifactRecord][source]
Get all registered artifacts.
- Returns:
List of all ArtifactRecords
- get_artifacts_for_step(pipeline_id: str, step_index: int, branch_path: List[int] | None = None, fold_id: int | None = None) List[ArtifactRecord][source]
Get all artifacts for a specific step context.
- Parameters:
pipeline_id – Pipeline to query
step_index – Step number
branch_path – Optional branch filter
fold_id – Optional fold filter
- Returns:
List of matching ArtifactRecords
- get_by_chain(chain: OperatorChain | str, fold_id: int | None = None) ArtifactRecord | None[source]
Get artifact by exact chain path match.
- Parameters:
chain – OperatorChain or chain path string
fold_id – Optional fold ID to filter (None = any fold)
- Returns:
ArtifactRecord or None if not found
- get_chain_prefix(prefix: str, branch_path: List[int] | None = None, source_index: int | None = None) List[ArtifactRecord][source]
Get all artifacts whose chain path starts with the given prefix.
Useful for finding all artifacts in a chain for prediction replay.
- Parameters:
prefix – Chain path prefix to match
branch_path – Optional branch path filter
source_index – Optional source index filter
- Returns:
List of matching ArtifactRecords
- get_dependencies(artifact_id: str) List[str][source]
Get direct dependencies of an artifact.
- Parameters:
artifact_id – Artifact to query
- Returns:
List of artifact IDs
- get_fold_models(pipeline_id: str, step_index: int, branch_path: List[int] | None = None) List[ArtifactRecord][source]
Get all fold-specific model artifacts for CV averaging.
- Parameters:
pipeline_id – Pipeline to query
step_index – Model step number
branch_path – Optional branch filter
- Returns:
List of per-fold model ArtifactRecords
- get_stats(scan_all_manifests: bool = True) Dict[str, Any][source]
Get storage statistics.
- Parameters:
scan_all_manifests – If True, scan all manifests for accurate stats
- Returns:
total_artifacts: Number of registered artifacts
unique_files: Number of unique binary files
total_size_bytes: Total size of all artifacts
deduplication_ratio: Ratio of saved space from deduplication
by_type: Count of artifacts by type
orphaned_count: Number of orphaned files
disk_usage_bytes: Actual disk usage in binaries directory
- Return type:
Dictionary with storage stats including
- import_from_manifest(manifest: Dict[str, Any], results_dir: Path) None[source]
Import artifact records from a manifest.
Loads V3 format manifests into the registry, building all indexes including the chain_path index for V3 lookups.
- Parameters:
manifest – Manifest dictionary
results_dir – Path to results directory
- load_artifact(record: ArtifactRecord) Any[source]
Load artifact binary from disk.
- Parameters:
record – ArtifactRecord with path and format
- Returns:
Deserialized object
- Raises:
FileNotFoundError – If artifact file doesn’t exist
- purge_dataset_artifacts(confirm: bool = False) Tuple[int, int][source]
Delete ALL artifacts for this dataset.
This is a destructive operation that removes all artifacts in the binaries directory for this dataset, regardless of manifest references.
- Parameters:
confirm – Must be True to actually delete files
- Returns:
Tuple of (files_deleted, bytes_freed)
- Raises:
ValueError – If confirm is False
- register(obj: Any, artifact_id: str, artifact_type: ArtifactType, depends_on: List[str] | None = None, params: Dict[str, Any] | None = None, meta_config: MetaModelConfig | None = None, format_hint: str | None = None, custom_name: str | None = None, chain_path: str = '', source_index: int | None = None) ArtifactRecord[source]
Register and persist an artifact.
Serializes the object, stores in centralized binaries (with deduplication), and creates an ArtifactRecord.
Note: This method accepts pre-generated artifact IDs for backward compatibility. For new code, use register_with_chain() which generates IDs from OperatorChain.
- Parameters:
obj – Object to persist (model, transformer, etc.)
artifact_id – Pre-generated artifact ID (V3 format: pipeline$hash:fold)
artifact_type – Classification (model, transformer, etc.)
depends_on – List of artifact IDs this depends on
params – Model parameters for inspection
meta_config – Meta-model configuration (for stacking)
format_hint – Optional serialization format hint
custom_name – User-defined name for the artifact (e.g., “Q5_PLS_10”)
chain_path – V3 operator chain path (required for full traceability)
source_index – Multi-source index (None for single source)
- Returns:
ArtifactRecord with full metadata
- Raises:
ValueError – If object cannot be serialized or if meta-model dependencies are missing
- register_meta_model(obj: Any, artifact_id: str, source_model_ids: List[str], feature_columns: List[str] | None = None, params: Dict[str, Any] | None = None, format_hint: str | None = None) ArtifactRecord[source]
Register a stacking meta-model with source model references.
Convenience method for registering meta-models that automatically: - Creates the MetaModelConfig with ordered source model references - Sets up dependency tracking to source models - Validates that all source models exist
- Parameters:
obj – The meta-model object to persist
artifact_id – Pre-generated artifact ID for the meta-model
source_model_ids – Ordered list of source model artifact IDs
feature_columns – Optional feature column names matching source order
params – Optional meta-model parameters
format_hint – Optional serialization format hint
- Returns:
ArtifactRecord for the registered meta-model
- Raises:
ValueError – If any source model is not found in the registry
Example
>>> meta_config_record = registry.register_meta_model( ... obj=ridge_meta_model, ... artifact_id="0001:5:all", ... source_model_ids=["0001:3:all", "0001:4:all"], ... feature_columns=["PLSRegression_pred", "RandomForestRegressor_pred"] ... )
- register_with_chain(obj: Any, chain: OperatorChain | str, artifact_type: ArtifactType, step_index: int, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None, depends_on: List[str] | None = None, params: Dict[str, Any] | None = None, meta_config: MetaModelConfig | None = None, format_hint: str | None = None, custom_name: str | None = None, pipeline_id: str | None = None) ArtifactRecord[source]
Register and persist an artifact using V3 chain-based identification.
This is the primary registration method for V3. It generates a deterministic artifact ID from the operator chain and stores the chain path for later lookup.
- Parameters:
obj – Object to persist (model, transformer, etc.)
chain – OperatorChain or chain path string
artifact_type – Classification (model, transformer, etc.)
step_index – Pipeline step index (1-based)
branch_path – List of branch indices (empty for non-branching)
source_index – Multi-source index (None for single source)
fold_id – CV fold (None for shared artifacts)
substep_index – Substep index for [model1, model2]
depends_on – List of artifact IDs this depends on
params – Model parameters for inspection
meta_config – Meta-model configuration (for stacking)
format_hint – Optional serialization format hint
custom_name – User-defined name for the artifact
- Returns:
ArtifactRecord with full metadata
- Raises:
ValueError – If object cannot be serialized or if meta-model dependencies are missing
- resolve(artifact_id: str) ArtifactRecord | None[source]
Resolve artifact ID to record.
- Parameters:
artifact_id – Artifact ID to resolve
- Returns:
ArtifactRecord or None if not found
- resolve_by_hash(content_hash: str) ArtifactRecord | None[source]
Resolve content hash to artifact record.
- Parameters:
content_hash – Content hash to look up
- Returns:
ArtifactRecord or None if not found
- resolve_dependencies(artifact_id: str) List[ArtifactRecord][source]
Get all transitive dependencies as records.
- Parameters:
artifact_id – Starting artifact
- Returns:
List of ArtifactRecords in topological order
- class nirs4all.pipeline.storage.artifacts.ArtifactType(value)[source]
-
Classification of artifact types.
Each type has specific handling: - model: Trained ML models (sklearn, tensorflow, pytorch, etc.) - transformer: Fitted preprocessors (scalers, feature extractors) - splitter: Train/test split configuration (for reproducibility) - encoder: Label encoders, y-scalers - meta_model: Stacking meta-models with source model dependencies
- ENCODER = 'encoder'
- META_MODEL = 'meta_model'
- MODEL = 'model'
- SPLITTER = 'splitter'
- TRANSFORMER = 'transformer'
- class nirs4all.pipeline.storage.artifacts.DependencyGraph[source]
Bases:
objectTracks artifact dependencies for stacking and transfer.
Maintains a directed graph where edges represent “depends on” relationships. Supports transitive dependency resolution with cycle detection.
- add_dependencies(artifact_id: str, depends_on: List[str]) None[source]
Add multiple dependencies at once.
- Parameters:
artifact_id – The dependent artifact
depends_on – List of artifacts being depended upon
- add_dependency(artifact_id: str, depends_on: str) None[source]
Add a dependency relationship.
- Parameters:
artifact_id – The dependent artifact
depends_on – The artifact being depended upon
- get_dependencies(artifact_id: str) List[str][source]
Get direct dependencies of an artifact.
- Parameters:
artifact_id – Artifact to query
- Returns:
List of artifact IDs this artifact depends on
- get_dependents(artifact_id: str) List[str][source]
Get artifacts that directly depend on this artifact.
- Parameters:
artifact_id – Artifact to query
- Returns:
List of artifact IDs that depend on this artifact
- remove_artifact(artifact_id: str) None[source]
Remove an artifact and its edges from the graph.
- Parameters:
artifact_id – Artifact to remove
- resolve_dependencies(artifact_id: str, max_depth: int = 100) List[str][source]
Get all transitive dependencies (topologically sorted).
Returns dependencies in order suitable for loading - dependencies before dependents.
- Parameters:
artifact_id – Starting artifact
max_depth – Maximum recursion depth (prevents cycles)
- Returns:
List of all dependencies in topological order
- Raises:
ValueError – If cycle detected or max depth exceeded
- class nirs4all.pipeline.storage.artifacts.ExecutionPath(pipeline_id: str, chain_path: str = '', branch_path: List[int] = None, step_index: int = 0, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None)[source]
Bases:
objectRepresents the execution context for an artifact (V3).
Captures all context needed to uniquely identify an artifact within a pipeline execution.
- classmethod from_artifact_id_v3(artifact_id: str, chain_path: str = '') ExecutionPath[source]
Create ExecutionPath from V3 artifact ID string.
- Parameters:
artifact_id – V3 artifact ID to parse
chain_path – Full chain path (required for complete reconstruction)
- Returns:
ExecutionPath instance
- class nirs4all.pipeline.storage.artifacts.MetaModelConfig(source_models: Dict[str, ~typing.Any]]=<factory>, feature_columns: List[str] = <factory>)[source]
Bases:
objectConfiguration for meta-model source tracking.
Stores the ordered source models that feed into a stacking meta-model, along with their feature column mapping.
- class nirs4all.pipeline.storage.artifacts.OperatorChain(nodes: List[OperatorNode] = <factory>, pipeline_id: str = '')[source]
Bases:
objectOrdered sequence of OperatorNodes representing the full execution path.
The OperatorChain captures the complete path of operators from input to the current artifact, enabling deterministic artifact identification and replay.
- nodes
Ordered list of OperatorNode objects in the chain
- append(node: OperatorNode) OperatorChain[source]
Return new chain with node appended.
- Parameters:
node – OperatorNode to append
- Returns:
New OperatorChain with the node appended
- copy() OperatorChain[source]
Create a deep copy of this chain.
- Returns:
New OperatorChain with copied nodes
- extend(other: OperatorChain) OperatorChain[source]
Return new chain with another chain’s nodes appended.
- Parameters:
other – OperatorChain to append
- Returns:
New OperatorChain with all nodes from both chains
- filter_branch(target_branch_path: List[int]) OperatorChain[source]
Return chain with only nodes matching the branch path.
Includes nodes that: - Have no branch path (shared/pre-branch artifacts) - Have a branch path that is a prefix of or equal to target
- Parameters:
target_branch_path – Branch path to filter for
- Returns:
New OperatorChain with only matching nodes
- filter_source(source_index: int) OperatorChain[source]
Return chain with only nodes for the specified source.
Includes nodes that: - Have no source_index (single source) - Have matching source_index
- Parameters:
source_index – Source index to filter for
- Returns:
New OperatorChain with only matching nodes
- filter_step(step_index: int) OperatorChain[source]
Return chain with only nodes at the specified step.
- Parameters:
step_index – Step index to filter for
- Returns:
New OperatorChain with only matching nodes
- classmethod from_dict(data: Dict[str, Any]) OperatorChain[source]
Create OperatorChain from dictionary.
- Parameters:
data – Dictionary representation
- Returns:
OperatorChain instance
- classmethod from_path(path: str, pipeline_id: str = '') OperatorChain[source]
Parse OperatorChain from a path string.
- Parameters:
path – Chain path string like “s1.MinMaxScaler>s3.SNV[br=0]”
pipeline_id – Pipeline identifier
- Returns:
OperatorChain instance
- get_branch_path() List[int][source]
Get the branch path from the last node.
- Returns:
Branch path of the last node, or empty list if no nodes
- get_last_node() OperatorNode | None[source]
Get the last node in the chain.
- Returns:
Last OperatorNode or None if chain is empty
- get_nodes_at_step(step_index: int) List[OperatorNode][source]
Get all nodes at a specific step.
- Parameters:
step_index – Step index to filter
- Returns:
List of nodes at that step
- merge_with_prefix(prefix_chain: OperatorChain, step_offset: int = 0) OperatorChain[source]
Merge this chain with a prefix chain for bundle import.
Used when importing a bundle into a pipeline, where the bundle’s chain needs to be prefixed with the import context’s chain.
- Parameters:
prefix_chain – Chain to prepend (the import context)
step_offset – Offset to add to step indices in this chain
- Returns:
New merged OperatorChain
Example
>>> bundle_chain = OperatorChain.from_path("s1.Scaler>s3.PLS") >>> import_chain = OperatorChain.from_path("s1.Import") >>> merged = bundle_chain.merge_with_prefix(import_chain, step_offset=1) # Result: "s1.Import>s2.Scaler>s4.PLS"
- nodes: List[OperatorNode]
- remap_steps(step_mapping: Dict[int, int]) OperatorChain[source]
Create new chain with remapped step indices.
- Parameters:
step_mapping – Mapping from old step index to new step index
- Returns:
New OperatorChain with remapped steps
- to_dict() Dict[str, Any][source]
Convert to dictionary for serialization.
- Returns:
Dictionary representation
- to_hash(length: int = 12) str[source]
Compute deterministic hash of the chain path.
- Parameters:
length – Number of hex characters to return (default: 12)
- Returns:
Truncated SHA256 hash of the chain path
- to_path() str[source]
Generate full path string from all nodes.
Format: node1>node2>node3
- Returns:
Chain path string
Examples
>>> chain = OperatorChain([ ... OperatorNode(1, "MinMaxScaler"), ... OperatorNode(3, "SNV", branch_path=[0]) ... ]) >>> chain.to_path() 's1.MinMaxScaler>s3.SNV[br=0]'
- with_pipeline_id(pipeline_id: str) OperatorChain[source]
Create a copy of this chain with a new pipeline ID.
- Parameters:
pipeline_id – New pipeline ID to set
- Returns:
New OperatorChain with the specified pipeline_id
- class nirs4all.pipeline.storage.artifacts.OperatorNode(step_index: int, operator_class: str, branch_path: List[int] = <factory>, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None, operator_name: str | None = None)[source]
Bases:
objectRepresents a single operator in the execution chain.
An OperatorNode captures all the context needed to identify a specific operator execution within a pipeline, including its position, branch context, and source index for multi-source processing.
- classmethod from_dict(data: Dict[str, Any]) OperatorNode[source]
Create OperatorNode from dictionary.
- Parameters:
data – Dictionary representation
- Returns:
OperatorNode instance
- classmethod from_key(key: str) OperatorNode[source]
Parse an OperatorNode from its key string representation.
- Parameters:
key – Key string like “s3.SNV[br=0,src=1]”
- Returns:
OperatorNode instance
- Raises:
ValueError – If key format is invalid
- matches_context(step_index: int | None = None, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None) bool[source]
Check if this node matches the given context filters.
None values are treated as “match any”.
- Parameters:
step_index – Step number to match (None = any)
branch_path – Branch path to match (None = any)
source_index – Source index to match (None = any)
fold_id – Fold ID to match (None = any)
- Returns:
True if node matches all specified filters
- to_dict() Dict[str, Any][source]
Convert to dictionary for serialization.
- Returns:
Dictionary representation suitable for YAML/JSON
- to_key() str[source]
Generate compact key string for this node.
Format: s{step}.{Class}[qualifiers]
- Qualifiers (only if present):
br={branch_path} - Branch context src={source_index} - Multi-source index sub={substep_index} - Substep index
- Returns:
Compact key string for this operator node
Examples
>>> OperatorNode(1, "MinMaxScaler").to_key() 's1.MinMaxScaler' >>> OperatorNode(3, "SNV", branch_path=[0]).to_key() 's3.SNV[br=0]' >>> OperatorNode(3, "SNV", branch_path=[0], source_index=1).to_key() 's3.SNV[br=0,src=1]'
- with_fold(fold_id: int) OperatorNode[source]
Create a copy of this node with a specific fold ID.
- Parameters:
fold_id – The fold ID to set
- Returns:
New OperatorNode with the specified fold_id
- with_source(source_index: int) OperatorNode[source]
Create a copy of this node with a specific source index.
- Parameters:
source_index – The source index to set
- Returns:
New OperatorNode with the specified source_index
- nirs4all.pipeline.storage.artifacts.artifact_id_matches_context(artifact_id: str, pipeline_id: str | None = None, branch_path: List[int] | None = None, step_index: int | None = None, fold_id: int | None = None) bool[source]
Check if a V3 artifact ID matches a given context.
Partial matching is supported - only specified parameters are checked. Note: branch_path and step_index matching requires ArtifactRecord access.
- Parameters:
artifact_id – V3 artifact ID to check
pipeline_id – Expected pipeline ID (None = don’t check)
branch_path – Expected branch path (ignored for V3 - use ArtifactRecord)
step_index – Expected step index (ignored for V3 - use ArtifactRecord)
fold_id – Expected fold ID (None = don’t check)
- Returns:
True if artifact matches specified criteria, False otherwise
- nirs4all.pipeline.storage.artifacts.compute_chain_hash(chain_path: str, length: int = 12) str[source]
Compute deterministic hash from chain path string.
- Parameters:
chain_path – Full operator chain path
length – Number of hex characters (default: 12)
- Returns:
Truncated SHA256 hash
- nirs4all.pipeline.storage.artifacts.compute_content_hash(content: bytes) str[source]
Compute SHA256 hash of binary content.
- Parameters:
content – Binary content to hash
- Returns:
“ prefix
- Return type:
Full SHA256 hash with “sha256
- nirs4all.pipeline.storage.artifacts.extract_fold_id_from_artifact_id(artifact_id: str) int | None[source]
Extract fold ID from artifact ID (V2 or V3).
- Parameters:
artifact_id – Full artifact ID
- Returns:
Fold ID or None if “all”
- nirs4all.pipeline.storage.artifacts.extract_pipeline_id_from_artifact_id(artifact_id: str) str[source]
Extract pipeline ID from artifact ID (V2 or V3).
- Parameters:
artifact_id – Full artifact ID
- Returns:
Pipeline ID component
- nirs4all.pipeline.storage.artifacts.generate_artifact_id_v3(pipeline_id: str, chain: OperatorChain | str, fold_id: int | None = None) str[source]
Generate V3 artifact ID from chain.
Format: {pipeline_id}${chain_hash}:{fold_id}
- Parameters:
pipeline_id – Pipeline identifier
chain – Operator chain object or chain path string for this artifact
fold_id – Fold ID (None for shared artifacts)
- Returns:
V3 artifact ID string
Examples
>>> generate_artifact_id_v3("0001_pls", chain, None) '0001_pls$a1b2c3d4e5f6:all' >>> generate_artifact_id_v3("0001_pls", chain, 0) '0001_pls$a1b2c3d4e5f6:0'
- nirs4all.pipeline.storage.artifacts.generate_filename(artifact_type: str, class_name: str, content_hash: str, extension: str = 'joblib') str[source]
Generate artifact filename from components.
New format: <type>_<class>_<short_hash>.<ext>
- Parameters:
artifact_type – Artifact type (model, transformer, etc.)
class_name – Python class name
content_hash – Full SHA256 hash (will be truncated)
extension – File extension (default: joblib)
- Returns:
Filename string
Examples
>>> generate_filename("model", "PLSRegression", "abc123def456") "model_PLSRegression_abc123def456.joblib"
- nirs4all.pipeline.storage.artifacts.get_binaries_path(workspace: Path, dataset: str) Path[source]
Get the centralized binaries directory for a dataset.
New architecture stores artifacts at workspace/binaries/<dataset>/
- Parameters:
workspace – Workspace root path
dataset – Dataset name
- Returns:
Path to binaries directory
- nirs4all.pipeline.storage.artifacts.get_short_hash(content_hash: str, length: int = 12) str[source]
Extract short hash from full content hash.
- Parameters:
content_hash – Full hash (with or without sha256: prefix)
length – Number of characters to return (default: 12)
- Returns:
Short hash string
- nirs4all.pipeline.storage.artifacts.is_v3_artifact_id(artifact_id: str) bool[source]
Check if an artifact ID is in V3 format.
- Parameters:
artifact_id – Artifact ID to check
- Returns:
True if V3 format, False otherwise
- nirs4all.pipeline.storage.artifacts.parse_artifact_id(artifact_id: str) Tuple[str, List[int], int, int | None, int | None][source]
Parse an artifact ID into its components (V3 only).
V3 format: {pipeline_id}${chain_hash}:{fold_id}
- Parameters:
artifact_id – V3 artifact ID to parse
- Returns:
Tuple of (pipeline_id, branch_path, step_index, fold_id, sub_index) For V3: step_index will be 0, branch_path empty (use ArtifactRecord for full info)
- Raises:
ValueError – If artifact ID format is not V3
- nirs4all.pipeline.storage.artifacts.parse_artifact_id_v3(artifact_id: str) Tuple[str, str, int | None][source]
Parse V3 artifact ID into components.
- Parameters:
artifact_id – V3 artifact ID string
- Returns:
Tuple of (pipeline_id, chain_hash, fold_id)
- Raises:
ValueError – If format is invalid
Examples
>>> parse_artifact_id_v3("0001_pls$a1b2c3d4e5f6:all") ('0001_pls', 'a1b2c3d4e5f6', None) >>> parse_artifact_id_v3("0001_pls$a1b2c3d4e5f6:0") ('0001_pls', 'a1b2c3d4e5f6', 0)
- nirs4all.pipeline.storage.artifacts.parse_filename(filename: str) Tuple[str, str, str] | None[source]
Parse artifact filename into components.
Handles new format: <type>_<class>_<short_hash>.<ext> Also handles legacy format: <class>_<short_hash>.<ext>
- Parameters:
filename – Filename to parse
- Returns:
Tuple of (artifact_type, class_name, short_hash) or None if invalid