Source code for nirs4all.analysis.selector

"""
Transfer Preprocessing Selector.

Main class for transfer-optimized preprocessing selection. Evaluates
preprocessings and their combinations to find those that best align
source and target datasets while preserving predictive information.

Supports two modes for preprocessing generation:
1. Combinatoric mode (default): Uses simple permutations from base preprocessings
2. Generator mode: Uses nirs4all's generator DSL for flexible, constraint-based specification
"""

import time
from typing import Any, Dict, List, Optional, Tuple

import numpy as np

from nirs4all.analysis.presets import PRESETS
from nirs4all.analysis.results import TransferResult, TransferSelectionResults
from nirs4all.analysis.transfer_metrics import (
    TransferMetrics,
    TransferMetricsComputer,
    compute_transfer_score,
)
from nirs4all.analysis.transfer_utils import (
    apply_augmentation,
    apply_pipeline,
    apply_preprocessing_objects,
    apply_stacked_pipeline,
    generate_augmentation_combinations,
    generate_top_k_stacked_pipelines,
    get_base_preprocessings,
    get_transform_name,
    get_transform_signature,
    normalize_preprocessing,
    validate_datasets,
)
from nirs4all.core.logging import get_logger

logger = get_logger(__name__)


[docs] class TransferPreprocessingSelector: """ Select preprocessing for optimal transfer between datasets. This class evaluates preprocessing methods to find those that best minimize distributional distance between source and target datasets while preserving predictive information. Supports two modes for preprocessing generation: 1. **Combinatoric mode** (default): Uses simple permutations from base preprocessings. Stage 1 evaluates all singles, Stage 2 generates permutations from top-K candidates. 2. **Generator mode**: Uses nirs4all's generator DSL for flexible, constraint-based specification. Enable by providing `preprocessing_spec`. Stages: 1. Single Preprocessing (required): Evaluate all base preprocessings 1b. Generator Stacked (optional): If using generator with stacked specs 2. Stacking (optional): Evaluate depth-2+ combinations of top-K 2b. Generator Augmented (optional): If using generator with augmentation specs 3. Augmentation (optional): Evaluate feature concatenation 4. Validation (optional): Supervised validation with proxy models Args: preset: Preset configuration ('fast', 'balanced', 'thorough', 'full', 'exhaustive') or None for manual configuration. Default is 'fast'. preprocessings: Custom preprocessings dict or None for base set. n_components: PCA components for metric computation. k_neighbors: Neighbors for trustworthiness metric. Stage 2 (stacking): run_stage2: Enable stacking evaluation. stage2_top_k: Number of top candidates for stacking. stage2_max_depth: Maximum stacking depth. Stage 3 (augmentation): run_stage3: Enable augmentation evaluation. stage3_top_k: Number of top candidates for augmentation. stage3_max_order: Maximum augmentation order (2 or 3). Stage 4 (validation): run_stage4: Enable supervised validation. stage4_top_k: Number of candidates to validate. stage4_cv_folds: Cross-validation folds. Generator integration: preprocessing_spec: Generator specification dict for flexible preprocessing definition. Uses nirs4all.pipeline.config.generator. Supports keywords like `_or_`, `arrange`, `pick`, `_mutex_`, etc. use_generator: Enable generator mode. Auto-detected if preprocessing_spec is provided. Set to False to disable even with preprocessing_spec. Parallelization: n_jobs: Number of parallel jobs for preprocessing evaluation. - n_jobs=-1: Use all available CPU cores (default) - n_jobs=1: Sequential execution (useful for debugging) - n_jobs=N: Use N cores Other: verbose: Verbosity level (0=silent, 1=progress, 2=detailed). random_state: Random seed for reproducibility. Example: >>> # Quick usage with default fast preset >>> selector = TransferPreprocessingSelector() >>> results = selector.fit(X_source, X_target) >>> print(results.best.name) 'snv' >>> # With balanced preset for stacking >>> selector = TransferPreprocessingSelector(preset='balanced') >>> results = selector.fit(X_source, X_target) >>> print(results.to_pipeline_spec()) 'snv>d1' >>> # Generator mode: constrained stacking >>> selector = TransferPreprocessingSelector( ... preprocessing_spec={ ... "_or_": ["snv", "msc", "d1", "d2", "savgol"], ... "arrange": 2, ... "_mutex_": [["d1", "d2"]], # Don't stack derivatives ... }, ... ) >>> results = selector.fit(X_source, X_target) >>> # Custom configuration >>> selector = TransferPreprocessingSelector( ... preset=None, ... run_stage2=True, ... stage2_top_k=10, ... stage2_max_depth=2, ... n_components=20, ... ) """ def __init__( self, preset: Optional[str] = "fast", preprocessings: Optional[Dict[str, Any]] = None, n_components: int = 10, k_neighbors: int = 10, # Stage 2 run_stage2: bool = False, stage2_top_k: Optional[int] = 5, stage2_max_depth: int = 2, stage2_exhaustive: bool = False, # Stage 3 run_stage3: bool = False, stage3_top_k: int = 5, stage3_max_order: int = 2, # Stage 4 run_stage4: bool = False, stage4_top_k: int = 10, stage4_cv_folds: int = 3, stage4_models: Optional[List[str]] = None, # Metric weights metric_weights: Optional[Dict[str, float]] = None, # Generator integration preprocessing_spec: Optional[Dict[str, Any]] = None, use_generator: Optional[bool] = None, # Parallelization n_jobs: int = -1, # Other verbose: int = 1, random_state: int = 0, ): # Apply preset if specified if preset is not None: if preset not in PRESETS: raise ValueError( f"Unknown preset: {preset}. Available: {list(PRESETS.keys())}" ) preset_config = PRESETS[preset] # Override with preset values n_components = preset_config.get("n_components", n_components) run_stage2 = preset_config.get("run_stage2", run_stage2) stage2_top_k = preset_config.get("stage2_top_k", stage2_top_k) stage2_max_depth = preset_config.get("stage2_max_depth", stage2_max_depth) stage2_exhaustive = preset_config.get("stage2_exhaustive", stage2_exhaustive) run_stage3 = preset_config.get("run_stage3", run_stage3) stage3_top_k = preset_config.get("stage3_top_k", stage3_top_k) stage3_max_order = preset_config.get("stage3_max_order", stage3_max_order) run_stage4 = preset_config.get("run_stage4", run_stage4) stage4_top_k = preset_config.get("stage4_top_k", stage4_top_k) stage4_cv_folds = preset_config.get("stage4_cv_folds", stage4_cv_folds) self.preset = preset self.preprocessings = preprocessings or get_base_preprocessings() self.n_components = n_components self.k_neighbors = k_neighbors # Stage configs self.run_stage2 = run_stage2 self.stage2_top_k = stage2_top_k self.stage2_max_depth = stage2_max_depth self.stage2_exhaustive = stage2_exhaustive self.run_stage3 = run_stage3 self.stage3_top_k = stage3_top_k self.stage3_max_order = stage3_max_order self.run_stage4 = run_stage4 self.stage4_top_k = stage4_top_k self.stage4_cv_folds = stage4_cv_folds self.stage4_models = stage4_models or ["ridge", "pls"] self.metric_weights = metric_weights self.verbose = verbose self.random_state = random_state # Parallelization self.n_jobs = n_jobs self._effective_n_jobs = self._compute_effective_n_jobs(n_jobs) # Generator integration self.preprocessing_spec = preprocessing_spec # Auto-detect generator mode if preprocessing_spec is provided self.use_generator = ( use_generator if use_generator is not None else (preprocessing_spec is not None) ) # Expanded preprocessing lists (populated when use_generator=True) # Now stores objects/transforms instead of just names self._expanded_singles: Optional[List[Any]] = None self._expanded_stacked: Optional[List[List[Any]]] = None self._expanded_augmented: Optional[List[List[Any]]] = None # Initialize metrics computer self.metrics_computer = TransferMetricsComputer( n_components=n_components, k_neighbors=k_neighbors, random_state=random_state, ) # Results storage self.results_: Optional[TransferSelectionResults] = None self.raw_metrics_: Optional[TransferMetrics] = None def _log(self, msg: str, level: int = 1) -> None: """Log message if verbosity level is sufficient.""" if self.verbose >= level: if level >= 2: logger.debug(msg) else: logger.info(msg) @staticmethod def _compute_effective_n_jobs(n_jobs: int) -> int: """ Compute effective number of parallel jobs. Args: n_jobs: User-specified n_jobs value. -1 means use all CPUs, positive integer specifies count. Returns: Effective number of workers to use. """ import os if n_jobs == -1: return os.cpu_count() or 1 elif n_jobs < 1: return 1 else: return n_jobs def _parallel_evaluate_preprocessings( self, X_source: np.ndarray, X_target: np.ndarray, pp_items: List[Tuple[str, Any]], pipeline_type: str = "single", ) -> List[TransferResult]: """ Evaluate preprocessings in parallel using ThreadPoolExecutor. Uses ThreadPoolExecutor instead of ProcessPoolExecutor because: 1. NumPy/scipy operations release the GIL, enabling true parallelism 2. No need to serialize/pickle sklearn transformers 3. Shared memory avoids copying large datasets to workers Supports both object-based and string-based preprocessings: - Objects: Applied directly via apply_preprocessing_objects - Strings: Resolved via preprocessings dict (legacy support) Args: X_source: Source dataset. X_target: Target dataset. pp_items: List of (name, transform_or_transforms) tuples. For object-based: (display_name, transform_object_or_list) For string-based: (name, name_or_list_of_names) pipeline_type: Type of pipeline ('single', 'stacked', 'augmented'). Returns: List of TransferResult for each preprocessing. """ from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Optional as Opt def evaluate_single(args: Tuple[str, Any]) -> Opt[TransferResult]: """Worker function to evaluate a single preprocessing.""" pp_name, pp_item = args try: # Track the actual transforms used for storing in result actual_transforms: List[Any] = [] # Determine how to apply the preprocessing based on item type if pipeline_type == "augmented": # pp_item is list of components (objects or strings) components = pp_item if isinstance(pp_item, list) else [pp_item] X_src_pp = apply_augmentation( X_source, components, self.preprocessings ) X_tgt_pp = apply_augmentation( X_target, components, self.preprocessings ) # Component names for result comp_names = [ get_transform_name(c) if not isinstance(c, str) else c for c in components ] # Store actual transforms actual_transforms = [ c if not isinstance(c, str) else self.preprocessings.get(c) for c in components ] elif pipeline_type == "stacked": # Check if pp_item is object-based (list of transforms) or string if isinstance(pp_item, list) and len(pp_item) > 0 and not isinstance(pp_item[0], str): # Object-based stacked pipeline X_src_pp = apply_preprocessing_objects(X_source, pp_item) X_tgt_pp = apply_preprocessing_objects(X_target, pp_item) comp_names = [get_transform_name(t) for t in pp_item] actual_transforms = list(pp_item) elif isinstance(pp_item, str) and ">" in pp_item: # String-based stacked pipeline X_src_pp = apply_stacked_pipeline( X_source, pp_item, self.preprocessings ) X_tgt_pp = apply_stacked_pipeline( X_target, pp_item, self.preprocessings ) comp_names = pp_item.split(">") actual_transforms = [self.preprocessings[n] for n in comp_names] else: # Single transform passed as stacked transforms = [pp_item] if not isinstance(pp_item, list) else pp_item X_src_pp = apply_preprocessing_objects(X_source, transforms) X_tgt_pp = apply_preprocessing_objects(X_target, transforms) comp_names = [get_transform_name(t) for t in transforms] actual_transforms = list(transforms) else: # Single preprocessing if isinstance(pp_item, str): # String name - resolve from dict transforms = [self.preprocessings[pp_item]] actual_transforms = transforms elif isinstance(pp_item, list): transforms = pp_item actual_transforms = transforms else: transforms = [pp_item] actual_transforms = transforms X_src_pp = apply_pipeline(X_source, transforms) X_tgt_pp = apply_pipeline(X_target, transforms) comp_names = [pp_name] # Compute transfer metrics metrics = self.metrics_computer.compute( X_src_pp, X_tgt_pp, compute_trust=False ) # Compute transfer score score = compute_transfer_score( metrics, raw_metrics=self.raw_metrics_, weights=self.metric_weights, ) # Compute improvement percentage raw_centroid = self.raw_metrics_.centroid_distance improvement_pct = ( (raw_centroid - metrics.centroid_distance) / (raw_centroid + 1e-10) ) * 100 return TransferResult( name=pp_name, pipeline_type=pipeline_type, components=comp_names, transfer_score=score, metrics=metrics.to_dict(), improvement_pct=improvement_pct, transforms=actual_transforms if actual_transforms else None, ) except Exception as e: self._log(f" Warning: Failed to evaluate {pp_name}: {e}", level=2) return None results: List[TransferResult] = [] # Use sequential execution if n_jobs=1 if self._effective_n_jobs == 1: for pp_item in pp_items: self._log(f" Evaluating: {pp_item[0]}", level=2) result = evaluate_single(pp_item) if result is not None: results.append(result) else: # Parallel execution with ThreadPoolExecutor with ThreadPoolExecutor(max_workers=self._effective_n_jobs) as executor: futures = { executor.submit(evaluate_single, pp_item): pp_item[0] for pp_item in pp_items } for future in as_completed(futures): pp_name = futures[future] try: result = future.result() if result is not None: results.append(result) except Exception as e: self._log( f" Warning: Failed to evaluate {pp_name}: {e}", level=2 ) return results def _expand_preprocessing_spec( self, ) -> Tuple[List[Any], List[List[Any]], List[List[Any]]]: """ Expand preprocessing_spec using the nirs4all generator. Handles both object-based and string-based preprocessing specs. Objects are used directly; strings are resolved via the preprocessings dict. Returns: Tuple of: - single_preprocessings: List of single transforms [SNV(), D1()] - stacked_pipelines: List of stacked transforms [[SNV(), D1()], ...] - augmented_combinations: List of augmentation combos [[SNV()], [D1()], ...] """ from nirs4all.pipeline.config.generator import ( ARRANGE_KEYWORD, PICK_KEYWORD, expand_spec, ) if self.preprocessing_spec is None: return [], [], [] expanded = expand_spec(self.preprocessing_spec, seed=self.random_state) # Determine if the spec used pick (augmentation) or arrange (stacking) spec = self.preprocessing_spec uses_pick = PICK_KEYWORD in spec if isinstance(spec, dict) else False uses_arrange = ARRANGE_KEYWORD in spec if isinstance(spec, dict) else False singles: List[Any] = [] stacked: List[List[Any]] = [] augmented: List[List[Any]] = [] # Helper to normalize an item (object or string) to a transform object def to_transform(item: Any) -> Any: """Convert item to transform object.""" if item is None: return None if isinstance(item, str): if item == "None": return None # Try to resolve from preprocessings dict return normalize_preprocessing(item, self.preprocessings) # Already an object return item for item in expanded: if isinstance(item, list): # Convert each element to a transform transforms = [to_transform(x) for x in item] # Filter out None values transforms = [t for t in transforms if t is not None] # Skip empty combinations (all None) if len(transforms) == 0: continue if len(transforms) == 1: singles.append(transforms[0]) elif uses_pick and not uses_arrange: # Augmentation: keep as separate transforms augmented.append(transforms) else: # Stacking: keep as ordered sequence stacked.append(transforms) else: # Single item (could be object or string) transform = to_transform(item) if transform is not None: singles.append(transform) # Remove duplicates while preserving order (using signature for identity) seen_singles: set = set() unique_singles: List[Any] = [] for t in singles: sig = get_transform_signature(t) if sig not in seen_singles: seen_singles.add(sig) unique_singles.append(t) singles = unique_singles seen_stacked: set = set() unique_stacked: List[List[Any]] = [] for pipeline in stacked: sig = ">".join(get_transform_signature(t) for t in pipeline) if sig not in seen_stacked: seen_stacked.add(sig) unique_stacked.append(pipeline) stacked = unique_stacked # For augmented, use order-independent key for deduplication seen_aug: set = set() unique_augmented: List[List[Any]] = [] for combo in augmented: key = tuple(sorted(get_transform_signature(t) for t in combo)) if key not in seen_aug: seen_aug.add(key) unique_augmented.append(combo) augmented = unique_augmented self._log( f" Generator expanded: {len(singles)} singles, " f"{len(stacked)} stacked, {len(augmented)} augmented", level=1 ) return singles, stacked, augmented def _prepare_generator_preprocessings(self) -> None: """ Prepare preprocessing lists from generator spec. Expands the preprocessing_spec and populates: - self._expanded_singles - self._expanded_stacked - self._expanded_augmented """ if not self.use_generator or self.preprocessing_spec is None: self._expanded_singles = None self._expanded_stacked = None self._expanded_augmented = None return self._log("Expanding preprocessing specification...") ( self._expanded_singles, self._expanded_stacked, self._expanded_augmented, ) = self._expand_preprocessing_spec()
[docs] def fit( self, X_source_or_config, X_target: Optional[np.ndarray] = None, y_source: Optional[np.ndarray] = None, y_target: Optional[np.ndarray] = None, ) -> TransferSelectionResults: """ Run transfer-optimized preprocessing selection. Supports two calling conventions: 1. **Raw arrays** (original API): selector.fit(X_source, X_target, y_source, y_target) 2. **DatasetConfigs** (nirs4all-native API): selector.fit(dataset_config) - Single dataset: Uses train as source, test as target - Multiple datasets: Combines X("all") from all datasets Args: X_source_or_config: Either: - np.ndarray: Source dataset (n_samples_src, n_features) - DatasetConfigs: nirs4all dataset configuration X_target: Target dataset (required if X_source_or_config is array). y_source: Optional source targets for supervised validation. y_target: Optional target labels for supervised validation. Returns: TransferSelectionResults with ranked recommendations. Example: >>> # Using DatasetConfigs (recommended nirs4all way) >>> selector = TransferPreprocessingSelector(preset="balanced") >>> results = selector.fit(DatasetConfigs(data_path)) >>> pp_list = results.to_preprocessing_list(top_k=10) >>> # Using raw arrays >>> results = selector.fit(X_train, X_test, y_train) """ # Determine calling convention if X_target is None: # DatasetConfigs mode X_source, X_target, y_source, y_target = self._extract_data_from_dataset_configs( X_source_or_config ) else: # Raw arrays mode X_source = X_source_or_config # Validate inputs X_source, X_target = validate_datasets(X_source, X_target) timing: Dict[str, float] = {} all_results: List[TransferResult] = [] # Prepare generator-based preprocessings if enabled if self.use_generator: t0 = time.time() self._prepare_generator_preprocessings() timing["generator"] = time.time() - t0 # Compute raw baseline metrics self._log("Computing baseline (raw) metrics...") t0 = time.time() self.raw_metrics_ = self.metrics_computer.compute( X_source, X_target, compute_trust=True ) raw_metrics_dict = self.raw_metrics_.to_dict() timing["baseline"] = time.time() - t0 # Stage 1: Single preprocessing evaluation self._log("\n=== Stage 1: Single Preprocessing Evaluation ===") t0 = time.time() stage1_results = self._stage1_single_evaluation(X_source, X_target) timing["stage1"] = time.time() - t0 all_results.extend(stage1_results) self._log( f"Stage 1 complete: {len(stage1_results)} preprocessings " f"in {timing['stage1']:.2f}s" ) # Generator mode: Evaluate pre-generated stacked pipelines gen_stacked_results: List[TransferResult] = [] if self.use_generator and self._expanded_stacked: self._log("\n=== Stage 1b: Generator-Based Stacked Evaluation ===") t0 = time.time() gen_stacked_results = self._evaluate_generator_stacked( X_source, X_target ) timing["stage1b_gen_stacked"] = time.time() - t0 all_results.extend(gen_stacked_results) self._log( f"Stage 1b complete: {len(gen_stacked_results)} generator-stacked " f"in {timing['stage1b_gen_stacked']:.2f}s" ) # Stage 2: Stacking evaluation if self.run_stage2: if not (self.use_generator and self._expanded_stacked): # Combinatoric mode self._log("\n=== Stage 2: Stacking Evaluation ===") t0 = time.time() stage2_results = self._stage2_stacking_evaluation( X_source, X_target, stage1_results ) timing["stage2"] = time.time() - t0 all_results.extend(stage2_results) self._log( f"Stage 2 complete: {len(stage2_results)} stacked " f"in {timing['stage2']:.2f}s" ) else: # Generator mode with stage2 enabled: extend with top-K self._log("\n=== Stage 2: Extended Stacking Evaluation ===") t0 = time.time() stage2_results = self._stage2_stacking_evaluation( X_source, X_target, stage1_results + gen_stacked_results ) timing["stage2"] = time.time() - t0 all_results.extend(stage2_results) self._log( f"Stage 2 complete: {len(stage2_results)} additional stacked " f"in {timing['stage2']:.2f}s" ) # Generator mode: Evaluate pre-generated augmentation combinations if self.use_generator and self._expanded_augmented: self._log("\n=== Stage 2b: Generator-Based Augmentation Evaluation ===") t0 = time.time() gen_aug_results = self._evaluate_generator_augmented( X_source, X_target ) timing["stage2b_gen_aug"] = time.time() - t0 all_results.extend(gen_aug_results) self._log( f"Stage 2b complete: {len(gen_aug_results)} generator-augmented " f"in {timing['stage2b_gen_aug']:.2f}s" ) # Stage 3: Augmentation evaluation if self.run_stage3: self._log("\n=== Stage 3: Augmentation Evaluation ===") t0 = time.time() stage3_results = self._stage3_augmentation_evaluation( X_source, X_target, all_results ) timing["stage3"] = time.time() - t0 all_results.extend(stage3_results) self._log( f"Stage 3 complete: {len(stage3_results)} augmented " f"in {timing['stage3']:.2f}s" ) # Stage 4: Supervised validation if self.run_stage4 and y_source is not None: self._log("\n=== Stage 4: Supervised Validation ===") t0 = time.time() all_results = self._stage4_supervised_validation( X_source, y_source, all_results ) timing["stage4"] = time.time() - t0 self._log(f"Stage 4 complete in {timing['stage4']:.2f}s") # Filter out results with NaN transfer scores (invalid preprocessing results) valid_results = [ r for r in all_results if r.transfer_score is not None and not np.isnan(r.transfer_score) ] invalid_count = len(all_results) - len(valid_results) if invalid_count > 0: self._log( f" Filtered {invalid_count} results with invalid transfer scores" ) all_results = valid_results # Sort by transfer score (higher is better) all_results.sort(key=lambda r: r.transfer_score, reverse=True) # Create results object self.results_ = TransferSelectionResults( ranking=all_results, raw_metrics=raw_metrics_dict, timing=timing, ) # Print summary if self.verbose >= 1: self._log("\n" + self.results_.summary()) return self.results_
def _stage1_single_evaluation( self, X_source: np.ndarray, X_target: np.ndarray, ) -> List[TransferResult]: """ Stage 1: Evaluate all single preprocessings. Args: X_source: Source dataset. X_target: Target dataset. Returns: List of TransferResult for each preprocessing. """ # Determine which preprocessings to evaluate if self.use_generator and self._expanded_singles: # Object-based: use expanded transforms directly pp_items = [ (get_transform_name(t), t) for t in self._expanded_singles ] self._log( f" Using generator-specified preprocessings: {len(pp_items)} items" ) else: pp_items = list(self.preprocessings.items()) # Log parallelization info if self._effective_n_jobs > 1: self._log(f" Parallel evaluation with {self._effective_n_jobs} workers...") # Use parallel evaluation helper return self._parallel_evaluate_preprocessings( X_source, X_target, pp_items, pipeline_type="single" ) def _evaluate_generator_stacked( self, X_source: np.ndarray, X_target: np.ndarray, ) -> List[TransferResult]: """ Evaluate stacked pipelines from generator specification. Uses parallel evaluation when n_jobs != 1 for improved performance. Args: X_source: Source dataset. X_target: Target dataset. Returns: List of TransferResult for each stacked pipeline. """ if not self._expanded_stacked: return [] self._log( f" Evaluating {len(self._expanded_stacked)} generator-stacked pipelines..." ) # Log parallelization info if self._effective_n_jobs > 1: self._log(f" Parallel evaluation with {self._effective_n_jobs} workers...") # Prepare items for parallel evaluation: (name, transforms_list) # Name is derived from transform types for display pp_items = [ (">".join(get_transform_name(t) for t in transforms), transforms) for transforms in self._expanded_stacked ] return self._parallel_evaluate_preprocessings( X_source, X_target, pp_items, pipeline_type="stacked" ) def _evaluate_generator_augmented( self, X_source: np.ndarray, X_target: np.ndarray, ) -> List[TransferResult]: """ Evaluate augmentation combinations from generator specification. Uses parallel evaluation when n_jobs != 1 for improved performance. Args: X_source: Source dataset. X_target: Target dataset. Returns: List of TransferResult for each augmented combination. """ if not self._expanded_augmented: return [] self._log( f" Evaluating {len(self._expanded_augmented)} generator-augmented " "combinations..." ) # Log parallelization info if self._effective_n_jobs > 1: self._log(f" Parallel evaluation with {self._effective_n_jobs} workers...") # Prepare items for parallel evaluation: (name, transforms_list) pp_items = [ ("+".join(get_transform_name(t) for t in transforms), transforms) for transforms in self._expanded_augmented ] return self._parallel_evaluate_preprocessings( X_source, X_target, pp_items, pipeline_type="augmented" ) def _stage2_stacking_evaluation( self, X_source: np.ndarray, X_target: np.ndarray, stage1_results: List[TransferResult], ) -> List[TransferResult]: """ Stage 2: Evaluate stacked pipeline combinations. Uses parallel evaluation when n_jobs != 1 for improved performance. Args: X_source: Source dataset. X_target: Target dataset. stage1_results: Results from Stage 1. Returns: List of TransferResult for stacked pipelines. """ # Select top-K from Stage 1 stage1_sorted = sorted( stage1_results, key=lambda r: r.transfer_score, reverse=True ) if self.stage2_top_k is not None: top_k_results = stage1_sorted[:self.stage2_top_k] else: top_k_results = stage1_sorted top_k_names = [r.name for r in top_k_results] self._log( f" Generating stacked combinations from top-{len(top_k_names)}: " f"{top_k_names[:5]}..." ) # Generate stacked pipelines stacked_pipelines = generate_top_k_stacked_pipelines( top_k_names, self.preprocessings, max_depth=self.stage2_max_depth, ) self._log(f" Evaluating {len(stacked_pipelines)} stacked pipelines...") # Log parallelization info if self._effective_n_jobs > 1: self._log(f" Parallel evaluation with {self._effective_n_jobs} workers...") # Prepare items for parallel evaluation: (name, name) for stacked lookup pp_items = [(pp_name, pp_name) for pp_name, _, _ in stacked_pipelines] return self._parallel_evaluate_preprocessings( X_source, X_target, pp_items, pipeline_type="stacked" ) def _stage3_augmentation_evaluation( self, X_source: np.ndarray, X_target: np.ndarray, previous_results: List[TransferResult], ) -> List[TransferResult]: """ Stage 3: Evaluate feature augmentation combinations. Uses parallel evaluation when n_jobs != 1 for improved performance. Args: X_source: Source dataset. X_target: Target dataset. previous_results: Results from Stage 1 and 2. Returns: List of TransferResult for augmented pipelines. """ # Select top-K diverse candidates for augmentation sorted_results = sorted( previous_results, key=lambda r: r.transfer_score, reverse=True ) top_k_results = sorted_results[:self.stage3_top_k] top_k_names = [r.name for r in top_k_results] self._log( f" Generating augmentation combinations from top-{len(top_k_names)}..." ) # Generate augmentation combinations aug_combinations = generate_augmentation_combinations( top_k_names, max_order=self.stage3_max_order, ) self._log(f" Evaluating {len(aug_combinations)} augmented pipelines...") # Log parallelization info if self._effective_n_jobs > 1: self._log(f" Parallel evaluation with {self._effective_n_jobs} workers...") # Prepare items for parallel evaluation: (name, components_list) pp_items = [(aug_name, components) for aug_name, components in aug_combinations] return self._parallel_evaluate_preprocessings( X_source, X_target, pp_items, pipeline_type="augmented" ) def _stage4_supervised_validation( self, X_source: np.ndarray, y_source: np.ndarray, previous_results: List[TransferResult], ) -> List[TransferResult]: """ Stage 4: Supervised validation with proxy models. Args: X_source: Source dataset. y_source: Source targets. previous_results: Results from previous stages. Returns: List of TransferResult with signal_score populated. """ from sklearn.cross_decomposition import PLSRegression from sklearn.decomposition import PCA from sklearn.linear_model import RidgeCV from sklearn.model_selection import cross_val_score from sklearn.preprocessing import StandardScaler # Select top-K candidates for validation sorted_results = sorted( previous_results, key=lambda r: r.transfer_score, reverse=True ) top_k = min(self.stage4_top_k, len(sorted_results)) candidates = sorted_results[:top_k] self._log(f" Validating top-{top_k} candidates with proxy models...") # Determine task type n_unique = len(np.unique(y_source)) is_classification = n_unique < 10 validated_results = [] for result in candidates: self._log(f" Validating: {result.name}", level=2) try: # Apply preprocessing to source data if result.pipeline_type == "augmented": X_pp = apply_augmentation( X_source, result.components, self.preprocessings ) else: X_pp = apply_stacked_pipeline( X_source, result.name, self.preprocessings ) # Standardize features scaler = StandardScaler() X_scaled = scaler.fit_transform(X_pp) # Reduce dimensionality if needed if X_scaled.shape[1] > X_scaled.shape[0]: n_comp = min(50, X_scaled.shape[0] - 1) pca = PCA(n_components=n_comp, random_state=self.random_state) X_scaled = pca.fit_transform(X_scaled) # Evaluate with proxy models scores = [] # Ridge regression if "ridge" in self.stage4_models: try: ridge = RidgeCV(alphas=[0.1, 1.0, 10.0, 100.0]) cv_scores = cross_val_score( ridge, X_scaled, y_source, cv=self.stage4_cv_folds, scoring="r2" ) scores.append(float(np.clip(np.mean(cv_scores), 0, 1))) except Exception: pass # PLS regression if "pls" in self.stage4_models: try: n_components = min( 10, X_scaled.shape[1], X_scaled.shape[0] - 1 ) pls = PLSRegression(n_components=n_components) cv_scores = cross_val_score( pls, X_scaled, y_source, cv=self.stage4_cv_folds, scoring="r2" ) scores.append(float(np.clip(np.mean(cv_scores), 0, 1))) except Exception: pass # KNN (optional) if "knn" in self.stage4_models: try: from sklearn.neighbors import ( KNeighborsClassifier, KNeighborsRegressor, ) if is_classification: knn = KNeighborsClassifier(n_neighbors=5) scoring = "accuracy" else: knn = KNeighborsRegressor(n_neighbors=5) scoring = "r2" cv_scores = cross_val_score( knn, X_scaled, y_source, cv=self.stage4_cv_folds, scoring=scoring ) scores.append(float(np.clip(np.mean(cv_scores), 0, 1))) except Exception: pass # XGBoost (optional) if "xgb" in self.stage4_models: try: from xgboost import XGBClassifier, XGBRegressor if is_classification: xgb = XGBClassifier( n_estimators=50, max_depth=3, verbosity=0, n_jobs=1, random_state=self.random_state ) scoring = "accuracy" else: xgb = XGBRegressor( n_estimators=50, max_depth=3, verbosity=0, n_jobs=1, random_state=self.random_state ) scoring = "r2" cv_scores = cross_val_score( xgb, X_scaled, y_source, cv=self.stage4_cv_folds, scoring=scoring ) scores.append(float(np.clip(np.mean(cv_scores), 0, 1))) except ImportError: pass except Exception: pass # Compute composite signal score signal_score = float(np.mean(scores)) if scores else 0.0 # Create updated result with signal score validated_result = TransferResult( name=result.name, pipeline_type=result.pipeline_type, components=result.components, transfer_score=result.transfer_score, metrics=result.metrics, improvement_pct=result.improvement_pct, signal_score=signal_score, ) validated_results.append(validated_result) except Exception as e: self._log( f" Warning: Failed to validate {result.name}: {e}", level=2 ) validated_results.append(result) # Add remaining non-validated results non_validated = sorted_results[top_k:] validated_results.extend(non_validated) return validated_results
[docs] def fit_from_configs( self, config_source, config_target, partition: str = "train", ) -> TransferSelectionResults: """ Fit from DatasetConfigs or SpectroDataset. Args: config_source: DatasetConfigs or SpectroDataset for source dataset. config_target: DatasetConfigs or SpectroDataset for target dataset. partition: Which partition to use ('train' or 'test'). Returns: TransferSelectionResults with ranked recommendations. Example: >>> from nirs4all.data.config import DatasetConfigs >>> config_src = DatasetConfigs("path/to/source.json") >>> config_tgt = DatasetConfigs("path/to/target.json") >>> selector = TransferPreprocessingSelector() >>> results = selector.fit_from_configs(config_src, config_tgt) """ X_source, y_source = self._extract_data_from_config(config_source, partition) X_target, y_target = self._extract_data_from_config(config_target, partition) return self.fit(X_source, X_target, y_source, y_target)
def _extract_data_from_config( self, config, partition: str = "train", ) -> Tuple[np.ndarray, Optional[np.ndarray]]: """ Extract X, y from various nirs4all data structures. Args: config: Data configuration object. partition: Partition to extract ('train' or 'test'). Returns: Tuple of (X, y) where y may be None. """ # Handle SpectroDataset (uses x() and y() methods with selector dict) if hasattr(config, "x") and hasattr(config, "y"): try: samples = config.x({"partition": partition}) if samples is not None: X = samples[0] if isinstance(samples, tuple) else np.asarray(samples) else: raise ValueError("Could not extract samples from SpectroDataset") except Exception as e: raise ValueError(f"Could not extract samples from SpectroDataset: {e}") try: y = config.y({"partition": partition}) y = np.asarray(y).ravel() if y is not None else None except Exception: y = None return np.asarray(X), y # Handle DatasetConfigs if hasattr(config, "get_datasets") or hasattr(config, "iter_datasets"): if hasattr(config, "get_dataset_at"): dataset = config.get_dataset_at(0) elif hasattr(config, "get_datasets"): datasets = config.get_datasets() dataset = datasets[0] if datasets else None else: dataset = next(config.iter_datasets()) if dataset is None: raise ValueError("Could not extract dataset from DatasetConfigs") return self._extract_data_from_config(dataset, partition) # Handle dict-like config if isinstance(config, dict): X = None for key in ["X", "x", "samples"]: if key in config and config[key] is not None: X = config[key] break y = None for key in ["y", "Y", "targets"]: if key in config and config[key] is not None: y = config[key] break if X is None: raise ValueError("Dict config must have 'X' or 'samples' key") return np.asarray(X), np.asarray(y) if y is not None else None # Handle raw numpy array if isinstance(config, np.ndarray): return config, None raise ValueError( f"Unsupported config type: {type(config)}. " "Expected DatasetConfigs, SpectroDataset, dict, or numpy array." ) def _extract_data_from_dataset_configs( self, config, ) -> Tuple[np.ndarray, np.ndarray, Optional[np.ndarray], Optional[np.ndarray]]: """ Extract X_source, X_target, y_source, y_target from DatasetConfigs. Handles two cases: 1. Single dataset: Uses train partition as source, test as target 2. Multiple datasets: Combines all samples from each dataset Args: config: DatasetConfigs or similar nirs4all data structure. Returns: Tuple of (X_source, X_target, y_source, y_target). """ # Handle DatasetConfigs if hasattr(config, "get_datasets") or hasattr(config, "iter_datasets"): if hasattr(config, "get_datasets"): datasets = config.get_datasets() else: datasets = list(config.iter_datasets()) if not datasets: raise ValueError("DatasetConfigs contains no datasets") if len(datasets) == 1: # Single dataset: train as source, test as target dataset = datasets[0] X_source, y_source = self._extract_samples_from_dataset( dataset, partition="train" ) X_target, y_target = self._extract_samples_from_dataset( dataset, partition="test" ) return X_source, X_target, y_source, y_target else: # Multiple datasets: combine all samples from each all_X = [] all_y = [] for ds in datasets: X_all, y_all = self._extract_samples_from_dataset(ds, partition="all") all_X.append(X_all) if y_all is not None: all_y.append(y_all) # Use first dataset as source, rest combined as target X_source = all_X[0] y_source = all_y[0] if all_y else None X_target = np.vstack(all_X[1:]) y_target = np.concatenate(all_y[1:]) if len(all_y) > 1 else None return X_source, X_target, y_source, y_target # Handle single SpectroDataset if hasattr(config, "get_samples"): X_source, y_source = self._extract_samples_from_dataset( config, partition="train" ) X_target, y_target = self._extract_samples_from_dataset( config, partition="test" ) return X_source, X_target, y_source, y_target raise ValueError( f"Cannot extract data from {type(config)}. " "Expected DatasetConfigs or SpectroDataset." ) def _extract_samples_from_dataset( self, dataset, partition: str = "train", ) -> Tuple[np.ndarray, Optional[np.ndarray]]: """ Extract X, y from a SpectroDataset for a given partition. Args: dataset: SpectroDataset instance. partition: Partition to extract ('train', 'test', or 'all'). Returns: Tuple of (X, y) where y may be None. """ # Get samples using SpectroDataset.x() method if partition == "all": # Combine train and test try: X_train = dataset.x({"partition": "train"}) except Exception: X_train = None try: X_test = dataset.x({"partition": "test"}) except Exception: X_test = None if X_train is not None and X_test is not None: # Handle tuple returns (multi-source) if isinstance(X_train, tuple): X_train = X_train[0] if isinstance(X_test, tuple): X_test = X_test[0] X = np.vstack([np.asarray(X_train), np.asarray(X_test)]) elif X_train is not None: X = np.asarray(X_train[0] if isinstance(X_train, tuple) else X_train) elif X_test is not None: X = np.asarray(X_test[0] if isinstance(X_test, tuple) else X_test) else: raise ValueError("Dataset has no samples") else: try: samples = dataset.x({"partition": partition}) except Exception: raise ValueError(f"No samples found for partition '{partition}'") if samples is None: raise ValueError(f"No samples found for partition '{partition}'") X = samples[0] if isinstance(samples, tuple) else np.asarray(samples) # Get targets using SpectroDataset.y() method y = None try: if partition == "all": y_train = dataset.y({"partition": "train"}) y_test = dataset.y({"partition": "test"}) if y_train is not None and y_test is not None: y = np.concatenate([ np.asarray(y_train).ravel(), np.asarray(y_test).ravel() ]) elif y_train is not None: y = np.asarray(y_train).ravel() elif y_test is not None: y = np.asarray(y_test).ravel() else: y_data = dataset.y({"partition": partition}) if y_data is not None: y = np.asarray(y_data).ravel() except Exception: pass return np.asarray(X), y
[docs] def get_preprocessing_by_name(self, name: str) -> Any: """ Get a preprocessing transform by name. Args: name: Preprocessing name (e.g., "snv", "snv>d1"). Returns: Transformer or list of transformers for stacked pipelines. """ if ">" in name: components = name.split(">") return [self.preprocessings[c] for c in components] else: return self.preprocessings.get(name)