nirs4all.pipeline.trace.execution_trace module

Execution Trace V3 - Records the exact path through pipeline that produced a prediction.

This module provides the core data structures for recording execution traces, which enable deterministic prediction replay and pipeline extraction.

V3 improvements: - OperatorChain tracking for complete execution path - Per-branch and per-source artifact indexing - Support for nested branches and multi-source pipelines - Chain-based artifact lookup for deterministic replay

Key Classes:
  • StepArtifacts: Artifacts produced by a single step with V3 indexes

  • ExecutionStep: Record of a single step’s execution with chain tracking

  • ExecutionTrace: Complete trace of a pipeline execution path

Architecture:

During training, each step execution is recorded in the trace: 1. Step starts -> record step_index, operator info, input chain 2. Step completes -> record artifacts and output chains 3. Model produces prediction -> trace_id is attached to prediction

During prediction, the trace is used to: 1. Identify the minimal set of steps needed 2. Load the correct artifacts for each step via chain lookup 3. Execute only required steps via existing controllers

class nirs4all.pipeline.trace.execution_trace.ExecutionStep(step_index: int, operator_type: str = '', operator_class: str = '', operator_config: Dict[str, ~typing.Any]=<factory>, execution_mode: StepExecutionMode = StepExecutionMode.TRAIN, artifacts: StepArtifacts = <factory>, branch_path: List[int] = <factory>, branch_name: str = '', duration_ms: float = 0.0, metadata: Dict[str, ~typing.Any]=<factory>, input_chain_path: str = '', output_chain_paths: List[str] = <factory>, source_count: int = 1, produces_branches: bool = False, substep_index: int | None = None, input_shape: Tuple[int, int] | None=None, output_shape: Tuple[int, int] | None=None, input_features_shape: Tuple[int, int, int]] | None=None, output_features_shape: Tuple[int, int, int]] | None=None)[source]

Bases: object

Record of a single step’s execution in the trace (V3).

Captures all information needed to replay this step during prediction, including operator configuration, execution mode, and produced artifacts.

V3 additions: - input_chain: Operator chain up to this step’s input - output_chains: Chains produced by this step (for branching) - source_count: Number of X sources at this step - produces_branches: Whether this is a branch operator

step_index

1-based step number in the pipeline

Type:

int

operator_type

Type of operation (e.g., “transform”, “model”, “splitter”)

Type:

str

operator_class

Class name of the operator (e.g., “PLSRegression”, “SNV”)

Type:

str

operator_config

Serialized operator configuration

Type:

Dict[str, Any]

execution_mode

How the step was executed (train/predict/skip)

Type:

nirs4all.pipeline.trace.execution_trace.StepExecutionMode

artifacts

Artifacts produced by this step

Type:

nirs4all.pipeline.trace.execution_trace.StepArtifacts

branch_path

Branch indices if in a branch context

Type:

List[int]

branch_name

Human-readable branch name

Type:

str

duration_ms

Execution duration in milliseconds

Type:

float

metadata

Additional step-specific metadata

Type:

Dict[str, Any]

# V3 chain tracking
input_chain_path

Serialized operator chain up to this step’s input

Type:

str

output_chain_paths

List of chains produced by this step

Type:

List[str]

source_count

Number of X sources processed

Type:

int

produces_branches

True if this is a branch operator

Type:

bool

substep_index

Index within substep (for [model1, model2])

Type:

int | None

add_output_chain(chain_path: str) None[source]

Add an output chain path to this step.

Parameters:

chain_path – Operator chain path to add

artifacts: StepArtifacts
branch_name: str = ''
branch_path: List[int]
duration_ms: float = 0.0
execution_mode: StepExecutionMode = 'train'
classmethod from_dict(data: Dict[str, Any]) ExecutionStep[source]

Create ExecutionStep from dictionary.

Parameters:

data – Dictionary from manifest

Returns:

ExecutionStep instance

has_artifacts() bool[source]

Check if this step produced any artifacts.

Returns:

True if the step has at least one artifact

