nirs4all.pipeline.storage.artifacts.operator_chain module
Operator Chain - V3 artifact identification system.
This module provides the core data structures for the V3 artifact system: - OperatorNode: Represents a single operator in the execution path - OperatorChain: Full path of operators that produced an artifact
The Operator Chain is the fundamental identifier for any artifact. It encodes the complete chain of operators that produced the artifact, enabling: - Complete chain tracking from input to output - Deterministic replay of any execution path - Unified handling of branching, multi-source, and stacking
- Chain Path Format:
“s{step}.{ClassName}[br={branch},src={source}]>s{step}.{ClassName}[…]”
Examples
“s1.MinMaxScaler[src=0]” - Single transformer at step 1, source 0
“s1.MinMaxScaler>s3.SNV[br=0]>s4.PLS[br=0]” - Chain through branch 0
“s4.PLS[br=0]+s4.RF[br=1]>s5.Ridge” - Meta-model combining branches
- class nirs4all.pipeline.storage.artifacts.operator_chain.OperatorChain(nodes: List[OperatorNode] = <factory>, pipeline_id: str = '')[source]
Bases:
objectOrdered sequence of OperatorNodes representing the full execution path.
The OperatorChain captures the complete path of operators from input to the current artifact, enabling deterministic artifact identification and replay.
- nodes
Ordered list of OperatorNode objects in the chain
- append(node: OperatorNode) OperatorChain[source]
Return new chain with node appended.
- Parameters:
node – OperatorNode to append
- Returns:
New OperatorChain with the node appended
- copy() OperatorChain[source]
Create a deep copy of this chain.
- Returns:
New OperatorChain with copied nodes
- extend(other: OperatorChain) OperatorChain[source]
Return new chain with another chain’s nodes appended.
- Parameters:
other – OperatorChain to append
- Returns:
New OperatorChain with all nodes from both chains
- filter_branch(target_branch_path: List[int]) OperatorChain[source]
Return chain with only nodes matching the branch path.
Includes nodes that: - Have no branch path (shared/pre-branch artifacts) - Have a branch path that is a prefix of or equal to target
- Parameters:
target_branch_path – Branch path to filter for
- Returns:
New OperatorChain with only matching nodes
- filter_source(source_index: int) OperatorChain[source]
Return chain with only nodes for the specified source.
Includes nodes that: - Have no source_index (single source) - Have matching source_index
- Parameters:
source_index – Source index to filter for
- Returns:
New OperatorChain with only matching nodes
- filter_step(step_index: int) OperatorChain[source]
Return chain with only nodes at the specified step.
- Parameters:
step_index – Step index to filter for
- Returns:
New OperatorChain with only matching nodes
- classmethod from_dict(data: Dict[str, Any]) OperatorChain[source]
Create OperatorChain from dictionary.
- Parameters:
data – Dictionary representation
- Returns:
OperatorChain instance
- classmethod from_path(path: str, pipeline_id: str = '') OperatorChain[source]
Parse OperatorChain from a path string.
- Parameters:
path – Chain path string like “s1.MinMaxScaler>s3.SNV[br=0]”
pipeline_id – Pipeline identifier
- Returns:
OperatorChain instance
- get_branch_path() List[int][source]
Get the branch path from the last node.
- Returns:
Branch path of the last node, or empty list if no nodes
- get_last_node() OperatorNode | None[source]
Get the last node in the chain.
- Returns:
Last OperatorNode or None if chain is empty
- get_nodes_at_step(step_index: int) List[OperatorNode][source]
Get all nodes at a specific step.
- Parameters:
step_index – Step index to filter
- Returns:
List of nodes at that step
- merge_with_prefix(prefix_chain: OperatorChain, step_offset: int = 0) OperatorChain[source]
Merge this chain with a prefix chain for bundle import.
Used when importing a bundle into a pipeline, where the bundle’s chain needs to be prefixed with the import context’s chain.
- Parameters:
prefix_chain – Chain to prepend (the import context)
step_offset – Offset to add to step indices in this chain
- Returns:
New merged OperatorChain
Example
>>> bundle_chain = OperatorChain.from_path("s1.Scaler>s3.PLS") >>> import_chain = OperatorChain.from_path("s1.Import") >>> merged = bundle_chain.merge_with_prefix(import_chain, step_offset=1) # Result: "s1.Import>s2.Scaler>s4.PLS"
- nodes: List[OperatorNode]
- remap_steps(step_mapping: Dict[int, int]) OperatorChain[source]
Create new chain with remapped step indices.
- Parameters:
step_mapping – Mapping from old step index to new step index
- Returns:
New OperatorChain with remapped steps
- to_dict() Dict[str, Any][source]
Convert to dictionary for serialization.
- Returns:
Dictionary representation
- to_hash(length: int = 12) str[source]
Compute deterministic hash of the chain path.
- Parameters:
length – Number of hex characters to return (default: 12)
- Returns:
Truncated SHA256 hash of the chain path
- to_path() str[source]
Generate full path string from all nodes.
Format: node1>node2>node3
- Returns:
Chain path string
Examples
>>> chain = OperatorChain([ ... OperatorNode(1, "MinMaxScaler"), ... OperatorNode(3, "SNV", branch_path=[0]) ... ]) >>> chain.to_path() 's1.MinMaxScaler>s3.SNV[br=0]'
- with_pipeline_id(pipeline_id: str) OperatorChain[source]
Create a copy of this chain with a new pipeline ID.
- Parameters:
pipeline_id – New pipeline ID to set
- Returns:
New OperatorChain with the specified pipeline_id
- class nirs4all.pipeline.storage.artifacts.operator_chain.OperatorNode(step_index: int, operator_class: str, branch_path: List[int] = <factory>, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None, operator_name: str | None = None)[source]
Bases:
objectRepresents a single operator in the execution chain.
An OperatorNode captures all the context needed to identify a specific operator execution within a pipeline, including its position, branch context, and source index for multi-source processing.
- classmethod from_dict(data: Dict[str, Any]) OperatorNode[source]
Create OperatorNode from dictionary.
- Parameters:
data – Dictionary representation
- Returns:
OperatorNode instance
- classmethod from_key(key: str) OperatorNode[source]
Parse an OperatorNode from its key string representation.
- Parameters:
key – Key string like “s3.SNV[br=0,src=1]”
- Returns:
OperatorNode instance
- Raises:
ValueError – If key format is invalid
- matches_context(step_index: int | None = None, branch_path: List[int] | None = None, source_index: int | None = None, fold_id: int | None = None) bool[source]
Check if this node matches the given context filters.
None values are treated as “match any”.
- Parameters:
step_index – Step number to match (None = any)
branch_path – Branch path to match (None = any)
source_index – Source index to match (None = any)
fold_id – Fold ID to match (None = any)
- Returns:
True if node matches all specified filters
- to_dict() Dict[str, Any][source]
Convert to dictionary for serialization.
- Returns:
Dictionary representation suitable for YAML/JSON
- to_key() str[source]
Generate compact key string for this node.
Format: s{step}.{Class}[qualifiers]
- Qualifiers (only if present):
br={branch_path} - Branch context src={source_index} - Multi-source index sub={substep_index} - Substep index
- Returns:
Compact key string for this operator node
Examples
>>> OperatorNode(1, "MinMaxScaler").to_key() 's1.MinMaxScaler' >>> OperatorNode(3, "SNV", branch_path=[0]).to_key() 's3.SNV[br=0]' >>> OperatorNode(3, "SNV", branch_path=[0], source_index=1).to_key() 's3.SNV[br=0,src=1]'
- with_fold(fold_id: int) OperatorNode[source]
Create a copy of this node with a specific fold ID.
- Parameters:
fold_id – The fold ID to set
- Returns:
New OperatorNode with the specified fold_id
- with_source(source_index: int) OperatorNode[source]
Create a copy of this node with a specific source index.
- Parameters:
source_index – The source index to set
- Returns:
New OperatorNode with the specified source_index
- nirs4all.pipeline.storage.artifacts.operator_chain.compute_chain_hash(chain_path: str, length: int = 12) str[source]
Compute deterministic hash from chain path string.
- Parameters:
chain_path – Full operator chain path
length – Number of hex characters (default: 12)
- Returns:
Truncated SHA256 hash
- nirs4all.pipeline.storage.artifacts.operator_chain.generate_artifact_id_v3(pipeline_id: str, chain: OperatorChain | str, fold_id: int | None = None) str[source]
Generate V3 artifact ID from chain.
Format: {pipeline_id}${chain_hash}:{fold_id}
- Parameters:
pipeline_id – Pipeline identifier
chain – Operator chain object or chain path string for this artifact
fold_id – Fold ID (None for shared artifacts)
- Returns:
V3 artifact ID string
Examples
>>> generate_artifact_id_v3("0001_pls", chain, None) '0001_pls$a1b2c3d4e5f6:all' >>> generate_artifact_id_v3("0001_pls", chain, 0) '0001_pls$a1b2c3d4e5f6:0'
- nirs4all.pipeline.storage.artifacts.operator_chain.is_v3_artifact_id(artifact_id: str) bool[source]
Check if an artifact ID is in V3 format.
- Parameters:
artifact_id – Artifact ID to check
- Returns:
True if V3 format, False otherwise
- nirs4all.pipeline.storage.artifacts.operator_chain.parse_artifact_id_v3(artifact_id: str) Tuple[str, str, int | None][source]
Parse V3 artifact ID into components.
- Parameters:
artifact_id – V3 artifact ID string
- Returns:
Tuple of (pipeline_id, chain_hash, fold_id)
- Raises:
ValueError – If format is invalid
Examples
>>> parse_artifact_id_v3("0001_pls$a1b2c3d4e5f6:all") ('0001_pls', 'a1b2c3d4e5f6', None) >>> parse_artifact_id_v3("0001_pls$a1b2c3d4e5f6:0") ('0001_pls', 'a1b2c3d4e5f6', 0)