nirs4all.pipeline.config.context module

Context classes for pipeline execution.

This module provides typed context components that replace the Dict[str, Any] context pattern used throughout the pipeline system. It separates three distinct concerns:

  1. DataSelector: Immutable data selection parameters for dataset.x() and dataset.y()

  2. PipelineState: Mutable pipeline state that evolves through transformations

  3. StepMetadata: Metadata for controller coordination and step tracking

  4. ExecutionContext: Composite context with custom data extensibility

  5. ArtifactProvider: Interface for providing artifacts during prediction replay

The separation enables: - Type safety throughout the codebase - Clear interfaces between components - Better testability - Explicit controller communication - Future extensibility via custom dict

Example

>>> selector = DataSelector(partition="train", processing=[["raw"]])
>>> state = PipelineState(y_processing="numeric")
>>> metadata = StepMetadata(keyword="transform")
>>> context = ExecutionContext(selector=selector, state=state, metadata=metadata)
>>> new_context = context.with_partition("test")
class nirs4all.pipeline.config.context.ArtifactProvider[source]

Bases: ABC

Abstract interface for providing artifacts during prediction replay.

The ArtifactProvider enables controller-agnostic artifact injection: controllers request artifacts by step index rather than by name matching, which is deterministic and works with any controller type.

This interface is used during prediction mode to provide pre-loaded artifacts (transformers, models, etc.) to controllers without requiring them to know about the artifact storage system.

Implementations:
  • MapArtifactProvider: In-memory dictionary-based provider

  • LoaderArtifactProvider: Wraps ArtifactLoader for lazy loading

Example

>>> provider = MapArtifactProvider(artifact_map)
>>> artifacts = provider.get_artifacts_for_step(step_index=2)
>>> for artifact_id, obj in artifacts:
...     process(obj)
abstractmethod get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]

Get a single artifact for a step.

Parameters:
  • step_index – 1-based step index

  • fold_id – Optional fold ID for fold-specific artifacts

Returns:

Artifact object or None if not found

get_artifact_by_chain(chain_path: str) Any | None[source]

Get artifact by V3 chain path (optional V3 method).

Parameters:

chain_path – Full operator chain path (e.g., “s1.MinMaxScaler>s3.PLS”)

Returns:

Artifact object or None if not found

get_artifacts_for_chain_prefix(chain_prefix: str) List[Tuple[str, Any]][source]

Get all artifacts matching a chain path prefix (optional V3 method).

Parameters:

chain_prefix – Chain path prefix to match

Returns:

List of (chain_path, artifact_object) tuples

abstractmethod get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None, branch_id: int | None = None, source_index: int | None = None, substep_index: int | None = None) List[Tuple[str, Any]][source]

Get all artifacts for a step.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter

  • branch_id – Optional branch ID filter

  • source_index – Optional source/dataset index filter for multi-source

  • substep_index – Optional substep index filter for branch substeps

Returns:

List of (artifact_id, artifact_object) tuples

abstractmethod get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]

Get all fold-specific artifacts for a step.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter

Returns:

List of (fold_id, artifact_object) tuples, sorted by fold_id

get_primary_artifact(step_index: int) Any | None[source]

Get the primary artifact for a step.

The primary artifact is typically the main model or transformer for the step. Default implementation returns the first artifact.

Parameters:

step_index – 1-based step index

Returns:

Primary artifact object or None if not found

abstractmethod has_artifacts_for_step(step_index: int) bool[source]

Check if artifacts exist for a step.

Parameters:

step_index – 1-based step index

Returns:

True if artifacts are available for this step

