nirs4all.pipeline.storage.artifacts package

Submodules

Module contents

Artifact management module (V3).

This module provides the V3 artifacts system with: - ArtifactRecord: Complete artifact metadata dataclass with chain tracking - ArtifactType: Enum for artifact classification - ArtifactRegistry: Central registry for artifact management - ArtifactLoader: Load artifacts by ID or execution context - OperatorNode/OperatorChain: V3 operator path tracking - Utility functions for ID generation and path handling

class nirs4all.pipeline.storage.artifacts.ArtifactLoader(workspace: Path, dataset: str, results_dir: Path | None = None, cache_size: int = 100)[source]

Bases: object

Load artifacts using V3 chain-based identification.

This class provides efficient loading of artifacts from centralized storage, with support for: - Direct loading by V3 artifact ID (pipeline$hash:fold) - Chain path-based loading for deterministic replay - Context-based loading (step/branch/source/fold) - Dependency resolution for stacking meta-models - Per-fold model loading for cross-validation ensemble - LRU caching to avoid redundant I/O

V3 Key Features: - Chain path indexing for O(1) lookup by chain - Source index support for multi-source pipelines - Branch path filtering using chain metadata

The loader uses lazy loading - artifacts are only deserialized when actually accessed via load_by_id() or related methods.

workspace

Workspace root path

dataset

Dataset name

binaries_dir

Path to centralized binaries

results_dir

Path to results directory (for manifest reference)

Example

>>> loader = ArtifactLoader.from_manifest(manifest, results_dir)
>>> model = loader.load_by_id("0001_pls$abc123def456:0")
>>> artifacts = loader.load_by_chain("s1.MinMaxScaler>s3.PLS[br=0]")
DEFAULT_CACHE_SIZE = 100
clear_cache() None[source]

Clear the object cache to free memory.

find_artifact_by_custom_name(custom_name: str, step_index: int | None = None, fold_id: int | None = None, branch_path: List[int] | None = None) ArtifactRecord | None[source]

Find an artifact by its custom_name.

Used for reverse lookup when only the model name is known but not the artifact_id. Useful for legacy compatibility.

Parameters:
  • custom_name – User-defined model name (e.g., “Q5_PLS_10”)

  • step_index – Optional filter by step

  • fold_id – Optional filter by fold

  • branch_path – Optional filter by branch

Returns:

ArtifactRecord if found, None otherwise

classmethod from_manifest(manifest: Dict[str, Any], results_dir: Path) ArtifactLoader[source]

Create an ArtifactLoader from a pipeline manifest.

Factory method for easy creation from manifest data.

Parameters:
  • manifest – Pipeline manifest dictionary

  • results_dir – Path to results directory (manifest.yaml’s parent)

Returns:

Initialized ArtifactLoader instance

get_all_records() List[ArtifactRecord][source]

Get all artifact records.

Returns:

List of all ArtifactRecords

get_artifacts_by_chain_filter(step_index: int | None = None, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None) List[ArtifactRecord][source]

Get artifact records matching chain-based filters.

Uses the chain_path information stored in V3 records to filter.

Parameters:
  • step_index – Filter by step index

  • branch_path – Filter by branch path

  • source_index – Filter by source index

  • fold_id – Filter by fold ID

Returns:

List of matching ArtifactRecords

get_cache_info() Dict[str, Any][source]

Get information about the current cache state.

Returns:

Dictionary with cache statistics

get_record(artifact_id: str) ArtifactRecord | None[source]

Get artifact record by ID.

Parameters:

artifact_id – Artifact ID

Returns:

ArtifactRecord or None if not found

get_record_by_chain(chain_path: str) ArtifactRecord | None[source]

Get artifact record by chain path.

Parameters:

chain_path – Operator chain path

Returns:

ArtifactRecord or None if not found

get_step_binaries(step_id: int, branch_id: int | None = None, branch_path: List[int] | None = None) List[Tuple[str, Any]][source]

Legacy-compatible method for loading step binaries.

This method provides backward compatibility with the BinaryLoader API. Prefer using load_for_step() for new code.

Returns names in a format compatible with controller lookup patterns: - For models with fold_id: “ClassName_<op_num>” where op_num = step*100 + fold - For shared models: “ClassName_<op_num>” where op_num = step*100 - For y_transformers (ENCODER type): “y_ClassName_<op_num>” - For x_transformers (TRANSFORMER type): “ClassName_<op_num>”

Parameters:
  • step_id – Step identifier (supports int or “step_substep” format)

  • branch_id – Optional branch ID (converts to branch_path [branch_id])

  • branch_path – Optional full branch path for nested branches (takes precedence over branch_id)

Returns:

List of (name, loaded_object) tuples

get_step_binaries_by_artifact_ids(artifact_ids: List[str]) List[Tuple[str, Any]][source]

Load multiple artifacts by their deterministic artifact_ids.

This method is used in prediction mode when model_artifact_id is available in the prediction record. It provides deterministic loading that works correctly with custom model names.

Parameters:

artifact_ids – List of artifact IDs to load

Returns:

List of (name, loaded_object) tuples

Raises:

KeyError – If any artifact_id is not found

Example

>>> artifact_ids = ["abc123:4:0", "abc123:4:1"]
>>> binaries = loader.get_step_binaries_by_artifact_ids(artifact_ids)
has_binaries_for_step(step_number: int, substep_number: int | None = None, branch_id: int | None = None) bool[source]

Check if binaries exist for a specific step.

Legacy-compatible method for checking artifact availability.

Parameters:
  • step_number – The main step number

  • substep_number – Ignored (kept for compatibility)

  • branch_id – Optional branch ID to check

Returns:

True if artifacts exist for this step

