Controller System
The controller system is the core dispatch mechanism in nirs4all. It routes pipeline steps to appropriate handlers based on keywords and operator types.
Overview
Every pipeline step is processed by a controller. The system uses a priority-based registry to match steps to controllers:
StepParser normalizes the step into a
ParsedStepControllerRouter queries all registered controllers
The highest-priority matching controller executes the step
# These all get routed to appropriate controllers
pipeline = [
MinMaxScaler(), # → TransformerMixinController
{"y_processing": MinMaxScaler()}, # → YProcessingController
KFold(n_splits=5), # → SplitterController
{"branch": [[A], [B]]}, # → BranchController
{"merge": "predictions"}, # → MergeController
PLSRegression(n_components=10), # → ModelController
]
Controller Base Class
All controllers inherit from OperatorController:
from abc import ABC, abstractmethod
from typing import Any, Tuple, Optional, List, TYPE_CHECKING
if TYPE_CHECKING:
from nirs4all.pipeline.config.context import ExecutionContext, RuntimeContext
from nirs4all.pipeline.steps.parser import ParsedStep
from nirs4all.data.dataset import SpectroDataset
class OperatorController(ABC):
"""Base class for pipeline operators."""
priority: int = 100 # Lower = higher priority
@classmethod
@abstractmethod
def matches(cls, step: Any, operator: Any, keyword: str) -> bool:
"""Check if this controller should handle the step."""
raise NotImplementedError
@classmethod
@abstractmethod
def use_multi_source(cls) -> bool:
"""Check if controller supports multi-source datasets."""
return False
@classmethod
def supports_prediction_mode(cls) -> bool:
"""Check if controller should execute during prediction."""
return False
@abstractmethod
def execute(
self,
step_info: "ParsedStep",
dataset: "SpectroDataset",
context: "ExecutionContext",
runtime_context: "RuntimeContext",
source: int = -1,
mode: str = "train",
loaded_binaries: Optional[List[Tuple[str, Any]]] = None,
prediction_store: Optional[Any] = None
) -> Tuple["ExecutionContext", Any]:
"""Execute the step."""
raise NotImplementedError
Controller Registry
Controllers are registered globally using the @register_controller decorator:
from nirs4all.controllers.registry import register_controller, CONTROLLER_REGISTRY
@register_controller
class MyController(OperatorController):
priority = 50
@classmethod
def matches(cls, step, operator, keyword):
return keyword == "my_keyword"
# ... rest of implementation
The registry automatically sorts controllers by priority and prevents duplicate registration.
Keyword System
Keyword Types
The parser recognizes three categories of keywords:
Reserved Keywords (not treated as operators):
params,metadata,steps,name,finetune_params,train_params,model_params
Serialization Operators (checked first):
class,function,module,object,pipeline,instance
Workflow Keywords:
Priority keywords (checked in order):
modelpreprocessingfeature_augmentationy_processingsample_augmentation
Custom keywords: Any other non-reserved key
Keyword Prioritization
When multiple potential keywords exist in a step:
# "model" wins (priority keyword)
{"model": SVC(), "my_custom": lambda x: x}
# "my_custom" is used (no priority keyword present)
{"my_custom": lambda x: x, "params": {...}}
# "class" wins (serialization operator)
{"class": "sklearn.svm.SVC", "model": SVC()}
Built-in Controllers
Transform Controllers
Controller |
Priority |
Keywords/Matches |
|---|---|---|
|
100 |
sklearn TransformerMixin objects |
|
50 |
|
|
50 |
|
Model Controllers
Controller |
Priority |
Keywords/Matches |
|---|---|---|
|
100 |
sklearn estimators, |
|
30 |
TensorFlow/Keras models |
|
30 |
PyTorch models |
|
30 |
JAX/Flax models |
|
20 |
|
Data Controllers
Controller |
Priority |
Keywords/Matches |
|---|---|---|
|
5 |
|
|
5 |
|
|
5 |
|
Splitter Controllers
Controller |
Priority |
Keywords/Matches |
|---|---|---|
|
100 |
sklearn splitters (KFold, etc.) |
Writing Custom Controllers
Step-by-Step Guide
Create the Controller Class
from nirs4all.controllers.controller import OperatorController
from nirs4all.controllers.registry import register_controller
from nirs4all.pipeline.execution.result import StepOutput
@register_controller
class SmoothingController(OperatorController):
"""Custom controller for spectral smoothing."""
priority = 45 # Between data (5) and generic (100)
@classmethod
def matches(cls, step, operator, keyword):
"""Match on 'smoothing' or 'smooth' keywords."""
if isinstance(step, dict):
return 'smoothing' in step or 'smooth' in step
return keyword in ['smoothing', 'smooth']
@classmethod
def use_multi_source(cls):
"""Support multi-source datasets."""
return True
@classmethod
def supports_prediction_mode(cls):
"""Execute during prediction to transform new data."""
return True
def execute(self, step_info, dataset, context, runtime_context,
source=-1, mode="train", loaded_binaries=None,
prediction_store=None):
"""Apply smoothing to spectral data."""
smoother = step_info.operator
params = step_info.metadata.get('params', {})
# Get current features
X = dataset.X
# Apply smoothing
if callable(smoother):
X_smoothed = smoother(X, **params)
else:
X_smoothed = smoother.fit_transform(X)
# Update dataset
dataset.X = X_smoothed
# Return updated context and empty artifacts
return context, StepOutput()
Import the Module
The controller is registered when imported:
# In your script or __init__.py
import my_custom_controllers # Registers all controllers
Use in Pipeline
from scipy.signal import savgol_filter
pipeline = [
{"smoothing": savgol_filter, "params": {"window": 5, "polyorder": 2}},
{"preprocessing": StandardScaler()},
{"model": PLSRegression()},
]
Important Considerations
Keyword Rules
Avoid reserved keywords:
params,metadata,steps,name,finetune_params,train_paramsDon’t use serialization keywords:
class,function,module,object,pipeline,instancePriority keywords take precedence: If your step has
model, that wins
Priority Guidelines
Priority Range |
Use Case |
|---|---|
1-10 |
Critical operations (branch, merge) |
20-50 |
Specific operator types (framework-specific models) |
50-80 |
Custom business logic |
80-100 |
Generic fallbacks |
1000+ |
Catch-all (DummyController) |
Prediction Mode
If your controller modifies features (like preprocessing), set supports_prediction_mode() = True so it executes when loading bundles or running predictions.
Real-World Examples
Example 1: Baseline Correction
@register_controller
class BaselineCorrectionController(OperatorController):
priority = 40
@classmethod
def matches(cls, step, operator, keyword):
return keyword in ["baseline_correction", "baseline"]
@classmethod
def use_multi_source(cls):
return True
@classmethod
def supports_prediction_mode(cls):
return True
def execute(self, step_info, dataset, context, runtime_context,
source=-1, mode="train", loaded_binaries=None,
prediction_store=None):
method = step_info.metadata.get('params', {}).get('method', 'als')
# Apply baseline correction...
return context, StepOutput()
Usage:
pipeline = [
{"baseline_correction": my_baseline_fn, "params": {"method": "als"}}
]
Example 2: Outlier Detection
@register_controller
class OutlierDetectionController(OperatorController):
priority = 35
@classmethod
def matches(cls, step, operator, keyword):
return keyword in ["outlier_detection", "outliers"]
@classmethod
def use_multi_source(cls):
return True
@classmethod
def supports_prediction_mode(cls):
return False # Only during training
def execute(self, step_info, dataset, context, runtime_context,
source=-1, mode="train", loaded_binaries=None,
prediction_store=None):
detector = step_info.operator
# Mark outliers in metadata...
return context, StepOutput()
Usage:
from sklearn.ensemble import IsolationForest
pipeline = [
{"outlier_detection": IsolationForest(), "params": {"contamination": 0.1}}
]
Controller Methods Reference
matches(cls, step, operator, keyword) -> bool
Determines if this controller should handle the step.
Parameters:
step: Original step configuration (dict, string, or object)operator: Deserialized operator (if any)keyword: Extracted keyword (e.g., “model”, “preprocessing”)
Returns: True if this controller should handle the step
use_multi_source(cls) -> bool
Indicates if the controller supports multi-source datasets.
Returns: True if the controller can process datasets with multiple feature sources
supports_prediction_mode(cls) -> bool
Indicates if the controller should execute during prediction mode.
Returns: True if the controller should run when loading bundles or making predictions on new data
execute(...) -> Tuple[ExecutionContext, StepOutput]
Executes the step logic.
Parameters:
step_info: Parsed step containing operator, keyword, and metadatadataset: SpectroDataset to operate oncontext: ExecutionContext with pipeline stateruntime_context: RuntimeContext with infrastructure (workspace, logging)source: Data source index (-1 for all sources)mode: “train” or “predict”loaded_binaries: Pre-loaded artifacts for prediction modeprediction_store: External store for model predictions
Returns: Tuple of (updated context, StepOutput with artifacts)
Testing Custom Controllers
Test your controller with:
def test_my_controller_matches():
"""Verify keyword matching."""
assert MyController.matches(
{"my_keyword": lambda x: x},
None,
"my_keyword"
)
assert not MyController.matches(
{"other": lambda x: x},
None,
"other"
)
def test_my_controller_registration():
"""Verify registration in registry."""
from nirs4all.controllers.registry import CONTROLLER_REGISTRY
assert MyController in CONTROLLER_REGISTRY
def test_my_controller_priority():
"""Verify priority ordering."""
from nirs4all.controllers.registry import CONTROLLER_REGISTRY
names = [c.__name__ for c in CONTROLLER_REGISTRY]
# Verify position relative to other controllers
See Also
Architecture Overview - High-level architecture overview
Writing a Pipeline in nirs4all - Complete pipeline syntax reference
Pipeline Branching - User-facing branching documentation
Examples - Working examples
Source files:
nirs4all/controllers/registry.py- Controller registrationnirs4all/controllers/controller.py- Base controller classnirs4all/controllers/transforms/- Transform controllersnirs4all/controllers/models/- Model controllers