nirs4all.pipeline.trace package
Submodules
- nirs4all.pipeline.trace.execution_trace module
ExecutionStepExecutionStep.step_indexExecutionStep.operator_typeExecutionStep.operator_classExecutionStep.operator_configExecutionStep.execution_modeExecutionStep.artifactsExecutionStep.branch_pathExecutionStep.branch_nameExecutionStep.duration_msExecutionStep.metadataExecutionStep.input_chain_pathExecutionStep.output_chain_pathsExecutionStep.source_countExecutionStep.produces_branchesExecutionStep.substep_indexExecutionStep.add_output_chain()ExecutionStep.artifactsExecutionStep.branch_nameExecutionStep.branch_pathExecutionStep.duration_msExecutionStep.execution_modeExecutionStep.from_dict()ExecutionStep.has_artifacts()ExecutionStep.input_chain_pathExecutionStep.input_features_shapeExecutionStep.input_shapeExecutionStep.metadataExecutionStep.operator_classExecutionStep.operator_configExecutionStep.operator_typeExecutionStep.output_chain_pathsExecutionStep.output_features_shapeExecutionStep.output_shapeExecutionStep.produces_branchesExecutionStep.source_countExecutionStep.step_indexExecutionStep.substep_indexExecutionStep.to_dict()
ExecutionTraceExecutionTrace.trace_idExecutionTrace.pipeline_uidExecutionTrace.created_atExecutionTrace.stepsExecutionTrace.model_step_indexExecutionTrace.fold_weightsExecutionTrace.preprocessing_chainExecutionTrace.metadataExecutionTrace.add_step()ExecutionTrace.created_atExecutionTrace.finalize()ExecutionTrace.fold_weightsExecutionTrace.from_dict()ExecutionTrace.get_artifact_ids()ExecutionTrace.get_artifacts_by_step()ExecutionTrace.get_fold_artifact_ids()ExecutionTrace.get_model_artifact_id()ExecutionTrace.get_step()ExecutionTrace.get_steps_before()ExecutionTrace.get_steps_up_to_model()ExecutionTrace.metadataExecutionTrace.model_step_indexExecutionTrace.pipeline_uidExecutionTrace.preprocessing_chainExecutionTrace.set_model_step()ExecutionTrace.stepsExecutionTrace.to_dict()ExecutionTrace.trace_id
StepArtifactsStepArtifacts.artifact_idsStepArtifacts.primary_artifact_idStepArtifacts.fold_artifact_idsStepArtifacts.primary_artifactsStepArtifacts.by_branchStepArtifacts.by_sourceStepArtifacts.by_chainStepArtifacts.metadataStepArtifacts.add_artifact()StepArtifacts.add_fold_artifact()StepArtifacts.artifact_idsStepArtifacts.by_branchStepArtifacts.by_chainStepArtifacts.by_sourceStepArtifacts.fold_artifact_idsStepArtifacts.from_dict()StepArtifacts.get_artifact_by_chain()StepArtifacts.get_artifacts_for_branch()StepArtifacts.get_artifacts_for_source()StepArtifacts.merge()StepArtifacts.metadataStepArtifacts.primary_artifact_idStepArtifacts.primary_artifactsStepArtifacts.to_dict()
StepExecutionMode
- nirs4all.pipeline.trace.extractor module
MinimalPipelineMinimalPipeline.trace_idMinimalPipeline.pipeline_uidMinimalPipeline.stepsMinimalPipeline.artifact_mapMinimalPipeline.model_step_indexMinimalPipeline.fold_weightsMinimalPipeline.preprocessing_chainMinimalPipeline.metadataMinimalPipeline.artifact_mapMinimalPipeline.fold_weightsMinimalPipeline.get_all_chain_paths()MinimalPipeline.get_artifact_by_chain()MinimalPipeline.get_artifact_ids()MinimalPipeline.get_artifacts_for_step()MinimalPipeline.get_step()MinimalPipeline.get_step_count()MinimalPipeline.get_step_indices()MinimalPipeline.has_step()MinimalPipeline.metadataMinimalPipeline.model_step_indexMinimalPipeline.pipeline_uidMinimalPipeline.preprocessing_chainMinimalPipeline.stepsMinimalPipeline.trace_id
MinimalPipelineStepMinimalPipelineStep.step_indexMinimalPipelineStep.step_configMinimalPipelineStep.execution_modeMinimalPipelineStep.artifactsMinimalPipelineStep.operator_typeMinimalPipelineStep.operator_classMinimalPipelineStep.branch_pathMinimalPipelineStep.branch_nameMinimalPipelineStep.depends_onMinimalPipelineStep.artifactsMinimalPipelineStep.branch_nameMinimalPipelineStep.branch_pathMinimalPipelineStep.depends_onMinimalPipelineStep.execution_modeMinimalPipelineStep.get_artifact_by_chain()MinimalPipelineStep.get_artifact_ids()MinimalPipelineStep.get_artifacts_by_chain()MinimalPipelineStep.has_artifacts()MinimalPipelineStep.operator_classMinimalPipelineStep.operator_typeMinimalPipelineStep.step_configMinimalPipelineStep.step_indexMinimalPipelineStep.substep_index
TraceBasedExtractorTraceBasedExtractor.include_skippedTraceBasedExtractor.preserve_orderTraceBasedExtractor.extract()TraceBasedExtractor.extract_for_branch()TraceBasedExtractor.extract_for_branch_name()TraceBasedExtractor.extract_for_step()TraceBasedExtractor.get_required_artifact_ids()TraceBasedExtractor.get_step_dependency_graph()TraceBasedExtractor.validate_trace_for_prediction()
- nirs4all.pipeline.trace.recorder module
TraceRecorderTraceRecorder.traceTraceRecorder.current_stepTraceRecorder.step_start_timeTraceRecorder.pipeline_idTraceRecorder.add_step_metadata()TraceRecorder.build_chain_for_artifact()TraceRecorder.current_branch_path()TraceRecorder.current_chain()TraceRecorder.end_step()TraceRecorder.enter_branch()TraceRecorder.exit_branch()TraceRecorder.finalize()TraceRecorder.get_current_step_index()TraceRecorder.has_model_step()TraceRecorder.in_branch()TraceRecorder.mark_step_skipped()TraceRecorder.pop_chain()TraceRecorder.push_chain()TraceRecorder.record_artifact()TraceRecorder.record_input_shapes()TraceRecorder.record_output_shapes()TraceRecorder.reset_chain_to()TraceRecorder.start_branch_step()TraceRecorder.start_branch_substep()TraceRecorder.start_step()TraceRecorder.trace_id
Module contents
Execution Trace module for nirs4all pipeline (V3).
This module provides data structures and utilities for recording the exact execution path through a pipeline, enabling deterministic prediction replay.
V3 improvements: - OperatorChain tracking for complete execution path - Branch and source indexes for artifact lookup - Chain-based artifact identification - Proper recording of branch substeps
- Key Components:
ExecutionTrace: Complete trace of a pipeline execution path
ExecutionStep: Record of a single step’s execution with chain tracking
StepArtifacts: Artifacts produced by a single step with V3 indexes
TraceRecorder: Records traces with chain/branch stacks
TraceBasedExtractor: Extracts minimal pipeline from trace
MinimalPipeline: Minimal pipeline ready for prediction replay
- Design Principles:
Controller-Agnostic: Works with any controller type
Deterministic: Same chain -> same artifacts
Complete: Full execution path tracking
Composable: Same infrastructure for predict, retrain, transfer, export
- Usage:
>>> from nirs4all.pipeline.trace import TraceRecorder, ExecutionTrace >>> >>> # During training with V3 chain tracking >>> recorder = TraceRecorder(pipeline_uid="0001_pls_abc123") >>> recorder.start_step(step_index=1, operator_type="transform", operator_class="SNV") >>> chain = recorder.build_chain_for_artifact(1, "SNV") >>> recorder.record_artifact("0001$abc123:all", chain_path=chain.to_path()) >>> recorder.end_step() >>> trace = recorder.finalize()
- class nirs4all.pipeline.trace.ExecutionStep(step_index: int, operator_type: str = '', operator_class: str = '', operator_config: Dict[str, ~typing.Any]=<factory>, execution_mode: StepExecutionMode = StepExecutionMode.TRAIN, artifacts: StepArtifacts = <factory>, branch_path: List[int] = <factory>, branch_name: str = '', duration_ms: float = 0.0, metadata: Dict[str, ~typing.Any]=<factory>, input_chain_path: str = '', output_chain_paths: List[str] = <factory>, source_count: int = 1, produces_branches: bool = False, substep_index: int | None = None, input_shape: Tuple[int, int] | None=None, output_shape: Tuple[int, int] | None=None, input_features_shape: Tuple[int, int, int]] | None=None, output_features_shape: Tuple[int, int, int]] | None=None)[source]
Bases:
objectRecord of a single step’s execution in the trace (V3).
Captures all information needed to replay this step during prediction, including operator configuration, execution mode, and produced artifacts.
V3 additions: - input_chain: Operator chain up to this step’s input - output_chains: Chains produced by this step (for branching) - source_count: Number of X sources at this step - produces_branches: Whether this is a branch operator
- execution_mode
How the step was executed (train/predict/skip)
- artifacts
Artifacts produced by this step
- # V3 chain tracking
- add_output_chain(chain_path: str) None[source]
Add an output chain path to this step.
- Parameters:
chain_path – Operator chain path to add
- artifacts: StepArtifacts
- execution_mode: StepExecutionMode = 'train'
- classmethod from_dict(data: Dict[str, Any]) ExecutionStep[source]
Create ExecutionStep from dictionary.
- Parameters:
data – Dictionary from manifest
- Returns:
ExecutionStep instance
- class nirs4all.pipeline.trace.ExecutionTrace(trace_id: str = <factory>, pipeline_uid: str = '', created_at: str = <factory>, steps: List[ExecutionStep] = <factory>, model_step_index: int | None = None, fold_weights: Dict[int, float] | None=None, preprocessing_chain: str = '', metadata: Dict[str, ~typing.Any]=<factory>)[source]
Bases:
objectComplete trace of a pipeline execution path.
Records the exact sequence of steps and artifacts that produced a prediction, enabling deterministic replay for prediction, transfer, and export.
The trace is controller-agnostic: it records what happened without encoding specific controller logic, so any controller (existing or custom) can be replayed using the same infrastructure.
- steps
Ordered list of execution steps
- fold_weights
Per-fold weights for CV ensemble (None for single model)
- add_step(step: ExecutionStep) None[source]
Add a step to the trace.
- Parameters:
step – ExecutionStep to add
- finalize(preprocessing_chain: str | None = None, metadata: Dict[str, Any] | None = None) None[source]
Finalize the trace with summary information.
Call this after all steps have been recorded to add summary info.
- Parameters:
preprocessing_chain – Summary string of preprocessing (e.g., “SNV>SG>MinMax”)
metadata – Additional metadata to merge
- classmethod from_dict(data: Dict[str, Any]) ExecutionTrace[source]
Create ExecutionTrace from dictionary.
- Parameters:
data – Dictionary from manifest
- Returns:
ExecutionTrace instance
- get_artifact_ids() List[str][source]
Get all artifact IDs in this trace.
- Returns:
List of all artifact IDs across all steps
- get_artifacts_by_step(step_index: int) StepArtifacts | None[source]
Get artifacts for a specific step.
- Parameters:
step_index – 1-based step index
- Returns:
StepArtifacts or None if step not found
- get_fold_artifact_ids() Dict[int, str][source]
Get per-fold model artifact IDs.
- Returns:
Dictionary of fold_id -> artifact_id
- get_model_artifact_id() str | None[source]
Get the primary model artifact ID.
- Returns:
Model artifact ID or None if no model step
- get_step(step_index: int) ExecutionStep | None[source]
Get a step by its index.
- Parameters:
step_index – 1-based step index to find
- Returns:
ExecutionStep or None if not found
- get_steps_before(step_index: int) List[ExecutionStep][source]
Get all steps before a given step index.
- Parameters:
step_index – 1-based step index (exclusive)
- Returns:
List of steps with step_index < given index
- get_steps_up_to_model() List[ExecutionStep][source]
Get all steps up to and including the model step.
- Returns:
List of steps needed to reproduce the prediction
- set_model_step(step_index: int, fold_weights: Dict[int, float] | None = None) None[source]
Set the model step index and optional fold weights.
- Parameters:
step_index – Index of the model step
fold_weights – Optional per-fold weights for CV
- steps: List[ExecutionStep]
- class nirs4all.pipeline.trace.MinimalPipeline(trace_id: str = '', pipeline_uid: str = '', steps: List[MinimalPipelineStep] = <factory>, artifact_map: Dict[int, ~nirs4all.pipeline.trace.execution_trace.StepArtifacts]=<factory>, model_step_index: int | None = None, fold_weights: Dict[int, float] | None=None, preprocessing_chain: str = '', metadata: Dict[str, ~typing.Any]=<factory>)[source]
Bases:
objectMinimal pipeline extracted from an execution trace.
Contains only the steps needed to replay a prediction, with artifact mappings for each step. Used by MinimalPredictor for efficient prediction.
- steps
Ordered list of minimal steps to execute
- artifact_map
Mapping of step_index to list of (artifact_id, step_artifacts)
- Type:
Dict[int, nirs4all.pipeline.trace.execution_trace.StepArtifacts]
- artifact_map: Dict[int, StepArtifacts]
- get_all_chain_paths() Dict[str, str][source]
Get all artifacts indexed by chain path.
- Returns:
Dict mapping chain_path to artifact_id
- get_artifact_by_chain(chain_path: str) str | None[source]
Get artifact ID by V3 chain path across all steps.
- Parameters:
chain_path – Operator chain path
- Returns:
Artifact ID or None if not found
- get_artifact_ids() List[str][source]
Get all artifact IDs in the minimal pipeline.
- Returns:
List of all artifact IDs across all steps
- get_artifacts_for_step(step_index: int) StepArtifacts | None[source]
Get artifacts for a specific step.
- Parameters:
step_index – 1-based step index
- Returns:
StepArtifacts or None if not found
- get_step(step_index: int) MinimalPipelineStep | None[source]
Get a step by its index.
- Parameters:
step_index – 1-based step index
- Returns:
MinimalPipelineStep or None if not found
- get_step_count() int[source]
Get the number of steps in the minimal pipeline.
- Returns:
Number of steps
- get_step_indices() List[int][source]
Get all step indices in execution order.
- Returns:
List of step indices
- has_step(step_index: int) bool[source]
Check if a step is included in the minimal pipeline.
- Parameters:
step_index – 1-based step index
- Returns:
True if step is included
- steps: List[MinimalPipelineStep]
- class nirs4all.pipeline.trace.MinimalPipelineStep(step_index: int, step_config: Any = None, execution_mode: StepExecutionMode = StepExecutionMode.PREDICT, artifacts: StepArtifacts = <factory>, operator_type: str = '', operator_class: str = '', branch_path: List[int] = <factory>, branch_name: str = '', substep_index: int | None = None, depends_on: Set[int] = <factory>)[source]
Bases:
objectA step in the minimal pipeline for prediction replay.
Contains the step configuration and metadata needed to replay the step during prediction, without encoding controller-specific logic.
- step_config
The pipeline step configuration (dict or object)
- Type:
Any
- execution_mode
How to execute this step (train/predict/skip)
- artifacts
Artifacts for this step (from trace)
- artifacts: StepArtifacts
- execution_mode: StepExecutionMode = 'predict'
- get_artifact_by_chain(chain_path: str) str | None[source]
Get artifact ID by V3 chain path.
- Parameters:
chain_path – Operator chain path
- Returns:
Artifact ID or None if not found
- get_artifact_ids() List[str][source]
Get all artifact IDs for this step.
- Returns:
List of artifact IDs
- get_artifacts_by_chain() Dict[str, str][source]
Get all artifacts indexed by chain path.
- Returns:
Dict mapping chain_path to artifact_id
- class nirs4all.pipeline.trace.StepArtifacts(artifact_ids: ~typing.List[str] = <factory>, primary_artifact_id: str | None = None, fold_artifact_ids: ~typing.Dict[int, str] = <factory>, primary_artifacts: ~typing.Dict[str, str] = <factory>, by_branch: ~typing.Dict[~typing.Tuple[int, ...], ~typing.List[str]] = <factory>, by_source: ~typing.Dict[int, ~typing.List[str]] = <factory>, by_chain: ~typing.Dict[str, str] = <factory>, metadata: ~typing.Dict[str, ~typing.Any] = <factory>)[source]
Bases:
objectArtifacts produced by a single step (V3).
Records all artifacts created during step execution, with V3 indexes for efficient lookup by chain path, branch, source, and fold.
- # V3 indexes
- add_artifact(artifact_id: str, is_primary: bool = False, chain_path: str | None = None, branch_path: List[int] | None = None, source_index: int | None = None) None[source]
Add an artifact ID to this step’s artifacts (V3).
- Parameters:
artifact_id – The artifact ID to add
is_primary – Whether this is the primary artifact
chain_path – V3 operator chain path
branch_path – Branch path for indexing
source_index – Source index for multi-source indexing
- add_fold_artifact(fold_id: int, artifact_id: str, chain_path: str | None = None, branch_path: List[int] | None = None) None[source]
Add a fold-specific artifact.
- Parameters:
fold_id – CV fold index
artifact_id – Artifact ID for this fold
chain_path – V3 operator chain path
branch_path – Branch path for indexing
- classmethod from_dict(data: Dict[str, Any]) StepArtifacts[source]
Create StepArtifacts from dictionary.
- Parameters:
data – Dictionary from manifest
- Returns:
StepArtifacts instance
- get_artifact_by_chain(chain_path: str) str | None[source]
Get artifact ID by exact chain path match.
- Parameters:
chain_path – Operator chain path
- Returns:
Artifact ID or None if not found
- get_artifacts_for_branch(branch_path: List[int]) List[str][source]
Get artifact IDs matching a branch path.
Includes artifacts from: - Exact branch match - Empty branch (shared/pre-branch) - Parent branches (for nested branches)
- Parameters:
branch_path – Target branch path
- Returns:
List of matching artifact IDs
- get_artifacts_for_source(source_index: int) List[str][source]
Get artifact IDs for a specific source.
- Parameters:
source_index – Source index to filter
- Returns:
List of artifact IDs for that source
- merge(other: StepArtifacts) None[source]
Merge another StepArtifacts into this one.
Used when multiple substeps share the same step_index and their artifacts need to be combined in the artifact_map.
- Parameters:
other – StepArtifacts to merge into this one
- class nirs4all.pipeline.trace.StepExecutionMode(value)[source]
-
Mode of step execution.
- TRAIN
Step fitted on data (creates new artifacts)
- PREDICT
Step uses pre-fitted artifacts
- SKIP
Step was skipped (no-op)
- PREDICT = 'predict'
- SKIP = 'skip'
- TRAIN = 'train'
- class nirs4all.pipeline.trace.TraceBasedExtractor(include_skipped: bool = False, preserve_order: bool = True)[source]
Bases:
objectExtract minimal pipeline from execution trace.
The extractor analyzes an ExecutionTrace to determine which steps are needed for prediction replay and builds a MinimalPipeline with the correct artifact mappings.
The extractor is controller-agnostic: it uses trace metadata to identify steps without encoding knowledge of controller types.
- include_skipped
Whether to include skipped steps in minimal pipeline
- preserve_order
Whether to preserve original step order
Example
>>> extractor = TraceBasedExtractor() >>> trace = manifest_manager.load_execution_trace(pipeline_uid, trace_id) >>> minimal = extractor.extract(trace, full_pipeline_steps) >>> print(f"Minimal pipeline has {minimal.get_step_count()} steps")
- extract(trace: ExecutionTrace, full_pipeline: List[Any] | None = None, up_to_model: bool = True) MinimalPipeline[source]
Extract minimal pipeline from execution trace.
Analyzes the trace to determine which steps are needed for prediction and builds a MinimalPipeline with artifact mappings.
- Parameters:
trace – ExecutionTrace to extract from
full_pipeline – Optional full pipeline steps (for step configs)
up_to_model – If True, only include steps up to model step
- Returns:
MinimalPipeline with steps and artifact mappings
- extract_for_branch(trace: ExecutionTrace, branch_path: List[int], full_pipeline: List[Any] | None = None) MinimalPipeline[source]
Extract minimal pipeline for a specific branch.
Includes shared steps (before branching) plus branch-specific steps.
- Parameters:
trace – ExecutionTrace to extract from
branch_path – Branch path to extract (e.g., [0] for first branch)
full_pipeline – Optional full pipeline steps
- Returns:
MinimalPipeline with steps for the specified branch
- extract_for_branch_name(trace: ExecutionTrace, branch_name: str, full_pipeline: List[Any] | None = None) MinimalPipeline[source]
Extract minimal pipeline for a specific branch by name.
More reliable than extract_for_branch for nested branches where branch_id doesn’t map directly to branch_path. Uses branch_name for matching since it’s unique and stored in both predictions and trace.
Includes shared steps (before branching) plus branch-specific steps.
- Parameters:
trace – ExecutionTrace to extract from
branch_name – Branch name to match (e.g., “branch_0_branch_0”)
full_pipeline – Optional full pipeline steps
- Returns:
MinimalPipeline with steps for the specified branch
- extract_for_step(trace: ExecutionTrace, target_step_index: int, full_pipeline: List[Any] | None = None) MinimalPipeline[source]
Extract minimal pipeline up to a specific step.
Useful for partial prediction or when targeting a specific model in a multi-model pipeline.
- Parameters:
trace – ExecutionTrace to extract from
target_step_index – Target step index (inclusive)
full_pipeline – Optional full pipeline steps
- Returns:
MinimalPipeline with steps up to target
- get_required_artifact_ids(trace: ExecutionTrace, up_to_model: bool = True) List[str][source]
Get list of artifact IDs required for prediction.
Useful for pre-loading artifacts or validating artifact availability.
- Parameters:
trace – ExecutionTrace to analyze
up_to_model – If True, only include artifacts up to model step
- Returns:
List of artifact IDs needed for prediction
- get_step_dependency_graph(trace: ExecutionTrace) Dict[int, Set[int]][source]
Build dependency graph from execution trace.
The dependency graph maps each step to the set of steps it depends on. This is inferred from the trace execution order and branch structure.
- Parameters:
trace – ExecutionTrace to analyze
- Returns:
Dictionary mapping step_index to set of dependency step indices
- validate_trace_for_prediction(trace: ExecutionTrace) Tuple[bool, List[str]][source]
Validate that a trace has all information needed for prediction.
Checks that: - Model step is recorded - All steps up to model have recorded artifacts (if applicable) - No critical information is missing
- Parameters:
trace – ExecutionTrace to validate
- Returns:
Tuple of (is_valid, list of issues)
- class nirs4all.pipeline.trace.TraceRecorder(pipeline_uid: str = '', pipeline_id: str = '', metadata: Dict[str, Any] | None = None)[source]
Bases:
objectRecords execution traces during pipeline execution (V3).
Builds an ExecutionTrace by recording step starts, artifact creations, and step completions. Designed for use within the pipeline executor.
V3 improvements: - Maintains a chain stack for tracking full operator chain - Maintains a branch stack for automatic branch path management - Tracks source index for multi-source pipelines - Records branch substeps individually
- trace
The ExecutionTrace being built
- current_step
The step currently being executed
- step_start_time
Time when current step started (for duration)
- pipeline_id
Pipeline identifier for chain generation
Example
>>> recorder = TraceRecorder(pipeline_uid="0001_pls_abc123") >>> recorder.start_step(step_index=1, operator_type="transform", operator_class="SNV") >>> recorder.record_artifact(artifact_id="0001$abc123:all", chain_path="s1.SNV") >>> recorder.end_step() >>> recorder.enter_branch(0) >>> recorder.start_step(step_index=3, operator_type="transform", operator_class="PLS") >>> recorder.record_artifact(artifact_id="0001$def456:0", chain_path="s1.SNV>s3.PLS[br=0]") >>> recorder.end_step(is_model=True) >>> recorder.exit_branch() >>> trace = recorder.finalize(preprocessing_chain="SNV>MinMax")
- add_step_metadata(key: str, value: Any) None[source]
Add metadata to the current step.
- Parameters:
key – Metadata key
value – Metadata value
- build_chain_for_artifact(step_index: int, operator_class: str, source_index: int | None = None, fold_id: int | None = None, substep_index: int | None = None) OperatorChain[source]
Build an operator chain for an artifact.
Creates a chain based on current context plus the specified operator.
- Parameters:
step_index – Step index of the operator
operator_class – Class name of the operator
source_index – Source index for multi-source
fold_id – Fold ID for CV models
substep_index – Substep index within step
- Returns:
OperatorChain for the artifact
- current_branch_path() List[int][source]
Get current branch path.
- Returns:
Copy of current branch path
- current_chain() OperatorChain[source]
Get current operator chain without modifying stack.
- Returns:
Current OperatorChain
- end_step(is_model: bool = False, fold_weights: Dict[int, float] | None = None, skip_trace: bool = False) None[source]
End the current step and add it to the trace.
- Parameters:
is_model – Whether this is the model step
fold_weights – Per-fold weights for CV models
skip_trace – If True, don’t add this step to the trace
- enter_branch(branch_id: int) List[int][source]
Enter a branch context.
- Parameters:
branch_id – Branch index to enter
- Returns:
New branch path after entering
- exit_branch() List[int][source]
Exit current branch context.
- Returns:
The exited branch path
- Raises:
RuntimeError – If not in a branch context
- finalize(preprocessing_chain: str | None = None, metadata: Dict[str, Any] | None = None) ExecutionTrace[source]
Finalize and return the completed trace.
- Parameters:
preprocessing_chain – Summary string of preprocessing
metadata – Additional metadata to merge
- Returns:
The completed ExecutionTrace
- get_current_step_index() int | None[source]
Get the current step index.
- Returns:
Current step index or None if no step active
- has_model_step() bool[source]
Check if a model step has been recorded.
- Returns:
True if model step index is set
- mark_step_skipped(step_index: int) None[source]
Record that a step was skipped.
- Parameters:
step_index – Index of the skipped step
- pop_chain() OperatorChain[source]
Pop and return the current chain.
- Returns:
The popped OperatorChain
- Raises:
RuntimeError – If trying to pop the root chain
- push_chain(node: OperatorNode) OperatorChain[source]
Push new node onto the chain stack.
Creates a new chain with the node appended and pushes it.
- Parameters:
node – OperatorNode to append
- Returns:
The new extended chain
- record_artifact(artifact_id: str, is_primary: bool = False, fold_id: int | None = None, chain_path: str | None = None, branch_path: List[int] | None = None, source_index: int | None = None, metadata: Dict[str, Any] | None = None) None[source]
Record an artifact created during the current step (V3).
- Parameters:
artifact_id – The artifact ID
is_primary – Whether this is the primary artifact
fold_id – CV fold ID if fold-specific artifact
chain_path – V3 operator chain path
branch_path – Branch path for indexing
source_index – Source index for multi-source
metadata – Additional artifact metadata
- record_input_shapes(input_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]
Record input shapes for the current step.
- Parameters:
input_shape – 2D layout shape (samples, features)
features_shape – List of 3D shapes per source (samples, processings, features)
- record_output_shapes(output_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]
Record output shapes for the current step.
- Parameters:
output_shape – 2D layout shape (samples, features)
features_shape – List of 3D shapes per source (samples, processings, features)
- reset_chain_to(chain: OperatorChain) None[source]
Reset chain stack to a specific chain.
Useful when entering a new branch context.
- Parameters:
chain – Chain to reset to
- start_branch_step(step_index: int, branch_count: int, operator_config: Dict[str, Any] | None = None) ExecutionStep[source]
Start recording a branch step.
- Parameters:
step_index – Step index of the branch
branch_count – Number of branches
operator_config – Branch configuration
- Returns:
The created ExecutionStep for the branch
- start_branch_substep(parent_step_index: int, branch_id: int, operator_type: str, operator_class: str, substep_index: int = 0, operator_config: Dict[str, Any] | None = None, branch_name: str | None = None) ExecutionStep[source]
Start recording a substep within a branch.
Note: This method assumes enter_branch() has already been called for this branch, so current_branch_path() already includes the branch_id.
- Parameters:
parent_step_index – Parent branch step index
branch_id – Branch index this substep belongs to (for metadata only)
operator_type – Type of operator
operator_class – Class name of operator
substep_index – Index within the branch’s substeps
operator_config – Operator configuration
branch_name – Human-readable branch name
- Returns:
The created ExecutionStep
- start_step(step_index: int, operator_type: str = '', operator_class: str = '', operator_config: Dict[str, Any] | None = None, execution_mode: StepExecutionMode = StepExecutionMode.TRAIN, branch_path: List[int] | None = None, branch_name: str = '', source_count: int = 1, produces_branches: bool = False, substep_index: int | None = None) ExecutionStep[source]
Start recording a new step (V3).
- Parameters:
step_index – 1-based step index
operator_type – Type of operator (e.g., “transform”, “model”)
operator_class – Class name of operator
operator_config – Serialized operator configuration
execution_mode – Train/predict/skip mode
branch_path – Branch indices (uses current if None)
branch_name – Human-readable branch name
source_count – Number of X sources at this step
produces_branches – Whether this is a branch operator
substep_index – Index within substep
- Returns:
The created ExecutionStep