class nirs4all.pipeline.config.context.DataSelector(partition: str = 'all', processing: ~typing.List[~typing.List[str]] = <factory>, layout: str = '2d', concat_source: bool = True, fold_id: int | None = None, include_augmented: bool = False, y: str | None = None, branch_id: int | None = None, branch_path: ~typing.List[int] = <factory>, branch_name: str | None = None, _extra: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: MutableMapping

Mutable data selection parameters for dataset operations.

This class replaces the dict-based Selector pattern used by dataset.x() and dataset.y(). It implements the MutableMapping protocol, so it can be used as a dictionary. It supports arbitrary keys via an internal dict to allow flexibility.

partition

Data partition to select (“train”, “test”, “all”, “val”)

Type:

str

processing

List of processing chains (one per data source)

Type:

List[List[str]]

layout

Data layout for X retrieval (“2d”, “3d”, “4d”)

Type:

str

concat_source

Whether to concatenate multiple sources

Type:

bool

fold_id

Optional fold identifier for cross-validation

Type:

int | None

include_augmented

Whether to include augmented samples

Type:

bool

y

Optional target processing version (e.g. “numeric”, “scaled”)

Type:

str | None

branch_id

Optional branch identifier for pipeline branching (0-indexed) DEPRECATED: Use branch_path instead for nested branch support.

Type:

int | None

branch_path

List of branch indices for nested branching (e.g., [0, 2] for branch 2 inside branch 0). Empty list means pre-branch/shared artifacts.

Type:

List[int]

branch_name

Optional human-readable branch name for tracking

Type:

str | None

Example

>>> selector = DataSelector(partition="train", processing=[["raw"]])
>>> selector["y"] = "scaled"  # Direct modification
>>> selector["custom_key"] = "value"  # Arbitrary keys supported
>>> print(selector["partition"])
__delitem__(key: str) None[source]

Delete extra key or set field to None.

__getitem__(key: str) Any[source]

Get field value or extra key.

__iter__() Iterator[str][source]

Iterate over non-None fields and extra keys.

__len__() int[source]

Count of non-None fields and extra keys.

__setitem__(key: str, value: Any) None[source]

Set field value or extra key.

branch_id: int | None = None
branch_name: str | None = None
branch_path: List[int]
concat_source: bool = True
copy() DataSelector[source]

Create a deep copy of the selector.

fold_id: int | None = None
include_augmented: bool = False
layout: str = '2d'
partition: str = 'all'
processing: List[List[str]]
with_augmented(include_augmented: bool) DataSelector[source]

Create new selector with updated include_augmented flag.

Parameters:

include_augmented – Whether to include augmented samples

Returns:

New DataSelector with updated include_augmented

with_branch(branch_id: int | None = None, branch_name: str | None = None, branch_path: List[int] | None = None) DataSelector[source]

Create new selector with updated branch information.

Parameters:
  • branch_id – Branch identifier (0-indexed). DEPRECATED: Use branch_path.

  • branch_name – Human-readable branch name

  • branch_path – List of branch indices for nested branching

Returns:

New DataSelector with updated branch info

with_fold(fold_id: int | None) DataSelector[source]

Create new selector with updated fold_id.

Parameters:

fold_id – New fold identifier

Returns:

New DataSelector with updated fold_id

with_layout(layout: str) DataSelector[source]

Create new selector with updated layout.

Parameters:

layout – New layout value

Returns:

New DataSelector with updated layout

with_partition(partition: str) DataSelector[source]

Create new selector with updated partition.

Parameters:

partition – New partition value

Returns:

New DataSelector with updated partition

with_processing(processing: List[List[str]]) DataSelector[source]

Create new selector with updated processing chains.

Parameters:

processing – New processing chains

Returns:

New DataSelector with updated processing

y: str | None = None
class nirs4all.pipeline.config.context.ExecutionContext(selector: DataSelector | None = None, state: PipelineState | None = None, metadata: StepMetadata | None = None, custom: Dict[str, Any] | None = None, aggregate_column: str | None = None)[source]

Bases: object

Composite execution context with extensibility.

This class combines the three context components and provides: - Immutable data selection via DataSelector - Mutable state tracking via PipelineState - Controller coordination via StepMetadata - Custom data storage for controller-specific needs

The context supports deep copying for controller isolation while sharing processing chains between selector and operations.

selector

Immutable data selector

state

Mutable pipeline state

metadata

Mutable step metadata

custom

Dict for controller-specific custom data

aggregate_column

Sample aggregation column for prediction aggregation. - None: No aggregation (default) - ‘y’: Aggregate by y_true values - str: Aggregate by specified metadata column

Example

>>> context = ExecutionContext(
...     selector=DataSelector(partition="train"),
...     state=PipelineState(y_processing="numeric"),
...     metadata=StepMetadata(keyword="transform")
... )
>>> context.custom["my_controller"] = {"threshold": 0.5}
>>> train_ctx = context.with_partition("train")
copy() ExecutionContext[source]

Create a deep copy of this context.

This preserves the copy semantics expected by controllers.

Returns:

Deep copy of ExecutionContext

get_selector() DataSelector[source]

Get the data selector.

Returns:

DataSelector instance

with_branch(branch_id: int | None = None, branch_name: str | None = None) ExecutionContext[source]

Create new context with updated branch information.

Parameters:
  • branch_id – Branch identifier (0-indexed)

  • branch_name – Human-readable branch name

Returns:

New ExecutionContext with updated branch info

with_layout(layout: str) ExecutionContext[source]

Create new context with updated layout.

Parameters:

layout – New layout value

Returns:

New ExecutionContext with updated layout

with_metadata(**kwargs) ExecutionContext[source]

Create new context with updated metadata fields.

Parameters:

**kwargs – Metadata fields to update

Returns:

New ExecutionContext with updated metadata

with_partition(partition: str) ExecutionContext[source]

Create new context with updated partition.

Parameters:

partition – New partition value

Returns:

New ExecutionContext with updated partition

with_processing(processing: List[List[str]]) ExecutionContext[source]

Create new context with updated processing chains.

Parameters:

processing – New processing chains

Returns:

New ExecutionContext with updated processing

with_step_number(step_number: int) ExecutionContext[source]

Create new context with updated step number.

Parameters:

step_number – New step number

Returns:

New ExecutionContext with updated step number

with_y(y_processing: str) ExecutionContext[source]

Create new context with updated y processing.

Parameters:

y_processing – New y processing value

Returns:

New ExecutionContext with updated y processing

class nirs4all.pipeline.config.context.LoaderArtifactProvider(loader: Any, trace: Any | None = None)[source]

Bases: ArtifactProvider

Artifact provider backed by an ArtifactLoader.

Wraps an ArtifactLoader to provide artifacts on-demand with lazy loading and caching. Used when loading from a manifest for prediction.

loader

The underlying ArtifactLoader

trace

Optional ExecutionTrace for step-to-artifact mapping

get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]

