Source code for nirs4all.data.synthetic.targets

"""
Target generation for synthetic NIRS datasets.

This module provides tools for generating target variables for regression
and classification tasks, with configurable distributions and class separation.

Example:
    >>> from nirs4all.data.synthetic.targets import TargetGenerator
    >>>
    >>> generator = TargetGenerator(random_state=42)
    >>>
    >>> # Regression targets
    >>> y = generator.regression(
    ...     n_samples=100,
    ...     concentrations=C,  # From spectra generation
    ...     distribution="lognormal",
    ...     range=(0, 100)
    ... )
    >>>
    >>> # Classification with separable classes
    >>> y = generator.classification(
    ...     n_samples=100,
    ...     concentrations=C,
    ...     n_classes=3,
    ...     separation=2.0
    ... )
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, List, Literal, Optional, Tuple, Union

import numpy as np
from scipy import stats


[docs] @dataclass class ClassSeparationConfig: """ Configuration for class separation in classification tasks. Attributes: separation: Separation factor (higher = more separable). Values around 0.5-1.0 create overlapping classes. Values around 2.0-3.0 create well-separated classes. method: How to create class differences: - "component": Different component concentration profiles per class. - "shift": Systematic spectral shifts between classes. - "intensity": Different overall intensity levels. noise: Noise level to add to class boundaries. """ separation: float = 1.5 method: Literal["component", "shift", "intensity"] = "component" noise: float = 0.1
[docs] class TargetGenerator: """ Generate target variables for synthetic NIRS datasets. This class creates both regression targets (continuous values correlated with component concentrations) and classification targets (discrete labels with controllable class separation). Attributes: rng: NumPy random generator for reproducibility. Args: random_state: Random seed for reproducibility. Example: >>> generator = TargetGenerator(random_state=42) >>> >>> # Generate concentrations first (from SyntheticNIRSGenerator) >>> C = np.random.rand(100, 5) # 5 components >>> >>> # Regression targets scaled to percentage >>> y = generator.regression( ... n_samples=100, ... concentrations=C, ... component=0, # Use first component ... range=(0, 100) ... ) >>> >>> # Multi-class classification >>> y = generator.classification( ... n_samples=100, ... concentrations=C, ... n_classes=4, ... separation=2.0 ... ) """ def __init__(self, random_state: Optional[int] = None) -> None: """ Initialize the target generator. Args: random_state: Random seed for reproducibility. """ self.rng = np.random.default_rng(random_state) self._random_state = random_state
[docs] def regression( self, n_samples: int, concentrations: Optional[np.ndarray] = None, *, distribution: Literal["uniform", "normal", "lognormal", "bimodal"] = "uniform", range: Optional[Tuple[float, float]] = None, component: Optional[Union[int, str, List[int]]] = None, component_names: Optional[List[str]] = None, correlation: float = 0.9, noise: float = 0.1, transform: Optional[Literal["log", "sqrt"]] = None, ) -> np.ndarray: """ Generate regression target values. Args: n_samples: Number of samples. concentrations: Component concentration matrix (n_samples, n_components). If None, generates random base values. distribution: Target value distribution. range: (min, max) for scaling targets. component: Which component(s) to use as target: - None: Weighted combination of all components - int: Use component at that index - str: Use component with that name (requires component_names) - List[int]: Multi-output using specified component indices component_names: Names of components (for string component selection). correlation: Correlation between concentrations and targets (0-1). noise: Noise level to add. transform: Optional transformation ('log', 'sqrt'). Returns: Target values array. Shape (n_samples,) for single target, or (n_samples, n_targets) for multi-output. Example: >>> y = generator.regression( ... 100, C, ... distribution="lognormal", ... range=(5, 50), ... component="protein", ... component_names=["water", "protein", "lipid"] ... ) """ # Generate base values from concentrations or random if concentrations is not None: base = self._concentrations_to_base( concentrations, component, component_names ) else: base = self.rng.uniform(0, 1, size=n_samples) if range is not None: base = base.reshape(-1, 1) if base.ndim == 1 else base # Apply distribution transformation y = self._apply_distribution(base, distribution) # Scale to range if range is not None: y = self._scale_to_range(y, range) # Add noise (maintaining correlation) if noise > 0 and correlation < 1.0: y = self._add_controlled_noise(y, correlation, noise) # Apply optional transformation if transform == "log": y = np.log1p(np.maximum(y, 0)) elif transform == "sqrt": y = np.sqrt(np.maximum(y, 0)) # Flatten if single target if y.ndim > 1 and y.shape[1] == 1: y = y.ravel() return y
[docs] def classification( self, n_samples: int, concentrations: Optional[np.ndarray] = None, *, n_classes: int = 2, class_weights: Optional[List[float]] = None, separation: float = 1.5, separation_method: Literal["component", "threshold", "cluster"] = "component", class_names: Optional[List[str]] = None, return_proba: bool = False, ) -> Union[np.ndarray, Tuple[np.ndarray, np.ndarray]]: """ Generate classification target labels with controllable class separation. The separation parameter controls how distinguishable classes are in feature space. Higher values create more separable classes. Args: n_samples: Number of samples. concentrations: Component concentration matrix. n_classes: Number of classes to generate. class_weights: Class proportions (should sum to 1.0). If None, uses balanced classes. separation: Class separation factor: - 0.5-1.0: Overlapping classes (challenging) - 1.5-2.0: Moderate separation (realistic) - 2.5+: Well-separated classes (easy) separation_method: How to create class differences: - "component": Each class has distinct component profiles - "threshold": Classes based on concentration thresholds - "cluster": K-means-like cluster assignment class_names: Optional string labels for classes. return_proba: If True, also return class probabilities. Returns: If return_proba=False: Integer class labels (n_samples,). If return_proba=True: Tuple of (labels, probabilities). Example: >>> # Binary classification with balanced classes >>> y = generator.classification(100, C, n_classes=2) >>> >>> # 3-class with imbalanced weights >>> y = generator.classification( ... 100, C, ... n_classes=3, ... class_weights=[0.5, 0.3, 0.2], ... separation=2.0 ... ) """ if n_classes < 2: raise ValueError(f"n_classes must be >= 2, got {n_classes}") if class_weights is not None: if len(class_weights) != n_classes: raise ValueError( f"class_weights length ({len(class_weights)}) must match " f"n_classes ({n_classes})" ) if abs(sum(class_weights) - 1.0) > 0.01: raise ValueError( f"class_weights must sum to 1.0, got {sum(class_weights)}" ) # Generate class labels based on method if separation_method == "component": labels, proba = self._classify_by_component_profile( n_samples, concentrations, n_classes, class_weights, separation ) elif separation_method == "threshold": labels, proba = self._classify_by_threshold( n_samples, concentrations, n_classes, class_weights, separation ) elif separation_method == "cluster": labels, proba = self._classify_by_clustering( n_samples, concentrations, n_classes, class_weights, separation ) else: raise ValueError(f"Unknown separation_method: '{separation_method}'") if return_proba: return labels, proba return labels
def _concentrations_to_base( self, concentrations: np.ndarray, component: Optional[Union[int, str, List[int]]], component_names: Optional[List[str]], ) -> np.ndarray: """Extract base values from concentration matrix.""" if component is None: # Weighted combination of all components weights = self.rng.dirichlet(np.ones(concentrations.shape[1])) return concentrations @ weights elif isinstance(component, str): if component_names is None: raise ValueError( "component_names required when component is specified as string" ) idx = component_names.index(component) return concentrations[:, idx] elif isinstance(component, int): return concentrations[:, component] elif isinstance(component, list): return concentrations[:, component] else: raise ValueError(f"Invalid component specification: {component}") def _apply_distribution( self, base: np.ndarray, distribution: str, ) -> np.ndarray: """Transform base values to target distribution.""" if base.ndim == 1: base = base.reshape(-1, 1) n_samples, n_targets = base.shape if distribution == "uniform": # Already uniform - just ensure range [0, 1] return base / base.max(axis=0, keepdims=True) elif distribution == "normal": # Transform to approximate normal via inverse CDF # Rank transform to uniform, then to normal result = np.zeros_like(base) for j in range(n_targets): ranks = stats.rankdata(base[:, j]) / (n_samples + 1) result[:, j] = stats.norm.ppf(ranks) return result elif distribution == "lognormal": # Log-normal: positively skewed # Ensure positive base values base_pos = np.maximum(base, 1e-10) return np.exp(base_pos * 2 - 1) # Scale and shift elif distribution == "bimodal": # Create bimodal distribution result = np.zeros_like(base) for j in range(n_targets): # Split samples into two modes mid = np.median(base[:, j]) low_mask = base[:, j] <= mid high_mask = ~low_mask # Shift modes apart result[low_mask, j] = base[low_mask, j] * 0.5 result[high_mask, j] = base[high_mask, j] * 0.5 + 0.5 return result else: raise ValueError(f"Unknown distribution: '{distribution}'") def _scale_to_range( self, y: np.ndarray, range: Tuple[float, float], ) -> np.ndarray: """Scale values to specified range.""" min_val, max_val = range # Handle edge case of constant values y_min, y_max = y.min(), y.max() if y_max - y_min < 1e-10: return np.full_like(y, (min_val + max_val) / 2) # Linear scaling return (y - y_min) / (y_max - y_min) * (max_val - min_val) + min_val def _add_controlled_noise( self, y: np.ndarray, target_correlation: float, noise_std: float, ) -> np.ndarray: """Add noise while maintaining target correlation with original values.""" # The correlation controls how much of the signal vs noise # Higher correlation = less noise influence signal_weight = target_correlation noise_weight = np.sqrt(1 - target_correlation**2) noise = self.rng.normal(0, noise_std, size=y.shape) y_noisy = signal_weight * y + noise_weight * noise * np.std(y) return y_noisy def _classify_by_component_profile( self, n_samples: int, concentrations: Optional[np.ndarray], n_classes: int, class_weights: Optional[List[float]], separation: float, ) -> Tuple[np.ndarray, np.ndarray]: """ Classify samples based on component concentration profiles. Each class is characterized by a different dominant component or combination of components. """ if concentrations is None: # Generate random concentrations concentrations = self.rng.dirichlet( np.ones(n_classes), size=n_samples ) n_components = concentrations.shape[1] # Create class centroids in component space # Each class emphasizes different components centroids = np.zeros((n_classes, n_components)) for c in range(n_classes): # Primary component for this class primary = c % n_components centroids[c, primary] = separation # Add some values to other components for i in range(n_components): if i != primary: centroids[c, i] = self.rng.uniform(0, 0.3) # Compute distances to each centroid distances = np.zeros((n_samples, n_classes)) for c in range(n_classes): diff = concentrations - centroids[c] distances[:, c] = np.sqrt((diff ** 2).sum(axis=1)) # Convert distances to probabilities (inverse distance weighting) inv_dist = 1 / (distances + 1e-10) proba = inv_dist / inv_dist.sum(axis=1, keepdims=True) # Apply class weights by adjusting probabilities if class_weights is not None: weights = np.array(class_weights) proba = proba * weights proba = proba / proba.sum(axis=1, keepdims=True) # Assign labels (with some randomness based on separation) if separation >= 2.0: # High separation - deterministic assignment labels = proba.argmax(axis=1) else: # Lower separation - probabilistic assignment labels = np.array([ self.rng.choice(n_classes, p=p) for p in proba ]) return labels.astype(np.int32), proba def _classify_by_threshold( self, n_samples: int, concentrations: Optional[np.ndarray], n_classes: int, class_weights: Optional[List[float]], separation: float, ) -> Tuple[np.ndarray, np.ndarray]: """ Classify samples using concentration thresholds. Classes are assigned based on whether key concentrations are above or below certain thresholds. """ if concentrations is None: concentrations = self.rng.uniform(0, 1, size=(n_samples, n_classes)) # Use first component for thresholding values = concentrations[:, 0] # Determine thresholds based on class weights if class_weights is None: # Uniform thresholds percentiles = np.linspace(0, 100, n_classes + 1)[1:-1] else: # Weighted thresholds cumsum = np.cumsum(class_weights[:-1]) percentiles = cumsum * 100 thresholds = [np.percentile(values, p) for p in percentiles] # Add noise to thresholds based on separation threshold_noise = (1 - separation / 3) * np.std(values) * 0.5 # Assign labels labels = np.zeros(n_samples, dtype=np.int32) for i, threshold in enumerate(thresholds): noisy_threshold = threshold + self.rng.normal(0, threshold_noise) labels[values > noisy_threshold] = i + 1 # Compute approximate probabilities proba = np.zeros((n_samples, n_classes)) for c in range(n_classes): proba[labels == c, c] = 0.8 + self.rng.uniform(0, 0.2, size=(labels == c).sum()) for other in range(n_classes): if other != c: proba[labels == c, other] = self.rng.uniform( 0, 0.2 / (n_classes - 1), size=(labels == c).sum() ) proba = proba / proba.sum(axis=1, keepdims=True) return labels, proba def _classify_by_clustering( self, n_samples: int, concentrations: Optional[np.ndarray], n_classes: int, class_weights: Optional[List[float]], separation: float, ) -> Tuple[np.ndarray, np.ndarray]: """ Classify samples using k-means-like clustering in component space. """ if concentrations is None: concentrations = self.rng.uniform(0, 1, size=(n_samples, 5)) # Simple k-means-like assignment # Initialize centroids n_components = concentrations.shape[1] centroid_indices = self.rng.choice( n_samples, size=n_classes, replace=False ) centroids = concentrations[centroid_indices].copy() # Spread centroids based on separation for i in range(n_classes): direction = self.rng.normal(0, 1, size=n_components) direction = direction / np.linalg.norm(direction) centroids[i] += direction * separation * 0.2 # Assign to nearest centroid distances = np.zeros((n_samples, n_classes)) for c in range(n_classes): distances[:, c] = np.sqrt( ((concentrations - centroids[c]) ** 2).sum(axis=1) ) labels = distances.argmin(axis=1) # Adjust for class weights if specified if class_weights is not None: # Re-balance assignments labels = self._rebalance_labels(labels, n_classes, class_weights) # Compute probabilities from distances inv_dist = 1 / (distances + 1e-10) proba = inv_dist / inv_dist.sum(axis=1, keepdims=True) return labels.astype(np.int32), proba def _rebalance_labels( self, labels: np.ndarray, n_classes: int, class_weights: List[float], ) -> np.ndarray: """Rebalance label distribution to match target weights.""" n_samples = len(labels) target_counts = [int(w * n_samples) for w in class_weights] # Adjust to ensure sum equals n_samples diff = n_samples - sum(target_counts) for i in range(abs(diff)): target_counts[i % n_classes] += 1 if diff > 0 else -1 # Reassign labels to match target counts new_labels = labels.copy() current_counts = [np.sum(labels == c) for c in range(n_classes)] for c in range(n_classes): excess = current_counts[c] - target_counts[c] if excess > 0: # Move excess samples to other classes class_samples = np.where(labels == c)[0] samples_to_move = self.rng.choice( class_samples, size=excess, replace=False ) # Find classes that need more samples for sample_idx in samples_to_move: for other_c in range(n_classes): if current_counts[other_c] < target_counts[other_c]: new_labels[sample_idx] = other_c current_counts[c] -= 1 current_counts[other_c] += 1 break return new_labels
[docs] def generate_regression_targets( n_samples: int, concentrations: Optional[np.ndarray] = None, *, random_state: Optional[int] = None, distribution: str = "uniform", range: Optional[Tuple[float, float]] = None, ) -> np.ndarray: """ Convenience function for generating regression targets. Args: n_samples: Number of samples. concentrations: Component concentrations (optional). random_state: Random seed. distribution: Target distribution type. range: Value range (min, max). Returns: Target values array. """ generator = TargetGenerator(random_state=random_state) return generator.regression( n_samples=n_samples, concentrations=concentrations, distribution=distribution, range=range, )
[docs] def generate_classification_targets( n_samples: int, concentrations: Optional[np.ndarray] = None, *, random_state: Optional[int] = None, n_classes: int = 2, class_weights: Optional[List[float]] = None, separation: float = 1.5, ) -> np.ndarray: """ Convenience function for generating classification targets. Args: n_samples: Number of samples. concentrations: Component concentrations (optional). random_state: Random seed. n_classes: Number of classes. class_weights: Class proportions. separation: Class separation factor. Returns: Integer class labels array. """ generator = TargetGenerator(random_state=random_state) return generator.classification( n_samples=n_samples, concentrations=concentrations, n_classes=n_classes, class_weights=class_weights, separation=separation, )
[docs] @dataclass class NonLinearTargetConfig: """ Configuration for non-linear target complexity. Attributes: nonlinear_interactions: Type of non-linear interaction. interaction_strength: Blend factor (0=linear, 1=fully non-linear). hidden_factors: Latent variables not in spectra. polynomial_degree: Degree for polynomial interactions. signal_to_confound_ratio: Predictability from spectra. n_confounders: Confounding variables. spectral_masking: Signal in noisy regions. temporal_drift: Relationship changes over samples. n_regimes: Number of relationship regimes. regime_method: How to partition into regimes. regime_overlap: Transition zone smoothness. noise_heteroscedasticity: Per-regime noise variation. """ # Proposition 1: Non-linear interactions nonlinear_interactions: Literal["none", "polynomial", "synergistic", "antagonistic"] = "none" interaction_strength: float = 0.5 hidden_factors: int = 0 polynomial_degree: int = 2 # Proposition 2: Confounders signal_to_confound_ratio: float = 1.0 n_confounders: int = 0 spectral_masking: float = 0.0 temporal_drift: bool = False # Proposition 3: Multi-regime n_regimes: int = 1 regime_method: Literal["concentration", "spectral", "random"] = "concentration" regime_overlap: float = 0.2 noise_heteroscedasticity: float = 0.0
[docs] class NonLinearTargetProcessor: """ Process targets with non-linear relationships, confounders, and multi-regime landscapes. This class implements three propositions for making synthetic targets harder to predict: 1. **Non-linear interactions**: Polynomial, synergistic, or antagonistic effects. 2. **Spectral-target decoupling**: Confounders and partial predictability. 3. **Multi-regime landscapes**: Different relationships in different regions. Args: config: NonLinearTargetConfig with all settings. random_state: Random seed for reproducibility. Example: >>> config = NonLinearTargetConfig( ... nonlinear_interactions="polynomial", ... interaction_strength=0.7, ... n_regimes=3 ... ) >>> processor = NonLinearTargetProcessor(config, random_state=42) >>> y_complex = processor.process(C, y_base) """ def __init__( self, config: NonLinearTargetConfig, random_state: Optional[int] = None, ) -> None: self.config = config self.rng = np.random.default_rng(random_state) self._random_state = random_state
[docs] def process( self, concentrations: np.ndarray, y_base: np.ndarray, spectra: Optional[np.ndarray] = None, ) -> np.ndarray: """ Apply all configured complexity to base targets. Args: concentrations: Component concentration matrix (n_samples, n_components). y_base: Base target values (n_samples,) or (n_samples, n_targets). spectra: Optional spectra matrix for spectral-based regimes. Returns: Transformed target values with added complexity. """ y = y_base.copy() n_samples = len(y) # Ensure 1D for processing was_1d = y.ndim == 1 if was_1d: y = y.reshape(-1, 1) # Step 1: Apply non-linear interactions if self.config.nonlinear_interactions != "none": y = self._apply_nonlinear_interactions(concentrations, y) # Step 2: Add hidden factors (unpredictable component) if self.config.hidden_factors > 0: y = self._add_hidden_factors(y) # Step 3: Apply multi-regime transformation if self.config.n_regimes > 1: y = self._apply_multi_regime(concentrations, y, spectra) # Step 4: Apply confounders if self.config.n_confounders > 0: y = self._apply_confounders(concentrations, y) # Step 5: Apply temporal drift if self.config.temporal_drift: y = self._apply_temporal_drift(y) # Step 6: Apply signal-to-confound ratio (add irreducible noise) if self.config.signal_to_confound_ratio < 1.0: y = self._apply_signal_ratio(y) # Step 7: Apply heteroscedastic noise if self.config.noise_heteroscedasticity > 0: y = self._apply_heteroscedastic_noise(y, concentrations) # Restore original shape if was_1d and y.shape[1] == 1: y = y.ravel() return y
def _apply_nonlinear_interactions( self, C: np.ndarray, y: np.ndarray, ) -> np.ndarray: """Apply non-linear interaction terms.""" n_samples, n_components = C.shape strength = self.config.interaction_strength if self.config.nonlinear_interactions == "polynomial": # Generate polynomial features y_nonlinear = self._polynomial_transform(C, y) elif self.config.nonlinear_interactions == "synergistic": # Synergistic: combinations enhance effect y_nonlinear = self._synergistic_transform(C, y) elif self.config.nonlinear_interactions == "antagonistic": # Antagonistic: saturation/inhibition effects y_nonlinear = self._antagonistic_transform(C, y) else: return y # Blend linear and non-linear based on strength y_blended = (1 - strength) * y + strength * y_nonlinear return y_blended def _polynomial_transform( self, C: np.ndarray, y: np.ndarray, ) -> np.ndarray: """Generate polynomial interaction terms.""" n_samples, n_components = C.shape degree = self.config.polynomial_degree # Normalize concentrations to avoid numerical issues C_norm = (C - C.mean(axis=0)) / (C.std(axis=0) + 1e-8) # Build polynomial features poly_terms = [] # Quadratic terms: C_i^2 for i in range(n_components): poly_terms.append(C_norm[:, i] ** 2) # Interaction terms: C_i * C_j for i in range(n_components): for j in range(i + 1, n_components): poly_terms.append(C_norm[:, i] * C_norm[:, j]) # Cubic terms if degree >= 3 if degree >= 3: for i in range(n_components): poly_terms.append(C_norm[:, i] ** 3) for i in range(min(n_components, 3)): for j in range(i + 1, min(n_components, 3)): poly_terms.append(C_norm[:, i] ** 2 * C_norm[:, j]) # Combine terms with random weights poly_features = np.column_stack(poly_terms) if poly_terms else np.zeros((n_samples, 1)) weights = self.rng.standard_normal(poly_features.shape[1]) weights = weights / np.linalg.norm(weights) # Normalize # Combine with base y nonlinear_component = poly_features @ weights nonlinear_component = nonlinear_component.reshape(-1, 1) # Scale to match y range y_range = y.max() - y.min() if y_range > 0: nonlinear_component = ( (nonlinear_component - nonlinear_component.mean()) / (nonlinear_component.std() + 1e-8) * y.std() + y.mean() ) return nonlinear_component if y.shape[1] == 1 else np.tile(nonlinear_component, (1, y.shape[1])) def _synergistic_transform( self, C: np.ndarray, y: np.ndarray, ) -> np.ndarray: """Synergistic effects: combinations enhance the target non-linearly.""" n_samples, n_components = C.shape # Select pairs of components for synergy n_pairs = min(3, n_components * (n_components - 1) // 2) synergy_terms = [] for _ in range(n_pairs): i, j = self.rng.choice(n_components, 2, replace=False) # Synergistic term: sqrt(C_i * C_j) * (C_i + C_j) term = np.sqrt(C[:, i] * C[:, j] + 1e-8) * (C[:, i] + C[:, j]) synergy_terms.append(term) if synergy_terms: synergy = np.column_stack(synergy_terms) weights = self.rng.uniform(0.5, 1.5, len(synergy_terms)) synergy_effect = (synergy * weights).sum(axis=1, keepdims=True) # Scale and add to y synergy_effect = ( (synergy_effect - synergy_effect.mean()) / (synergy_effect.std() + 1e-8) * y.std() ) return y + synergy_effect return y def _antagonistic_transform( self, C: np.ndarray, y: np.ndarray, ) -> np.ndarray: """Antagonistic effects: saturation and inhibition (Michaelis-Menten-like).""" n_samples, n_components = C.shape # Apply Michaelis-Menten kinetics to primary component primary_idx = self.rng.integers(0, n_components) C_primary = C[:, primary_idx] # Vmax and Km parameters Vmax = y.max() * 1.2 Km = np.median(C_primary) * 0.5 # Michaelis-Menten: V = Vmax * [S] / (Km + [S]) mm_term = Vmax * C_primary / (Km + C_primary + 1e-8) # Optional competitive inhibition from another component if n_components > 1: inhibitor_idx = (primary_idx + 1) % n_components Ki = np.median(C[:, inhibitor_idx]) inhibition_factor = 1 / (1 + C[:, inhibitor_idx] / (Ki + 1e-8)) mm_term = mm_term * inhibition_factor mm_term = mm_term.reshape(-1, 1) # Scale to match y mm_term = ( (mm_term - mm_term.mean()) / (mm_term.std() + 1e-8) * y.std() + y.mean() ) return mm_term if y.shape[1] == 1 else np.tile(mm_term, (1, y.shape[1])) def _add_hidden_factors(self, y: np.ndarray) -> np.ndarray: """Add latent factors that affect y but have no spectral signature.""" n_samples = y.shape[0] n_hidden = self.config.hidden_factors # Generate hidden factors hidden = self.rng.standard_normal((n_samples, n_hidden)) # Random weights for hidden factor contribution weights = self.rng.uniform(0.1, 0.3, n_hidden) # Hidden effect (unobservable from spectra) hidden_effect = (hidden * weights).sum(axis=1, keepdims=True) hidden_effect = hidden_effect / (hidden_effect.std() + 1e-8) * y.std() * 0.3 return y + hidden_effect def _apply_multi_regime( self, C: np.ndarray, y: np.ndarray, spectra: Optional[np.ndarray], ) -> np.ndarray: """Apply different target functions in different regimes.""" n_samples = y.shape[0] n_regimes = self.config.n_regimes # Assign samples to regimes regime_assignments = self._assign_regimes(C, spectra) # Generate different transformation functions per regime y_transformed = np.zeros_like(y) for regime_id in range(n_regimes): mask = regime_assignments == regime_id if not np.any(mask): continue # Each regime has a different relationship y_regime = y[mask].copy() C_regime = C[mask] # Regime-specific transformation if regime_id == 0: # Linear (baseline) y_regime = y_regime elif regime_id == 1: # Quadratic emphasis if C_regime.shape[1] > 0: quad_term = (C_regime[:, 0] ** 2).reshape(-1, 1) quad_term = (quad_term - quad_term.mean()) / (quad_term.std() + 1e-8) y_regime = y_regime + quad_term * y_regime.std() * 0.5 elif regime_id == 2: # Inverse relationship y_regime = y_regime.max() + y_regime.min() - y_regime else: # Random non-linear mixing component_idx = regime_id % C_regime.shape[1] ratio_term = C_regime[:, component_idx] / ( C_regime[:, (component_idx + 1) % C_regime.shape[1]] + 1e-8 ) ratio_term = ratio_term.reshape(-1, 1) ratio_term = (ratio_term - ratio_term.mean()) / (ratio_term.std() + 1e-8) y_regime = y_regime + ratio_term * y_regime.std() * 0.3 y_transformed[mask] = y_regime # Apply overlap smoothing if self.config.regime_overlap > 0: y_transformed = self._smooth_regime_boundaries( y_transformed, regime_assignments, y ) return y_transformed def _assign_regimes( self, C: np.ndarray, spectra: Optional[np.ndarray], ) -> np.ndarray: """Assign samples to regimes based on method.""" n_samples = C.shape[0] n_regimes = self.config.n_regimes if self.config.regime_method == "random": return self.rng.integers(0, n_regimes, size=n_samples) elif self.config.regime_method == "concentration": # Use first principal direction of concentrations C_centered = C - C.mean(axis=0) if C.shape[1] > 1: # Simple projection onto first component projection = C_centered.sum(axis=1) else: projection = C_centered[:, 0] # Quantile-based assignment percentiles = np.linspace(0, 100, n_regimes + 1) thresholds = [np.percentile(projection, p) for p in percentiles] assignments = np.zeros(n_samples, dtype=int) for i in range(n_regimes - 1): assignments[projection > thresholds[i + 1]] = i + 1 return assignments elif self.config.regime_method == "spectral": if spectra is None: # Fall back to concentration-based return self._assign_regimes(C, None) # Use spectral intensity for regime assignment intensity = spectra.mean(axis=1) percentiles = np.linspace(0, 100, n_regimes + 1) thresholds = [np.percentile(intensity, p) for p in percentiles] assignments = np.zeros(n_samples, dtype=int) for i in range(n_regimes - 1): assignments[intensity > thresholds[i + 1]] = i + 1 return assignments return np.zeros(n_samples, dtype=int) def _smooth_regime_boundaries( self, y_transformed: np.ndarray, regime_assignments: np.ndarray, y_original: np.ndarray, ) -> np.ndarray: """Smooth transitions between regimes.""" # Simple implementation: blend with original at boundaries overlap = self.config.regime_overlap n_regimes = self.config.n_regimes # For each regime boundary, create a soft transition # This is a simplified version - more sophisticated approaches possible blend_factor = np.ones((y_transformed.shape[0], 1)) for regime in range(n_regimes): mask = regime_assignments == regime if np.sum(mask) == 0: continue # Reduce blend at edges (simple approach) edge_samples = int(np.sum(mask) * overlap) if edge_samples > 0: regime_indices = np.where(mask)[0] for idx in regime_indices[:edge_samples]: blend_factor[idx] = 0.5 + 0.5 * ( np.where(regime_indices == idx)[0][0] / edge_samples ) return blend_factor * y_transformed + (1 - blend_factor) * y_original def _apply_confounders( self, C: np.ndarray, y: np.ndarray, ) -> np.ndarray: """Apply confounding variables that affect spectra and target differently.""" n_samples = y.shape[0] n_confounders = self.config.n_confounders # Generate confounders correlated with concentrations confounder_effect = np.zeros((n_samples, 1)) for i in range(n_confounders): # Each confounder is partially correlated with a component component_idx = i % C.shape[1] base_confounder = 0.5 * C[:, component_idx] + 0.5 * self.rng.standard_normal(n_samples) # Non-linear transformation of confounder effect on y # (different from its effect on spectra which is linear) effect = np.sin(base_confounder * np.pi) + base_confounder ** 2 effect = effect.reshape(-1, 1) effect = (effect - effect.mean()) / (effect.std() + 1e-8) confounder_effect += effect * 0.2 / n_confounders return y + confounder_effect * y.std() def _apply_temporal_drift(self, y: np.ndarray) -> np.ndarray: """Apply temporal drift - relationship changes over samples.""" n_samples = y.shape[0] # Time index (assume samples are in temporal order) t = np.linspace(0, 1, n_samples).reshape(-1, 1) # Drift function: gradual shift and scale change drift_shift = 0.2 * np.sin(2 * np.pi * t) # Oscillating shift drift_scale = 1 + 0.15 * t # Gradual scale increase y_drifted = (y - y.mean()) * drift_scale + y.mean() + drift_shift * y.std() return y_drifted def _apply_signal_ratio(self, y: np.ndarray) -> np.ndarray: """Add irreducible noise based on signal-to-confound ratio.""" n_samples = y.shape[0] ratio = self.config.signal_to_confound_ratio # Calculate noise variance to achieve target ratio # If ratio = 0.7, then 30% of variance is unexplainable unexplainable_var = (1 - ratio) * np.var(y) noise_std = np.sqrt(unexplainable_var) noise = self.rng.standard_normal(y.shape) * noise_std return y + noise def _apply_heteroscedastic_noise( self, y: np.ndarray, C: np.ndarray, ) -> np.ndarray: """Apply noise that varies based on concentration/regime.""" n_samples = y.shape[0] hetero_strength = self.config.noise_heteroscedasticity # Noise level depends on first concentration component if C.shape[1] > 0: noise_scale = 1 + hetero_strength * (C[:, 0] - C[:, 0].mean()) / (C[:, 0].std() + 1e-8) noise_scale = np.clip(noise_scale, 0.2, 3.0).reshape(-1, 1) else: noise_scale = np.ones((n_samples, 1)) base_noise = self.rng.standard_normal(y.shape) * y.std() * 0.1 heteroscedastic_noise = base_noise * noise_scale return y + heteroscedastic_noise