nirs4all.pipeline.trace package

Submodules

Module contents

Execution Trace module for nirs4all pipeline (V3).

This module provides data structures and utilities for recording the exact execution path through a pipeline, enabling deterministic prediction replay.

V3 improvements: - OperatorChain tracking for complete execution path - Branch and source indexes for artifact lookup - Chain-based artifact identification - Proper recording of branch substeps

Key Components:
  • ExecutionTrace: Complete trace of a pipeline execution path

  • ExecutionStep: Record of a single step’s execution with chain tracking

  • StepArtifacts: Artifacts produced by a single step with V3 indexes

  • TraceRecorder: Records traces with chain/branch stacks

  • TraceBasedExtractor: Extracts minimal pipeline from trace

  • MinimalPipeline: Minimal pipeline ready for prediction replay

Design Principles:
  1. Controller-Agnostic: Works with any controller type

  2. Deterministic: Same chain -> same artifacts

  3. Complete: Full execution path tracking

  4. Composable: Same infrastructure for predict, retrain, transfer, export

Usage:
>>> from nirs4all.pipeline.trace import TraceRecorder, ExecutionTrace
>>>
>>> # During training with V3 chain tracking
>>> recorder = TraceRecorder(pipeline_uid="0001_pls_abc123")
>>> recorder.start_step(step_index=1, operator_type="transform", operator_class="SNV")
>>> chain = recorder.build_chain_for_artifact(1, "SNV")
>>> recorder.record_artifact("0001$abc123:all", chain_path=chain.to_path())
>>> recorder.end_step()
>>> trace = recorder.finalize()
class nirs4all.pipeline.trace.ExecutionStep(step_index: int, operator_type: str = '', operator_class: str = '', operator_config: Dict[str, ~typing.Any]=<factory>, execution_mode: StepExecutionMode = StepExecutionMode.TRAIN, artifacts: StepArtifacts = <factory>, branch_path: List[int] = <factory>, branch_name: str = '', duration_ms: float = 0.0, metadata: Dict[str, ~typing.Any]=<factory>, input_chain_path: str = '', output_chain_paths: List[str] = <factory>, source_count: int = 1, produces_branches: bool = False, substep_index: int | None = None, input_shape: Tuple[int, int] | None=None, output_shape: Tuple[int, int] | None=None, input_features_shape: Tuple[int, int, int]] | None=None, output_features_shape: Tuple[int, int, int]] | None=None)[source]

Bases: object

Record of a single step’s execution in the trace (V3).

Captures all information needed to replay this step during prediction, including operator configuration, execution mode, and produced artifacts.

V3 additions: - input_chain: Operator chain up to this step’s input - output_chains: Chains produced by this step (for branching) - source_count: Number of X sources at this step - produces_branches: Whether this is a branch operator

step_index

1-based step number in the pipeline

Type:

int

operator_type

Type of operation (e.g., “transform”, “model”, “splitter”)

Type:

str

operator_class

Class name of the operator (e.g., “PLSRegression”, “SNV”)

Type:

str

operator_config

Serialized operator configuration

Type:

Dict[str, Any]

execution_mode

How the step was executed (train/predict/skip)

Type:

nirs4all.pipeline.trace.execution_trace.StepExecutionMode

artifacts

Artifacts produced by this step

Type:

nirs4all.pipeline.trace.execution_trace.StepArtifacts

branch_path

Branch indices if in a branch context

Type:

List[int]

branch_name

Human-readable branch name

Type:

str

duration_ms

Execution duration in milliseconds

Type:

float

metadata

Additional step-specific metadata

Type:

Dict[str, Any]

# V3 chain tracking
input_chain_path

Serialized operator chain up to this step’s input

Type:

str

output_chain_paths

List of chains produced by this step

Type:

List[str]

source_count

Number of X sources processed

Type:

int

produces_branches

True if this is a branch operator

Type:

bool

substep_index

Index within substep (for [model1, model2])

Type:

int | None

add_output_chain(chain_path: str) None[source]

Add an output chain path to this step.

Parameters:

chain_path – Operator chain path to add