import_from_manifest(manifest: Dict[str, Any], results_dir: Path | None = None) None[source]

Import artifact records from a V3 manifest.

Builds all indexes including chain_path index for V3 lookups.

Parameters:
  • manifest – Manifest dictionary

  • results_dir – Optional results directory override

load_by_artifact_id(artifact_id: str) Tuple[str, Any][source]

Load a single artifact by its deterministic artifact_id.

This method provides deterministic artifact loading using the artifact_id stored in predictions. Unlike name-based loading which can be ambiguous with custom model names, artifact_id-based loading is always exact.

Parameters:

artifact_id – The deterministic artifact ID (e.g., “0001:4:0” for fold 0 or “0001:4:all” for shared artifacts)

Returns:

Tuple of (name, loaded_object) where name is built from custom_name if available, otherwise from class_name.

Raises:

Example

>>> loader = ArtifactLoader.from_manifest(manifest, results_dir)
>>> name, model = loader.load_by_artifact_id("abc123:4:0")
>>> predictions = model.predict(X_new)
load_by_chain(chain: str, fold_id: int | None = None) Any | None[source]

Load artifact by exact chain path match.

Parameters:
  • chain – Operator chain path string (e.g., “s1.MinMaxScaler>s3.PLS[br=0]”)

  • fold_id – Optional fold ID filter

Returns:

Loaded artifact object or None if not found

load_by_chain_prefix(prefix: str, branch_path: List[int] | None = None, source_index: int | None = None) List[Tuple[str, Any]][source]

Load all artifacts whose chain path starts with the given prefix.

Useful for loading all artifacts in a chain for prediction replay.

Parameters:
  • prefix – Chain path prefix to match

  • branch_path – Optional branch path filter

  • source_index – Optional source index filter

Returns:

List of (artifact_id, loaded_object) tuples

load_by_id(artifact_id: str) Any[source]

Load a single artifact by its V3 ID.

Uses LRU cache to avoid redundant disk I/O. Artifacts are loaded lazily on first access.

Parameters:

artifact_id – V3 artifact identifier (pipeline$hash:fold)

Returns:

Deserialized artifact object

Raises:
load_fold_models(step_index: int, branch_path: List[int] | None = None, pipeline_id: str | None = None) List[Tuple[int, Any]][source]

Load all fold-specific model artifacts for CV averaging.

Returns models for all folds at the specified step, sorted by fold_id.

Parameters:
  • step_index – Step number where models are

  • branch_path – Optional branch path filter

  • pipeline_id – Optional pipeline ID filter

Returns:

List of (fold_id, loaded_model) tuples, sorted by fold_id

load_for_step(step_index: int, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None, pipeline_id: str | None = None) List[Tuple[str, Any]][source]

Load all artifacts for a step context.

Returns artifacts matching the specified step, branch path, source, and fold. If branch_path is provided, includes both branch-specific and shared (pre-branch) artifacts.

Parameters:
  • step_index – Step number to load

  • branch_path – Optional branch path filter

  • source_index – Optional source index filter

  • fold_id – Optional fold ID filter

  • pipeline_id – Optional pipeline ID filter

Returns:

List of (artifact_id, loaded_object) tuples

load_meta_model_for_prediction(artifact_id: str, X: Any = None) Tuple[Any, List[Tuple[str, Any]], List[str]][source]

Load a meta-model and its sources, ready for prediction.

This method loads the complete stacking ensemble and validates that all components are compatible for prediction.

Parameters:
  • artifact_id – Meta-model artifact ID

  • X – Optional input features for validation

Returns:

Tuple of (meta_model, source_models, feature_columns) where source_models is list of (artifact_id, model) tuples in the correct order for feature construction

Raises:
  • KeyError – If artifact or source models not found

  • ValueError – If artifact is not a meta-model

load_meta_model_with_sources(artifact_id: str, validate_branch: bool = True) Tuple[Any, List[Tuple[str, Any]], List[str]][source]

Load a meta-model and its source models.

For stacking, loads the meta-model and all source models it depends on, preserving the feature column order as specified in meta_config.

Parameters:
  • artifact_id – Meta-model artifact ID

  • validate_branch – If True, validate branch context matches

Returns:

Tuple of (meta_model, [(source_id, source_model), …], feature_columns) where source_models are in the correct order for feature construction

Raises:
  • KeyError – If artifact not found

  • ValueError – If artifact is not a meta-model or if branch validation fails

load_with_dependencies(artifact_id: str) Dict[str, Any][source]

Load an artifact and all its transitive dependencies.

Returns a dictionary mapping artifact IDs to loaded objects, in topological order (dependencies before dependents).

Parameters:

artifact_id – Starting artifact ID

Returns:

loaded_object}

Return type:

