nirs4all.pipeline.storage.artifacts.artifact_loader module

Artifact Loader V3 - Chain-based artifact loading for prediction replay.

This module provides the ArtifactLoader class which loads artifacts using the V3 chain-based identification system. It supports:

  • Loading by V3 artifact ID (pipeline$hash:fold)

  • Loading by operator chain path

  • Loading by step/branch/source/fold context

  • Transitive dependency resolution for stacking

  • Per-fold model loading for CV averaging

  • LRU caching for efficient reuse

V3 Key Features: - Chain path indexing for deterministic artifact lookup - Source index tracking for multi-source pipelines - Unified handling of branching, stacking, and bundles

The loader works with centralized storage at workspace/binaries/<dataset>/ and reads artifact metadata from V3 manifests.

class nirs4all.pipeline.storage.artifacts.artifact_loader.ArtifactLoader(workspace: Path, dataset: str, results_dir: Path | None = None, cache_size: int = 100)[source]

Bases: object

Load artifacts using V3 chain-based identification.

This class provides efficient loading of artifacts from centralized storage, with support for: - Direct loading by V3 artifact ID (pipeline$hash:fold) - Chain path-based loading for deterministic replay - Context-based loading (step/branch/source/fold) - Dependency resolution for stacking meta-models - Per-fold model loading for cross-validation ensemble - LRU caching to avoid redundant I/O

V3 Key Features: - Chain path indexing for O(1) lookup by chain - Source index support for multi-source pipelines - Branch path filtering using chain metadata

The loader uses lazy loading - artifacts are only deserialized when actually accessed via load_by_id() or related methods.

workspace

Workspace root path

dataset

Dataset name

binaries_dir

Path to centralized binaries

results_dir

Path to results directory (for manifest reference)

Example

>>> loader = ArtifactLoader.from_manifest(manifest, results_dir)
>>> model = loader.load_by_id("0001_pls$abc123def456:0")
>>> artifacts = loader.load_by_chain("s1.MinMaxScaler>s3.PLS[br=0]")
DEFAULT_CACHE_SIZE = 100
clear_cache() None[source]

Clear the object cache to free memory.

find_artifact_by_custom_name(custom_name: str, step_index: int | None = None, fold_id: int | None = None, branch_path: List[int] | None = None) ArtifactRecord | None[source]

Find an artifact by its custom_name.

Used for reverse lookup when only the model name is known but not the artifact_id. Useful for legacy compatibility.

Parameters:
  • custom_name – User-defined model name (e.g., “Q5_PLS_10”)

  • step_index – Optional filter by step

  • fold_id – Optional filter by fold

  • branch_path – Optional filter by branch

Returns:

ArtifactRecord if found, None otherwise

classmethod from_manifest(manifest: Dict[str, Any], results_dir: Path) ArtifactLoader[source]

Create an ArtifactLoader from a pipeline manifest.

Factory method for easy creation from manifest data.

Parameters:
  • manifest – Pipeline manifest dictionary

  • results_dir – Path to results directory (manifest.yaml’s parent)

Returns:

Initialized ArtifactLoader instance

get_all_records() List[ArtifactRecord][source]

Get all artifact records.

Returns:

List of all ArtifactRecords

get_artifacts_by_chain_filter(step_index: int | None = None, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None) List[ArtifactRecord][source]

Get artifact records matching chain-based filters.

Uses the chain_path information stored in V3 records to filter.

Parameters:
  • step_index – Filter by step index

  • branch_path – Filter by branch path

  • source_index – Filter by source index

  • fold_id – Filter by fold ID

Returns:

List of matching ArtifactRecords

get_cache_info() Dict[str, Any][source]

Get information about the current cache state.

Returns:

Dictionary with cache statistics

get_record(artifact_id: str) ArtifactRecord | None[source]

Get artifact record by ID.

Parameters:

artifact_id – Artifact ID

Returns:

ArtifactRecord or None if not found

get_record_by_chain(chain_path: str) ArtifactRecord | None[source]

Get artifact record by chain path.

Parameters:

chain_path – Operator chain path

Returns:

