Pipeline Architecture Overview
This document provides a high-level overview of the nirs4all pipeline architecture for developers. It explains the flow of execution, key components, and how to extend the system with custom controllers and keywords.
Architecture Philosophy
The pipeline module is designed around a layered architecture with separation of concerns:
Orchestration: Managing multiple datasets and pipeline configurations.
Execution: Running a specific sequence of steps on a specific dataset.
Step Logic: The actual implementation of a pipeline step (model training, preprocessing, etc.).
Key Components
1. PipelineRunner (nirs4all/pipeline/runner.py)
Role: The public entry point (Facade). Responsibility:
Provides a simple API for users (
run(),predict(),explain()).Initializes the environment (workspace, logging).
Delegates work to the
Orchestrator.
2. PipelineOrchestrator (nirs4all/pipeline/execution/orchestrator.py)
Role: The high-level manager. Responsibility:
Iterates over all provided Datasets.
Iterates over all provided Pipeline Configurations.
Manages global results (aggregating predictions across runs).
Instantiates a
PipelineExecutorfor each (Dataset, Pipeline) pair.
3. PipelineExecutor (nirs4all/pipeline/execution/executor.py)
Role: The sequence runner. Responsibility:
Executes a list of steps sequentially.
Manages the ExecutionContext (state propagation).
Handles artifact management (saving models, logs) for a single run.
Catches errors and handles the “continue on error” logic.
4. StepRunner (nirs4all/pipeline/steps/step_runner.py)
Role: The unit executor. Responsibility:
Parses the raw step definition (dict, string, object) using
StepParser.Routes the step to the appropriate Controller using
ControllerRouter.Executes the Controller.
5. StepParser (nirs4all/pipeline/steps/parser.py)
Role: Step configuration parser. Responsibility:
Normalizes different step syntaxes (dict, string, instance, list) into a canonical
ParsedStepformat.Extracts keywords from step configurations (both standard and custom).
Identifies step types (workflow, serialized, subpipeline, direct).
Deserializes operators when needed.
6. ControllerRouter (nirs4all/pipeline/steps/router.py)
Role: Controller selection. Responsibility:
Matches parsed steps to appropriate controllers using priority-based selection.
Queries each controller’s
matches()method.Returns the highest-priority matching controller.
7. Controllers (nirs4all/controllers/)
Role: The business logic. Responsibility:
Implements the actual logic for a step (e.g.,
ModelController,PreprocessingController).Interacts with the
SpectroDataset.Updates the
ExecutionContext.Returns artifacts (files, objects) to be saved.
8. Controller Registry (nirs4all/controllers/registry.py)
Role: Controller discovery and registration. Responsibility:
Maintains the global
CONTROLLER_REGISTRYlist.Provides the
@register_controllerdecorator for automatic registration.Ensures controllers are sorted by priority.
Data Flow
The data flow relies on two main objects passed through the layers:
SpectroDataset: The data itself. It is mutable but typically modified via “views” or internal state updates managed by controllers.
ExecutionContext: A composite object containing:
DataSelector: Immutable configuration for how to read data (e.g., “train” partition, “raw” source).PipelineState: Mutable state tracking (e.g., current Y-transformation).StepMetadata: Ephemeral flags for communication between steps.
Execution Flow Diagram
graph TD
User -->|calls| Runner[PipelineRunner]
Runner -->|delegates to| Orch[PipelineOrchestrator]
Orch -->|loops datasets & configs| Exec[PipelineExecutor]
Exec -->|loops steps| StepRun[StepRunner]
StepRun -->|parses| Parser[StepParser]
Parser -->|extracts keyword| ParsedStep[ParsedStep]
StepRun -->|routes| Router[ControllerRouter]
Router -->|queries registry| Reg[CONTROLLER_REGISTRY]
Router -->|selects by priority| Ctrl[Controller]
Ctrl -->|modifies| Context[ExecutionContext]
Ctrl -->|reads/writes| Data[SpectroDataset]
Keyword and Controller System
Step Configuration Parsing
The StepParser handles multiple step syntaxes and normalizes them into a canonical ParsedStep format:
Dictionary steps: Extracts keyword and operator
{"model": SVC(), "params": {"C": 1.0}} {"preprocessing": StandardScaler} {"my_custom_op": custom_function, "params": {...}}
String steps: Interpreted as class paths or keywords
"sklearn.preprocessing.StandardScaler" # Class path "chart_2d" # Keyword
Direct instances: Object passed directly
StandardScaler()
Nested lists: Subpipelines
[[step1, step2], step3]
Keyword Types
The parser recognizes three categories of keywords:
Reserved Keywords (not treated as operators):
params,metadata,steps,name,finetune_params,train_params
Serialization Operators (checked first):
class,function,module,object,pipeline,instance
Workflow Keywords:
Priority keywords (standard operations, checked in order):
modelpreprocessingfeature_augmentationy_processingsample_augmentation
Custom keywords: Any other non-reserved key
Keyword Prioritization
When multiple potential keywords exist in a step:
Serialization operators take precedence over everything
Priority workflow keywords take precedence over custom keywords
First custom keyword is chosen when multiple custom keywords exist
Example:
# "model" wins (priority keyword)
{"model": SVC(), "my_custom": lambda x: x}
# "my_custom" is used (no priority keyword present)
{"my_custom": lambda x: x, "params": {...}}
# "class" wins (serialization operator)
{"class": "sklearn.svm.SVC", "model": SVC()}
Controller Matching
The ControllerRouter selects controllers based on:
Priority: Lower numbers = higher priority (default=100)
Matching: Controller’s
matches()method returns TrueSelection: Highest-priority matching controller is chosen
Example controller priorities in the system:
DummyController: 1000 (catch-all, lowest priority)
Model controllers: varies (20-50)
Transform controllers: varies (30-60)
Chart controllers: 10 (high priority for specific matching)
How to Add a New Feature
Adding a Custom Controller with Custom Keywords
The system is designed to be fully extensible. You can add custom controllers with custom keywords without modifying core nirs4all code.
Step-by-Step Guide
Create a Controller Class
Inherit from
OperatorControllerand implement required methods:from nirs4all.controllers.controller import OperatorController from nirs4all.controllers.registry import register_controller from typing import Any, Tuple, List @register_controller class SmoothingController(OperatorController): """Custom controller for spectral smoothing operations.""" priority = 45 # Lower = higher priority (default is 100) @classmethod def matches(cls, step: Any, operator: Any, keyword: str) -> bool: """Define when this controller should be used.""" # Match on custom keywords if isinstance(step, dict): return 'smoothing' in step or 'smooth' in step return keyword in ['smoothing', 'smooth'] @classmethod def use_multi_source(cls) -> bool: """Return True if controller supports multi-source datasets.""" return True @classmethod def supports_prediction_mode(cls) -> bool: """Return True if controller should run during prediction.""" return True def execute( self, step_info: 'ParsedStep', dataset: 'SpectroDataset', context: 'ExecutionContext', runtime_context: 'RuntimeContext', source: int = -1, mode: str = "train", loaded_binaries: Any = None, prediction_store: Any = None ) -> Tuple['ExecutionContext', List[Tuple[str, bytes]]]: """Execute the smoothing operation.""" smoother = step_info.operator params = step_info.metadata.get('params', {}) # Your custom logic here # - Modify dataset # - Update context # - Generate artifacts return context, [] # Return updated context and artifacts
Import Your Controller Module
The controller is automatically registered when the module is imported:
# At the top of your script or in __init__.py import my_custom_controllers # Registers all controllers in this module
Use Your Custom Keyword in Pipelines
from scipy.signal import savgol_filter pipeline = [ # Your custom keyword! {"smoothing": savgol_filter, "params": {"window": 5, "polyorder": 2}}, # Or with lambda {"smooth": lambda x: savgol_filter(x, 5, 2)}, # Standard steps still work {"preprocessing": StandardScaler()}, {"model": SVC()}, ]
Important Considerations
Reserved Keywords: Avoid using these as your custom keywords:
params,metadata,steps,name,finetune_params,train_params
Serialization Keywords: These have special meaning:
class,function,module,object,pipeline,instance
Priority Keywords: These take precedence if multiple keywords in one step:
model,preprocessing,feature_augmentation,y_processing,sample_augmentation
Priority System: Set your controller’s priority appropriately:
Lower number = higher priority
Default is 100
Specific controllers should have priority 10-50
Generic fallbacks should have priority 80-100
Catch-all controllers use 1000+
Multiple Matching Controllers: If multiple controllers match, the one with lowest priority number wins
Real-World Examples
Example 1: Baseline Correction Controller
@register_controller
class BaselineCorrectionController(OperatorController):
priority = 40
@classmethod
def matches(cls, step, operator, keyword):
return keyword in ["baseline_correction", "baseline"]
@classmethod
def use_multi_source(cls):
return True
@classmethod
def supports_prediction_mode(cls):
return True
def execute(self, step_info, dataset, context, runtime_context,
source=-1, mode="train", loaded_binaries=None, prediction_store=None):
# Apply baseline correction to spectra
method = step_info.metadata.get('params', {}).get('method', 'als')
# ... implementation ...
return context, []
# Usage:
pipeline = [
{"baseline_correction": my_baseline_fn, "params": {"method": "als"}}
]
Example 2: Outlier Detection Controller
@register_controller
class OutlierDetectionController(OperatorController):
priority = 35
@classmethod
def matches(cls, step, operator, keyword):
return keyword in ["outlier_detection", "outliers"]
@classmethod
def use_multi_source(cls):
return True
@classmethod
def supports_prediction_mode(cls):
return False # Only runs during training
def execute(self, step_info, dataset, context, runtime_context,
source=-1, mode="train", loaded_binaries=None, prediction_store=None):
# Detect and mark outliers
detector = step_info.operator
# ... implementation ...
return context, []
# Usage:
pipeline = [
{"outlier_detection": IsolationForest(), "params": {"contamination": 0.1}}
]
Adding a New Step Type (Without Custom Controller)
If you want to use an existing controller with a new operator type, you don’t need a custom controller. Just ensure your operator matches an existing controller’s pattern:
# Uses TransformerMixinController (has fit() and transform())
from sklearn.preprocessing import StandardScaler, RobustScaler
pipeline = [
StandardScaler(), # Direct instance
{"preprocessing": RobustScaler()}, # Dict with keyword
{"class": "sklearn.preprocessing.StandardScaler"}, # Serialized
]
Adding a New Execution Mode
Update
PipelineRunnerto expose the mode.Update
PipelineExecutor/StepRunnerif the mode requires special handling (likepredictorexplain).
Common Patterns
Registry Pattern: Controllers are discovered automatically via
@register_controllerdecorator.Priority Pattern: Controllers compete for steps based on priority; highest priority (lowest number) wins.
Parser Pattern: Multiple step syntaxes are normalized to canonical
ParsedStepformat.Builder Pattern:
ExecutorBuilderconstructs the executor with dependencies.Facade Pattern:
PipelineRunnerhides complexity from users.Context Object:
ExecutionContextencapsulates state and is immutably updated through steps.Strategy Pattern: Controllers implement different strategies for handling different operator types.
Extension Points
The system provides multiple extension points:
Custom Controllers: Add new controllers with
@register_controllerCustom Keywords: Use any non-reserved keyword in step dictionaries
Custom Operators: Any Python callable or class can be an operator
Custom Context Data: Use
context.customdict for controller-specific dataCustom Artifacts: Controllers can return any serializable artifacts
Testing Custom Extensions
When adding custom controllers, consider testing:
Keyword extraction: Verify your keyword is extracted from step dicts
Controller registration: Verify
@register_controlleradds to registryPriority ordering: Verify your controller’s priority relative to others
Routing: Verify
ControllerRouterselects your controller for your keywordsExecution: Verify your controller executes correctly and modifies context/dataset
Prediction mode: Verify
supports_prediction_mode()behavior is correct
See tests/unit/pipeline/steps/test_custom_extensibility.py for comprehensive examples.