Dictionary of {artifact_id

Raises:
  • KeyError – If artifact or dependency not found

  • ValueError – If cycle detected in dependencies

preload_artifacts(artifact_ids: List[str] | None = None, artifact_types: List[ArtifactType] | None = None) int[source]

Preload artifacts into cache.

Useful for warming the cache before prediction or when you know which artifacts will be needed.

Parameters:
  • artifact_ids – Specific artifact IDs to preload (default: all)

  • artifact_types – Filter by artifact types (default: all)

Returns:

Number of artifacts loaded

set_cache_size(max_size: int) None[source]

Set the maximum cache size.

If new size is smaller than current cache, oldest items are evicted.

Parameters:

max_size – New maximum cache size

class nirs4all.pipeline.storage.artifacts.ArtifactRecord(artifact_id: str, content_hash: str, path: str, chain_path: str = '', source_index: int | None = None, pipeline_id: str = '', branch_path: List[int] = <factory>, step_index: int = 0, substep_index: int | None = None, fold_id: int | None = None, artifact_type: ArtifactType = ArtifactType.MODEL, class_name: str = '', custom_name: str = '', depends_on: List[str] = <factory>, format: str = 'joblib', format_version: str = '', nirs4all_version: str = '', size_bytes: int = 0, created_at: str = <factory>, params: Dict[str, ~typing.Any]=<factory>, meta_config: MetaModelConfig | None = None, version: int = 3)[source]

Bases: object

Complete artifact metadata for manifest storage (V3).

This record contains all metadata needed to: - Uniquely identify an artifact via operator chain - Load the artifact from centralized storage - Resolve dependencies for stacking/transfer - Track serialization format and library versions

V3 Format:

artifact_id: “{pipeline_id}${chain_hash}:{fold_id}” chain_path: Full operator chain path string

artifact_id

Unique, deterministic ID based on chain hash Format: “{pipeline_id}${chain_hash}:{fold_id}”

Type:

str

content_hash

SHA256 hash of binary content (for deduplication)

Type:

str

path

Relative path in binaries/<dataset>/ directory

Type:

str

# Chain tracking
Type:

V3

chain_path

Serialized operator chain path

Type:

str

source_index

Multi-source index (None for single source)

Type:

int | None

# Context
pipeline_id

Parent pipeline ID (e.g., “0001_pls_abc123”)

Type:

str

branch_path

Branch hierarchy as list of indices (empty = pre-branch)

Type:

List[int]

step_index

Logical step index within execution

Type:

int

substep_index

Index within substep (for [model1, model2])

Type:

int | None

fold_id

CV fold identifier (None = shared across folds)

Type:

int | None

# Classification
artifact_type

Type classification (model, transformer, etc.)

Type:

nirs4all.pipeline.storage.artifacts.types.ArtifactType

class_name

Python class name (e.g., “PLSRegression”)

Type:

str

custom_name

User-defined name for the artifact

Type:

str

# Dependencies
depends_on

List of artifact_ids this artifact depends on

Type:

List[str]

# Serialization
format

Serialization format (joblib, pickle, keras, etc.)

Type:

str

format_version

Library version string

Type:

str

nirs4all_version

nirs4all version that created this artifact

Type:

str

size_bytes

Size of serialized binary in bytes

Type:

int

created_at

ISO timestamp of creation

Type:

str

# Metadata
params

Hyperparameters for models

Type:

Dict[str, Any]

meta_config

Configuration for meta-models

Type:

nirs4all.pipeline.storage.artifacts.types.MetaModelConfig | None

version

Schema version (3 for V3)

Type:

int

artifact_id: str
artifact_type: ArtifactType = 'model'
branch_path: List[int]
property chain_hash: str

Get chain hash from artifact ID (V3 format).

Returns:

Chain hash portion of the artifact ID, or empty if not V3 format

chain_path: str = ''
class_name: str = ''
content_hash: str
created_at: str
custom_name: str = ''
depends_on: List[str]
fold_id: int | None = None
format: str = 'joblib'
format_version: str = ''
classmethod from_dict(data: Dict[str, Any]) ArtifactRecord[source]

Create ArtifactRecord from dictionary.

Parameters:

data – Dictionary from YAML manifest

Returns:

ArtifactRecord instance

get_branch_path_str() str[source]

Get branch path as string.

Returns:

Colon-separated branch indices or empty string

get_fold_str() str[source]

Get fold ID as string.

Returns:

Fold ID as string or “all” for shared artifacts

property is_branch_specific: bool

Check if artifact is branch-specific.

Returns:

True if artifact belongs to a specific branch path

property is_fold_specific: bool

Check if artifact is fold-specific.

Returns:

True if artifact belongs to a specific CV fold

property is_meta_model: bool

Check if artifact is a meta-model.

Returns:

True if artifact is a stacking meta-model

property is_source_specific: bool

Check if artifact is source-specific.

Returns:

True if artifact belongs to a specific source in multi-source

matches_context(step_index: int | None = None, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None) bool[source]

Check if artifact matches a given context.

Parameters:
  • step_index – Step to match (None = any)

  • branch_path – Branch path to match (None = any)

  • source_index – Source index to match (None = any)

  • fold_id – Fold ID to match (None = any)

Returns:

True if artifact matches all specified filters

meta_config: MetaModelConfig | None = None
nirs4all_version: str = ''
params: Dict[str, Any]
path: str
pipeline_id: str = ''
property short_hash: str

Get short version of content hash for filenames.

Returns:

prefix if present)

Return type:

