nirs4all.pipeline.config.context module
Context classes for pipeline execution.
This module provides typed context components that replace the Dict[str, Any] context pattern used throughout the pipeline system. It separates three distinct concerns:
DataSelector: Immutable data selection parameters for dataset.x() and dataset.y()
PipelineState: Mutable pipeline state that evolves through transformations
StepMetadata: Metadata for controller coordination and step tracking
ExecutionContext: Composite context with custom data extensibility
ArtifactProvider: Interface for providing artifacts during prediction replay
The separation enables: - Type safety throughout the codebase - Clear interfaces between components - Better testability - Explicit controller communication - Future extensibility via custom dict
Example
>>> selector = DataSelector(partition="train", processing=[["raw"]])
>>> state = PipelineState(y_processing="numeric")
>>> metadata = StepMetadata(keyword="transform")
>>> context = ExecutionContext(selector=selector, state=state, metadata=metadata)
>>> new_context = context.with_partition("test")
- class nirs4all.pipeline.config.context.ArtifactProvider[source]
Bases:
ABCAbstract interface for providing artifacts during prediction replay.
The ArtifactProvider enables controller-agnostic artifact injection: controllers request artifacts by step index rather than by name matching, which is deterministic and works with any controller type.
This interface is used during prediction mode to provide pre-loaded artifacts (transformers, models, etc.) to controllers without requiring them to know about the artifact storage system.
- Implementations:
MapArtifactProvider: In-memory dictionary-based provider
LoaderArtifactProvider: Wraps ArtifactLoader for lazy loading
Example
>>> provider = MapArtifactProvider(artifact_map) >>> artifacts = provider.get_artifacts_for_step(step_index=2) >>> for artifact_id, obj in artifacts: ... process(obj)
- abstractmethod get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]
Get a single artifact for a step.
- Parameters:
step_index – 1-based step index
fold_id – Optional fold ID for fold-specific artifacts
- Returns:
Artifact object or None if not found
- get_artifact_by_chain(chain_path: str) Any | None[source]
Get artifact by V3 chain path (optional V3 method).
- Parameters:
chain_path – Full operator chain path (e.g., “s1.MinMaxScaler>s3.PLS”)
- Returns:
Artifact object or None if not found
- get_artifacts_for_chain_prefix(chain_prefix: str) List[Tuple[str, Any]][source]
Get all artifacts matching a chain path prefix (optional V3 method).
- Parameters:
chain_prefix – Chain path prefix to match
- Returns:
List of (chain_path, artifact_object) tuples
- abstractmethod get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None, branch_id: int | None = None, source_index: int | None = None, substep_index: int | None = None) List[Tuple[str, Any]][source]
Get all artifacts for a step.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter
branch_id – Optional branch ID filter
source_index – Optional source/dataset index filter for multi-source
substep_index – Optional substep index filter for branch substeps
- Returns:
List of (artifact_id, artifact_object) tuples
- abstractmethod get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]
Get all fold-specific artifacts for a step.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter
- Returns:
List of (fold_id, artifact_object) tuples, sorted by fold_id
- get_primary_artifact(step_index: int) Any | None[source]
Get the primary artifact for a step.
The primary artifact is typically the main model or transformer for the step. Default implementation returns the first artifact.
- Parameters:
step_index – 1-based step index
- Returns:
Primary artifact object or None if not found
- class nirs4all.pipeline.config.context.DataSelector(partition: str = 'all', processing: List[List[str]] = <factory>, layout: str = '2d', concat_source: bool = True, fold_id: int | None = None, include_augmented: bool = False, y: str | None = None, branch_id: int | None = None, branch_path: List[int] = <factory>, branch_name: str | None = None, _extra: Dict[str, ~typing.Any]=<factory>)[source]
Bases:
MutableMappingMutable data selection parameters for dataset operations.
This class replaces the dict-based Selector pattern used by dataset.x() and dataset.y(). It implements the MutableMapping protocol, so it can be used as a dictionary. It supports arbitrary keys via an internal dict to allow flexibility.
- branch_id
Optional branch identifier for pipeline branching (0-indexed) DEPRECATED: Use branch_path instead for nested branch support.
- Type:
int | None
- branch_path
List of branch indices for nested branching (e.g., [0, 2] for branch 2 inside branch 0). Empty list means pre-branch/shared artifacts.
- Type:
List[int]
Example
>>> selector = DataSelector(partition="train", processing=[["raw"]]) >>> selector["y"] = "scaled" # Direct modification >>> selector["custom_key"] = "value" # Arbitrary keys supported >>> print(selector["partition"])
- copy() DataSelector[source]
Create a deep copy of the selector.
- with_augmented(include_augmented: bool) DataSelector[source]
Create new selector with updated include_augmented flag.
- Parameters:
include_augmented – Whether to include augmented samples
- Returns:
New DataSelector with updated include_augmented
- with_branch(branch_id: int | None = None, branch_name: str | None = None, branch_path: List[int] | None = None) DataSelector[source]
Create new selector with updated branch information.
- Parameters:
branch_id – Branch identifier (0-indexed). DEPRECATED: Use branch_path.
branch_name – Human-readable branch name
branch_path – List of branch indices for nested branching
- Returns:
New DataSelector with updated branch info
- with_fold(fold_id: int | None) DataSelector[source]
Create new selector with updated fold_id.
- Parameters:
fold_id – New fold identifier
- Returns:
New DataSelector with updated fold_id
- with_layout(layout: str) DataSelector[source]
Create new selector with updated layout.
- Parameters:
layout – New layout value
- Returns:
New DataSelector with updated layout
- with_partition(partition: str) DataSelector[source]
Create new selector with updated partition.
- Parameters:
partition – New partition value
- Returns:
New DataSelector with updated partition
- class nirs4all.pipeline.config.context.ExecutionContext(selector: DataSelector | None = None, state: PipelineState | None = None, metadata: StepMetadata | None = None, custom: Dict[str, Any] | None = None, aggregate_column: str | None = None)[source]
Bases:
objectComposite execution context with extensibility.
This class combines the three context components and provides: - Immutable data selection via DataSelector - Mutable state tracking via PipelineState - Controller coordination via StepMetadata - Custom data storage for controller-specific needs
The context supports deep copying for controller isolation while sharing processing chains between selector and operations.
- selector
Immutable data selector
- state
Mutable pipeline state
- metadata
Mutable step metadata
- custom
Dict for controller-specific custom data
- aggregate_column
Sample aggregation column for prediction aggregation. - None: No aggregation (default) - ‘y’: Aggregate by y_true values - str: Aggregate by specified metadata column
Example
>>> context = ExecutionContext( ... selector=DataSelector(partition="train"), ... state=PipelineState(y_processing="numeric"), ... metadata=StepMetadata(keyword="transform") ... ) >>> context.custom["my_controller"] = {"threshold": 0.5} >>> train_ctx = context.with_partition("train")
- copy() ExecutionContext[source]
Create a deep copy of this context.
This preserves the copy semantics expected by controllers.
- Returns:
Deep copy of ExecutionContext
- get_selector() DataSelector[source]
Get the data selector.
- Returns:
DataSelector instance
- with_branch(branch_id: int | None = None, branch_name: str | None = None) ExecutionContext[source]
Create new context with updated branch information.
- Parameters:
branch_id – Branch identifier (0-indexed)
branch_name – Human-readable branch name
- Returns:
New ExecutionContext with updated branch info
- with_layout(layout: str) ExecutionContext[source]
Create new context with updated layout.
- Parameters:
layout – New layout value
- Returns:
New ExecutionContext with updated layout
- with_metadata(**kwargs) ExecutionContext[source]
Create new context with updated metadata fields.
- Parameters:
**kwargs – Metadata fields to update
- Returns:
New ExecutionContext with updated metadata
- with_partition(partition: str) ExecutionContext[source]
Create new context with updated partition.
- Parameters:
partition – New partition value
- Returns:
New ExecutionContext with updated partition
- with_processing(processing: List[List[str]]) ExecutionContext[source]
Create new context with updated processing chains.
- Parameters:
processing – New processing chains
- Returns:
New ExecutionContext with updated processing
- with_step_number(step_number: int) ExecutionContext[source]
Create new context with updated step number.
- Parameters:
step_number – New step number
- Returns:
New ExecutionContext with updated step number
- with_y(y_processing: str) ExecutionContext[source]
Create new context with updated y processing.
- Parameters:
y_processing – New y processing value
- Returns:
New ExecutionContext with updated y processing
- class nirs4all.pipeline.config.context.LoaderArtifactProvider(loader: Any, trace: Any | None = None)[source]
Bases:
ArtifactProviderArtifact provider backed by an ArtifactLoader.
Wraps an ArtifactLoader to provide artifacts on-demand with lazy loading and caching. Used when loading from a manifest for prediction.
- loader
The underlying ArtifactLoader
- trace
Optional ExecutionTrace for step-to-artifact mapping
- get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]
Get a single artifact for a step.
If trace is available, uses trace to find artifact IDs. Otherwise, uses loader’s step-based lookup.
- Parameters:
step_index – 1-based step index
fold_id – Optional fold ID for fold-specific artifacts
- Returns:
Artifact object or None if not found
- get_artifact_by_chain(chain_path: str) Any | None[source]
Get artifact by V3 chain path.
- Parameters:
chain_path – Full operator chain path (e.g., “s1.MinMaxScaler>s3.PLS”)
- Returns:
Artifact object or None if not found
- get_artifacts_for_chain_prefix(chain_prefix: str) List[Tuple[str, Any]][source]
Get all artifacts matching a chain path prefix.
- Parameters:
chain_prefix – Chain path prefix to match
- Returns:
List of (chain_path, artifact_object) tuples
- get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None, branch_id: int | None = None, source_index: int | None = None, substep_index: int | None = None) List[Tuple[str, Any]][source]
Get all artifacts for a step.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter
branch_id – Optional branch ID filter
source_index – Optional source index filter for multi-source pipelines
substep_index – Optional substep index filter (not used in loader provider)
- Returns:
List of (artifact_id, artifact_object) tuples
- get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]
Get all fold-specific artifacts for a step.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter
- Returns:
List of (fold_id, artifact_object) tuples, sorted by fold_id
- class nirs4all.pipeline.config.context.MapArtifactProvider(artifact_map: Dict[int, List[Tuple[str, Any]]], fold_weights: Dict[int, float] | None = None, primary_artifacts: Dict[int, str] | None = None)[source]
Bases:
ArtifactProviderIn-memory artifact provider backed by a dictionary.
Provides artifacts from a pre-loaded dictionary mapping step indices to artifacts. Used when artifacts are resolved from an ExecutionTrace or when loading from a bundle.
- artifact_map
Dictionary mapping step_index to list of (artifact_id, object) tuples
- fold_weights
Optional fold weights for CV ensemble averaging
Example
>>> artifact_map = { ... 1: [("0001:1:all", snv_transformer)], ... 2: [("0001:2:0", model_fold0), ("0001:2:1", model_fold1)] ... } >>> provider = MapArtifactProvider(artifact_map) >>> transformer = provider.get_artifact(step_index=1)
- get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]
Get a single artifact for a step.
If fold_id is specified, returns the fold-specific artifact. Otherwise, returns the primary or first artifact.
- Parameters:
step_index – 1-based step index
fold_id – Optional fold ID for fold-specific artifacts
- Returns:
Artifact object or None if not found
- get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None, branch_id: int | None = None, source_index: int | None = None, substep_index: int | None = None) List[Tuple[str, Any]][source]
Get all artifacts for a step.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter (not used in map provider)
branch_id – Optional branch ID filter (not used in map provider)
source_index – Optional source/dataset index filter (not used in map provider)
substep_index – Optional substep index filter (not used in map provider)
- Returns:
List of (artifact_id, artifact_object) tuples
- get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]
Get all fold-specific artifacts for a step.
- Parameters:
step_index – 1-based step index
branch_path – Optional branch path filter (not used in map provider)
- Returns:
List of (fold_id, artifact_object) tuples, sorted by fold_id
- class nirs4all.pipeline.config.context.PipelineState(y_processing: str = 'numeric', step_number: int = 0, mode: str = 'train')[source]
Bases:
objectMutable pipeline state that evolves through execution.
This class tracks state that changes as the pipeline executes: - Y transformation state (e.g., “encoded_LabelEncoder_001”) - Current step number in execution
Unlike DataSelector, this is mutable because state must evolve.
Example
>>> state = PipelineState(y_processing="numeric") >>> state.step_number = 2 # Mutable update >>> state.y_processing = "encoded_LabelEncoder_001"
- copy() PipelineState[source]
Create a deep copy of this state.
- Returns:
Deep copy of PipelineState
- class nirs4all.pipeline.config.context.RuntimeContext(saver: Any = None, manifest_manager: Any = None, artifact_loader: Any = None, artifact_provider: ArtifactProvider | None = None, artifact_registry: Any = None, pipeline_uid: str | None = None, step_runner: Any = None, step_number: int = 0, operation_count: int = 0, substep_number: int = -1, processing_counter: int = 0, artifact_load_counter: Dict[int, int]=<factory>, target_model: Dict[str, ~typing.Any] | None=None, explainer: Any = None, trace_recorder: Any = None, retrain_config: Any = None)[source]
Bases:
objectRuntime infrastructure components for pipeline execution.
This class holds references to infrastructure components that are needed during execution but are not part of the data flow or pipeline state. It replaces the “God Object” pattern of passing the runner everywhere.
- saver
SimulationSaver for file operations
- Type:
Any
- manifest_manager
ManifestManager for pipeline tracking
- Type:
Any
- artifact_loader
ArtifactLoader for predict/explain modes
- Type:
Any
- artifact_provider
ArtifactProvider for controller-agnostic artifact injection (Phase 3)
- Type:
- artifact_registry
ArtifactRegistry for artifact management (v2 system)
- Type:
Any
- step_runner
StepRunner for executing sub-steps
- Type:
Any
- trace_recorder
TraceRecorder for recording execution traces (Phase 2)
- Type:
Any
- retrain_config
RetrainConfig for retrain mode control (Phase 7)
- Type:
Any
- artifact_provider: ArtifactProvider | None = None
- get_execution_trace() Any | None[source]
Get the current execution trace.
Returns the trace object that has been built during execution. This can be used to generate post-execution diagrams with actual shapes.
- Returns:
ExecutionTrace object or None if no trace recorder
- get_trace_id() str | None[source]
Get the current trace ID.
- Returns:
Trace ID or None if no trace recorder
- next_artifact_load_index(source_index: int) int[source]
Get the next artifact load index for a source during prediction.
This counter tracks how many artifacts have been loaded for each source across all sub-operations within a step (e.g., feature_augmentation).
- Parameters:
source_index – The source index to track.
- Returns:
The next artifact index to load for this source.
- Return type:
- next_processing_index() int[source]
Get the next unique processing index for artifact identification.
This counter persists across all sub-operations within a step (e.g., feature_augmentation), ensuring each transformer gets a unique substep_index. The counter is reset at the start of each step.
- Returns:
A unique processing index within the current step.
- Return type:
- record_input_shapes(input_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]
Record input shapes for the current step.
- Parameters:
input_shape – 2D layout shape (samples, features)
features_shape – List of 3D shapes per source (samples, processings, features)
- record_output_shapes(output_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]
Record output shapes for the current step.
- Parameters:
output_shape – 2D layout shape (samples, features)
features_shape – List of 3D shapes per source (samples, processings, features)
- record_step_artifact(artifact_id: str, is_primary: bool = False, fold_id: int | None = None, chain_path: str | None = None, branch_path: List[int] | None = None, source_index: int | None = None, metadata: Dict[str, Any] | None = None) None[source]
Record an artifact created during the current step (V3).
- Parameters:
artifact_id – The V3 artifact ID
is_primary – Whether this is the primary artifact (e.g., model)
fold_id – CV fold ID if fold-specific artifact
chain_path – V3 operator chain path for this artifact
branch_path – Branch path for indexing
source_index – Source index for multi-source artifacts
metadata – Additional artifact metadata
- record_step_end(is_model: bool = False, fold_weights: Dict[int, float] | None = None, skip_trace: bool = False) None[source]
Record the end of a step execution.
- Parameters:
is_model – Whether this is the model step
fold_weights – Per-fold weights for CV models
skip_trace – If True, don’t add this step to the trace
- record_step_start(step_index: int, operator_type: str = '', operator_class: str = '', operator_config: Dict[str, Any] | None = None, branch_path: List[int] | None = None, branch_name: str = '', mode: str = 'train') None[source]
Record the start of a step execution in the trace.
- Parameters:
step_index – 1-based step index
operator_type – Type of operator (e.g., “transform”, “model”)
operator_class – Class name of operator
operator_config – Serialized operator configuration
branch_path – Branch indices if in branch context
branch_name – Human-readable branch name
mode – Execution mode (“train”, “predict”, “explain”)
- should_train_step(step_index: int, is_model: bool = False) bool[source]
Determine if a step should train based on retrain configuration.
- Phase 7 Feature:
When a retrain_config is present, this method delegates to it to determine whether a step should train or use existing artifacts.
- Parameters:
step_index – 1-based step index
is_model – Whether this is the model step
- Returns:
True if the step should train, False if it should use existing artifacts
- class nirs4all.pipeline.config.context.StepMetadata(keyword: str = '', step_id: str = '', augment_sample: bool = False, add_feature: bool = False, replace_processing: bool = False, target_samples: List[int] = <factory>, target_features: List[int] = <factory>)[source]
Bases:
objectMetadata for controller coordination and step tracking.
This class handles: - Controller coordination flags (augment_sample, add_feature, replace_processing) - Step identification (step_id, keyword) - Target specification for augmentation operations
These are ephemeral flags set/cleared between steps for controller communication.
Example
>>> metadata = StepMetadata(keyword="transform", step_id="001") >>> metadata.augment_sample = True >>> metadata.target_samples = [42]
- copy() StepMetadata[source]
Create a deep copy of this metadata.
- Returns:
Deep copy of StepMetadata