nirs4all.pipeline.trace.recorder module

Trace Recorder V3 - Records execution traces during pipeline execution.

This module provides the TraceRecorder class which is responsible for building ExecutionTrace objects during pipeline execution.

V3 improvements: - Chain stack for tracking operator chain through execution - Branch stack for automatic branch path management - Proper recording of branch substeps as individual steps - Support for multi-source artifact tracking

The recorder is designed to be controller-agnostic: it records step execution and artifact creation without knowing about specific controller types.

Usage:
  1. Create a TraceRecorder at the start of pipeline execution

  2. Call start_step() when a step begins

  3. Call record_artifact() when artifacts are created

  4. Call end_step() when a step completes

  5. Call finalize() to get the completed trace

class nirs4all.pipeline.trace.recorder.TraceRecorder(pipeline_uid: str = '', pipeline_id: str = '', metadata: Dict[str, Any] | None = None)[source]

Bases: object

Records execution traces during pipeline execution (V3).

Builds an ExecutionTrace by recording step starts, artifact creations, and step completions. Designed for use within the pipeline executor.

V3 improvements: - Maintains a chain stack for tracking full operator chain - Maintains a branch stack for automatic branch path management - Tracks source index for multi-source pipelines - Records branch substeps individually

trace

The ExecutionTrace being built

current_step

The step currently being executed

step_start_time

Time when current step started (for duration)

pipeline_id

Pipeline identifier for chain generation

Example

>>> recorder = TraceRecorder(pipeline_uid="0001_pls_abc123")
>>> recorder.start_step(step_index=1, operator_type="transform", operator_class="SNV")
>>> recorder.record_artifact(artifact_id="0001$abc123:all", chain_path="s1.SNV")
>>> recorder.end_step()
>>> recorder.enter_branch(0)
>>> recorder.start_step(step_index=3, operator_type="transform", operator_class="PLS")
>>> recorder.record_artifact(artifact_id="0001$def456:0", chain_path="s1.SNV>s3.PLS[br=0]")
>>> recorder.end_step(is_model=True)
>>> recorder.exit_branch()
>>> trace = recorder.finalize(preprocessing_chain="SNV>MinMax")
add_step_metadata(key: str, value: Any) None[source]

Add metadata to the current step.

Parameters:
  • key – Metadata key

  • value – Metadata value

build_chain_for_artifact(step_index: int, operator_class: str, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None) OperatorChain[source]

Build an operator chain for an artifact.

Creates a chain based on current context plus the specified operator.

Parameters:
  • step_index – Step index of the operator

  • operator_class – Class name of the operator

  • source_index – Source index for multi-source

  • fold_id – Fold ID for CV models

  • substep_index – Substep index within step

Returns:

OperatorChain for the artifact

current_branch_path() List[int][source]

Get current branch path.

Returns:

Copy of current branch path

current_chain() OperatorChain[source]

Get current operator chain without modifying stack.

Returns:

Current OperatorChain

end_step(is_model: bool = False, fold_weights: Dict[int, float] | None = None, skip_trace: bool = False) None[source]

End the current step and add it to the trace.

Parameters:
  • is_model – Whether this is the model step

  • fold_weights – Per-fold weights for CV models

  • skip_trace – If True, don’t add this step to the trace

enter_branch(branch_id: int) List[int][source]

Enter a branch context.

Parameters:

branch_id – Branch index to enter

Returns:

New branch path after entering

exit_branch() List[int][source]

Exit current branch context.

Returns:

The exited branch path

Raises:

RuntimeError – If not in a branch context

finalize(preprocessing_chain: str | None = None, metadata: Dict[str, Any] | None = None) ExecutionTrace[source]

Finalize and return the completed trace.

Parameters:
  • preprocessing_chain – Summary string of preprocessing

  • metadata – Additional metadata to merge

Returns:

The completed ExecutionTrace

get_current_step_index() int | None[source]