ArtifactRecord or None if not found

get_step_binaries(step_id: int, branch_id: int | None = None, branch_path: List[int] | None = None) List[Tuple[str, Any]][source]

Legacy-compatible method for loading step binaries.

This method provides backward compatibility with the BinaryLoader API. Prefer using load_for_step() for new code.

Returns names in a format compatible with controller lookup patterns: - For models with fold_id: “ClassName_<op_num>” where op_num = step*100 + fold - For shared models: “ClassName_<op_num>” where op_num = step*100 - For y_transformers (ENCODER type): “y_ClassName_<op_num>” - For x_transformers (TRANSFORMER type): “ClassName_<op_num>”

Parameters:
  • step_id – Step identifier (supports int or “step_substep” format)

  • branch_id – Optional branch ID (converts to branch_path [branch_id])

  • branch_path – Optional full branch path for nested branches (takes precedence over branch_id)

Returns:

List of (name, loaded_object) tuples

get_step_binaries_by_artifact_ids(artifact_ids: List[str]) List[Tuple[str, Any]][source]

Load multiple artifacts by their deterministic artifact_ids.

This method is used in prediction mode when model_artifact_id is available in the prediction record. It provides deterministic loading that works correctly with custom model names.

Parameters:

artifact_ids – List of artifact IDs to load

Returns:

List of (name, loaded_object) tuples

Raises:

KeyError – If any artifact_id is not found

Example

>>> artifact_ids = ["abc123:4:0", "abc123:4:1"]
>>> binaries = loader.get_step_binaries_by_artifact_ids(artifact_ids)
has_binaries_for_step(step_number: int, substep_number: int | None = None, branch_id: int | None = None) bool[source]

Check if binaries exist for a specific step.

Legacy-compatible method for checking artifact availability.

Parameters:
  • step_number – The main step number

  • substep_number – Ignored (kept for compatibility)

  • branch_id – Optional branch ID to check

Returns:

True if artifacts exist for this step

import_from_manifest(manifest: Dict[str, Any], results_dir: Path | None = None) None[source]

Import artifact records from a V3 manifest.

Builds all indexes including chain_path index for V3 lookups.

Parameters:
  • manifest – Manifest dictionary

  • results_dir – Optional results directory override

load_by_artifact_id(artifact_id: str) Tuple[str, Any][source]

Load a single artifact by its deterministic artifact_id.

This method provides deterministic artifact loading using the artifact_id stored in predictions. Unlike name-based loading which can be ambiguous with custom model names, artifact_id-based loading is always exact.

Parameters:

artifact_id – The deterministic artifact ID (e.g., “0001:4:0” for fold 0 or “0001:4:all” for shared artifacts)

Returns:

Tuple of (name, loaded_object) where name is built from custom_name if available, otherwise from class_name.

Raises:

Example

>>> loader = ArtifactLoader.from_manifest(manifest, results_dir)
>>> name, model = loader.load_by_artifact_id("abc123:4:0")
>>> predictions = model.predict(X_new)
load_by_chain(chain: str, fold_id: int | None = None) Any | None[source]

Load artifact by exact chain path match.

Parameters:
  • chain – Operator chain path string (e.g., “s1.MinMaxScaler>s3.PLS[br=0]”)

  • fold_id – Optional fold ID filter

Returns:

Loaded artifact object or None if not found

load_by_chain_prefix(prefix: str, branch_path: List[int] | None = None, source_index: int | None = None) List[Tuple[str, Any]][source]

Load all artifacts whose chain path starts with the given prefix.

Useful for loading all artifacts in a chain for prediction replay.

Parameters:
  • prefix – Chain path prefix to match

  • branch_path – Optional branch path filter

  • source_index – Optional source index filter

Returns:

List of (artifact_id, loaded_object) tuples

load_by_id(artifact_id: str) Any[source]

Load a single artifact by its V3 ID.

Uses LRU cache to avoid redundant disk I/O. Artifacts are loaded lazily on first access.

Parameters:

artifact_id – V3 artifact identifier (pipeline$hash:fold)

Returns:

Deserialized artifact object

