nirs4all.pipeline.trace.extractor module
Trace-Based Extractor - Extract minimal pipeline from execution trace.
This module provides the TraceBasedExtractor class which extracts the minimal set of pipeline steps needed to replay a prediction from an ExecutionTrace.
The extractor is controller-agnostic: it uses the recorded trace information to identify which steps are needed, without encoding knowledge of specific controller types. This enables the same infrastructure to work with any controller (existing or custom).
- Key Features:
Extracts minimal pipeline (only steps needed for prediction)
Builds artifact map for step-based artifact injection
Creates dependency graph for step execution order
Supports branch-aware extraction for multi-branch pipelines
- Usage:
>>> from nirs4all.pipeline.trace import TraceBasedExtractor, ExecutionTrace >>> >>> # Load trace from manifest >>> trace = manifest_manager.load_execution_trace(pipeline_uid, trace_id) >>> >>> # Extract minimal pipeline >>> extractor = TraceBasedExtractor() >>> minimal = extractor.extract(trace, full_pipeline_steps) >>> >>> # Use minimal pipeline for prediction >>> for step_info in minimal.steps: ... artifacts = minimal.get_artifacts_for_step(step_info.step_index) ... # Execute step with artifacts
- class nirs4all.pipeline.trace.extractor.MinimalPipeline(trace_id: str = '', pipeline_uid: str = '', steps: List[MinimalPipelineStep] = <factory>, artifact_map: Dict[int, ~nirs4all.pipeline.trace.execution_trace.StepArtifacts]=<factory>, model_step_index: int | None = None, fold_weights: Dict[int, float] | None=None, preprocessing_chain: str = '', metadata: Dict[str, ~typing.Any]=<factory>)[source]
Bases:
objectMinimal pipeline extracted from an execution trace.
Contains only the steps needed to replay a prediction, with artifact mappings for each step. Used by MinimalPredictor for efficient prediction.
- steps
Ordered list of minimal steps to execute
- artifact_map
Mapping of step_index to list of (artifact_id, step_artifacts)
- Type:
Dict[int, nirs4all.pipeline.trace.execution_trace.StepArtifacts]
- artifact_map: Dict[int, StepArtifacts]
- get_all_chain_paths() Dict[str, str][source]
Get all artifacts indexed by chain path.
- Returns:
Dict mapping chain_path to artifact_id
- get_artifact_by_chain(chain_path: str) str | None[source]
Get artifact ID by V3 chain path across all steps.
- Parameters:
chain_path – Operator chain path
- Returns:
Artifact ID or None if not found
- get_artifact_ids() List[str][source]
Get all artifact IDs in the minimal pipeline.
- Returns:
List of all artifact IDs across all steps
- get_artifacts_for_step(step_index: int) StepArtifacts | None[source]
Get artifacts for a specific step.
- Parameters:
step_index – 1-based step index
- Returns:
StepArtifacts or None if not found
- get_step(step_index: int) MinimalPipelineStep | None[source]
Get a step by its index.
- Parameters:
step_index – 1-based step index
- Returns:
MinimalPipelineStep or None if not found
- get_step_count() int[source]
Get the number of steps in the minimal pipeline.
- Returns:
Number of steps
- get_step_indices() List[int][source]
Get all step indices in execution order.
- Returns:
List of step indices
- has_step(step_index: int) bool[source]
Check if a step is included in the minimal pipeline.
- Parameters:
step_index – 1-based step index
- Returns:
True if step is included
- steps: List[MinimalPipelineStep]
- class nirs4all.pipeline.trace.extractor.MinimalPipelineStep(step_index: int, step_config: Any = None, execution_mode: StepExecutionMode = StepExecutionMode.PREDICT, artifacts: StepArtifacts = <factory>, operator_type: str = '', operator_class: str = '', branch_path: List[int] = <factory>, branch_name: str = '', substep_index: int | None = None, depends_on: Set[int] = <factory>)[source]
Bases:
objectA step in the minimal pipeline for prediction replay.
Contains the step configuration and metadata needed to replay the step during prediction, without encoding controller-specific logic.
- step_config
The pipeline step configuration (dict or object)
- Type:
Any
- execution_mode
How to execute this step (train/predict/skip)
- artifacts
Artifacts for this step (from trace)
- artifacts: StepArtifacts
- execution_mode: StepExecutionMode = 'predict'
- get_artifact_by_chain(chain_path: str) str | None[source]
Get artifact ID by V3 chain path.
- Parameters:
chain_path – Operator chain path
- Returns:
Artifact ID or None if not found
- get_artifact_ids() List[str][source]
Get all artifact IDs for this step.
- Returns:
List of artifact IDs
- get_artifacts_by_chain() Dict[str, str][source]
Get all artifacts indexed by chain path.
- Returns:
Dict mapping chain_path to artifact_id
- class nirs4all.pipeline.trace.extractor.TraceBasedExtractor(include_skipped: bool = False, preserve_order: bool = True)[source]
Bases:
objectExtract minimal pipeline from execution trace.
The extractor analyzes an ExecutionTrace to determine which steps are needed for prediction replay and builds a MinimalPipeline with the correct artifact mappings.
The extractor is controller-agnostic: it uses trace metadata to identify steps without encoding knowledge of controller types.
- include_skipped
Whether to include skipped steps in minimal pipeline
- preserve_order
Whether to preserve original step order
Example
>>> extractor = TraceBasedExtractor() >>> trace = manifest_manager.load_execution_trace(pipeline_uid, trace_id) >>> minimal = extractor.extract(trace, full_pipeline_steps) >>> print(f"Minimal pipeline has {minimal.get_step_count()} steps")
- extract(trace: ExecutionTrace, full_pipeline: List[Any] | None = None, up_to_model: bool = True) MinimalPipeline[source]
Extract minimal pipeline from execution trace.
Analyzes the trace to determine which steps are needed for prediction and builds a MinimalPipeline with artifact mappings.
- Parameters:
trace – ExecutionTrace to extract from
full_pipeline – Optional full pipeline steps (for step configs)
up_to_model – If True, only include steps up to model step
- Returns:
MinimalPipeline with steps and artifact mappings
- extract_for_branch(trace: ExecutionTrace, branch_path: List[int], full_pipeline: List[Any] | None = None) MinimalPipeline[source]
Extract minimal pipeline for a specific branch.
Includes shared steps (before branching) plus branch-specific steps.
- Parameters:
trace – ExecutionTrace to extract from
branch_path – Branch path to extract (e.g., [0] for first branch)
full_pipeline – Optional full pipeline steps
- Returns:
MinimalPipeline with steps for the specified branch
- extract_for_branch_name(trace: ExecutionTrace, branch_name: str, full_pipeline: List[Any] | None = None) MinimalPipeline[source]
Extract minimal pipeline for a specific branch by name.
More reliable than extract_for_branch for nested branches where branch_id doesn’t map directly to branch_path. Uses branch_name for matching since it’s unique and stored in both predictions and trace.
Includes shared steps (before branching) plus branch-specific steps.
- Parameters:
trace – ExecutionTrace to extract from
branch_name – Branch name to match (e.g., “branch_0_branch_0”)
full_pipeline – Optional full pipeline steps
- Returns:
MinimalPipeline with steps for the specified branch
- extract_for_step(trace: ExecutionTrace, target_step_index: int, full_pipeline: List[Any] | None = None) MinimalPipeline[source]
Extract minimal pipeline up to a specific step.
Useful for partial prediction or when targeting a specific model in a multi-model pipeline.
- Parameters:
trace – ExecutionTrace to extract from
target_step_index – Target step index (inclusive)
full_pipeline – Optional full pipeline steps
- Returns:
MinimalPipeline with steps up to target
- get_required_artifact_ids(trace: ExecutionTrace, up_to_model: bool = True) List[str][source]
Get list of artifact IDs required for prediction.
Useful for pre-loading artifacts or validating artifact availability.
- Parameters:
trace – ExecutionTrace to analyze
up_to_model – If True, only include artifacts up to model step
- Returns:
List of artifact IDs needed for prediction
- get_step_dependency_graph(trace: ExecutionTrace) Dict[int, Set[int]][source]
Build dependency graph from execution trace.
The dependency graph maps each step to the set of steps it depends on. This is inferred from the trace execution order and branch structure.
- Parameters:
trace – ExecutionTrace to analyze
- Returns:
Dictionary mapping step_index to set of dependency step indices
- validate_trace_for_prediction(trace: ExecutionTrace) Tuple[bool, List[str]][source]
Validate that a trace has all information needed for prediction.
Checks that: - Model step is recorded - All steps up to model have recorded artifacts (if applicable) - No critical information is missing
- Parameters:
trace – ExecutionTrace to validate
- Returns:
Tuple of (is_valid, list of issues)