First 12 characters of hash (after sha256

size_bytes: int = 0
source_index: int | None = None
step_index: int = 0
substep_index: int | None = None
to_dict() Dict[str, Any][source]

Convert to dictionary for YAML serialization.

Handles enum conversion and nested dataclass serialization.

Returns:

Dictionary suitable for YAML safe_dump

version: int = 3
class nirs4all.pipeline.storage.artifacts.ArtifactRegistry(workspace: Path, dataset: str, manifest_manager: Any | None = None, pipeline_id: str = '')[source]

Bases: object

Central registry for artifact management (V3).

Provides: - Chain-based ID generation for complete execution path tracking - Content-addressed storage with deduplication - Dependency graph for stacking/transfer - Cleanup utilities

V3 Key Changes: - Uses OperatorChain for artifact identification - Chain hash-based artifact IDs for deterministic identification - Chain path stored in ArtifactRecord for complete traceability - Lookup by chain path for prediction replay

The registry coordinates between: - Centralized binaries at workspace/binaries/<dataset>/ - Per-run manifests with artifact references - Dependency tracking for complex pipelines

workspace

Workspace root path

dataset

Current dataset name

binaries_dir

Path to centralized binaries

dependency_graph

Dependency tracking graph

pipeline_id

Current pipeline identifier for chain generation

cleanup_failed_run() int[source]

Clean up artifacts from a failed run.

Deletes artifacts registered during the current run. Called automatically on exception.

Returns:

Number of artifacts cleaned up

delete_orphaned_artifacts(dry_run: bool = True, scan_all_manifests: bool = True) Tuple[List[str], int][source]

Delete artifacts not referenced by any manifest.

Parameters:
  • dry_run – If True, only report what would be deleted

  • scan_all_manifests – If True, scan all manifests before deletion

Returns:

Tuple of (deleted_files, bytes_freed)

delete_pipeline_artifacts(pipeline_id: str, delete_files: bool = False) int[source]

Delete all artifacts for a specific pipeline.

Parameters:
  • pipeline_id – Pipeline to delete artifacts for

  • delete_files – If True, also delete the binary files from disk

Returns:

Number of artifacts deleted

end_run() None[source]

End run tracking (successful completion).

export_to_manifest() Dict[str, Any][source]

Export registry to manifest V3 format.

Returns:

Dictionary suitable for manifest artifacts section

find_orphaned_artifacts(scan_all_manifests: bool = True) List[str][source]

Find artifact files not referenced by any manifest.

Scans binaries directory and compares with all referenced artifacts from manifests in the workspace.

Parameters:

scan_all_manifests – If True, scan all manifests in workspace/runs/. If False, only check against in-memory registry.

Returns:

List of orphaned filenames

generate_id(chain: OperatorChain | str, fold_id: int | None = None, pipeline_id: str | None = None) str[source]

Generate deterministic V3 artifact ID from operator chain.

V3 Format: {pipeline_id}${chain_hash}:{fold_id}

Parameters:
  • chain – OperatorChain or chain path string

  • fold_id – CV fold (None for shared)

  • pipeline_id – Pipeline identifier (uses self.pipeline_id if None)

Returns:

V3 Artifact ID string

Examples

>>> registry.generate_id(chain, fold_id=0)
'0001_pls$a1b2c3d4e5f6:0'
>>> registry.generate_id("s1.MinMaxScaler>s3.PLS", fold_id=None)
'0001_pls$7f8e9d0c1b2a:all'
get_all_records() List[ArtifactRecord][source]

Get all registered artifacts.

Returns:

List of all ArtifactRecords

get_artifacts_for_step(pipeline_id: str, step_index: int, branch_path: List[int] | None = None, fold_id: int | None = None) List[ArtifactRecord][source]

Get all artifacts for a specific step context.

Parameters:
  • pipeline_id – Pipeline to query

  • step_index – Step number

  • branch_path – Optional branch filter

  • fold_id – Optional fold filter

Returns:

List of matching ArtifactRecords

get_by_chain(chain: OperatorChain | str, fold_id: int | None = None) ArtifactRecord | None[source]

Get artifact by exact chain path match.

Parameters:
  • chain – OperatorChain or chain path string

  • fold_id – Optional fold ID to filter (None = any fold)

Returns:

ArtifactRecord or None if not found

get_chain_prefix(prefix: str, branch_path: List[int] | None = None, source_index: int | None = None) List[ArtifactRecord][source]

Get all artifacts whose chain path starts with the given prefix.

Useful for finding all artifacts in a chain for prediction replay.

Parameters:
  • prefix – Chain path prefix to match

  • branch_path – Optional branch path filter

  • source_index – Optional source index filter

Returns:

List of matching ArtifactRecords

get_dependencies(artifact_id: str) List[str][source]

Get direct dependencies of an artifact.

Parameters:

artifact_id – Artifact to query

Returns:

List of artifact IDs

get_fold_models(pipeline_id: str, step_index: int, branch_path: List[int] | None = None) List[ArtifactRecord][source]

Get all fold-specific model artifacts for CV averaging.

Parameters:
  • pipeline_id – Pipeline to query

  • step_index – Model step number

  • branch_path – Optional branch filter

Returns:

List of per-fold model ArtifactRecords

get_stats(scan_all_manifests: bool = True) Dict[str, Any][source]

Get storage statistics.

Parameters:

scan_all_manifests – If True, scan all manifests for accurate stats

Returns:

  • total_artifacts: Number of registered artifacts

  • unique_files: Number of unique binary files

  • total_size_bytes: Total size of all artifacts

  • deduplication_ratio: Ratio of saved space from deduplication

  • by_type: Count of artifacts by type

  • orphaned_count: Number of orphaned files

  • disk_usage_bytes: Actual disk usage in binaries directory

Return type:

Dictionary with storage stats including

import_from_manifest(manifest: Dict[str, Any], results_dir: Path) None[source]

Import artifact records from a manifest.

Loads V3 format manifests into the registry, building all indexes including the chain_path index for V3 lookups.

Parameters:
  • manifest – Manifest dictionary

  • results_dir – Path to results directory

load_artifact(record: ArtifactRecord) Any[source]

Load artifact binary from disk.

Parameters:

record – ArtifactRecord with path and format

Returns:

Deserialized object

Raises:

FileNotFoundError – If artifact file doesn’t exist

purge_dataset_artifacts(confirm: bool = False) Tuple[int, int][source]

Delete ALL artifacts for this dataset.

This is a destructive operation that removes all artifacts in the binaries directory for this dataset, regardless of manifest references.

Parameters:

confirm – Must be True to actually delete files

Returns:

Tuple of (files_deleted, bytes_freed)

Raises:

ValueError – If confirm is False

register(obj: Any, artifact_id: str, artifact_type: ArtifactType, depends_on: List[str] | None = None, params: Dict[str, Any] | None = None, meta_config: MetaModelConfig | None = None, format_hint: str | None = None, custom_name: str | None = None, chain_path: str = '', source_index: int | None = None) ArtifactRecord[source]

Register and persist an artifact.

Serializes the object, stores in centralized binaries (with deduplication), and creates an ArtifactRecord.

Note: This method accepts pre-generated artifact IDs for backward compatibility. For new code, use register_with_chain() which generates IDs from OperatorChain.

Parameters:
  • obj – Object to persist (model, transformer, etc.)

  • artifact_id – Pre-generated artifact ID (V3 format: pipeline$hash:fold)

  • artifact_type – Classification (model, transformer, etc.)

  • depends_on – List of artifact IDs this depends on

  • params – Model parameters for inspection

  • meta_config – Meta-model configuration (for stacking)

  • format_hint – Optional serialization format hint

  • custom_name – User-defined name for the artifact (e.g., “Q5_PLS_10”)

  • chain_path – V3 operator chain path (required for full traceability)

  • source_index – Multi-source index (None for single source)

Returns:

ArtifactRecord with full metadata

Raises:

ValueError – If object cannot be serialized or if meta-model dependencies are missing

register_meta_model(obj: Any, artifact_id: str, source_model_ids: List[str], feature_columns: List[str] | None = None, params: Dict[str, Any] | None = None, format_hint: str | None = None) ArtifactRecord[source]

Register a stacking meta-model with source model references.

Convenience method for registering meta-models that automatically: - Creates the MetaModelConfig with ordered source model references - Sets up dependency tracking to source models - Validates that all source models exist

Parameters:
  • obj – The meta-model object to persist

  • artifact_id – Pre-generated artifact ID for the meta-model

  • source_model_ids – Ordered list of source model artifact IDs

  • feature_columns – Optional feature column names matching source order

  • params – Optional meta-model parameters

  • format_hint – Optional serialization format hint

Returns:

ArtifactRecord for the registered meta-model

Raises:

ValueError – If any source model is not found in the registry

Example

>>> meta_config_record = registry.register_meta_model(
...     obj=ridge_meta_model,
...     artifact_id="0001:5:all",
...     source_model_ids=["0001:3:all", "0001:4:all"],
...     feature_columns=["PLSRegression_pred", "RandomForestRegressor_pred"]
... )
register_with_chain(obj: Any, chain: OperatorChain | str, artifact_type: ArtifactType, step_index: int, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None, depends_on: List[str] | None = None, params: Dict[str, Any] | None = None, meta_config: MetaModelConfig | None = None, format_hint: str | None = None, custom_name: str | None = None, pipeline_id: str | None = None) ArtifactRecord[source]

Register and persist an artifact using V3 chain-based identification.

This is the primary registration method for V3. It generates a deterministic artifact ID from the operator chain and stores the chain path for later lookup.

Parameters:
  • obj – Object to persist (model, transformer, etc.)

  • chain – OperatorChain or chain path string

  • artifact_type – Classification (model, transformer, etc.)

  • step_index – Pipeline step index (1-based)

  • branch_path – List of branch indices (empty for non-branching)

  • source_index – Multi-source index (None for single source)

  • fold_id – CV fold (None for shared artifacts)

  • substep_index – Substep index for [model1, model2]

  • depends_on – List of artifact IDs this depends on

  • params – Model parameters for inspection

  • meta_config – Meta-model configuration (for stacking)

  • format_hint – Optional serialization format hint

  • custom_name – User-defined name for the artifact

Returns:

ArtifactRecord with full metadata

Raises:

ValueError – If object cannot be serialized or if meta-model dependencies are missing

resolve(artifact_id: str) ArtifactRecord | None[source]

Resolve artifact ID to record.

Parameters:

artifact_id – Artifact ID to resolve

Returns:

ArtifactRecord or None if not found

resolve_by_hash(content_hash: str) ArtifactRecord | None[source]

Resolve content hash to artifact record.

Parameters:

content_hash – Content hash to look up

Returns:

ArtifactRecord or None if not found

resolve_dependencies(artifact_id: str) List[ArtifactRecord][source]

Get all transitive dependencies as records.

Parameters:

artifact_id – Starting artifact

Returns:

List of ArtifactRecords in topological order

start_run() None[source]

Start tracking a new run for cleanup purposes.

class nirs4all.pipeline.storage.artifacts.ArtifactType(value)[source]

Bases: str, Enum

Classification of artifact types.

Each type has specific handling: - model: Trained ML models (sklearn, tensorflow, pytorch, etc.) - transformer: Fitted preprocessors (scalers, feature extractors) - splitter: Train/test split configuration (for reproducibility) - encoder: Label encoders, y-scalers - meta_model: Stacking meta-models with source model dependencies

ENCODER = 'encoder'
META_MODEL = 'meta_model'
MODEL = 'model'
SPLITTER = 'splitter'
TRANSFORMER = 'transformer'
class nirs4all.pipeline.storage.artifacts.DependencyGraph[source]

Bases: object

Tracks artifact dependencies for stacking and transfer.

Maintains a directed graph where edges represent “depends on” relationships. Supports transitive dependency resolution with cycle detection.

add_dependencies(artifact_id: str, depends_on: List[str]) None[source]

Add multiple dependencies at once.

Parameters:
  • artifact_id – The dependent artifact

  • depends_on – List of artifacts being depended upon

add_dependency(artifact_id: str, depends_on: str) None[source]

Add a dependency relationship.

Parameters:
  • artifact_id – The dependent artifact

  • depends_on – The artifact being depended upon

clear() None[source]

Clear all dependencies.

get_dependencies(artifact_id: str) List[str][source]

Get direct dependencies of an artifact.

Parameters:

artifact_id – Artifact to query

Returns:

List of artifact IDs this artifact depends on

get_dependents(artifact_id: str) List[str][source]

Get artifacts that directly depend on this artifact.

Parameters:

artifact_id – Artifact to query

Returns:

List of artifact IDs that depend on this artifact

remove_artifact(artifact_id: str) None[source]

Remove an artifact and its edges from the graph.

Parameters:

artifact_id – Artifact to remove

resolve_dependencies(artifact_id: str, max_depth: int = 100) List[str][source]

Get all transitive dependencies (topologically sorted).

Returns dependencies in order suitable for loading - dependencies before dependents.

Parameters:
  • artifact_id – Starting artifact

  • max_depth – Maximum recursion depth (prevents cycles)

Returns:

List of all dependencies in topological order

Raises:

ValueError – If cycle detected or max depth exceeded

class nirs4all.pipeline.storage.artifacts.ExecutionPath(pipeline_id: str, chain_path: str = '', branch_path: List[int] = None, step_index: int = 0, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None)[source]

Bases: object

Represents the execution context for an artifact (V3).

Captures all context needed to uniquely identify an artifact within a pipeline execution.

pipeline_id

Pipeline identifier (e.g., “0001_pls_abc123”)

Type:

str

chain_path

Full operator chain path string

Type:

str

branch_path

List of branch indices for nested branching

Type:

List[int]

step_index

Logical step number within current branch

Type:

int

source_index

Multi-source index (None for single source)

Type:

int | None

fold_id

CV fold identifier (None for shared artifacts)

Type:

int | None

substep_index

Substep index (for [model1, model2])

Type:

int | None

branch_path: List[int] = None
chain_path: str = ''
fold_id: int | None = None
classmethod from_artifact_id_v3(artifact_id: str, chain_path: str = '') ExecutionPath[source]

Create ExecutionPath from V3 artifact ID string.

Parameters:
  • artifact_id – V3 artifact ID to parse

  • chain_path – Full chain path (required for complete reconstruction)

Returns:

ExecutionPath instance

pipeline_id: str
source_index: int | None = None
step_index: int = 0
substep_index: int | None = None
to_artifact_id() str[source]

Convert execution path to V3 artifact ID string.

Returns:

{fold_id}”

Return type:

V3 Artifact ID in format “{pipeline_id}${chain_hash}

class nirs4all.pipeline.storage.artifacts.MetaModelConfig(source_models: Dict[str, ~typing.Any]]=<factory>, feature_columns: List[str] = <factory>)[source]