input_chain_path: str = ''
input_features_shape: List[Tuple[int, int, int]] | None = None
input_shape: Tuple[int, int] | None = None
metadata: Dict[str, Any]
operator_class: str = ''
operator_config: Dict[str, Any]
operator_type: str = ''
output_chain_paths: List[str]
output_features_shape: List[Tuple[int, int, int]] | None = None
output_shape: Tuple[int, int] | None = None
produces_branches: bool = False
source_count: int = 1
step_index: int
substep_index: int | None = None
to_dict() Dict[str, Any][source]

Convert to dictionary for YAML serialization.

Returns:

Dictionary suitable for manifest storage

class nirs4all.pipeline.trace.execution_trace.ExecutionTrace(trace_id: str = <factory>, pipeline_uid: str = '', created_at: str = <factory>, steps: List[ExecutionStep] = <factory>, model_step_index: int | None = None, fold_weights: Dict[int, float] | None=None, preprocessing_chain: str = '', metadata: Dict[str, ~typing.Any]=<factory>)[source]

Bases: object

Complete trace of a pipeline execution path.

Records the exact sequence of steps and artifacts that produced a prediction, enabling deterministic replay for prediction, transfer, and export.

The trace is controller-agnostic: it records what happened without encoding specific controller logic, so any controller (existing or custom) can be replayed using the same infrastructure.

trace_id

Unique identifier for this trace

Type:

str

pipeline_uid

Parent pipeline UID

Type:

str

created_at

ISO timestamp of trace creation

Type:

str

steps

Ordered list of execution steps

Type:

List[nirs4all.pipeline.trace.execution_trace.ExecutionStep]

model_step_index

Index of the model step that produced predictions

Type:

int | None

fold_weights

Per-fold weights for CV ensemble (None for single model)

Type:

Dict[int, float] | None

preprocessing_chain

Summary of preprocessing steps for quick reference

Type:

str

metadata

Additional trace metadata (e.g., dataset info, run parameters)

Type:

Dict[str, Any]

add_step(step: ExecutionStep) None[source]

Add a step to the trace.

Parameters:

step – ExecutionStep to add

created_at: str
finalize(preprocessing_chain: str | None = None, metadata: Dict[str, Any] | None = None) None[source]

Finalize the trace with summary information.

Call this after all steps have been recorded to add summary info.

Parameters:
  • preprocessing_chain – Summary string of preprocessing (e.g., “SNV>SG>MinMax”)

  • metadata – Additional metadata to merge

fold_weights: Dict[int, float] | None = None
classmethod from_dict(data: Dict[str, Any]) ExecutionTrace[source]

Create ExecutionTrace from dictionary.

Parameters:

data – Dictionary from manifest

Returns:

ExecutionTrace instance

get_artifact_ids() List[str][source]

Get all artifact IDs in this trace.

Returns:

List of all artifact IDs across all steps

get_artifacts_by_step(step_index: int) StepArtifacts | None[source]

Get artifacts for a specific step.

Parameters:

step_index – 1-based step index

Returns:

StepArtifacts or None if step not found

get_fold_artifact_ids() Dict[int, str][source]

Get per-fold model artifact IDs.

Returns:

Dictionary of fold_id -> artifact_id

get_model_artifact_id() str | None[source]

Get the primary model artifact ID.

Returns:

Model artifact ID or None if no model step

get_step(step_index: int) ExecutionStep | None[source]

Get a step by its index.

Parameters:

step_index – 1-based step index to find

Returns:

ExecutionStep or None if not found

get_steps_before(step_index: int) List[ExecutionStep][source]

Get all steps before a given step index.

Parameters:

step_index – 1-based step index (exclusive)

Returns:

List of steps with step_index < given index

get_steps_up_to_model() List[ExecutionStep][source]

Get all steps up to and including the model step.

Returns:

List of steps needed to reproduce the prediction

metadata: Dict[str, Any]
model_step_index: int | None = None
pipeline_uid: str = ''
preprocessing_chain: str = ''
set_model_step(step_index: int, fold_weights: Dict[int, float] | None = None) None[source]

Set the model step index and optional fold weights.

Parameters:
  • step_index – Index of the model step

  • fold_weights – Optional per-fold weights for CV

steps: List[ExecutionStep]
to_dict() Dict[str, Any][source]

Convert to dictionary for YAML serialization.

Returns:

Dictionary suitable for manifest storage

