nirs4all.pipeline.storage.artifacts.operator_chain module

Operator Chain - V3 artifact identification system.

This module provides the core data structures for the V3 artifact system: - OperatorNode: Represents a single operator in the execution path - OperatorChain: Full path of operators that produced an artifact

The Operator Chain is the fundamental identifier for any artifact. It encodes the complete chain of operators that produced the artifact, enabling: - Complete chain tracking from input to output - Deterministic replay of any execution path - Unified handling of branching, multi-source, and stacking

Chain Path Format:

“s{step}.{ClassName}[br={branch},src={source}]>s{step}.{ClassName}[…]”

Examples

  • “s1.MinMaxScaler[src=0]” - Single transformer at step 1, source 0

  • “s1.MinMaxScaler>s3.SNV[br=0]>s4.PLS[br=0]” - Chain through branch 0

  • “s4.PLS[br=0]+s4.RF[br=1]>s5.Ridge” - Meta-model combining branches

class nirs4all.pipeline.storage.artifacts.operator_chain.OperatorChain(nodes: ~typing.List[~nirs4all.pipeline.storage.artifacts.operator_chain.OperatorNode] = <factory>, pipeline_id: str = '')[source]

Bases: object

Ordered sequence of OperatorNodes representing the full execution path.

The OperatorChain captures the complete path of operators from input to the current artifact, enabling deterministic artifact identification and replay.

nodes

Ordered list of OperatorNode objects in the chain

Type:

List[nirs4all.pipeline.storage.artifacts.operator_chain.OperatorNode]

pipeline_id

Pipeline identifier this chain belongs to

Type:

str

append(node: OperatorNode) OperatorChain[source]

Return new chain with node appended.

Parameters:

node – OperatorNode to append

Returns:

New OperatorChain with the node appended

copy() OperatorChain[source]

Create a deep copy of this chain.

Returns:

New OperatorChain with copied nodes

extend(other: OperatorChain) OperatorChain[source]

Return new chain with another chain’s nodes appended.

Parameters:

other – OperatorChain to append

Returns:

New OperatorChain with all nodes from both chains

filter_branch(target_branch_path: List[int]) OperatorChain[source]

Return chain with only nodes matching the branch path.

Includes nodes that: - Have no branch path (shared/pre-branch artifacts) - Have a branch path that is a prefix of or equal to target

Parameters:

target_branch_path – Branch path to filter for

Returns:

New OperatorChain with only matching nodes

filter_source(source_index: int) OperatorChain[source]

Return chain with only nodes for the specified source.

Includes nodes that: - Have no source_index (single source) - Have matching source_index

Parameters:

source_index – Source index to filter for

Returns:

New OperatorChain with only matching nodes

filter_step(step_index: int) OperatorChain[source]

Return chain with only nodes at the specified step.

Parameters:

step_index – Step index to filter for

Returns:

New OperatorChain with only matching nodes

classmethod from_dict(data: Dict[str, Any]) OperatorChain[source]

Create OperatorChain from dictionary.

Parameters:

data – Dictionary representation

Returns:

OperatorChain instance

classmethod from_path(path: str, pipeline_id: str = '') OperatorChain[source]

Parse OperatorChain from a path string.

Parameters:
  • path – Chain path string like “s1.MinMaxScaler>s3.SNV[br=0]”

  • pipeline_id – Pipeline identifier

Returns:

OperatorChain instance

get_branch_path() List[int][source]

Get the branch path from the last node.

Returns:

Branch path of the last node, or empty list if no nodes

get_last_node() OperatorNode | None[source]

Get the last node in the chain.

Returns:

Last OperatorNode or None if chain is empty

get_nodes_at_step(step_index: int) List[OperatorNode][source]

Get all nodes at a specific step.

Parameters:

step_index – Step index to filter

Returns:

List of nodes at that step

is_empty() bool[source]

Check if chain has no nodes.

Returns:

True if chain is empty

merge_with_prefix(prefix_chain: OperatorChain, step_offset: int = 0) OperatorChain[source]

Merge this chain with a prefix chain for bundle import.

Used when importing a bundle into a pipeline, where the bundle’s chain needs to be prefixed with the import context’s chain.

Parameters:
  • prefix_chain – Chain to prepend (the import context)

  • step_offset – Offset to add to step indices in this chain

Returns:

New merged OperatorChain

Example

>>> bundle_chain = OperatorChain.from_path("s1.Scaler>s3.PLS")
>>> import_chain = OperatorChain.from_path("s1.Import")
>>> merged = bundle_chain.merge_with_prefix(import_chain, step_offset=1)
# Result: "s1.Import>s2.Scaler>s4.PLS"
nodes: List[OperatorNode]
pipeline_id: str = ''
remap_steps(step_mapping: Dict[int, int]) OperatorChain[source]

Create new chain with remapped step indices.

Parameters:

step_mapping – Mapping from old step index to new step index

Returns:

New OperatorChain with remapped steps

to_dict() Dict[str, Any][source]

Convert to dictionary for serialization.

Returns:

Dictionary representation

to_hash(length: int = 12) str[source]

Compute deterministic hash of the chain path.

Parameters:

length – Number of hex characters to return (default: 12)

Returns:

Truncated SHA256 hash of the chain path

to_path() str[source]

Generate full path string from all nodes.

Format: node1>node2>node3

Returns:

Chain path string

Examples

