nirs4all.pipeline.trace.execution_trace module
Execution Trace V3 - Records the exact path through pipeline that produced a prediction.
This module provides the core data structures for recording execution traces, which enable deterministic prediction replay and pipeline extraction.
V3 improvements: - OperatorChain tracking for complete execution path - Per-branch and per-source artifact indexing - Support for nested branches and multi-source pipelines - Chain-based artifact lookup for deterministic replay
- Key Classes:
StepArtifacts: Artifacts produced by a single step with V3 indexes
ExecutionStep: Record of a single step’s execution with chain tracking
ExecutionTrace: Complete trace of a pipeline execution path
- Architecture:
During training, each step execution is recorded in the trace: 1. Step starts -> record step_index, operator info, input chain 2. Step completes -> record artifacts and output chains 3. Model produces prediction -> trace_id is attached to prediction
During prediction, the trace is used to: 1. Identify the minimal set of steps needed 2. Load the correct artifacts for each step via chain lookup 3. Execute only required steps via existing controllers
- class nirs4all.pipeline.trace.execution_trace.ExecutionStep(step_index: int, operator_type: str = '', operator_class: str = '', operator_config: Dict[str, ~typing.Any]=<factory>, execution_mode: StepExecutionMode = StepExecutionMode.TRAIN, artifacts: StepArtifacts = <factory>, branch_path: List[int] = <factory>, branch_name: str = '', duration_ms: float = 0.0, metadata: Dict[str, ~typing.Any]=<factory>, input_chain_path: str = '', output_chain_paths: List[str] = <factory>, source_count: int = 1, produces_branches: bool = False, substep_index: int | None = None, input_shape: Tuple[int, int] | None=None, output_shape: Tuple[int, int] | None=None, input_features_shape: Tuple[int, int, int]] | None=None, output_features_shape: Tuple[int, int, int]] | None=None)[source]
Bases:
objectRecord of a single step’s execution in the trace (V3).
Captures all information needed to replay this step during prediction, including operator configuration, execution mode, and produced artifacts.
V3 additions: - input_chain: Operator chain up to this step’s input - output_chains: Chains produced by this step (for branching) - source_count: Number of X sources at this step - produces_branches: Whether this is a branch operator
- execution_mode
How the step was executed (train/predict/skip)
- artifacts
Artifacts produced by this step
- # V3 chain tracking
- add_output_chain(chain_path: str) None[source]
Add an output chain path to this step.
- Parameters:
chain_path – Operator chain path to add
- artifacts: StepArtifacts
- execution_mode: StepExecutionMode = 'train'
- classmethod from_dict(data: Dict[str, Any]) ExecutionStep[source]
Create ExecutionStep from dictionary.
- Parameters:
data – Dictionary from manifest
- Returns:
ExecutionStep instance
- class nirs4all.pipeline.trace.execution_trace.ExecutionTrace(trace_id: str = <factory>, pipeline_uid: str = '', created_at: str = <factory>, steps: List[ExecutionStep] = <factory>, model_step_index: int | None = None, fold_weights: Dict[int, float] | None=None, preprocessing_chain: str = '', metadata: Dict[str, ~typing.Any]=<factory>)[source]
Bases:
objectComplete trace of a pipeline execution path.
Records the exact sequence of steps and artifacts that produced a prediction, enabling deterministic replay for prediction, transfer, and export.
The trace is controller-agnostic: it records what happened without encoding specific controller logic, so any controller (existing or custom) can be replayed using the same infrastructure.
- steps
Ordered list of execution steps
- fold_weights
Per-fold weights for CV ensemble (None for single model)
- add_step(step: ExecutionStep) None[source]
Add a step to the trace.
- Parameters:
step – ExecutionStep to add
- finalize(preprocessing_chain: str | None = None, metadata: Dict[str, Any] | None = None) None[source]
Finalize the trace with summary information.
Call this after all steps have been recorded to add summary info.
- Parameters:
preprocessing_chain – Summary string of preprocessing (e.g., “SNV>SG>MinMax”)
metadata – Additional metadata to merge
- classmethod from_dict(data: Dict[str, Any]) ExecutionTrace[source]
Create ExecutionTrace from dictionary.
- Parameters:
data – Dictionary from manifest
- Returns:
ExecutionTrace instance
- get_artifact_ids() List[str][source]
Get all artifact IDs in this trace.
- Returns:
List of all artifact IDs across all steps
- get_artifacts_by_step(step_index: int) StepArtifacts | None[source]
Get artifacts for a specific step.
- Parameters:
step_index – 1-based step index
- Returns:
StepArtifacts or None if step not found
- get_fold_artifact_ids() Dict[int, str][source]
Get per-fold model artifact IDs.
- Returns:
Dictionary of fold_id -> artifact_id
- get_model_artifact_id() str | None[source]
Get the primary model artifact ID.
- Returns:
Model artifact ID or None if no model step
- get_step(step_index: int) ExecutionStep | None[source]
Get a step by its index.
- Parameters:
step_index – 1-based step index to find
- Returns:
ExecutionStep or None if not found
- get_steps_before(step_index: int) List[ExecutionStep][source]
Get all steps before a given step index.
- Parameters:
step_index – 1-based step index (exclusive)
- Returns:
List of steps with step_index < given index
- get_steps_up_to_model() List[ExecutionStep][source]
Get all steps up to and including the model step.
- Returns:
List of steps needed to reproduce the prediction
- set_model_step(step_index: int, fold_weights: Dict[int, float] | None = None) None[source]
Set the model step index and optional fold weights.
- Parameters:
step_index – Index of the model step
fold_weights – Optional per-fold weights for CV
- steps: List[ExecutionStep]
- class nirs4all.pipeline.trace.execution_trace.StepArtifacts(artifact_ids: ~typing.List[str] = <factory>, primary_artifact_id: str | None = None, fold_artifact_ids: ~typing.Dict[int, str] = <factory>, primary_artifacts: ~typing.Dict[str, str] = <factory>, by_branch: ~typing.Dict[~typing.Tuple[int, ...], ~typing.List[str]] = <factory>, by_source: ~typing.Dict[int, ~typing.List[str]] = <factory>, by_chain: ~typing.Dict[str, str] = <factory>, metadata: ~typing.Dict[str, ~typing.Any] = <factory>)[source]
Bases:
objectArtifacts produced by a single step (V3).
Records all artifacts created during step execution, with V3 indexes for efficient lookup by chain path, branch, source, and fold.
- # V3 indexes
- add_artifact(artifact_id: str, is_primary: bool = False, chain_path: str | None = None, branch_path: List[int] | None = None, source_index: int | None = None) None[source]
Add an artifact ID to this step’s artifacts (V3).
- Parameters:
artifact_id – The artifact ID to add
is_primary – Whether this is the primary artifact
chain_path – V3 operator chain path
branch_path – Branch path for indexing
source_index – Source index for multi-source indexing
- add_fold_artifact(fold_id: int, artifact_id: str, chain_path: str | None = None, branch_path: List[int] | None = None) None[source]
Add a fold-specific artifact.
- Parameters:
fold_id – CV fold index
artifact_id – Artifact ID for this fold
chain_path – V3 operator chain path
branch_path – Branch path for indexing
- classmethod from_dict(data: Dict[str, Any]) StepArtifacts[source]
Create StepArtifacts from dictionary.
- Parameters:
data – Dictionary from manifest
- Returns:
StepArtifacts instance
- get_artifact_by_chain(chain_path: str) str | None[source]
Get artifact ID by exact chain path match.
- Parameters:
chain_path – Operator chain path
- Returns:
Artifact ID or None if not found
- get_artifacts_for_branch(branch_path: List[int]) List[str][source]
Get artifact IDs matching a branch path.
Includes artifacts from: - Exact branch match - Empty branch (shared/pre-branch) - Parent branches (for nested branches)
- Parameters:
branch_path – Target branch path
- Returns:
List of matching artifact IDs
- get_artifacts_for_source(source_index: int) List[str][source]
Get artifact IDs for a specific source.
- Parameters:
source_index – Source index to filter
- Returns:
List of artifact IDs for that source
- merge(other: StepArtifacts) None[source]
Merge another StepArtifacts into this one.
Used when multiple substeps share the same step_index and their artifacts need to be combined in the artifact_map.
- Parameters:
other – StepArtifacts to merge into this one