nirs4all.pipeline.storage.artifacts.artifact_loader module
Artifact Loader V3 - Chain-based artifact loading for prediction replay.
This module provides the ArtifactLoader class which loads artifacts using the V3 chain-based identification system. It supports:
Loading by V3 artifact ID (pipeline$hash:fold)
Loading by operator chain path
Loading by step/branch/source/fold context
Transitive dependency resolution for stacking
Per-fold model loading for CV averaging
LRU caching for efficient reuse
V3 Key Features: - Chain path indexing for deterministic artifact lookup - Source index tracking for multi-source pipelines - Unified handling of branching, stacking, and bundles
The loader works with centralized storage at workspace/binaries/<dataset>/ and reads artifact metadata from V3 manifests.
- class nirs4all.pipeline.storage.artifacts.artifact_loader.ArtifactLoader(workspace: Path, dataset: str, results_dir: Path | None = None, cache_size: int = 100)[source]
Bases:
objectLoad artifacts using V3 chain-based identification.
This class provides efficient loading of artifacts from centralized storage, with support for: - Direct loading by V3 artifact ID (pipeline$hash:fold) - Chain path-based loading for deterministic replay - Context-based loading (step/branch/source/fold) - Dependency resolution for stacking meta-models - Per-fold model loading for cross-validation ensemble - LRU caching to avoid redundant I/O
V3 Key Features: - Chain path indexing for O(1) lookup by chain - Source index support for multi-source pipelines - Branch path filtering using chain metadata
The loader uses lazy loading - artifacts are only deserialized when actually accessed via load_by_id() or related methods.
- workspace
Workspace root path
- dataset
Dataset name
- binaries_dir
Path to centralized binaries
- results_dir
Path to results directory (for manifest reference)
Example
>>> loader = ArtifactLoader.from_manifest(manifest, results_dir) >>> model = loader.load_by_id("0001_pls$abc123def456:0") >>> artifacts = loader.load_by_chain("s1.MinMaxScaler>s3.PLS[br=0]")
- DEFAULT_CACHE_SIZE = 100
- find_artifact_by_custom_name(custom_name: str, step_index: int | None = None, fold_id: int | None = None, branch_path: List[int] | None = None) ArtifactRecord | None[source]
Find an artifact by its custom_name.
Used for reverse lookup when only the model name is known but not the artifact_id. Useful for legacy compatibility.
- Parameters:
custom_name – User-defined model name (e.g., “Q5_PLS_10”)
step_index – Optional filter by step
fold_id – Optional filter by fold
branch_path – Optional filter by branch
- Returns:
ArtifactRecord if found, None otherwise
- classmethod from_manifest(manifest: Dict[str, Any], results_dir: Path) ArtifactLoader[source]
Create an ArtifactLoader from a pipeline manifest.
Factory method for easy creation from manifest data.
- Parameters:
manifest – Pipeline manifest dictionary
results_dir – Path to results directory (manifest.yaml’s parent)
- Returns:
Initialized ArtifactLoader instance
- get_all_records() List[ArtifactRecord][source]
Get all artifact records.
- Returns:
List of all ArtifactRecords
- get_artifacts_by_chain_filter(step_index: int | None = None, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None) List[ArtifactRecord][source]
Get artifact records matching chain-based filters.
Uses the chain_path information stored in V3 records to filter.
- Parameters:
step_index – Filter by step index
branch_path – Filter by branch path
source_index – Filter by source index
fold_id – Filter by fold ID
- Returns:
List of matching ArtifactRecords
- get_cache_info() Dict[str, Any][source]
Get information about the current cache state.
- Returns:
Dictionary with cache statistics
- get_record(artifact_id: str) ArtifactRecord | None[source]
Get artifact record by ID.
- Parameters:
artifact_id – Artifact ID
- Returns:
ArtifactRecord or None if not found
- get_record_by_chain(chain_path: str) ArtifactRecord | None[source]
Get artifact record by chain path.
- Parameters:
chain_path – Operator chain path
- Returns:
ArtifactRecord or None if not found
- get_step_binaries(step_id: int, branch_id: int | None = None, branch_path: List[int] | None = None) List[Tuple[str, Any]][source]
Legacy-compatible method for loading step binaries.
This method provides backward compatibility with the BinaryLoader API. Prefer using load_for_step() for new code.
Returns names in a format compatible with controller lookup patterns: - For models with fold_id: “ClassName_<op_num>” where op_num = step*100 + fold - For shared models: “ClassName_<op_num>” where op_num = step*100 - For y_transformers (ENCODER type): “y_ClassName_<op_num>” - For x_transformers (TRANSFORMER type): “ClassName_<op_num>”
- Parameters:
step_id – Step identifier (supports int or “step_substep” format)
branch_id – Optional branch ID (converts to branch_path [branch_id])
branch_path – Optional full branch path for nested branches (takes precedence over branch_id)
- Returns:
List of (name, loaded_object) tuples
- get_step_binaries_by_artifact_ids(artifact_ids: List[str]) List[Tuple[str, Any]][source]
Load multiple artifacts by their deterministic artifact_ids.
This method is used in prediction mode when model_artifact_id is available in the prediction record. It provides deterministic loading that works correctly with custom model names.
- Parameters:
artifact_ids – List of artifact IDs to load
- Returns:
List of (name, loaded_object) tuples
- Raises:
KeyError – If any artifact_id is not found
Example
>>> artifact_ids = ["abc123:4:0", "abc123:4:1"] >>> binaries = loader.get_step_binaries_by_artifact_ids(artifact_ids)
- has_binaries_for_step(step_number: int, substep_number: int | None = None, branch_id: int | None = None) bool[source]
Check if binaries exist for a specific step.
Legacy-compatible method for checking artifact availability.
- Parameters:
step_number – The main step number
substep_number – Ignored (kept for compatibility)
branch_id – Optional branch ID to check
- Returns:
True if artifacts exist for this step
- import_from_manifest(manifest: Dict[str, Any], results_dir: Path | None = None) None[source]
Import artifact records from a V3 manifest.
Builds all indexes including chain_path index for V3 lookups.
- Parameters:
manifest – Manifest dictionary
results_dir – Optional results directory override
- load_by_artifact_id(artifact_id: str) Tuple[str, Any][source]
Load a single artifact by its deterministic artifact_id.
This method provides deterministic artifact loading using the artifact_id stored in predictions. Unlike name-based loading which can be ambiguous with custom model names, artifact_id-based loading is always exact.
- Parameters:
artifact_id – The deterministic artifact ID (e.g., “0001:4:0” for fold 0 or “0001:4:all” for shared artifacts)
- Returns:
Tuple of (name, loaded_object) where name is built from custom_name if available, otherwise from class_name.
- Raises:
KeyError – If artifact_id not found in registry
FileNotFoundError – If artifact file doesn’t exist on disk
Example
>>> loader = ArtifactLoader.from_manifest(manifest, results_dir) >>> name, model = loader.load_by_artifact_id("abc123:4:0") >>> predictions = model.predict(X_new)
- load_by_chain(chain: str, fold_id: int | None = None) Any | None[source]
Load artifact by exact chain path match.
- Parameters:
chain – Operator chain path string (e.g., “s1.MinMaxScaler>s3.PLS[br=0]”)
fold_id – Optional fold ID filter
- Returns:
Loaded artifact object or None if not found
- load_by_chain_prefix(prefix: str, branch_path: List[int] | None = None, source_index: int | None = None) List[Tuple[str, Any]][source]
Load all artifacts whose chain path starts with the given prefix.
Useful for loading all artifacts in a chain for prediction replay.
- Parameters:
prefix – Chain path prefix to match
branch_path – Optional branch path filter
source_index – Optional source index filter
- Returns:
List of (artifact_id, loaded_object) tuples
- load_by_id(artifact_id: str) Any[source]
Load a single artifact by its V3 ID.
Uses LRU cache to avoid redundant disk I/O. Artifacts are loaded lazily on first access.
- Parameters:
artifact_id – V3 artifact identifier (pipeline$hash:fold)
- Returns:
Deserialized artifact object
- Raises:
KeyError – If artifact ID not found
FileNotFoundError – If artifact file doesn’t exist
- load_fold_models(step_index: int, branch_path: List[int] | None = None, pipeline_id: str | None = None) List[Tuple[int, Any]][source]
Load all fold-specific model artifacts for CV averaging.
Returns models for all folds at the specified step, sorted by fold_id.
- Parameters:
step_index – Step number where models are
branch_path – Optional branch path filter
pipeline_id – Optional pipeline ID filter
- Returns:
List of (fold_id, loaded_model) tuples, sorted by fold_id
- load_for_step(step_index: int, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None, pipeline_id: str | None = None) List[Tuple[str, Any]][source]
Load all artifacts for a step context.
Returns artifacts matching the specified step, branch path, source, and fold. If branch_path is provided, includes both branch-specific and shared (pre-branch) artifacts.
- Parameters:
step_index – Step number to load
branch_path – Optional branch path filter
source_index – Optional source index filter
fold_id – Optional fold ID filter
pipeline_id – Optional pipeline ID filter
- Returns:
List of (artifact_id, loaded_object) tuples
- load_meta_model_for_prediction(artifact_id: str, X: Any = None) Tuple[Any, List[Tuple[str, Any]], List[str]][source]
Load a meta-model and its sources, ready for prediction.
This method loads the complete stacking ensemble and validates that all components are compatible for prediction.
- Parameters:
artifact_id – Meta-model artifact ID
X – Optional input features for validation
- Returns:
Tuple of (meta_model, source_models, feature_columns) where source_models is list of (artifact_id, model) tuples in the correct order for feature construction
- Raises:
KeyError – If artifact or source models not found
ValueError – If artifact is not a meta-model
- load_meta_model_with_sources(artifact_id: str, validate_branch: bool = True) Tuple[Any, List[Tuple[str, Any]], List[str]][source]
Load a meta-model and its source models.
For stacking, loads the meta-model and all source models it depends on, preserving the feature column order as specified in meta_config.
- Parameters:
artifact_id – Meta-model artifact ID
validate_branch – If True, validate branch context matches
- Returns:
Tuple of (meta_model, [(source_id, source_model), …], feature_columns) where source_models are in the correct order for feature construction
- Raises:
KeyError – If artifact not found
ValueError – If artifact is not a meta-model or if branch validation fails
- load_with_dependencies(artifact_id: str) Dict[str, Any][source]
Load an artifact and all its transitive dependencies.
Returns a dictionary mapping artifact IDs to loaded objects, in topological order (dependencies before dependents).
- Parameters:
artifact_id – Starting artifact ID
- Returns:
loaded_object}
- Return type:
Dictionary of {artifact_id
- Raises:
KeyError – If artifact or dependency not found
ValueError – If cycle detected in dependencies
- preload_artifacts(artifact_ids: List[str] | None = None, artifact_types: List[ArtifactType] | None = None) int[source]
Preload artifacts into cache.
Useful for warming the cache before prediction or when you know which artifacts will be needed.
- Parameters:
artifact_ids – Specific artifact IDs to preload (default: all)
artifact_types – Filter by artifact types (default: all)
- Returns:
Number of artifacts loaded
- class nirs4all.pipeline.storage.artifacts.artifact_loader.LRUCache(max_size: int = 100)[source]
Bases:
objectSimple LRU cache with configurable max size.
Uses OrderedDict for O(1) access and LRU eviction.
- get(key: str) Any | None[source]
Get item from cache, moving to end (most recently used).
- Parameters:
key – Cache key
- Returns:
Cached value or None if not found