nirs4all.pipeline.trace.extractor module

Trace-Based Extractor - Extract minimal pipeline from execution trace.

This module provides the TraceBasedExtractor class which extracts the minimal set of pipeline steps needed to replay a prediction from an ExecutionTrace.

The extractor is controller-agnostic: it uses the recorded trace information to identify which steps are needed, without encoding knowledge of specific controller types. This enables the same infrastructure to work with any controller (existing or custom).

Key Features:
  • Extracts minimal pipeline (only steps needed for prediction)

  • Builds artifact map for step-based artifact injection

  • Creates dependency graph for step execution order

  • Supports branch-aware extraction for multi-branch pipelines

Usage:
>>> from nirs4all.pipeline.trace import TraceBasedExtractor, ExecutionTrace
>>>
>>> # Load trace from manifest
>>> trace = manifest_manager.load_execution_trace(pipeline_uid, trace_id)
>>>
>>> # Extract minimal pipeline
>>> extractor = TraceBasedExtractor()
>>> minimal = extractor.extract(trace, full_pipeline_steps)
>>>
>>> # Use minimal pipeline for prediction
>>> for step_info in minimal.steps:
...     artifacts = minimal.get_artifacts_for_step(step_info.step_index)
...     # Execute step with artifacts
class nirs4all.pipeline.trace.extractor.MinimalPipeline(trace_id: str = '', pipeline_uid: str = '', steps: ~typing.List[~nirs4all.pipeline.trace.extractor.MinimalPipelineStep] = <factory>, artifact_map: ~typing.Dict[int, ~nirs4all.pipeline.trace.execution_trace.StepArtifacts] = <factory>, model_step_index: int | None = None, fold_weights: ~typing.Dict[int, float] | None = None, preprocessing_chain: str = '', metadata: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: object

Minimal pipeline extracted from an execution trace.

Contains only the steps needed to replay a prediction, with artifact mappings for each step. Used by MinimalPredictor for efficient prediction.

trace_id

ID of the source execution trace

Type:

str

pipeline_uid

UID of the parent pipeline

Type:

str

steps

Ordered list of minimal steps to execute

Type:

List[nirs4all.pipeline.trace.extractor.MinimalPipelineStep]

artifact_map

Mapping of step_index to list of (artifact_id, step_artifacts)

Type:

Dict[int, nirs4all.pipeline.trace.execution_trace.StepArtifacts]

model_step_index

Index of the model step

Type:

int | None

fold_weights

Per-fold weights for CV ensemble

Type:

Dict[int, float] | None

preprocessing_chain

Summary of preprocessing steps

Type:

str

metadata

Additional metadata from trace

Type:

Dict[str, Any]

artifact_map: Dict[int, StepArtifacts]
fold_weights: Dict[int, float] | None = None
get_all_chain_paths() Dict[str, str][source]

Get all artifacts indexed by chain path.

Returns:

Dict mapping chain_path to artifact_id

get_artifact_by_chain(chain_path: str) str | None[source]

Get artifact ID by V3 chain path across all steps.

Parameters:

chain_path – Operator chain path

Returns:

Artifact ID or None if not found

get_artifact_ids() List[str][source]

Get all artifact IDs in the minimal pipeline.

Returns:

List of all artifact IDs across all steps

get_artifacts_for_step(step_index: int) StepArtifacts | None[source]

Get artifacts for a specific step.

Parameters:

step_index – 1-based step index

Returns:

StepArtifacts or None if not found

get_step(step_index: int) MinimalPipelineStep | None[source]

Get a step by its index.

Parameters:

step_index – 1-based step index

Returns:

MinimalPipelineStep or None if not found

get_step_count() int[source]

Get the number of steps in the minimal pipeline.

Returns:

Number of steps

get_step_indices() List[int][source]

Get all step indices in execution order.

Returns:

List of step indices

has_step(step_index: int) bool[source]

Check if a step is included in the minimal pipeline.

Parameters:

step_index – 1-based step index

Returns:

True if step is included

metadata: Dict[str, Any]
model_step_index: int | None = None
pipeline_uid: str = ''
preprocessing_chain: str = ''
steps: List[MinimalPipelineStep]
trace_id: str = ''
class nirs4all.pipeline.trace.extractor.MinimalPipelineStep(step_index: int, step_config: ~typing.Any = None, execution_mode: ~nirs4all.pipeline.trace.execution_trace.StepExecutionMode = StepExecutionMode.PREDICT, artifacts: ~nirs4all.pipeline.trace.execution_trace.StepArtifacts = <factory>, operator_type: str = '', operator_class: str = '', branch_path: ~typing.List[int] = <factory>, branch_name: str = '', substep_index: int | None = None, depends_on: ~typing.Set[int] = <factory>)[source]

Bases: object

A step in the minimal pipeline for prediction replay.

Contains the step configuration and metadata needed to replay the step during prediction, without encoding controller-specific logic.

step_index

1-based step index from original pipeline

Type:

int

step_config

The pipeline step configuration (dict or object)

Type:

Any

execution_mode

How to execute this step (train/predict/skip)

Type:

nirs4all.pipeline.trace.execution_trace.StepExecutionMode

artifacts

Artifacts for this step (from trace)

Type:

nirs4all.pipeline.trace.execution_trace.StepArtifacts

operator_type

Type of operator (for logging/debugging)

Type:

str

operator_class

Class name of operator

Type:

str

branch_path

Branch path if in branch context

Type:

List[int]

branch_name

Human-readable branch name

Type:

str

depends_on

Indices of steps this step depends on

Type:

Set[int]

artifacts: StepArtifacts
branch_name: str = ''
branch_path: List[int]
depends_on: Set[int]
execution_mode: StepExecutionMode = 'predict'
get_artifact_by_chain(chain_path: str) str | None[source]

Get artifact ID by V3 chain path.

Parameters:

chain_path – Operator chain path

Returns:

Artifact ID or None if not found

get_artifact_ids() List[str][source]

Get all artifact IDs for this step.

Returns:

List of artifact IDs

get_artifacts_by_chain() Dict[str, str][source]

Get all artifacts indexed by chain path.

Returns:

Dict mapping chain_path to artifact_id

has_artifacts() bool[source]

Check if this step has associated artifacts.

Returns:

True if artifacts are available for this step

operator_class: str = ''
operator_type: str = ''
step_config: Any = None
step_index: int
substep_index: int | None = None
class nirs4all.pipeline.trace.extractor.TraceBasedExtractor(include_skipped: bool = False, preserve_order: bool = True)[source]

Bases: object

Extract minimal pipeline from execution trace.

The extractor analyzes an ExecutionTrace to determine which steps are needed for prediction replay and builds a MinimalPipeline with the correct artifact mappings.

The extractor is controller-agnostic: it uses trace metadata to identify steps without encoding knowledge of controller types.

include_skipped

Whether to include skipped steps in minimal pipeline

preserve_order

Whether to preserve original step order

Example

>>> extractor = TraceBasedExtractor()
>>> trace = manifest_manager.load_execution_trace(pipeline_uid, trace_id)
>>> minimal = extractor.extract(trace, full_pipeline_steps)
>>> print(f"Minimal pipeline has {minimal.get_step_count()} steps")
extract(trace: ExecutionTrace, full_pipeline: List[Any] | None = None, up_to_model: bool = True) MinimalPipeline[source]

Extract minimal pipeline from execution trace.

Analyzes the trace to determine which steps are needed for prediction and builds a MinimalPipeline with artifact mappings.

Parameters:
  • trace – ExecutionTrace to extract from

  • full_pipeline – Optional full pipeline steps (for step configs)

  • up_to_model – If True, only include steps up to model step

Returns:

MinimalPipeline with steps and artifact mappings

extract_for_branch(trace: ExecutionTrace, branch_path: List[int], full_pipeline: List[Any] | None = None) MinimalPipeline[source]

Extract minimal pipeline for a specific branch.

Includes shared steps (before branching) plus branch-specific steps.

Parameters:
  • trace – ExecutionTrace to extract from

  • branch_path – Branch path to extract (e.g., [0] for first branch)

  • full_pipeline – Optional full pipeline steps

Returns:

MinimalPipeline with steps for the specified branch

extract_for_branch_name(trace: ExecutionTrace, branch_name: str, full_pipeline: List[Any] | None = None) MinimalPipeline[source]

Extract minimal pipeline for a specific branch by name.

More reliable than extract_for_branch for nested branches where branch_id doesn’t map directly to branch_path. Uses branch_name for matching since it’s unique and stored in both predictions and trace.

Includes shared steps (before branching) plus branch-specific steps.

Parameters:
  • trace – ExecutionTrace to extract from

  • branch_name – Branch name to match (e.g., “branch_0_branch_0”)

  • full_pipeline – Optional full pipeline steps

Returns:

MinimalPipeline with steps for the specified branch

extract_for_step(trace: ExecutionTrace, target_step_index: int, full_pipeline: List[Any] | None = None) MinimalPipeline[source]

Extract minimal pipeline up to a specific step.

Useful for partial prediction or when targeting a specific model in a multi-model pipeline.

Parameters:
  • trace – ExecutionTrace to extract from

  • target_step_index – Target step index (inclusive)

  • full_pipeline – Optional full pipeline steps

Returns:

MinimalPipeline with steps up to target

get_required_artifact_ids(trace: ExecutionTrace, up_to_model: bool = True) List[str][source]

Get list of artifact IDs required for prediction.

Useful for pre-loading artifacts or validating artifact availability.

Parameters:
  • trace – ExecutionTrace to analyze

  • up_to_model – If True, only include artifacts up to model step

Returns:

List of artifact IDs needed for prediction

get_step_dependency_graph(trace: ExecutionTrace) Dict[int, Set[int]][source]

Build dependency graph from execution trace.

The dependency graph maps each step to the set of steps it depends on. This is inferred from the trace execution order and branch structure.

Parameters:

trace – ExecutionTrace to analyze

Returns:

Dictionary mapping step_index to set of dependency step indices

validate_trace_for_prediction(trace: ExecutionTrace) Tuple[bool, List[str]][source]

Validate that a trace has all information needed for prediction.

Checks that: - Model step is recorded - All steps up to model have recorded artifacts (if applicable) - No critical information is missing

Parameters:

trace – ExecutionTrace to validate

Returns:

Tuple of (is_valid, list of issues)