Bases: object

Configuration for meta-model source tracking.

Stores the ordered source models that feed into a stacking meta-model, along with their feature column mapping.

source_models

Ordered list of source model artifact IDs with feature indices

Type:

List[Dict[str, Any]]

feature_columns

Feature column names in the meta-model input order

Type:

List[str]

feature_columns: List[str]
classmethod from_dict(data: Dict[str, Any]) MetaModelConfig[source]

Create from dictionary.

source_models: List[Dict[str, Any]]
to_dict() Dict[str, Any][source]

Convert to dictionary for YAML serialization.

class nirs4all.pipeline.storage.artifacts.OperatorChain(nodes: List[OperatorNode] = <factory>, pipeline_id: str = '')[source]

Bases: object

Ordered sequence of OperatorNodes representing the full execution path.

The OperatorChain captures the complete path of operators from input to the current artifact, enabling deterministic artifact identification and replay.

nodes

Ordered list of OperatorNode objects in the chain

Type:

List[nirs4all.pipeline.storage.artifacts.operator_chain.OperatorNode]

pipeline_id

Pipeline identifier this chain belongs to

Type:

str

append(node: OperatorNode) OperatorChain[source]

Return new chain with node appended.

Parameters:

node – OperatorNode to append

Returns:

New OperatorChain with the node appended