Get the current step index.

Returns:

Current step index or None if no step active

has_model_step() bool[source]

Check if a model step has been recorded.

Returns:

True if model step index is set

in_branch() bool[source]

Check if currently in a branch context.

Returns:

True if in a branch

mark_step_skipped(step_index: int) None[source]

Record that a step was skipped.

Parameters:

step_index – Index of the skipped step

pop_chain() OperatorChain[source]

Pop and return the current chain.

Returns:

The popped OperatorChain

Raises:

RuntimeError – If trying to pop the root chain

push_chain(node: OperatorNode) OperatorChain[source]

Push new node onto the chain stack.

Creates a new chain with the node appended and pushes it.

Parameters:

node – OperatorNode to append

Returns:

The new extended chain

record_artifact(artifact_id: str, is_primary: bool = False, fold_id: int | None = None, chain_path: str | None = None, branch_path: List[int] | None = None, source_index: int | None = None, metadata: Dict[str, Any] | None = None) None[source]

Record an artifact created during the current step (V3).

Parameters:
  • artifact_id – The artifact ID

  • is_primary – Whether this is the primary artifact

  • fold_id – CV fold ID if fold-specific artifact

  • chain_path – V3 operator chain path

  • branch_path – Branch path for indexing

  • source_index – Source index for multi-source

  • metadata – Additional artifact metadata

record_input_shapes(input_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]

Record input shapes for the current step.

Parameters:
  • input_shape – 2D layout shape (samples, features)

  • features_shape – List of 3D shapes per source (samples, processings, features)

record_output_shapes(output_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]

Record output shapes for the current step.

Parameters:
  • output_shape – 2D layout shape (samples, features)

  • features_shape – List of 3D shapes per source (samples, processings, features)

reset_chain_to(chain: OperatorChain) None[source]

Reset chain stack to a specific chain.

Useful when entering a new branch context.

Parameters:

chain – Chain to reset to

start_branch_step(step_index: int, branch_count: int, operator_config: Dict[str, Any] | None = None) ExecutionStep[source]

Start recording a branch step.

Parameters:
  • step_index – Step index of the branch

  • branch_count – Number of branches

  • operator_config – Branch configuration

Returns:

The created ExecutionStep for the branch

start_branch_substep(parent_step_index: int, branch_id: int, operator_type: str, operator_class: str, substep_index: int = 0, operator_config: Dict[str, Any] | None = None, branch_name: str | None = None) ExecutionStep[source]

Start recording a substep within a branch.

Note: This method assumes enter_branch() has already been called for this branch, so current_branch_path() already includes the branch_id.

Parameters:
  • parent_step_index – Parent branch step index

  • branch_id – Branch index this substep belongs to (for metadata only)

  • operator_type – Type of operator

  • operator_class – Class name of operator

  • substep_index – Index within the branch’s substeps

  • operator_config – Operator configuration

  • branch_name – Human-readable branch name

Returns:

The created ExecutionStep

start_step(step_index: int, operator_type: str = '', operator_class: str = '', operator_config: Dict[str, Any] | None = None, execution_mode: StepExecutionMode = StepExecutionMode.TRAIN, branch_path: List[int] | None = None, branch_name: str = '', source_count: int = 1, produces_branches: bool = False, substep_index: int | None = None) ExecutionStep[source]

Start recording a new step (V3).

Parameters:
  • step_index – 1-based step index

  • operator_type – Type of operator (e.g., “transform”, “model”)

  • operator_class – Class name of operator

  • operator_config – Serialized operator configuration

  • execution_mode – Train/predict/skip mode

  • branch_path – Branch indices (uses current if None)

  • branch_name – Human-readable branch name

  • source_count – Number of X sources at this step

  • produces_branches – Whether this is a branch operator

  • substep_index – Index within substep

Returns:

The created ExecutionStep

property trace_id: str

Get the trace ID.

Returns:

Trace ID string