Pipeline Architecture Overview

This document provides a high-level overview of the nirs4all pipeline architecture for developers. It explains the flow of execution, key components, and how to extend the system with custom controllers and keywords.

Architecture Philosophy

The pipeline module is designed around a layered architecture with separation of concerns:

  1. Orchestration: Managing multiple datasets and pipeline configurations.

  2. Execution: Running a specific sequence of steps on a specific dataset.

  3. Step Logic: The actual implementation of a pipeline step (model training, preprocessing, etc.).

Key Components

1. PipelineRunner (nirs4all/pipeline/runner.py)

Role: The public entry point (Facade). Responsibility:

  • Provides a simple API for users (run(), predict(), explain()).

  • Initializes the environment (workspace, logging).

  • Delegates work to the Orchestrator.

2. PipelineOrchestrator (nirs4all/pipeline/execution/orchestrator.py)

Role: The high-level manager. Responsibility:

  • Iterates over all provided Datasets.

  • Iterates over all provided Pipeline Configurations.

  • Manages global results (aggregating predictions across runs).

  • Instantiates a PipelineExecutor for each (Dataset, Pipeline) pair.

3. PipelineExecutor (nirs4all/pipeline/execution/executor.py)

Role: The sequence runner. Responsibility:

  • Executes a list of steps sequentially.

  • Manages the ExecutionContext (state propagation).

  • Handles artifact management (saving models, logs) for a single run.

  • Catches errors and handles the “continue on error” logic.

4. StepRunner (nirs4all/pipeline/steps/step_runner.py)

Role: The unit executor. Responsibility:

  • Parses the raw step definition (dict, string, object) using StepParser.

  • Routes the step to the appropriate Controller using ControllerRouter.

  • Executes the Controller.

5. StepParser (nirs4all/pipeline/steps/parser.py)

Role: Step configuration parser. Responsibility:

  • Normalizes different step syntaxes (dict, string, instance, list) into a canonical ParsedStep format.

  • Extracts keywords from step configurations (both standard and custom).

  • Identifies step types (workflow, serialized, subpipeline, direct).

  • Deserializes operators when needed.

6. ControllerRouter (nirs4all/pipeline/steps/router.py)

Role: Controller selection. Responsibility:

  • Matches parsed steps to appropriate controllers using priority-based selection.

  • Queries each controller’s matches() method.

  • Returns the highest-priority matching controller.

7. Controllers (nirs4all/controllers/)

Role: The business logic. Responsibility:

  • Implements the actual logic for a step (e.g., ModelController, PreprocessingController).

  • Interacts with the SpectroDataset.

  • Updates the ExecutionContext.

  • Returns artifacts (files, objects) to be saved.

8. Controller Registry (nirs4all/controllers/registry.py)

Role: Controller discovery and registration. Responsibility:

  • Maintains the global CONTROLLER_REGISTRY list.

  • Provides the @register_controller decorator for automatic registration.

  • Ensures controllers are sorted by priority.

Data Flow

The data flow relies on two main objects passed through the layers:

  1. SpectroDataset: The data itself. It is mutable but typically modified via “views” or internal state updates managed by controllers.

  2. ExecutionContext: A composite object containing:

    • DataSelector: Immutable configuration for how to read data (e.g., “train” partition, “raw” source).

    • PipelineState: Mutable state tracking (e.g., current Y-transformation).

    • StepMetadata: Ephemeral flags for communication between steps.

Execution Flow Diagram

        graph TD
    User -->|calls| Runner[PipelineRunner]
    Runner -->|delegates to| Orch[PipelineOrchestrator]
    Orch -->|loops datasets & configs| Exec[PipelineExecutor]
    Exec -->|loops steps| StepRun[StepRunner]
    StepRun -->|parses| Parser[StepParser]
    Parser -->|extracts keyword| ParsedStep[ParsedStep]
    StepRun -->|routes| Router[ControllerRouter]
    Router -->|queries registry| Reg[CONTROLLER_REGISTRY]
    Router -->|selects by priority| Ctrl[Controller]
    Ctrl -->|modifies| Context[ExecutionContext]
    Ctrl -->|reads/writes| Data[SpectroDataset]
    

Keyword and Controller System

Step Configuration Parsing