artifacts: StepArtifacts
branch_name: str = ''
branch_path: List[int]
duration_ms: float = 0.0
execution_mode: StepExecutionMode = 'train'
classmethod from_dict(data: Dict[str, Any]) ExecutionStep[source]

Create ExecutionStep from dictionary.

Parameters:

data – Dictionary from manifest

Returns:

ExecutionStep instance

has_artifacts() bool[source]

Check if this step produced any artifacts.

Returns:

True if the step has at least one artifact

input_chain_path: str = ''
input_features_shape: List[Tuple[int, int, int]] | None = None
input_shape: Tuple[int, int] | None = None
metadata: Dict[str, Any]
operator_class: str = ''
operator_config: Dict[str, Any]
operator_type: str = ''
output_chain_paths: List[str]
output_features_shape: List[Tuple[int, int, int]] | None = None
output_shape: Tuple[int, int] | None = None
produces_branches: bool = False
source_count: int = 1
step_index: int
substep_index: int | None = None
to_dict() Dict[str, Any][source]

Convert to dictionary for YAML serialization.

Returns:

Dictionary suitable for manifest storage

class nirs4all.pipeline.trace.ExecutionTrace(trace_id: str = <factory>, pipeline_uid: str = '', created_at: str = <factory>, steps: List[ExecutionStep] = <factory>, model_step_index: int | None = None, fold_weights: Dict[int, float] | None=None, preprocessing_chain: str = '', metadata: Dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Complete trace of a pipeline execution path.

Records the exact sequence of steps and artifacts that produced a prediction, enabling deterministic replay for prediction, transfer, and export.

The trace is controller-agnostic: it records what happened without encoding specific controller logic, so any controller (existing or custom) can be replayed using the same infrastructure.

trace_id

Unique identifier for this trace

Type:

str

pipeline_uid

Parent pipeline UID

Type:

str

created_at

ISO timestamp of trace creation

Type:

str

steps

Ordered list of execution steps

Type:

List[nirs4all.pipeline.trace.execution_trace.ExecutionStep]

model_step_index

Index of the model step that produced predictions

Type:

int | None

fold_weights

Per-fold weights for CV ensemble (None for single model)

Type:

Dict[int, float] | None

preprocessing_chain

Summary of preprocessing steps for quick reference

Type:

str

metadata

Additional trace metadata (e.g., dataset info, run parameters)

Type:

Dict[str, Any]

add_step(step: ExecutionStep) None[source]

Add a step to the trace.

Parameters:

step – ExecutionStep to add

created_at: str
finalize(preprocessing_chain: str | None = None, metadata: Dict[str, Any] | None = None) None[source]

Finalize the trace with summary information.

Call this after all steps have been recorded to add summary info.

Parameters:
  • preprocessing_chain – Summary string of preprocessing (e.g., “SNV>SG>MinMax”)

  • metadata – Additional metadata to merge

fold_weights: Dict[int, float] | None = None
classmethod from_dict(data: Dict[str, Any]) ExecutionTrace[source]

Create ExecutionTrace from dictionary.

Parameters:

data – Dictionary from manifest

Returns:

ExecutionTrace instance

get_artifact_ids() List[str][source]

Get all artifact IDs in this trace.

Returns:

List of all artifact IDs across all steps

get_artifacts_by_step(step_index: int) StepArtifacts | None[source]

Get artifacts for a specific step.

Parameters:

step_index – 1-based step index

Returns:

StepArtifacts or None if step not found

get_fold_artifact_ids() Dict[int, str][source]

Get per-fold model artifact IDs.

Returns:

Dictionary of fold_id -> artifact_id

get_model_artifact_id() str | None[source]

Get the primary model artifact ID.

Returns:

Model artifact ID or None if no model step

get_step(step_index: int) ExecutionStep | None[source]

Get a step by its index.

Parameters:

step_index – 1-based step index to find

Returns:

ExecutionStep or None if not found

get_steps_before(step_index: int) List[ExecutionStep][source]

Get all steps before a given step index.

Parameters:

step_index – 1-based step index (exclusive)

Returns:

List of steps with step_index < given index

get_steps_up_to_model() List[ExecutionStep][source]

Get all steps up to and including the model step.

Returns:

List of steps needed to reproduce the prediction

metadata: Dict[str, Any]
model_step_index: int | None = None
pipeline_uid: str = ''
preprocessing_chain: str = ''
set_model_step(step_index: int, fold_weights: Dict[int, float] | None = None) None[source]

Set the model step index and optional fold weights.

Parameters:
  • step_index – Index of the model step

  • fold_weights – Optional per-fold weights for CV

steps: List[ExecutionStep]
to_dict() Dict[str, Any][source]

Convert to dictionary for YAML serialization.

Returns:

Dictionary suitable for manifest storage

trace_id: str
class nirs4all.pipeline.trace.MinimalPipeline(trace_id: str = '', pipeline_uid: str = '', steps: List[MinimalPipelineStep] = <factory>, artifact_map: Dict[int, ~nirs4all.pipeline.trace.execution_trace.StepArtifacts]=<factory>, model_step_index: int | None = None, fold_weights: Dict[int, float] | None=None, preprocessing_chain: str = '', metadata: Dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Minimal pipeline extracted from an execution trace.

Contains only the steps needed to replay a prediction, with artifact mappings for each step. Used by MinimalPredictor for efficient prediction.

trace_id

ID of the source execution trace

Type:

str

pipeline_uid

UID of the parent pipeline

Type:

str

steps

Ordered list of minimal steps to execute

Type:

List[nirs4all.pipeline.trace.extractor.MinimalPipelineStep]

artifact_map

Mapping of step_index to list of (artifact_id, step_artifacts)

Type:

Dict[int, nirs4all.pipeline.trace.execution_trace.StepArtifacts]

model_step_index

Index of the model step

Type:

int | None

fold_weights

Per-fold weights for CV ensemble

Type:

Dict[int, float] | None

preprocessing_chain

Summary of preprocessing steps

Type:

str

metadata

Additional metadata from trace

Type:

Dict[str, Any]

artifact_map: Dict[int, StepArtifacts]
fold_weights: Dict[int, float] | None = None
get_all_chain_paths() Dict[str, str][source]

Get all artifacts indexed by chain path.

Returns:

Dict mapping chain_path to artifact_id

get_artifact_by_chain(chain_path: str) str | None[source]

Get artifact ID by V3 chain path across all steps.

Parameters:

chain_path – Operator chain path

Returns:

Artifact ID or None if not found

get_artifact_ids() List[str][source]

Get all artifact IDs in the minimal pipeline.

Returns:

List of all artifact IDs across all steps

get_artifacts_for_step(step_index: int) StepArtifacts | None[source]

Get artifacts for a specific step.

Parameters:

step_index – 1-based step index

Returns:

StepArtifacts or None if not found

get_step(step_index: int) MinimalPipelineStep | None[source]

Get a step by its index.

Parameters:

step_index – 1-based step index

Returns:

MinimalPipelineStep or None if not found

get_step_count() int[source]

Get the number of steps in the minimal pipeline.

Returns:

Number of steps

get_step_indices() List[int][source]

Get all step indices in execution order.

Returns:

List of step indices

has_step(step_index: int) bool[source]

Check if a step is included in the minimal pipeline.

Parameters:

step_index – 1-based step index

Returns:

True if step is included

metadata: Dict[str, Any]
model_step_index: int | None = None
pipeline_uid: str = ''
preprocessing_chain: str = ''
steps: List[MinimalPipelineStep]
trace_id: str = ''
class nirs4all.pipeline.trace.MinimalPipelineStep(step_index: int, step_config: Any = None, execution_mode: StepExecutionMode = StepExecutionMode.PREDICT, artifacts: StepArtifacts = <factory>, operator_type: str = '', operator_class: str = '', branch_path: List[int] = <factory>, branch_name: str = '', substep_index: int | None = None, depends_on: Set[int] = <factory>)[source]

Bases: object

A step in the minimal pipeline for prediction replay.

Contains the step configuration and metadata needed to replay the step during prediction, without encoding controller-specific logic.

step_index

1-based step index from original pipeline

Type:

int

step_config

The pipeline step configuration (dict or object)

Type:

Any

execution_mode

How to execute this step (train/predict/skip)

Type:

nirs4all.pipeline.trace.execution_trace.StepExecutionMode

artifacts

Artifacts for this step (from trace)

Type:

nirs4all.pipeline.trace.execution_trace.StepArtifacts

operator_type

Type of operator (for logging/debugging)

Type:

str

operator_class

Class name of operator

Type:

str

branch_path

Branch path if in branch context

Type:

List[int]

branch_name

Human-readable branch name

Type:

str

depends_on

Indices of steps this step depends on

Type:

Set[int]

artifacts: StepArtifacts
branch_name: str = ''
branch_path: List[int]
depends_on: Set[int]
execution_mode: StepExecutionMode = 'predict'
get_artifact_by_chain(chain_path: str) str | None[source]

Get artifact ID by V3 chain path.

Parameters:

chain_path – Operator chain path

Returns:

Artifact ID or None if not found

get_artifact_ids() List[str][source]

Get all artifact IDs for this step.

Returns:

List of artifact IDs

get_artifacts_by_chain() Dict[str, str][source]

Get all artifacts indexed by chain path.

Returns:

Dict mapping chain_path to artifact_id

has_artifacts() bool[source]

Check if this step has associated artifacts.

Returns:

True if artifacts are available for this step

operator_class: str = ''
operator_type: str = ''
step_config: Any = None
step_index: int
substep_index: int | None = None
class nirs4all.pipeline.trace.StepArtifacts(artifact_ids: ~typing.List[str] = <factory>, primary_artifact_id: str | None = None, fold_artifact_ids: ~typing.Dict[int, str] = <factory>, primary_artifacts: ~typing.Dict[str, str] = <factory>, by_branch: ~typing.Dict[~typing.Tuple[int, ...], ~typing.List[str]] = <factory>, by_source: ~typing.Dict[int, ~typing.List[str]] = <factory>, by_chain: ~typing.Dict[str, str] = <factory>, metadata: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: object

Artifacts produced by a single step (V3).

Records all artifacts created during step execution, with V3 indexes for efficient lookup by chain path, branch, source, and fold.

artifact_ids

List of artifact IDs produced by this step

Type:

List[str]

primary_artifact_id

Main artifact (e.g., model) if applicable

Type:

str | None

fold_artifact_ids

Per-fold artifacts for CV models

Type:

Dict[int, str]

# V3 indexes
primary_artifacts

Map of chain_path to artifact_id for shared artifacts

Type:

Dict[str, str]

by_branch

Artifacts indexed by branch path tuple

Type:

Dict[Tuple[int, …], List[str]]

by_source

Artifacts indexed by source index

Type:

Dict[int, List[str]]

by_chain

Artifacts indexed by chain path

Type:

Dict[str, str]

metadata

Additional artifact metadata (types, paths, etc.)

Type:

Dict[str, Any]

add_artifact(artifact_id: str, is_primary: bool = False, chain_path: str | None = None, branch_path: List[int] | None = None, source_index: int | None = None) None[source]

Add an artifact ID to this step’s artifacts (V3).

Parameters:
  • artifact_id – The artifact ID to add

  • is_primary – Whether this is the primary artifact

  • chain_path – V3 operator chain path

  • branch_path – Branch path for indexing

  • source_index – Source index for multi-source indexing

add_fold_artifact(fold_id: int, artifact_id: str, chain_path: str | None = None, branch_path: List[int] | None = None) None[source]

Add a fold-specific artifact.

Parameters:
  • fold_id – CV fold index

  • artifact_id – Artifact ID for this fold

  • chain_path – V3 operator chain path

  • branch_path – Branch path for indexing

artifact_ids: List[str]
by_branch: Dict[Tuple[int, ...], List[str]]
by_chain: Dict[str, str]
by_source: Dict[int, List[str]]
fold_artifact_ids: Dict[int, str]
classmethod from_dict(data: Dict[str, Any]) StepArtifacts[source]

Create StepArtifacts from dictionary.

Parameters:

data – Dictionary from manifest

Returns:

StepArtifacts instance

get_artifact_by_chain(chain_path: str) str | None[source]

Get artifact ID by exact chain path match.

Parameters:

chain_path – Operator chain path

Returns:

Artifact ID or None if not found

get_artifacts_for_branch(branch_path: List[int]) List[str][source]

Get artifact IDs matching a branch path.

Includes artifacts from: - Exact branch match - Empty branch (shared/pre-branch) - Parent branches (for nested branches)

Parameters:

branch_path – Target branch path

Returns:

List of matching artifact IDs

get_artifacts_for_source(source_index: int) List[str][source]

Get artifact IDs for a specific source.

Parameters:

source_index – Source index to filter

Returns:

List of artifact IDs for that source

merge(other: StepArtifacts) None[source]

Merge another StepArtifacts into this one.

Used when multiple substeps share the same step_index and their artifacts need to be combined in the artifact_map.

Parameters:

other – StepArtifacts to merge into this one

metadata: Dict[str, Any]
primary_artifact_id: str | None = None
primary_artifacts: Dict[str, str]
to_dict() Dict[str, Any][source]

Convert to dictionary for YAML serialization.

Returns:

Dictionary suitable for manifest storage

class nirs4all.pipeline.trace.StepExecutionMode(value)[source]

Bases: str, Enum

Mode of step execution.

TRAIN

Step fitted on data (creates new artifacts)

PREDICT

Step uses pre-fitted artifacts

SKIP

Step was skipped (no-op)

PREDICT = 'predict'
SKIP = 'skip'
TRAIN = 'train'
class nirs4all.pipeline.trace.TraceBasedExtractor(include_skipped: bool = False, preserve_order: bool = True)[source]

Bases: object

Extract minimal pipeline from execution trace.

The extractor analyzes an ExecutionTrace to determine which steps are needed for prediction replay and builds a MinimalPipeline with the correct artifact mappings.

The extractor is controller-agnostic: it uses trace metadata to identify steps without encoding knowledge of controller types.

include_skipped

Whether to include skipped steps in minimal pipeline

preserve_order

Whether to preserve original step order

Example

>>> extractor = TraceBasedExtractor()
>>> trace = manifest_manager.load_execution_trace(pipeline_uid, trace_id)
>>> minimal = extractor.extract(trace, full_pipeline_steps)
>>> print(f"Minimal pipeline has {minimal.get_step_count()} steps")
extract(trace: ExecutionTrace, full_pipeline: List[Any] | None = None, up_to_model: bool = True) MinimalPipeline[source]

Extract minimal pipeline from execution trace.

Analyzes the trace to determine which steps are needed for prediction and builds a MinimalPipeline with artifact mappings.

Parameters:
  • trace – ExecutionTrace to extract from

  • full_pipeline – Optional full pipeline steps (for step configs)

  • up_to_model – If True, only include steps up to model step

Returns:

MinimalPipeline with steps and artifact mappings

extract_for_branch(trace: ExecutionTrace, branch_path: List[int], full_pipeline: List[Any] | None = None) MinimalPipeline[source]

Extract minimal pipeline for a specific branch.

Includes shared steps (before branching) plus branch-specific steps.

Parameters:
  • trace – ExecutionTrace to extract from

  • branch_path – Branch path to extract (e.g., [0] for first branch)

  • full_pipeline – Optional full pipeline steps

Returns:

MinimalPipeline with steps for the specified branch

extract_for_branch_name(trace: ExecutionTrace, branch_name: str, full_pipeline: List[Any] | None = None) MinimalPipeline[source]

Extract minimal pipeline for a specific branch by name.

More reliable than extract_for_branch for nested branches where branch_id doesn’t map directly to branch_path. Uses branch_name for matching since it’s unique and stored in both predictions and trace.

Includes shared steps (before branching) plus branch-specific steps.

Parameters:
  • trace – ExecutionTrace to extract from

  • branch_name – Branch name to match (e.g., “branch_0_branch_0”)

  • full_pipeline – Optional full pipeline steps

Returns:

MinimalPipeline with steps for the specified branch

extract_for_step(trace: ExecutionTrace, target_step_index: int, full_pipeline: List[Any] | None = None) MinimalPipeline[source]

Extract minimal pipeline up to a specific step.

Useful for partial prediction or when targeting a specific model in a multi-model pipeline.

Parameters:
  • trace – ExecutionTrace to extract from

  • target_step_index – Target step index (inclusive)

  • full_pipeline – Optional full pipeline steps

Returns:

MinimalPipeline with steps up to target

get_required_artifact_ids(trace: ExecutionTrace, up_to_model: bool = True) List[str][source]

Get list of artifact IDs required for prediction.

Useful for pre-loading artifacts or validating artifact availability.

Parameters:
  • trace – ExecutionTrace to analyze

  • up_to_model – If True, only include artifacts up to model step

Returns:

List of artifact IDs needed for prediction

get_step_dependency_graph(trace: ExecutionTrace) Dict[int, Set[int]][source]

Build dependency graph from execution trace.

The dependency graph maps each step to the set of steps it depends on. This is inferred from the trace execution order and branch structure.

Parameters:

trace – ExecutionTrace to analyze

Returns:

Dictionary mapping step_index to set of dependency step indices

validate_trace_for_prediction(trace: ExecutionTrace) Tuple[bool, List[str]][source]

Validate that a trace has all information needed for prediction.

Checks that: - Model step is recorded - All steps up to model have recorded artifacts (if applicable) - No critical information is missing

Parameters:

trace – ExecutionTrace to validate

Returns:

Tuple of (is_valid, list of issues)

class nirs4all.pipeline.trace.TraceRecorder(pipeline_uid: str = '', pipeline_id: str = '', metadata: Dict[str, Any] | None = None)[source]

Bases: object

Records execution traces during pipeline execution (V3).

Builds an ExecutionTrace by recording step starts, artifact creations, and step completions. Designed for use within the pipeline executor.

V3 improvements: - Maintains a chain stack for tracking full operator chain - Maintains a branch stack for automatic branch path management - Tracks source index for multi-source pipelines - Records branch substeps individually

trace

The ExecutionTrace being built

current_step

The step currently being executed

step_start_time

Time when current step started (for duration)

pipeline_id

Pipeline identifier for chain generation

Example

>>> recorder = TraceRecorder(pipeline_uid="0001_pls_abc123")
>>> recorder.start_step(step_index=1, operator_type="transform", operator_class="SNV")
>>> recorder.record_artifact(artifact_id="0001$abc123:all", chain_path="s1.SNV")
>>> recorder.end_step()
>>> recorder.enter_branch(0)
>>> recorder.start_step(step_index=3, operator_type="transform", operator_class="PLS")
>>> recorder.record_artifact(artifact_id="0001$def456:0", chain_path="s1.SNV>s3.PLS[br=0]")
>>> recorder.end_step(is_model=True)
>>> recorder.exit_branch()
>>> trace = recorder.finalize(preprocessing_chain="SNV>MinMax")
add_step_metadata(key: str, value: Any) None[source]

Add metadata to the current step.

Parameters:
  • key – Metadata key

  • value – Metadata value

build_chain_for_artifact(step_index: int, operator_class: str, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None) OperatorChain[source]

Build an operator chain for an artifact.

Creates a chain based on current context plus the specified operator.

Parameters:
  • step_index – Step index of the operator

  • operator_class – Class name of the operator

  • source_index – Source index for multi-source

  • fold_id – Fold ID for CV models

  • substep_index – Substep index within step

Returns:

OperatorChain for the artifact

current_branch_path() List[int][source]

Get current branch path.

Returns:

Copy of current branch path

current_chain() OperatorChain[source]

Get current operator chain without modifying stack.

Returns:

Current OperatorChain

end_step(is_model: bool = False, fold_weights: Dict[int, float] | None = None, skip_trace: bool = False) None[source]

End the current step and add it to the trace.

Parameters:
  • is_model – Whether this is the model step

  • fold_weights – Per-fold weights for CV models

  • skip_trace – If True, don’t add this step to the trace

enter_branch(branch_id: int) List[int][source]

Enter a branch context.

Parameters:

branch_id – Branch index to enter

Returns:

New branch path after entering

exit_branch() List[int][source]

Exit current branch context.

Returns:

The exited branch path

Raises:

RuntimeError – If not in a branch context

finalize(preprocessing_chain: str | None = None, metadata: Dict[str, Any] | None = None) ExecutionTrace[source]

Finalize and return the completed trace.

Parameters:
  • preprocessing_chain – Summary string of preprocessing

  • metadata – Additional metadata to merge

Returns:

The completed ExecutionTrace

get_current_step_index() int | None[source]

Get the current step index.

Returns:

Current step index or None if no step active

has_model_step() bool[source]

Check if a model step has been recorded.

Returns:

True if model step index is set

in_branch() bool[source]

Check if currently in a branch context.

Returns:

True if in a branch

mark_step_skipped(step_index: int) None[source]

Record that a step was skipped.

Parameters:

step_index – Index of the skipped step

pop_chain() OperatorChain[source]

Pop and return the current chain.

Returns:

The popped OperatorChain

Raises:

RuntimeError – If trying to pop the root chain

push_chain(node: OperatorNode) OperatorChain[source]

Push new node onto the chain stack.

Creates a new chain with the node appended and pushes it.

Parameters:

node – OperatorNode to append

Returns:

The new extended chain

record_artifact(artifact_id: str, is_primary: bool = False, fold_id: int | None = None, chain_path: str | None = None, branch_path: List[int] | None = None, source_index: int | None = None, metadata: Dict[str, Any] | None = None) None[source]

Record an artifact created during the current step (V3).

Parameters:
  • artifact_id – The artifact ID

  • is_primary – Whether this is the primary artifact

  • fold_id – CV fold ID if fold-specific artifact

  • chain_path – V3 operator chain path

  • branch_path – Branch path for indexing

  • source_index – Source index for multi-source

  • metadata – Additional artifact metadata

record_input_shapes(input_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]

Record input shapes for the current step.

Parameters:
  • input_shape – 2D layout shape (samples, features)

  • features_shape – List of 3D shapes per source (samples, processings, features)

record_output_shapes(output_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]

Record output shapes for the current step.

Parameters:
  • output_shape – 2D layout shape (samples, features)

  • features_shape – List of 3D shapes per source (samples, processings, features)

reset_chain_to(chain: OperatorChain) None[source]

Reset chain stack to a specific chain.

Useful when entering a new branch context.

Parameters:

chain – Chain to reset to

start_branch_step(step_index: int, branch_count: int, operator_config: Dict[str, Any] | None = None) ExecutionStep[source]

Start recording a branch step.

Parameters:
  • step_index – Step index of the branch

  • branch_count – Number of branches

  • operator_config – Branch configuration

Returns:

The created ExecutionStep for the branch

start_branch_substep(parent_step_index: int, branch_id: int, operator_type: str, operator_class: str, substep_index: int = 0, operator_config: Dict[str, Any] | None = None, branch_name: str | None = None) ExecutionStep[source]

Start recording a substep within a branch.

Note: This method assumes enter_branch() has already been called for this branch, so current_branch_path() already includes the branch_id.

Parameters:
  • parent_step_index – Parent branch step index

  • branch_id – Branch index this substep belongs to (for metadata only)

  • operator_type – Type of operator

  • operator_class – Class name of operator

  • substep_index – Index within the branch’s substeps

  • operator_config – Operator configuration

  • branch_name – Human-readable branch name

Returns:

The created ExecutionStep

start_step(step_index: int, operator_type: str = '', operator_class: str = '', operator_config: Dict[str, Any] | None = None, execution_mode: StepExecutionMode = StepExecutionMode.TRAIN, branch_path: List[int] | None = None, branch_name: str = '', source_count: int = 1, produces_branches: bool = False, substep_index: int | None = None) ExecutionStep[source]

Start recording a new step (V3).

Parameters:
  • step_index – 1-based step index

  • operator_type – Type of operator (e.g., “transform”, “model”)

  • operator_class – Class name of operator

  • operator_config – Serialized operator configuration

  • execution_mode – Train/predict/skip mode

  • branch_path – Branch indices (uses current if None)

  • branch_name – Human-readable branch name

  • source_count – Number of X sources at this step

  • produces_branches – Whether this is a branch operator

  • substep_index – Index within substep

Returns:

The created ExecutionStep

property trace_id: str

Get the trace ID.

Returns:

Trace ID string