nirs4all.pipeline.trace.recorder module
Trace Recorder V3 - Records execution traces during pipeline execution.
This module provides the TraceRecorder class which is responsible for building ExecutionTrace objects during pipeline execution.
V3 improvements: - Chain stack for tracking operator chain through execution - Branch stack for automatic branch path management - Proper recording of branch substeps as individual steps - Support for multi-source artifact tracking
The recorder is designed to be controller-agnostic: it records step execution and artifact creation without knowing about specific controller types.
- Usage:
Create a TraceRecorder at the start of pipeline execution
Call start_step() when a step begins
Call record_artifact() when artifacts are created
Call end_step() when a step completes
Call finalize() to get the completed trace
- class nirs4all.pipeline.trace.recorder.TraceRecorder(pipeline_uid: str = '', pipeline_id: str = '', metadata: Dict[str, Any] | None = None)[source]
Bases:
objectRecords execution traces during pipeline execution (V3).
Builds an ExecutionTrace by recording step starts, artifact creations, and step completions. Designed for use within the pipeline executor.
V3 improvements: - Maintains a chain stack for tracking full operator chain - Maintains a branch stack for automatic branch path management - Tracks source index for multi-source pipelines - Records branch substeps individually
- trace
The ExecutionTrace being built
- current_step
The step currently being executed
- step_start_time
Time when current step started (for duration)
- pipeline_id
Pipeline identifier for chain generation
Example
>>> recorder = TraceRecorder(pipeline_uid="0001_pls_abc123") >>> recorder.start_step(step_index=1, operator_type="transform", operator_class="SNV") >>> recorder.record_artifact(artifact_id="0001$abc123:all", chain_path="s1.SNV") >>> recorder.end_step() >>> recorder.enter_branch(0) >>> recorder.start_step(step_index=3, operator_type="transform", operator_class="PLS") >>> recorder.record_artifact(artifact_id="0001$def456:0", chain_path="s1.SNV>s3.PLS[br=0]") >>> recorder.end_step(is_model=True) >>> recorder.exit_branch() >>> trace = recorder.finalize(preprocessing_chain="SNV>MinMax")
- add_step_metadata(key: str, value: Any) None[source]
Add metadata to the current step.
- Parameters:
key – Metadata key
value – Metadata value
- build_chain_for_artifact(step_index: int, operator_class: str, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None) OperatorChain[source]
Build an operator chain for an artifact.
Creates a chain based on current context plus the specified operator.
- Parameters:
step_index – Step index of the operator
operator_class – Class name of the operator
source_index – Source index for multi-source
fold_id – Fold ID for CV models
substep_index – Substep index within step
- Returns:
OperatorChain for the artifact
- current_branch_path() List[int][source]
Get current branch path.
- Returns:
Copy of current branch path
- current_chain() OperatorChain[source]
Get current operator chain without modifying stack.
- Returns:
Current OperatorChain
- end_step(is_model: bool = False, fold_weights: Dict[int, float] | None = None, skip_trace: bool = False) None[source]
End the current step and add it to the trace.
- Parameters:
is_model – Whether this is the model step
fold_weights – Per-fold weights for CV models
skip_trace – If True, don’t add this step to the trace
- enter_branch(branch_id: int) List[int][source]
Enter a branch context.
- Parameters:
branch_id – Branch index to enter
- Returns:
New branch path after entering
- exit_branch() List[int][source]
Exit current branch context.
- Returns:
The exited branch path
- Raises:
RuntimeError – If not in a branch context
- finalize(preprocessing_chain: str | None = None, metadata: Dict[str, Any] | None = None) ExecutionTrace[source]
Finalize and return the completed trace.
- Parameters:
preprocessing_chain – Summary string of preprocessing
metadata – Additional metadata to merge
- Returns:
The completed ExecutionTrace
- get_current_step_index() int | None[source]
Get the current step index.
- Returns:
Current step index or None if no step active
- has_model_step() bool[source]
Check if a model step has been recorded.
- Returns:
True if model step index is set
- mark_step_skipped(step_index: int) None[source]
Record that a step was skipped.
- Parameters:
step_index – Index of the skipped step
- pop_chain() OperatorChain[source]
Pop and return the current chain.
- Returns:
The popped OperatorChain
- Raises:
RuntimeError – If trying to pop the root chain
- push_chain(node: OperatorNode) OperatorChain[source]
Push new node onto the chain stack.
Creates a new chain with the node appended and pushes it.
- Parameters:
node – OperatorNode to append
- Returns:
The new extended chain
- record_artifact(artifact_id: str, is_primary: bool = False, fold_id: int | None = None, chain_path: str | None = None, branch_path: List[int] | None = None, source_index: int | None = None, metadata: Dict[str, Any] | None = None) None[source]
Record an artifact created during the current step (V3).
- Parameters:
artifact_id – The artifact ID
is_primary – Whether this is the primary artifact
fold_id – CV fold ID if fold-specific artifact
chain_path – V3 operator chain path
branch_path – Branch path for indexing
source_index – Source index for multi-source
metadata – Additional artifact metadata
- record_input_shapes(input_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]
Record input shapes for the current step.
- Parameters:
input_shape – 2D layout shape (samples, features)
features_shape – List of 3D shapes per source (samples, processings, features)
- record_output_shapes(output_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]
Record output shapes for the current step.
- Parameters:
output_shape – 2D layout shape (samples, features)
features_shape – List of 3D shapes per source (samples, processings, features)
- reset_chain_to(chain: OperatorChain) None[source]
Reset chain stack to a specific chain.
Useful when entering a new branch context.
- Parameters:
chain – Chain to reset to
- start_branch_step(step_index: int, branch_count: int, operator_config: Dict[str, Any] | None = None) ExecutionStep[source]
Start recording a branch step.
- Parameters:
step_index – Step index of the branch
branch_count – Number of branches
operator_config – Branch configuration
- Returns:
The created ExecutionStep for the branch
- start_branch_substep(parent_step_index: int, branch_id: int, operator_type: str, operator_class: str, substep_index: int = 0, operator_config: Dict[str, Any] | None = None, branch_name: str | None = None) ExecutionStep[source]
Start recording a substep within a branch.
Note: This method assumes enter_branch() has already been called for this branch, so current_branch_path() already includes the branch_id.
- Parameters:
parent_step_index – Parent branch step index
branch_id – Branch index this substep belongs to (for metadata only)
operator_type – Type of operator
operator_class – Class name of operator
substep_index – Index within the branch’s substeps
operator_config – Operator configuration
branch_name – Human-readable branch name
- Returns:
The created ExecutionStep
- start_step(step_index: int, operator_type: str = '', operator_class: str = '', operator_config: Dict[str, Any] | None = None, execution_mode: StepExecutionMode = StepExecutionMode.TRAIN, branch_path: List[int] | None = None, branch_name: str = '', source_count: int = 1, produces_branches: bool = False, substep_index: int | None = None) ExecutionStep[source]
Start recording a new step (V3).
- Parameters:
step_index – 1-based step index
operator_type – Type of operator (e.g., “transform”, “model”)
operator_class – Class name of operator
operator_config – Serialized operator configuration
execution_mode – Train/predict/skip mode
branch_path – Branch indices (uses current if None)
branch_name – Human-readable branch name
source_count – Number of X sources at this step
produces_branches – Whether this is a branch operator
substep_index – Index within substep
- Returns:
The created ExecutionStep