The StepParser handles multiple step syntaxes and normalizes them into a canonical ParsedStep format:

  1. Dictionary steps: Extracts keyword and operator

    {"model": SVC(), "params": {"C": 1.0}}
    {"preprocessing": StandardScaler}
    {"my_custom_op": custom_function, "params": {...}}
    
  2. String steps: Interpreted as class paths or keywords

    "sklearn.preprocessing.StandardScaler"  # Class path
    "chart_2d"                               # Keyword
    
  3. Direct instances: Object passed directly

    StandardScaler()
    
  4. Nested lists: Subpipelines

    [[step1, step2], step3]
    

Keyword Types

The parser recognizes three categories of keywords:

  1. Reserved Keywords (not treated as operators):

    • params, metadata, steps, name, finetune_params, train_params

  2. Serialization Operators (checked first):

    • class, function, module, object, pipeline, instance

  3. Workflow Keywords:

    • Priority keywords (standard operations, checked in order):

      1. model

      2. preprocessing

      3. feature_augmentation

      4. y_processing

      5. sample_augmentation

    • Custom keywords: Any other non-reserved key

Keyword Prioritization

When multiple potential keywords exist in a step:

  1. Serialization operators take precedence over everything

  2. Priority workflow keywords take precedence over custom keywords

  3. First custom keyword is chosen when multiple custom keywords exist

Example:

# "model" wins (priority keyword)
{"model": SVC(), "my_custom": lambda x: x}

# "my_custom" is used (no priority keyword present)
{"my_custom": lambda x: x, "params": {...}}

# "class" wins (serialization operator)
{"class": "sklearn.svm.SVC", "model": SVC()}

Controller Matching

The ControllerRouter selects controllers based on:

  1. Priority: Lower numbers = higher priority (default=100)

  2. Matching: Controller’s matches() method returns True

  3. Selection: Highest-priority matching controller is chosen

Example controller priorities in the system:

  • DummyController: 1000 (catch-all, lowest priority)

  • Model controllers: varies (20-50)

  • Transform controllers: varies (30-60)

  • Chart controllers: 10 (high priority for specific matching)

How to Add a New Feature

Adding a Custom Controller with Custom Keywords

The system is designed to be fully extensible. You can add custom controllers with custom keywords without modifying core nirs4all code.

Step-by-Step Guide

  1. Create a Controller Class

    Inherit from OperatorController and implement required methods:

    from nirs4all.controllers.controller import OperatorController
    from nirs4all.controllers.registry import register_controller
    from typing import Any, Tuple, List
    
    @register_controller
    class SmoothingController(OperatorController):
        """Custom controller for spectral smoothing operations."""
    
        priority = 45  # Lower = higher priority (default is 100)
    
        @classmethod
        def matches(cls, step: Any, operator: Any, keyword: str) -> bool:
            """Define when this controller should be used."""
            # Match on custom keywords
            if isinstance(step, dict):
                return 'smoothing' in step or 'smooth' in step
            return keyword in ['smoothing', 'smooth']
    
        @classmethod
        def use_multi_source(cls) -> bool:
            """Return True if controller supports multi-source datasets."""
            return True
    
        @classmethod
        def supports_prediction_mode(cls) -> bool:
            """Return True if controller should run during prediction."""
            return True
    
        def execute(
            self,
            step_info: 'ParsedStep',
            dataset: 'SpectroDataset',
            context: 'ExecutionContext',
            runtime_context: 'RuntimeContext',
            source: int = -1,
            mode: str = "train",
            loaded_binaries: Any = None,
            prediction_store: Any = None
        ) -> Tuple['ExecutionContext', List[Tuple[str, bytes]]]:
            """Execute the smoothing operation."""
            smoother = step_info.operator
            params = step_info.metadata.get('params', {})
    
            # Your custom logic here
            # - Modify dataset
            # - Update context
            # - Generate artifacts
    
            return context, []  # Return updated context and artifacts
    
  2. Import Your Controller Module

    The controller is automatically registered when the module is imported:

    # At the top of your script or in __init__.py
    import my_custom_controllers  # Registers all controllers in this module
    
  3. Use Your Custom Keyword in Pipelines

    from scipy.signal import savgol_filter
    
    pipeline = [
        # Your custom keyword!
        {"smoothing": savgol_filter, "params": {"window": 5, "polyorder": 2}},
    
        # Or with lambda
        {"smooth": lambda x: savgol_filter(x, 5, 2)},
    
        # Standard steps still work
        {"preprocessing": StandardScaler()},
        {"model": SVC()},
    ]
    