copy() OperatorChain[source]

Create a deep copy of this chain.

Returns:

New OperatorChain with copied nodes

extend(other: OperatorChain) OperatorChain[source]

Return new chain with another chain’s nodes appended.

Parameters:

other – OperatorChain to append

Returns:

New OperatorChain with all nodes from both chains

filter_branch(target_branch_path: List[int]) OperatorChain[source]

Return chain with only nodes matching the branch path.

Includes nodes that: - Have no branch path (shared/pre-branch artifacts) - Have a branch path that is a prefix of or equal to target

Parameters:

target_branch_path – Branch path to filter for

Returns:

New OperatorChain with only matching nodes

filter_source(source_index: int) OperatorChain[source]

Return chain with only nodes for the specified source.

Includes nodes that: - Have no source_index (single source) - Have matching source_index

Parameters:

source_index – Source index to filter for

Returns:

New OperatorChain with only matching nodes

filter_step(step_index: int) OperatorChain[source]

Return chain with only nodes at the specified step.

Parameters:

step_index – Step index to filter for

Returns:

New OperatorChain with only matching nodes

classmethod from_dict(data: Dict[str, Any]) OperatorChain[source]

Create OperatorChain from dictionary.

Parameters:

data – Dictionary representation

Returns:

OperatorChain instance

classmethod from_path(path: str, pipeline_id: str = '') OperatorChain[source]

Parse OperatorChain from a path string.

Parameters:
  • path – Chain path string like “s1.MinMaxScaler>s3.SNV[br=0]”

  • pipeline_id – Pipeline identifier

Returns:

OperatorChain instance

get_branch_path() List[int][source]

Get the branch path from the last node.

Returns:

Branch path of the last node, or empty list if no nodes

get_last_node() OperatorNode | None[source]