Raises:
load_fold_models(step_index: int, branch_path: List[int] | None = None, pipeline_id: str | None = None) List[Tuple[int, Any]][source]

Load all fold-specific model artifacts for CV averaging.

Returns models for all folds at the specified step, sorted by fold_id.

Parameters:
  • step_index – Step number where models are

  • branch_path – Optional branch path filter

  • pipeline_id – Optional pipeline ID filter

Returns:

List of (fold_id, loaded_model) tuples, sorted by fold_id

load_for_step(step_index: int, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None, pipeline_id: str | None = None) List[Tuple[str, Any]][source]

Load all artifacts for a step context.

Returns artifacts matching the specified step, branch path, source, and fold. If branch_path is provided, includes both branch-specific and shared (pre-branch) artifacts.

Parameters:
  • step_index – Step number to load

  • branch_path – Optional branch path filter

  • source_index – Optional source index filter

  • fold_id – Optional fold ID filter

  • pipeline_id – Optional pipeline ID filter

Returns:

List of (artifact_id, loaded_object) tuples

load_meta_model_for_prediction(artifact_id: str, X: Any = None) Tuple[Any, List[Tuple[str, Any]], List[str]][source]

Load a meta-model and its sources, ready for prediction.

This method loads the complete stacking ensemble and validates that all components are compatible for prediction.

Parameters:
  • artifact_id – Meta-model artifact ID

  • X – Optional input features for validation

Returns:

Tuple of (meta_model, source_models, feature_columns) where source_models is list of (artifact_id, model) tuples in the correct order for feature construction

Raises:
  • KeyError – If artifact or source models not found

  • ValueError – If artifact is not a meta-model

load_meta_model_with_sources(artifact_id: str, validate_branch: bool = True) Tuple[Any, List[Tuple[str, Any]], List[str]][source]

Load a meta-model and its source models.

For stacking, loads the meta-model and all source models it depends on, preserving the feature column order as specified in meta_config.

Parameters:
  • artifact_id – Meta-model artifact ID

  • validate_branch – If True, validate branch context matches

Returns:

Tuple of (meta_model, [(source_id, source_model), …], feature_columns) where source_models are in the correct order for feature construction

Raises:
  • KeyError – If artifact not found

  • ValueError – If artifact is not a meta-model or if branch validation fails

load_with_dependencies(artifact_id: str) Dict[str, Any][source]

Load an artifact and all its transitive dependencies.

Returns a dictionary mapping artifact IDs to loaded objects, in topological order (dependencies before dependents).

Parameters:

artifact_id – Starting artifact ID

Returns:

loaded_object}

Return type:

Dictionary of {artifact_id

Raises:
  • KeyError – If artifact or dependency not found

  • ValueError – If cycle detected in dependencies

preload_artifacts(artifact_ids: List[str] | None = None, artifact_types: List[ArtifactType] | None = None) int[source]

Preload artifacts into cache.

Useful for warming the cache before prediction or when you know which artifacts will be needed.

Parameters:
  • artifact_ids – Specific artifact IDs to preload (default: all)

  • artifact_types – Filter by artifact types (default: all)

Returns:

Number of artifacts loaded

set_cache_size(max_size: int) None[source]

Set the maximum cache size.

If new size is smaller than current cache, oldest items are evicted.

Parameters:

max_size – New maximum cache size

class nirs4all.pipeline.storage.artifacts.artifact_loader.LRUCache(max_size: int = 100)[source]

Bases: object

Simple LRU cache with configurable max size.

Uses OrderedDict for O(1) access and LRU eviction.

clear() None[source]

Clear all cached items.

contains(key: str) bool[source]

Check if key is in cache without updating LRU order.

get(key: str) Any | None[source]

Get item from cache, moving to end (most recently used).

Parameters:

key – Cache key

Returns:

Cached value or None if not found

put(key: str, value: Any) None[source]

Put item in cache, evicting oldest if at capacity.

Parameters:
  • key – Cache key

  • value – Value to cache

remove(key: str) None[source]

Remove item from cache.

property size: int

Current cache size.

property stats: Dict[str, Any]

Get cache statistics.