Important Considerations

  • Reserved Keywords: Avoid using these as your custom keywords:

    • params, metadata, steps, name, finetune_params, train_params

  • Serialization Keywords: These have special meaning:

    • class, function, module, object, pipeline, instance

  • Priority Keywords: These take precedence if multiple keywords in one step:

    • model, preprocessing, feature_augmentation, y_processing, sample_augmentation

  • Priority System: Set your controller’s priority appropriately:

    • Lower number = higher priority

    • Default is 100

    • Specific controllers should have priority 10-50

    • Generic fallbacks should have priority 80-100

    • Catch-all controllers use 1000+

  • Multiple Matching Controllers: If multiple controllers match, the one with lowest priority number wins

Real-World Examples

Example 1: Baseline Correction Controller

@register_controller
class BaselineCorrectionController(OperatorController):
    priority = 40

    @classmethod
    def matches(cls, step, operator, keyword):
        return keyword in ["baseline_correction", "baseline"]

    @classmethod
    def use_multi_source(cls):
        return True

    @classmethod
    def supports_prediction_mode(cls):
        return True

    def execute(self, step_info, dataset, context, runtime_context,
               source=-1, mode="train", loaded_binaries=None, prediction_store=None):
        # Apply baseline correction to spectra
        method = step_info.metadata.get('params', {}).get('method', 'als')
        # ... implementation ...
        return context, []

# Usage:
pipeline = [
    {"baseline_correction": my_baseline_fn, "params": {"method": "als"}}
]

Example 2: Outlier Detection Controller

@register_controller
class OutlierDetectionController(OperatorController):
    priority = 35

    @classmethod
    def matches(cls, step, operator, keyword):
        return keyword in ["outlier_detection", "outliers"]

    @classmethod
    def use_multi_source(cls):
        return True

    @classmethod
    def supports_prediction_mode(cls):
        return False  # Only runs during training

    def execute(self, step_info, dataset, context, runtime_context,
               source=-1, mode="train", loaded_binaries=None, prediction_store=None):
        # Detect and mark outliers
        detector = step_info.operator
        # ... implementation ...
        return context, []

# Usage:
pipeline = [
    {"outlier_detection": IsolationForest(), "params": {"contamination": 0.1}}
]

Adding a New Step Type (Without Custom Controller)

If you want to use an existing controller with a new operator type, you don’t need a custom controller. Just ensure your operator matches an existing controller’s pattern:

# Uses TransformerMixinController (has fit() and transform())
from sklearn.preprocessing import StandardScaler, RobustScaler

pipeline = [
    StandardScaler(),  # Direct instance
    {"preprocessing": RobustScaler()},  # Dict with keyword
    {"class": "sklearn.preprocessing.StandardScaler"},  # Serialized
]

Adding a New Execution Mode

  1. Update PipelineRunner to expose the mode.

  2. Update PipelineExecutor / StepRunner if the mode requires special handling (like predict or explain).

Common Patterns

  • Registry Pattern: Controllers are discovered automatically via @register_controller decorator.

  • Priority Pattern: Controllers compete for steps based on priority; highest priority (lowest number) wins.

  • Parser Pattern: Multiple step syntaxes are normalized to canonical ParsedStep format.

  • Builder Pattern: ExecutorBuilder constructs the executor with dependencies.

  • Facade Pattern: PipelineRunner hides complexity from users.

  • Context Object: ExecutionContext encapsulates state and is immutably updated through steps.

  • Strategy Pattern: Controllers implement different strategies for handling different operator types.

Extension Points

The system provides multiple extension points:

  1. Custom Controllers: Add new controllers with @register_controller

  2. Custom Keywords: Use any non-reserved keyword in step dictionaries

  3. Custom Operators: Any Python callable or class can be an operator

  4. Custom Context Data: Use context.custom dict for controller-specific data

  5. Custom Artifacts: Controllers can return any serializable artifacts

Testing Custom Extensions

When adding custom controllers, consider testing:

  1. Keyword extraction: Verify your keyword is extracted from step dicts

  2. Controller registration: Verify @register_controller adds to registry

  3. Priority ordering: Verify your controller’s priority relative to others

  4. Routing: Verify ControllerRouter selects your controller for your keywords

  5. Execution: Verify your controller executes correctly and modifies context/dataset

  6. Prediction mode: Verify supports_prediction_mode() behavior is correct

See tests/unit/pipeline/steps/test_custom_extensibility.py for comprehensive examples.