>>> chain = OperatorChain([
...     OperatorNode(1, "MinMaxScaler"),
...     OperatorNode(3, "SNV", branch_path=[0])
... ])
>>> chain.to_path()
's1.MinMaxScaler>s3.SNV[br=0]'
with_pipeline_id(pipeline_id: str) OperatorChain[source]

Create a copy of this chain with a new pipeline ID.

Parameters:

pipeline_id – New pipeline ID to set

Returns:

New OperatorChain with the specified pipeline_id

class nirs4all.pipeline.storage.artifacts.operator_chain.OperatorNode(step_index: int, operator_class: str, branch_path: ~typing.List[int] = <factory>, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None, operator_name: str | None = None)[source]

Bases: object

Represents a single operator in the execution chain.

An OperatorNode captures all the context needed to identify a specific operator execution within a pipeline, including its position, branch context, and source index for multi-source processing.

step_index

Pipeline step number (1-based)

Type:

int

operator_class

Class name of the operator (e.g., “MinMaxScaler”, “PLS”)

Type:

str

branch_path

Branch indices path (e.g., [0] for branch 0, [0, 1] for nested)

Type:

List[int]

source_index

Index for multi-source transformers (None for single source)

Type:

int | None

fold_id

Fold number for CV models (None for shared artifacts)

Type:

int | None

substep_index

Index within a substep (for [model1, model2] at same step)

Type:

int | None

operator_name

Instance name if different from class name

Type:

str | None

branch_path: List[int]
fold_id: int | None = None
classmethod from_dict(data: Dict[str, Any]) OperatorNode[source]

Create OperatorNode from dictionary.

Parameters:

data – Dictionary representation

Returns:

OperatorNode instance

classmethod from_key(key: str) OperatorNode[source]

Parse an OperatorNode from its key string representation.

Parameters:

key – Key string like “s3.SNV[br=0,src=1]”

Returns:

OperatorNode instance

Raises:

ValueError – If key format is invalid

matches_context(step_index: int | None = None, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None) bool[source]

Check if this node matches the given context filters.

None values are treated as “match any”.

Parameters:
  • step_index – Step number to match (None = any)

  • branch_path – Branch path to match (None = any)

  • source_index – Source index to match (None = any)

  • fold_id – Fold ID to match (None = any)

Returns:

True if node matches all specified filters

operator_class: str
operator_name: str | None = None
source_index: int | None = None
step_index: int
substep_index: int | None = None
to_dict() Dict[str, Any][source]

Convert to dictionary for serialization.

Returns:

Dictionary representation suitable for YAML/JSON

to_key() str[source]

Generate compact key string for this node.

Format: s{step}.{Class}[qualifiers]

Qualifiers (only if present):

br={branch_path} - Branch context src={source_index} - Multi-source index sub={substep_index} - Substep index

Returns:

Compact key string for this operator node

Examples

>>> OperatorNode(1, "MinMaxScaler").to_key()
's1.MinMaxScaler'
>>> OperatorNode(3, "SNV", branch_path=[0]).to_key()
's3.SNV[br=0]'
>>> OperatorNode(3, "SNV", branch_path=[0], source_index=1).to_key()
's3.SNV[br=0,src=1]'
with_fold(fold_id: int) OperatorNode[source]

Create a copy of this node with a specific fold ID.

Parameters:

fold_id – The fold ID to set

Returns:

New OperatorNode with the specified fold_id

with_source(source_index: int) OperatorNode[source]

Create a copy of this node with a specific source index.

Parameters:

source_index – The source index to set

Returns:

New OperatorNode with the specified source_index

nirs4all.pipeline.storage.artifacts.operator_chain.compute_chain_hash(chain_path: str, length: int = 12) str[source]

Compute deterministic hash from chain path string.

Parameters:
  • chain_path – Full operator chain path

  • length – Number of hex characters (default: 12)

Returns:

Truncated SHA256 hash

nirs4all.pipeline.storage.artifacts.operator_chain.generate_artifact_id_v3(pipeline_id: str, chain: OperatorChain | str, fold_id: int | None = None) str[source]

Generate V3 artifact ID from chain.

Format: {pipeline_id}${chain_hash}:{fold_id}

Parameters:
  • pipeline_id – Pipeline identifier

  • chain – Operator chain object or chain path string for this artifact

  • fold_id – Fold ID (None for shared artifacts)

Returns:

V3 artifact ID string

Examples

>>> generate_artifact_id_v3("0001_pls", chain, None)
'0001_pls$a1b2c3d4e5f6:all'
>>> generate_artifact_id_v3("0001_pls", chain, 0)
'0001_pls$a1b2c3d4e5f6:0'
nirs4all.pipeline.storage.artifacts.operator_chain.is_v3_artifact_id(artifact_id: str) bool[source]

Check if an artifact ID is in V3 format.

Parameters:

artifact_id – Artifact ID to check

Returns:

True if V3 format, False otherwise

nirs4all.pipeline.storage.artifacts.operator_chain.parse_artifact_id_v3(artifact_id: str) Tuple[str, str, int | None][source]

Parse V3 artifact ID into components.

Parameters:

artifact_id – V3 artifact ID string

Returns:

Tuple of (pipeline_id, chain_hash, fold_id)

Raises:

ValueError – If format is invalid

Examples

>>> parse_artifact_id_v3("0001_pls$a1b2c3d4e5f6:all")
('0001_pls', 'a1b2c3d4e5f6', None)
>>> parse_artifact_id_v3("0001_pls$a1b2c3d4e5f6:0")
('0001_pls', 'a1b2c3d4e5f6', 0)