trace_id: str
class nirs4all.pipeline.trace.execution_trace.StepArtifacts(artifact_ids: ~typing.List[str] = <factory>, primary_artifact_id: str | None = None, fold_artifact_ids: ~typing.Dict[int, str] = <factory>, primary_artifacts: ~typing.Dict[str, str] = <factory>, by_branch: ~typing.Dict[~typing.Tuple[int, ...], ~typing.List[str]] = <factory>, by_source: ~typing.Dict[int, ~typing.List[str]] = <factory>, by_chain: ~typing.Dict[str, str] = <factory>, metadata: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: object

Artifacts produced by a single step (V3).

Records all artifacts created during step execution, with V3 indexes for efficient lookup by chain path, branch, source, and fold.

artifact_ids

List of artifact IDs produced by this step

Type:

List[str]

primary_artifact_id

Main artifact (e.g., model) if applicable

Type:

str | None

fold_artifact_ids

Per-fold artifacts for CV models

Type:

Dict[int, str]

# V3 indexes
primary_artifacts

Map of chain_path to artifact_id for shared artifacts

Type:

Dict[str, str]

by_branch

Artifacts indexed by branch path tuple

Type:

Dict[Tuple[int, …], List[str]]

by_source

Artifacts indexed by source index

Type:

Dict[int, List[str]]

by_chain

Artifacts indexed by chain path

Type:

Dict[str, str]

metadata

Additional artifact metadata (types, paths, etc.)

Type:

Dict[str, Any]

add_artifact(artifact_id: str, is_primary: bool = False, chain_path: str | None = None, branch_path: List[int] | None = None, source_index: int | None = None) None[source]

Add an artifact ID to this step’s artifacts (V3).

Parameters:
  • artifact_id – The artifact ID to add

  • is_primary – Whether this is the primary artifact

  • chain_path – V3 operator chain path

  • branch_path – Branch path for indexing

  • source_index – Source index for multi-source indexing

add_fold_artifact(fold_id: int, artifact_id: str, chain_path: str | None = None, branch_path: List[int] | None = None) None[source]

Add a fold-specific artifact.

Parameters:
  • fold_id – CV fold index

  • artifact_id – Artifact ID for this fold

  • chain_path – V3 operator chain path

  • branch_path – Branch path for indexing

artifact_ids: List[str]
by_branch: Dict[Tuple[int, ...], List[str]]
by_chain: Dict[str, str]
by_source: Dict[int, List[str]]
fold_artifact_ids: Dict[int, str]
classmethod from_dict(data: Dict[str, Any]) StepArtifacts[source]

Create StepArtifacts from dictionary.

Parameters:

data – Dictionary from manifest

Returns:

StepArtifacts instance

get_artifact_by_chain(chain_path: str) str | None[source]

Get artifact ID by exact chain path match.

Parameters:

chain_path – Operator chain path

Returns:

Artifact ID or None if not found

get_artifacts_for_branch(branch_path: List[int]) List[str][source]

Get artifact IDs matching a branch path.

Includes artifacts from: - Exact branch match - Empty branch (shared/pre-branch) - Parent branches (for nested branches)

Parameters:

branch_path – Target branch path

Returns:

List of matching artifact IDs

get_artifacts_for_source(source_index: int) List[str][source]

Get artifact IDs for a specific source.

Parameters:

source_index – Source index to filter

Returns:

List of artifact IDs for that source

merge(other: StepArtifacts) None[source]

Merge another StepArtifacts into this one.

Used when multiple substeps share the same step_index and their artifacts need to be combined in the artifact_map.

Parameters:

other – StepArtifacts to merge into this one

metadata: Dict[str, Any]
primary_artifact_id: str | None = None
primary_artifacts: Dict[str, str]
to_dict() Dict[str, Any][source]

Convert to dictionary for YAML serialization.

Returns:

Dictionary suitable for manifest storage

class nirs4all.pipeline.trace.execution_trace.StepExecutionMode(value)[source]

Bases: str, Enum

Mode of step execution.

TRAIN

Step fitted on data (creates new artifacts)

PREDICT

Step uses pre-fitted artifacts

SKIP

Step was skipped (no-op)

PREDICT = 'predict'
SKIP = 'skip'
TRAIN = 'train'