Get a single artifact for a step.

If trace is available, uses trace to find artifact IDs. Otherwise, uses loader’s step-based lookup.

Parameters:
  • step_index – 1-based step index

  • fold_id – Optional fold ID for fold-specific artifacts

Returns:

Artifact object or None if not found

get_artifact_by_chain(chain_path: str) Any | None[source]

Get artifact by V3 chain path.

Parameters:

chain_path – Full operator chain path (e.g., “s1.MinMaxScaler>s3.PLS”)

Returns:

Artifact object or None if not found

get_artifacts_for_chain_prefix(chain_prefix: str) List[Tuple[str, Any]][source]

Get all artifacts matching a chain path prefix.

Parameters:

chain_prefix – Chain path prefix to match

Returns:

List of (chain_path, artifact_object) tuples

get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None, branch_id: int | None = None, source_index: int | None = None, substep_index: int | None = None) List[Tuple[str, Any]][source]

Get all artifacts for a step.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter

  • branch_id – Optional branch ID filter

  • source_index – Optional source index filter for multi-source pipelines

  • substep_index – Optional substep index filter (not used in loader provider)

Returns:

List of (artifact_id, artifact_object) tuples

get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]

Get all fold-specific artifacts for a step.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter

Returns:

List of (fold_id, artifact_object) tuples, sorted by fold_id

has_artifacts_for_step(step_index: int) bool[source]

Check if artifacts exist for a step.

Parameters:

step_index – 1-based step index

Returns:

True if artifacts are available for this step

class nirs4all.pipeline.config.context.MapArtifactProvider(artifact_map: Dict[int, List[Tuple[str, Any]]], fold_weights: Dict[int, float] | None = None, primary_artifacts: Dict[int, str] | None = None)[source]

Bases: ArtifactProvider

In-memory artifact provider backed by a dictionary.

Provides artifacts from a pre-loaded dictionary mapping step indices to artifacts. Used when artifacts are resolved from an ExecutionTrace or when loading from a bundle.

artifact_map

Dictionary mapping step_index to list of (artifact_id, object) tuples

fold_weights

Optional fold weights for CV ensemble averaging

Example

>>> artifact_map = {
...     1: [("0001:1:all", snv_transformer)],
...     2: [("0001:2:0", model_fold0), ("0001:2:1", model_fold1)]
... }
>>> provider = MapArtifactProvider(artifact_map)
>>> transformer = provider.get_artifact(step_index=1)
get_artifact(step_index: int, fold_id: int | None = None) Any | None[source]

Get a single artifact for a step.

If fold_id is specified, returns the fold-specific artifact. Otherwise, returns the primary or first artifact.

Parameters:
  • step_index – 1-based step index

  • fold_id – Optional fold ID for fold-specific artifacts

Returns:

Artifact object or None if not found

get_artifacts_for_step(step_index: int, branch_path: List[int] | None = None, branch_id: int | None = None, source_index: int | None = None, substep_index: int | None = None) List[Tuple[str, Any]][source]

Get all artifacts for a step.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter (not used in map provider)

  • branch_id – Optional branch ID filter (not used in map provider)

  • source_index – Optional source/dataset index filter (not used in map provider)

  • substep_index – Optional substep index filter (not used in map provider)