Get the last node in the chain.

Returns:

Last OperatorNode or None if chain is empty

get_nodes_at_step(step_index: int) List[OperatorNode][source]

Get all nodes at a specific step.

Parameters:

step_index – Step index to filter

Returns:

List of nodes at that step

is_empty() bool[source]

Check if chain has no nodes.

Returns:

True if chain is empty

merge_with_prefix(prefix_chain: OperatorChain, step_offset: int = 0) OperatorChain[source]

Merge this chain with a prefix chain for bundle import.

Used when importing a bundle into a pipeline, where the bundle’s chain needs to be prefixed with the import context’s chain.

Parameters:
  • prefix_chain – Chain to prepend (the import context)

  • step_offset – Offset to add to step indices in this chain

Returns:

New merged OperatorChain

Example

>>> bundle_chain = OperatorChain.from_path("s1.Scaler>s3.PLS")
>>> import_chain = OperatorChain.from_path("s1.Import")
>>> merged = bundle_chain.merge_with_prefix(import_chain, step_offset=1)
# Result: "s1.Import>s2.Scaler>s4.PLS"
nodes: List[OperatorNode]
pipeline_id: str = ''
remap_steps(step_mapping: Dict[int, int]) OperatorChain[source]

Create new chain with remapped step indices.

Parameters:

step_mapping – Mapping from old step index to new step index

Returns:

New OperatorChain with remapped steps

to_dict() Dict[str, Any][source]

Convert to dictionary for serialization.

Returns:

Dictionary representation

to_hash(length: int = 12) str[source]

Compute deterministic hash of the chain path.

Parameters:

length – Number of hex characters to return (default: 12)

Returns:

Truncated SHA256 hash of the chain path

to_path() str[source]

Generate full path string from all nodes.

Format: node1>node2>node3

Returns:

Chain path string

Examples

>>> chain = OperatorChain([
...     OperatorNode(1, "MinMaxScaler"),
...     OperatorNode(3, "SNV", branch_path=[0])
... ])
>>> chain.to_path()
's1.MinMaxScaler>s3.SNV[br=0]'
with_pipeline_id(pipeline_id: str) OperatorChain[source]

Create a copy of this chain with a new pipeline ID.

Parameters:

pipeline_id – New pipeline ID to set

Returns:

New OperatorChain with the specified pipeline_id

class nirs4all.pipeline.storage.artifacts.OperatorNode(step_index: int, operator_class: str, branch_path: List[int] = <factory>, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None, operator_name: str | None = None)[source]

Bases: object

Represents a single operator in the execution chain.

An OperatorNode captures all the context needed to identify a specific operator execution within a pipeline, including its position, branch context, and source index for multi-source processing.

step_index

Pipeline step number (1-based)

Type:

int

operator_class

Class name of the operator (e.g., “MinMaxScaler”, “PLS”)

Type:

str

branch_path

Branch indices path (e.g., [0] for branch 0, [0, 1] for nested)

Type:

List[int]

source_index

Index for multi-source transformers (None for single source)

Type:

int | None

fold_id

Fold number for CV models (None for shared artifacts)

Type:

int | None

substep_index

Index within a substep (for [model1, model2] at same step)

Type:

int | None

operator_name

Instance name if different from class name

Type:

str | None

branch_path: List[int]
fold_id: int | None = None
classmethod from_dict(data: Dict[str, Any]) OperatorNode[source]

Create OperatorNode from dictionary.

Parameters:

data – Dictionary representation

Returns:

OperatorNode instance

classmethod from_key(key: str) OperatorNode[source]

Parse an OperatorNode from its key string representation.

Parameters:

key – Key string like “s3.SNV[br=0,src=1]”

Returns:

OperatorNode instance

Raises:

ValueError – If key format is invalid

matches_context(step_index: int | None = None, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None) bool[source]

Check if this node matches the given context filters.

None values are treated as “match any”.

Parameters:
  • step_index – Step number to match (None = any)

  • branch_path – Branch path to match (None = any)

  • source_index – Source index to match (None = any)

  • fold_id – Fold ID to match (None = any)

Returns:

True if node matches all specified filters

operator_class: str
operator_name: str | None = None
source_index: int | None = None
step_index: int
substep_index: int | None = None
to_dict() Dict[str, Any][source]

Convert to dictionary for serialization.

Returns:

Dictionary representation suitable for YAML/JSON

to_key() str[source]

Generate compact key string for this node.

Format: s{step}.{Class}[qualifiers]

Qualifiers (only if present):

br={branch_path} - Branch context src={source_index} - Multi-source index sub={substep_index} - Substep index

Returns:

Compact key string for this operator node

Examples

>>> OperatorNode(1, "MinMaxScaler").to_key()
's1.MinMaxScaler'
>>> OperatorNode(3, "SNV", branch_path=[0]).to_key()
's3.SNV[br=0]'
>>> OperatorNode(3, "SNV", branch_path=[0], source_index=1).to_key()
's3.SNV[br=0,src=1]'
with_fold(fold_id: int) OperatorNode[source]

Create a copy of this node with a specific fold ID.

Parameters:

fold_id – The fold ID to set

Returns:

New OperatorNode with the specified fold_id

with_source(source_index: int) OperatorNode[source]

Create a copy of this node with a specific source index.

Parameters:

source_index – The source index to set

Returns:

New OperatorNode with the specified source_index

nirs4all.pipeline.storage.artifacts.artifact_id_matches_context(artifact_id: str, pipeline_id: str | None = None, branch_path: List[int] | None = None, step_index: int | None = None, fold_id: int | None = None) bool[source]

Check if a V3 artifact ID matches a given context.

