nirs4all.controllers.data.source_branch module
Source Branch Controller for per-source pipeline execution.
This controller enables per-source pipeline execution for multi-source datasets. Each data source (e.g., NIR, markers, Raman) can have its own independent preprocessing pipeline.
Unlike regular branching (branch), which creates parallel paths that all process the same data, source branching assigns each source to a specific processing pipeline based on its name or index.
Phase 10 Implementation: - Parse source_branch configurations - Create per-source execution contexts - Execute source-specific pipelines - Support prediction mode - Integration with merge_sources
Example
>>> # Different preprocessing per source
>>> pipeline = [
... {"source_branch": {
... "NIR": [SNV(), SavitzkyGolay()],
... "markers": [VarianceThreshold(), MinMaxScaler()],
... }},
... {"merge_sources": "concat"}, # Combine sources after
... PLSRegression(n_components=10)
... ]
>>>
>>> # Automatic source branching (same empty pipeline per source - isolation only)
>>> pipeline = [
... {"source_branch": "auto"},
... {"merge_sources": "concat"},
... PLSRegression(n_components=10)
... ]
Keywords: “source_branch” Priority: 5 (same as BranchController)
- class nirs4all.controllers.data.source_branch.SourceBranchConfigParser[source]
Bases:
objectParser for source_branch step configurations.
Handles multiple syntax formats for source branching and normalizes them to SourceBranchConfig.
- Supported syntaxes:
Simple string: “auto” (isolate each source)
Dict with source names: {“NIR”: [steps], “markers”: [steps]}
Dict with indices: {0: [steps], 1: [steps]}
Dict with special keys: {“_default_”: [steps], “_merge_after_”: False}
- classmethod parse(raw_config: Any) SourceBranchConfig[source]
Parse raw source_branch configuration into SourceBranchConfig.
- Parameters:
raw_config – The value from {“source_branch”: raw_config}
- Returns:
Normalized SourceBranchConfig instance.
- Raises:
ValueError – If configuration format is invalid.
- class nirs4all.controllers.data.source_branch.SourceBranchController[source]
Bases:
OperatorControllerController for per-source pipeline execution.
This controller enables per-source pipeline execution for multi-source datasets. Each data source gets its own independent processing pipeline.
- Key behaviors:
Creates per-source execution contexts
Executes source-specific pipelines
Stores source contexts for subsequent steps or auto-merge
Optionally auto-merges sources after processing
- Unlike regular BranchController:
Operates on the data provenance dimension (sources), not execution paths
Each source’s data is isolated during its pipeline execution
Sources can have completely different preprocessing chains
Designed for multi-modal data (NIR, markers, Raman, etc.)
- execute(step_info: ParsedStep, dataset: SpectroDataset, context: ExecutionContext, runtime_context: RuntimeContext, source: int = -1, mode: str = 'train', loaded_binaries: List[Tuple[str, Any]] | None = None, prediction_store: Any | None = None) Tuple[ExecutionContext, StepOutput][source]
Execute source branch step.
For each source, runs a specific sub-pipeline (if defined) and updates the processing context. Uses existing infrastructure:
Get source names and current processing chains
For each source with a defined pipeline: - Create a context with processing limited to that source - Run the sub-pipeline steps - Collect artifacts
Update context with new processing chains
Optionally auto-merge sources
The TransformerController will naturally apply transforms only to the source whose processing is in the context.
- Parameters:
step_info – Parsed step containing source_branch configuration
dataset – Dataset to operate on (must have multiple sources)
context – Pipeline execution context
runtime_context – Runtime infrastructure context
source – Data source index
mode – Execution mode (“train” or “predict”)
loaded_binaries – Pre-loaded binary objects for prediction mode
prediction_store – External prediction store
- Returns:
Tuple of (updated_context, StepOutput with artifacts)
- Raises:
ValueError – If dataset has only one source.
- classmethod matches(step: Any, operator: Any, keyword: str) bool[source]
Check if the step matches the source_branch controller.
- Parameters:
step – Original step configuration
operator – Deserialized operator
keyword – Step keyword
- Returns:
True if keyword is “source_branch”