Returns:

List of (artifact_id, artifact_object) tuples

get_fold_artifacts(step_index: int, branch_path: List[int] | None = None) List[Tuple[int, Any]][source]

Get all fold-specific artifacts for a step.

Parameters:
  • step_index – 1-based step index

  • branch_path – Optional branch path filter (not used in map provider)

Returns:

List of (fold_id, artifact_object) tuples, sorted by fold_id

get_fold_weights() Dict[int, float][source]

Get fold weights for CV ensemble averaging.

Returns:

Dictionary mapping fold_id to weight

has_artifacts_for_step(step_index: int) bool[source]

Check if artifacts exist for a step.

Parameters:

step_index – 1-based step index

Returns:

True if artifacts are available for this step

class nirs4all.pipeline.config.context.PipelineState(y_processing: str = 'numeric', step_number: int = 0, mode: str = 'train')[source]

Bases: object

Mutable pipeline state that evolves through execution.

This class tracks state that changes as the pipeline executes: - Y transformation state (e.g., “encoded_LabelEncoder_001”) - Current step number in execution

Unlike DataSelector, this is mutable because state must evolve.

y_processing

Current y transformation identifier

Type:

str

step_number

Current step number (1-indexed)

Type:

int

mode

Execution mode (“train”, “predict”, “explain”)

Type:

str

Example

>>> state = PipelineState(y_processing="numeric")
>>> state.step_number = 2  # Mutable update
>>> state.y_processing = "encoded_LabelEncoder_001"
copy() PipelineState[source]

Create a deep copy of this state.

Returns:

Deep copy of PipelineState

mode: str = 'train'
step_number: int = 0
y_processing: str = 'numeric'
class nirs4all.pipeline.config.context.RuntimeContext(saver: ~typing.Any = None, manifest_manager: ~typing.Any = None, artifact_loader: ~typing.Any = None, artifact_provider: ~nirs4all.pipeline.config.context.ArtifactProvider | None = None, artifact_registry: ~typing.Any = None, pipeline_uid: str | None = None, step_runner: ~typing.Any = None, step_number: int = 0, operation_count: int = 0, substep_number: int = -1, processing_counter: int = 0, artifact_load_counter: ~typing.Dict[int, int] = <factory>, target_model: ~typing.Dict[str, ~typing.Any] | None = None, explainer: ~typing.Any = None, trace_recorder: ~typing.Any = None, retrain_config: ~typing.Any = None)[source]

Bases: object

Runtime infrastructure components for pipeline execution.

This class holds references to infrastructure components that are needed during execution but are not part of the data flow or pipeline state. It replaces the “God Object” pattern of passing the runner everywhere.

saver

SimulationSaver for file operations

Type:

Any

manifest_manager

ManifestManager for pipeline tracking

Type:

Any

artifact_loader

ArtifactLoader for predict/explain modes

Type:

Any

artifact_provider

ArtifactProvider for controller-agnostic artifact injection (Phase 3)

Type:

nirs4all.pipeline.config.context.ArtifactProvider | None

artifact_registry

ArtifactRegistry for artifact management (v2 system)

Type:

Any

pipeline_uid

Current pipeline unique identifier

Type:

str | None

step_runner

StepRunner for executing sub-steps

Type:

Any

operation_count

Counter for operation IDs

Type:

int

substep_number

Current substep number

Type:

int

trace_recorder

TraceRecorder for recording execution traces (Phase 2)

Type:

Any

retrain_config

RetrainConfig for retrain mode control (Phase 7)

Type:

Any

artifact_load_counter: Dict[int, int]
artifact_loader: Any = None
artifact_provider: ArtifactProvider | None = None
artifact_registry: Any = None
explainer: Any = None
get_execution_trace() Any | None[source]

Get the current execution trace.

Returns the trace object that has been built during execution. This can be used to generate post-execution diagrams with actual shapes.

Returns:

ExecutionTrace object or None if no trace recorder

get_trace_id() str | None[source]

Get the current trace ID.

Returns:

Trace ID or None if no trace recorder

manifest_manager: Any = None
next_artifact_load_index(source_index: int) int[source]

Get the next artifact load index for a source during prediction.

This counter tracks how many artifacts have been loaded for each source across all sub-operations within a step (e.g., feature_augmentation).

Parameters:

source_index – The source index to track.

Returns:

The next artifact index to load for this source.

Return type:

int

next_op() int[source]

Get the next operation ID.

next_processing_index() int[source]

Get the next unique processing index for artifact identification.