Partial matching is supported - only specified parameters are checked. Note: branch_path and step_index matching requires ArtifactRecord access.

Parameters:
  • artifact_id – V3 artifact ID to check

  • pipeline_id – Expected pipeline ID (None = don’t check)

  • branch_path – Expected branch path (ignored for V3 - use ArtifactRecord)

  • step_index – Expected step index (ignored for V3 - use ArtifactRecord)

  • fold_id – Expected fold ID (None = don’t check)

Returns:

True if artifact matches specified criteria, False otherwise

nirs4all.pipeline.storage.artifacts.compute_chain_hash(chain_path: str, length: int = 12) str[source]

Compute deterministic hash from chain path string.

Parameters:
  • chain_path – Full operator chain path

  • length – Number of hex characters (default: 12)

Returns:

Truncated SHA256 hash

nirs4all.pipeline.storage.artifacts.compute_content_hash(content: bytes) str[source]

Compute SHA256 hash of binary content.

Parameters:

content – Binary content to hash

Returns:

“ prefix

Return type:

Full SHA256 hash with “sha256

nirs4all.pipeline.storage.artifacts.extract_fold_id_from_artifact_id(artifact_id: str) int | None[source]

Extract fold ID from artifact ID (V2 or V3).

Parameters:

artifact_id – Full artifact ID

Returns:

Fold ID or None if “all”

nirs4all.pipeline.storage.artifacts.extract_pipeline_id_from_artifact_id(artifact_id: str) str[source]

Extract pipeline ID from artifact ID (V2 or V3).

Parameters:

artifact_id – Full artifact ID

Returns:

Pipeline ID component

nirs4all.pipeline.storage.artifacts.generate_artifact_id_v3(pipeline_id: str, chain: OperatorChain | str, fold_id: int | None = None) str[source]

Generate V3 artifact ID from chain.

Format: {pipeline_id}${chain_hash}:{fold_id}

Parameters:
  • pipeline_id – Pipeline identifier

  • chain – Operator chain object or chain path string for this artifact

  • fold_id – Fold ID (None for shared artifacts)

Returns:

V3 artifact ID string

Examples

>>> generate_artifact_id_v3("0001_pls", chain, None)
'0001_pls$a1b2c3d4e5f6:all'
>>> generate_artifact_id_v3("0001_pls", chain, 0)
'0001_pls$a1b2c3d4e5f6:0'
nirs4all.pipeline.storage.artifacts.generate_filename(artifact_type: str, class_name: str, content_hash: str, extension: str = 'joblib') str[source]

Generate artifact filename from components.

New format: <type>_<class>_<short_hash>.<ext>

Parameters:
  • artifact_type – Artifact type (model, transformer, etc.)

  • class_name – Python class name

  • content_hash – Full SHA256 hash (will be truncated)

  • extension – File extension (default: joblib)

Returns:

Filename string

Examples

>>> generate_filename("model", "PLSRegression", "abc123def456")
"model_PLSRegression_abc123def456.joblib"
nirs4all.pipeline.storage.artifacts.get_binaries_path(workspace: Path, dataset: str) Path[source]

Get the centralized binaries directory for a dataset.

New architecture stores artifacts at workspace/binaries/<dataset>/

Parameters:
  • workspace – Workspace root path

  • dataset – Dataset name

Returns:

Path to binaries directory

nirs4all.pipeline.storage.artifacts.get_short_hash(content_hash: str, length: int = 12) str[source]

Extract short hash from full content hash.

Parameters:
  • content_hash – Full hash (with or without sha256: prefix)

  • length – Number of characters to return (default: 12)

Returns:

Short hash string

nirs4all.pipeline.storage.artifacts.is_v3_artifact_id(artifact_id: str) bool[source]

Check if an artifact ID is in V3 format.

Parameters:

artifact_id – Artifact ID to check

Returns:

True if V3 format, False otherwise

nirs4all.pipeline.storage.artifacts.parse_artifact_id(artifact_id: str) Tuple[str, List[int], int, int | None, int | None][source]

Parse an artifact ID into its components (V3 only).

V3 format: {pipeline_id}${chain_hash}:{fold_id}

Parameters:

artifact_id – V3 artifact ID to parse

Returns:

Tuple of (pipeline_id, branch_path, step_index, fold_id, sub_index) For V3: step_index will be 0, branch_path empty (use ArtifactRecord for full info)

Raises:

ValueError – If artifact ID format is not V3

nirs4all.pipeline.storage.artifacts.parse_artifact_id_v3(artifact_id: str) Tuple[str, str, int | None][source]

Parse V3 artifact ID into components.

Parameters:

artifact_id – V3 artifact ID string

Returns:

Tuple of (pipeline_id, chain_hash, fold_id)

Raises:

ValueError – If format is invalid

Examples

>>> parse_artifact_id_v3("0001_pls$a1b2c3d4e5f6:all")
('0001_pls', 'a1b2c3d4e5f6', None)
>>> parse_artifact_id_v3("0001_pls$a1b2c3d4e5f6:0")
('0001_pls', 'a1b2c3d4e5f6', 0)
nirs4all.pipeline.storage.artifacts.parse_filename(filename: str) Tuple[str, str, str] | None[source]

Parse artifact filename into components.

Handles new format: <type>_<class>_<short_hash>.<ext> Also handles legacy format: <class>_<short_hash>.<ext>

Parameters:

filename – Filename to parse

Returns:

Tuple of (artifact_type, class_name, short_hash) or None if invalid

nirs4all.pipeline.storage.artifacts.validate_artifact_id(artifact_id: str) bool[source]

Validate artifact ID format (V3 only).

Parameters:

artifact_id – Artifact ID to validate

Returns:

True if valid V3 format, False otherwise