This counter persists across all sub-operations within a step (e.g., feature_augmentation), ensuring each transformer gets a unique substep_index. The counter is reset at the start of each step.

Returns:

A unique processing index within the current step.

Return type:

int

operation_count: int = 0
pipeline_uid: str | None = None
processing_counter: int = 0
record_input_shapes(input_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]

Record input shapes for the current step.

Parameters:
  • input_shape – 2D layout shape (samples, features)

  • features_shape – List of 3D shapes per source (samples, processings, features)

record_output_shapes(output_shape: tuple | None = None, features_shape: List[tuple] | None = None) None[source]

Record output shapes for the current step.

Parameters:
  • output_shape – 2D layout shape (samples, features)

  • features_shape – List of 3D shapes per source (samples, processings, features)

record_step_artifact(artifact_id: str, is_primary: bool = False, fold_id: int | None = None, chain_path: str | None = None, branch_path: List[int] | None = None, source_index: int | None = None, metadata: Dict[str, Any] | None = None) None[source]

Record an artifact created during the current step (V3).

Parameters:
  • artifact_id – The V3 artifact ID

  • is_primary – Whether this is the primary artifact (e.g., model)

  • fold_id – CV fold ID if fold-specific artifact

  • chain_path – V3 operator chain path for this artifact

  • branch_path – Branch path for indexing

  • source_index – Source index for multi-source artifacts

  • metadata – Additional artifact metadata

record_step_end(is_model: bool = False, fold_weights: Dict[int, float] | None = None, skip_trace: bool = False) None[source]

Record the end of a step execution.

Parameters:
  • is_model – Whether this is the model step

  • fold_weights – Per-fold weights for CV models

  • skip_trace – If True, don’t add this step to the trace

record_step_start(step_index: int, operator_type: str = '', operator_class: str = '', operator_config: Dict[str, Any] | None = None, branch_path: List[int] | None = None, branch_name: str = '', mode: str = 'train') None[source]

Record the start of a step execution in the trace.

Parameters:
  • step_index – 1-based step index

  • operator_type – Type of operator (e.g., “transform”, “model”)

  • operator_class – Class name of operator

  • operator_config – Serialized operator configuration

  • branch_path – Branch indices if in branch context

  • branch_name – Human-readable branch name

  • mode – Execution mode (“train”, “predict”, “explain”)

reset_processing_counter() None[source]

Reset the processing counter at the start of each step.

retrain_config: Any = None
saver: Any = None
should_train_step(step_index: int, is_model: bool = False) bool[source]

Determine if a step should train based on retrain configuration.

Phase 7 Feature:

When a retrain_config is present, this method delegates to it to determine whether a step should train or use existing artifacts.

Parameters:
  • step_index – 1-based step index

  • is_model – Whether this is the model step

Returns:

True if the step should train, False if it should use existing artifacts

step_number: int = 0
step_runner: Any = None
substep_number: int = -1
target_model: Dict[str, Any] | None = None
trace_recorder: Any = None
class nirs4all.pipeline.config.context.StepMetadata(keyword: str = '', step_id: str = '', augment_sample: bool = False, add_feature: bool = False, replace_processing: bool = False, target_samples: ~typing.List[int] = <factory>, target_features: ~typing.List[int] = <factory>)[source]

Bases: object

Metadata for controller coordination and step tracking.

This class handles: - Controller coordination flags (augment_sample, add_feature, replace_processing) - Step identification (step_id, keyword) - Target specification for augmentation operations

These are ephemeral flags set/cleared between steps for controller communication.

keyword

Step keyword (e.g., “model”, “transform”)

Type:

str

step_id

Step identifier (e.g., “001”, “002”)

Type:

str

augment_sample

Flag for sample augmentation mode

Type:

bool

add_feature

Flag for feature augmentation mode

Type:

bool

replace_processing

Flag to replace processing chains

Type:

bool

target_samples

Target sample IDs for augmentation

Type:

List[int]

target_features

Target feature indices for augmentation

Type:

List[int]

Example

>>> metadata = StepMetadata(keyword="transform", step_id="001")
>>> metadata.augment_sample = True
>>> metadata.target_samples = [42]
add_feature: bool = False
augment_sample: bool = False
copy() StepMetadata[source]

Create a deep copy of this metadata.

Returns:

Deep copy of StepMetadata

keyword: str = ''
replace_processing: bool = False
reset_ephemeral_flags() None[source]

Reset ephemeral flags after step execution.

Clears augment_sample, add_feature, replace_processing flags and target lists to prevent leakage between steps.

step_id: str = ''
target_features: List[int]
target_samples: List[int]