"""
Synthetic NIRS Spectra Generator.
A physically-motivated synthetic NIRS spectra generator based on Beer-Lambert law,
with realistic instrumental effects and noise models.
Key features:
- Voigt profile peak shapes (Gaussian + Lorentzian convolution)
- Realistic NIR band positions from known spectroscopic databases
- Configurable baseline, scattering, and instrumental effects
- Batch/session effects for domain adaptation research
- Controllable outlier/artifact generation
- Instrument archetype simulation (Phase 2)
- Measurement mode physics (transmittance, reflectance, ATR) (Phase 2)
- Detector response and noise models (Phase 2)
- Multi-sensor stitching (Phase 2)
- Multi-scan averaging/denoising (Phase 2)
References:
- Workman Jr, J., & Weyer, L. (2012). Practical Guide and Spectral Atlas for
Interpretive Near-Infrared Spectroscopy. CRC Press.
- Burns, D. A., & Ciurczak, E. W. (2007). Handbook of Near-Infrared Analysis.
CRC Press.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Tuple, Union
import numpy as np
from scipy.ndimage import gaussian_filter1d, median_filter
from scipy.signal import savgol_filter
from .components import ComponentLibrary
from ._constants import (
COMPLEXITY_PARAMS,
DEFAULT_REALISTIC_COMPONENTS,
DEFAULT_WAVELENGTH_END,
DEFAULT_WAVELENGTH_START,
DEFAULT_WAVELENGTH_STEP,
)
from .instruments import (
InstrumentArchetype,
InstrumentSimulator,
MultiScanConfig,
MultiSensorConfig,
SensorConfig,
get_instrument_archetype,
)
from .measurement_modes import (
MeasurementMode,
MeasurementModeSimulator,
create_transmittance_simulator,
)
from .detectors import (
DetectorConfig,
DetectorSimulator,
get_default_noise_config,
)
from .environmental import (
EnvironmentalEffectsConfig,
EnvironmentalEffectsSimulator,
TemperatureConfig,
MoistureConfig,
)
from .scattering import (
ScatteringEffectsConfig,
ScatteringEffectsSimulator,
ParticleSizeConfig,
EMSCConfig,
)
if TYPE_CHECKING:
from nirs4all.data.dataset import SpectroDataset
[docs]
class SyntheticNIRSGenerator:
"""
Generator for synthetic NIRS spectra with realistic instrumental effects.
This generator implements a physically-motivated model based on Beer-Lambert law
with additional effects for baseline, scattering, instrumental response, and noise.
Model:
A_i(λ) = L_i * Σ_k c_ik * ε_k(λ) + baseline_i(λ) + scatter_i(λ) + noise_i(λ)
where:
- c_ik: concentration of component k in sample i
- ε_k(λ): molar absorptivity of component k (Voigt profiles)
- L_i: optical path length factor
- baseline: polynomial baseline drift
- scatter: multiplicative/additive scattering effects
- noise: wavelength-dependent Gaussian noise
Phase 2 Features:
- Instrument archetype simulation (FOSS, Bruker, etc.)
- Measurement mode physics (transmittance, reflectance, ATR)
- Detector response curves and noise models
- Multi-sensor stitching (combining signals from different wavelength ranges)
- Multi-scan averaging/denoising (simulating multiple scans per sample)
Phase 3 Features:
- Temperature effects on spectral bands (O-H, N-H, C-H shifts)
- Moisture and water activity effects
- Particle size effects (EMSC-style scattering)
- Scattering coefficient generation (Kubelka-Munk)
Attributes:
wavelengths: Array of wavelength values in nm.
n_wavelengths: Number of wavelength points.
library: ComponentLibrary containing spectral components.
E: Precomputed component spectra matrix (n_components, n_wavelengths).
params: Dictionary of effect parameters based on complexity level.
instrument: Optional InstrumentArchetype for realistic simulation.
measurement_mode_simulator: Optional measurement mode simulator.
environmental_simulator: Optional Phase 3 environmental effects simulator.
scattering_effects_simulator: Optional Phase 3 scattering effects simulator.
Args:
wavelength_start: Start wavelength in nm.
wavelength_end: End wavelength in nm.
wavelength_step: Wavelength step in nm.
component_library: Optional ComponentLibrary. If None, generates
predefined components for realistic mode or random for simple mode.
complexity: Complexity level controlling noise, scatter, etc.
Options: 'simple', 'realistic', 'complex'.
instrument: Instrument archetype name or InstrumentArchetype object.
If provided, uses instrument-specific wavelength range, detector, etc.
measurement_mode: Measurement mode (transmittance, reflectance, etc.).
multi_sensor_config: Configuration for multi-sensor stitching.
multi_scan_config: Configuration for multi-scan averaging.
environmental_config: Phase 3 configuration for temperature/moisture effects.
scattering_effects_config: Phase 3 configuration for particle size/scattering.
random_state: Random seed for reproducibility.
Example:
>>> generator = SyntheticNIRSGenerator(random_state=42)
>>> X, Y, E = generator.generate(n_samples=1000)
>>> print(X.shape, Y.shape, E.shape)
(1000, 751) (1000, 5) (5, 751)
>>> # With instrument simulation (Phase 2)
>>> generator = SyntheticNIRSGenerator(
... instrument="foss_xds",
... measurement_mode="reflectance",
... random_state=42
... )
>>> X, Y, E = generator.generate(n_samples=500)
>>> # With environmental effects (Phase 3)
>>> from nirs4all.data.synthetic import EnvironmentalEffectsConfig
>>> env_config = EnvironmentalEffectsConfig(
... enable_temperature=True,
... enable_moisture=True
... )
>>> generator = SyntheticNIRSGenerator(
... environmental_config=env_config,
... random_state=42
... )
>>> X, Y, E = generator.generate(n_samples=500, include_environmental_effects=True)
>>> # Create a SpectroDataset directly
>>> dataset = generator.create_dataset(n_train=800, n_test=200)
See Also:
ComponentLibrary: For managing spectral components.
InstrumentArchetype: For instrument-specific simulation.
MeasurementModeSimulator: For measurement mode physics.
EnvironmentalEffectsSimulator: For temperature/moisture effects (Phase 3).
ScatteringEffectsSimulator: For particle size/scattering effects (Phase 3).
"""
def __init__(
self,
wavelength_start: float = DEFAULT_WAVELENGTH_START,
wavelength_end: float = DEFAULT_WAVELENGTH_END,
wavelength_step: float = DEFAULT_WAVELENGTH_STEP,
component_library: Optional[ComponentLibrary] = None,
complexity: Literal["simple", "realistic", "complex"] = "realistic",
instrument: Optional[Union[str, InstrumentArchetype]] = None,
measurement_mode: Optional[Union[str, MeasurementMode]] = None,
multi_sensor_config: Optional[MultiSensorConfig] = None,
multi_scan_config: Optional[MultiScanConfig] = None,
environmental_config: Optional[EnvironmentalEffectsConfig] = None,
scattering_effects_config: Optional[ScatteringEffectsConfig] = None,
random_state: Optional[int] = None,
) -> None:
"""
Initialize the synthetic NIRS generator.
Args:
wavelength_start: Start wavelength in nm.
wavelength_end: End wavelength in nm.
wavelength_step: Wavelength step in nm.
component_library: Optional ComponentLibrary instance.
If None, creates appropriate library based on complexity.
complexity: Complexity level: 'simple', 'realistic', or 'complex'.
instrument: Instrument archetype name (e.g., 'foss_xds', 'bruker_mpa')
or InstrumentArchetype object. If provided, instrument-specific
parameters are used for wavelength range and noise.
measurement_mode: Measurement mode ('transmittance', 'reflectance',
'atr', 'transflectance'). If None, uses 'transmittance'.
multi_sensor_config: Configuration for multi-sensor stitching.
If None and instrument has multi_sensor, uses instrument defaults.
multi_scan_config: Configuration for multi-scan averaging.
If None and instrument has multi_scan, uses instrument defaults.
environmental_config: Phase 3 configuration for environmental effects
(temperature and moisture). If provided, enables simulation of
temperature-induced band shifts and moisture effects.
scattering_effects_config: Phase 3 configuration for scattering effects
(particle size and EMSC-style distortions). If provided, enables
simulation of particle-size-dependent scattering.
random_state: Random seed for reproducibility.
Raises:
ValueError: If complexity is not a valid option.
"""
if complexity not in COMPLEXITY_PARAMS:
valid = list(COMPLEXITY_PARAMS.keys())
raise ValueError(f"complexity must be one of {valid}, got '{complexity}'")
self.complexity = complexity
self.rng = np.random.default_rng(random_state)
self._random_state = random_state
# Phase 2: Instrument archetype setup
self.instrument: Optional[InstrumentArchetype] = None
self.instrument_simulator: Optional[InstrumentSimulator] = None
self.multi_sensor_config = multi_sensor_config
self.multi_scan_config = multi_scan_config
if instrument is not None:
if isinstance(instrument, str):
self.instrument = get_instrument_archetype(instrument)
else:
self.instrument = instrument
# Use instrument's wavelength range if not explicitly specified
if wavelength_start == DEFAULT_WAVELENGTH_START:
wavelength_start = self.instrument.wavelength_range[0]
if wavelength_end == DEFAULT_WAVELENGTH_END:
wavelength_end = self.instrument.wavelength_range[1]
# Get multi-sensor config from instrument if not provided
if self.multi_sensor_config is None and self.instrument.multi_sensor is not None:
self.multi_sensor_config = self.instrument.multi_sensor
# Get multi-scan config from instrument if not provided
if self.multi_scan_config is None and self.instrument.multi_scan is not None:
self.multi_scan_config = self.instrument.multi_scan
# Create instrument simulator
self.instrument_simulator = InstrumentSimulator(
self.instrument, random_state=random_state
)
self.wavelength_start = wavelength_start
self.wavelength_end = wavelength_end
self.wavelength_step = wavelength_step
# Generate wavelength grid
self.wavelengths = np.arange(
wavelength_start, wavelength_end + wavelength_step, wavelength_step
)
self.n_wavelengths = len(self.wavelengths)
# Phase 2: Measurement mode setup
self.measurement_mode: Optional[MeasurementMode] = None
self.measurement_mode_simulator: Optional[MeasurementModeSimulator] = None
if measurement_mode is not None:
if isinstance(measurement_mode, str):
self.measurement_mode = MeasurementMode(measurement_mode.lower())
else:
self.measurement_mode = measurement_mode
# Create measurement mode simulator
self.measurement_mode_simulator = create_transmittance_simulator(
scattering_enabled=True, random_state=random_state
)
# Phase 2: Detector simulator
self.detector_simulator: Optional[DetectorSimulator] = None
if self.instrument is not None:
detector_type = self.instrument.detector_type
noise_config = get_default_noise_config(detector_type)
detector_config = DetectorConfig(
detector_type=detector_type,
noise_model=noise_config,
apply_response_curve=True,
)
self.detector_simulator = DetectorSimulator(detector_config, random_state)
# Set up component library
if component_library is not None:
self.library = component_library
else:
# Use predefined components for realistic/complex mode
if complexity in ("realistic", "complex"):
self.library = ComponentLibrary.from_predefined(
DEFAULT_REALISTIC_COMPONENTS,
random_state=random_state,
)
else:
# Generate random components for simple mode
self.library = ComponentLibrary(random_state=random_state)
self.library.generate_random_library(n_components=5)
# Precompute component spectra (pure component matrix E)
self.E = self.library.compute_all(self.wavelengths)
# Set complexity-dependent parameters
self.params = COMPLEXITY_PARAMS[complexity].copy()
# Phase 3: Environmental effects simulator
self.environmental_config = environmental_config
self.environmental_simulator: Optional[EnvironmentalEffectsSimulator] = None
if environmental_config is not None:
self.environmental_simulator = EnvironmentalEffectsSimulator(
environmental_config, random_state=random_state
)
# Phase 3: Scattering effects simulator
self.scattering_effects_config = scattering_effects_config
self.scattering_effects_simulator: Optional[ScatteringEffectsSimulator] = None
if scattering_effects_config is not None:
self.scattering_effects_simulator = ScatteringEffectsSimulator(
scattering_effects_config, random_state=random_state
)
[docs]
def generate_concentrations(
self,
n_samples: int,
method: Literal["dirichlet", "uniform", "lognormal", "correlated"] = "dirichlet",
alpha: Optional[np.ndarray] = None,
correlation_matrix: Optional[np.ndarray] = None,
) -> np.ndarray:
"""
Generate concentration matrix using specified distribution.
Args:
n_samples: Number of samples to generate.
method: Concentration generation method:
- 'dirichlet': Compositional data (concentrations sum to ~1).
- 'uniform': Independent uniform [0, 1] values.
- 'lognormal': Log-normal distributed, normalized.
- 'correlated': Multivariate with specified correlations.
alpha: Dirichlet concentration parameters (only for 'dirichlet' method).
Shape: (n_components,). Higher values = more uniform distribution.
correlation_matrix: Correlation structure for 'correlated' method.
Shape: (n_components, n_components).
Returns:
Concentration matrix of shape (n_samples, n_components).
Raises:
ValueError: If method is unknown.
Example:
>>> generator = SyntheticNIRSGenerator(random_state=42)
>>> C = generator.generate_concentrations(100, method='dirichlet')
>>> print(C.shape, C.sum(axis=1).mean()) # Should sum to ~1
"""
n_components = self.library.n_components
if method == "dirichlet":
if alpha is None:
alpha = np.ones(n_components) * 2.0
C = self.rng.dirichlet(alpha, size=n_samples)
elif method == "uniform":
C = self.rng.uniform(0, 1, size=(n_samples, n_components))
elif method == "lognormal":
C = self.rng.lognormal(mean=0, sigma=0.5, size=(n_samples, n_components))
C = C / C.sum(axis=1, keepdims=True) # Normalize
elif method == "correlated":
C = self._generate_correlated_concentrations(
n_samples, n_components, correlation_matrix
)
else:
valid = ["dirichlet", "uniform", "lognormal", "correlated"]
raise ValueError(f"Unknown concentration method: '{method}'. Use one of {valid}")
return C
def _generate_correlated_concentrations(
self,
n_samples: int,
n_components: int,
correlation_matrix: Optional[np.ndarray] = None,
) -> np.ndarray:
"""
Generate correlated concentrations using Cholesky decomposition.
Args:
n_samples: Number of samples.
n_components: Number of components.
correlation_matrix: Desired correlation structure.
Returns:
Concentration matrix with specified correlations.
"""
if correlation_matrix is None:
# Create default correlation structure
correlation_matrix = np.eye(n_components)
for i in range(n_components):
for j in range(i + 1, n_components):
corr = self.rng.uniform(-0.3, 0.5)
correlation_matrix[i, j] = corr
correlation_matrix[j, i] = corr
# Ensure positive definiteness
eigvals, eigvecs = np.linalg.eigh(correlation_matrix)
eigvals = np.maximum(eigvals, 0.01)
correlation_matrix = eigvecs @ np.diag(eigvals) @ eigvecs.T
L = np.linalg.cholesky(correlation_matrix)
Z = self.rng.standard_normal((n_samples, n_components))
C = Z @ L.T
# Transform to positive values and normalize
C = np.abs(C)
C = C / C.sum(axis=1, keepdims=True)
return C
def _apply_beer_lambert(self, C: np.ndarray) -> np.ndarray:
"""
Apply Beer-Lambert law: A = C @ E.
Args:
C: Concentration matrix (n_samples, n_components).
Returns:
Absorbance matrix (n_samples, n_wavelengths).
"""
return C @ self.E
def _apply_path_length(self, A: np.ndarray) -> np.ndarray:
"""
Apply random path length variation to absorbance.
Args:
A: Absorbance matrix (n_samples, n_wavelengths).
Returns:
Modified absorbance matrix.
"""
n_samples = A.shape[0]
L = self.rng.normal(1.0, self.params["path_length_std"], size=n_samples)
L = np.maximum(L, 0.5) # Ensure positive
return A * L[:, np.newaxis]
def _generate_baseline(self, n_samples: int) -> np.ndarray:
"""
Generate polynomial baseline drift.
Args:
n_samples: Number of samples.
Returns:
Baseline array (n_samples, n_wavelengths).
"""
x = (self.wavelengths - self.wavelengths.mean()) / (np.ptp(self.wavelengths) / 2)
amp = self.params["baseline_amplitude"]
baseline = np.zeros((n_samples, self.n_wavelengths))
for i in range(n_samples):
b0 = self.rng.normal(0, amp)
b1 = self.rng.normal(0, amp * 0.5)
b2 = self.rng.normal(0, amp * 0.3)
b3 = self.rng.normal(0, amp * 0.1)
baseline[i] = b0 + b1 * x + b2 * x**2 + b3 * x**3
return baseline
def _apply_global_slope(self, A: np.ndarray) -> np.ndarray:
"""
Apply global slope effect commonly observed in NIR spectra.
This simulates the typical upward trend in absorbance with increasing
wavelength, caused by scattering effects and instrumental factors.
Args:
A: Absorbance matrix (n_samples, n_wavelengths).
Returns:
Modified absorbance matrix.
"""
n_samples = A.shape[0]
# Normalized wavelength (0 to 1 across the range)
wl_range = np.ptp(self.wavelengths)
x_norm = (self.wavelengths - self.wavelengths.min()) / wl_range
# Generate slopes: mean + sample-specific variation
slope_mean = self.params["global_slope_mean"]
slope_std = self.params["global_slope_std"]
slopes = self.rng.normal(slope_mean, slope_std, size=n_samples)
# Scale to actual wavelength range (slope is per 1000nm)
scale_factor = wl_range / 1000.0
# Apply slope to each sample
for i in range(n_samples):
A[i] += slopes[i] * scale_factor * x_norm
return A
def _apply_scatter(self, A: np.ndarray) -> np.ndarray:
"""
Apply multiplicative/additive scatter effects (SNV/MSC-like before correction).
Args:
A: Absorbance matrix (n_samples, n_wavelengths).
Returns:
Modified absorbance matrix.
"""
n_samples = A.shape[0]
# Multiplicative scatter
alpha = self.rng.normal(1.0, self.params["scatter_alpha_std"], size=n_samples)
alpha = np.maximum(alpha, 0.7)
# Additive offset
beta = self.rng.normal(0, self.params["scatter_beta_std"], size=n_samples)
# Apply
A_scattered = A * alpha[:, np.newaxis] + beta[:, np.newaxis]
# Add tilt
x = (self.wavelengths - self.wavelengths.mean()) / np.ptp(self.wavelengths)
gamma = self.rng.normal(0, self.params["tilt_std"], size=n_samples)
tilt = gamma[:, np.newaxis] * x[np.newaxis, :]
A_scattered += tilt
return A_scattered
def _apply_wavelength_shift(self, A: np.ndarray) -> np.ndarray:
"""
Apply wavelength calibration shifts/stretches via interpolation.
Args:
A: Absorbance matrix (n_samples, n_wavelengths).
Returns:
Modified absorbance matrix.
"""
n_samples = A.shape[0]
shifts = self.rng.normal(0, self.params["shift_std"], size=n_samples)
stretches = self.rng.normal(1.0, self.params["stretch_std"], size=n_samples)
A_shifted = np.zeros_like(A)
for i in range(n_samples):
wl_shifted = stretches[i] * self.wavelengths + shifts[i]
A_shifted[i] = np.interp(self.wavelengths, wl_shifted, A[i])
return A_shifted
def _apply_instrumental_response(self, A: np.ndarray) -> np.ndarray:
"""
Apply instrumental broadening via Gaussian convolution.
Args:
A: Absorbance matrix (n_samples, n_wavelengths).
Returns:
Modified absorbance matrix.
"""
fwhm = self.params["instrumental_fwhm"]
# Convert FWHM to sigma in wavelength step units
sigma_wl = fwhm / (2 * np.sqrt(2 * np.log(2)))
sigma_pts = sigma_wl / self.wavelength_step
A_convolved = np.zeros_like(A)
for i in range(A.shape[0]):
A_convolved[i] = gaussian_filter1d(A[i], sigma_pts)
return A_convolved
def _add_noise(self, A: np.ndarray) -> np.ndarray:
"""
Add wavelength-dependent Gaussian noise.
Args:
A: Absorbance matrix (n_samples, n_wavelengths).
Returns:
Modified absorbance matrix with noise.
"""
sigma_base = self.params["noise_base"]
sigma_signal = self.params["noise_signal_dep"]
# Heteroscedastic noise: higher noise at higher absorbance
sigma = sigma_base + sigma_signal * np.abs(A)
noise = self.rng.normal(0, sigma)
return A + noise
def _add_artifacts(self, A: np.ndarray, metadata: Dict[str, Any]) -> np.ndarray:
"""
Add random artifacts (spikes, dead bands, saturation).
Args:
A: Absorbance matrix (n_samples, n_wavelengths).
metadata: Dictionary to store artifact information.
Returns:
Modified absorbance matrix.
"""
n_samples = A.shape[0]
artifact_prob = self.params["artifact_prob"]
artifact_types: List[Optional[str]] = []
for i in range(n_samples):
if self.rng.random() < artifact_prob:
artifact_type = self.rng.choice(["spike", "dead_band", "saturation"])
artifact_types.append(artifact_type)
if artifact_type == "spike":
n_spikes = self.rng.integers(1, 4)
spike_indices = self.rng.choice(
self.n_wavelengths, n_spikes, replace=False
)
spike_values = self.rng.uniform(0.5, 1.5, n_spikes)
A[i, spike_indices] += spike_values * np.sign(
self.rng.standard_normal(n_spikes)
)
elif artifact_type == "dead_band":
start_idx = self.rng.integers(0, self.n_wavelengths - 20)
width = self.rng.integers(10, 30)
end_idx = min(start_idx + width, self.n_wavelengths)
A[i, start_idx:end_idx] += self.rng.normal(
0, 0.05, end_idx - start_idx
)
elif artifact_type == "saturation":
threshold = self.rng.uniform(0.8, 1.2)
A[i] = np.clip(A[i], -np.inf, threshold)
else:
artifact_types.append(None)
metadata["artifact_types"] = artifact_types
return A
[docs]
def generate_batch_effects(
self,
n_batches: int,
samples_per_batch: List[int],
) -> Tuple[np.ndarray, np.ndarray]:
"""
Generate batch/session effects for domain adaptation research.
Args:
n_batches: Number of measurement batches/sessions.
samples_per_batch: List of sample counts per batch.
Returns:
Tuple of:
- batch_offsets: Wavelength-dependent offsets per batch.
- batch_gains: Multiplicative gains per batch.
"""
batch_offsets = []
batch_gains = []
for _ in range(n_batches):
# Baseline offset per batch (slow drift)
x = (self.wavelengths - self.wavelengths.mean()) / np.ptp(self.wavelengths)
offset = self.rng.normal(0, 0.02) + self.rng.normal(0, 0.01) * x
batch_offsets.append(offset)
# Gain variation per batch
gain = self.rng.normal(1.0, 0.03)
batch_gains.append(gain)
return np.array(batch_offsets), np.array(batch_gains)
# ========================================================================
# Phase 2: Multi-Sensor Stitching
# ========================================================================
def _apply_multi_sensor_stitching(
self,
spectra: np.ndarray,
wavelengths: np.ndarray,
) -> np.ndarray:
"""
Apply multi-sensor stitching effects.
Real NIR instruments often use multiple sensors to cover a wide
wavelength range (e.g., Si for 400-1100nm, InGaAs for 900-1700nm).
This creates stitching artifacts at the junction regions.
Args:
spectra: Input spectra (n_samples, n_wavelengths).
wavelengths: Wavelength array (nm).
Returns:
Spectra with multi-sensor stitching effects applied.
"""
if self.multi_sensor_config is None:
return spectra
config = self.multi_sensor_config
n_samples, n_wl = spectra.shape
result = spectra.copy()
# Check if enabled
if not config.enabled:
return result
# Get sensor configurations
sensors = config.sensors
if len(sensors) < 2:
return result
# Get artifact intensity for noise level
artifact_intensity = config.artifact_intensity if config.add_stitch_artifacts else 0.0
# Process each junction between sensors
for i in range(len(sensors) - 1):
sensor1 = sensors[i]
sensor2 = sensors[i + 1]
# Find overlap region
overlap_start = max(sensor1.wavelength_range[0], sensor2.wavelength_range[0])
overlap_end = min(sensor1.wavelength_range[1], sensor2.wavelength_range[1])
if overlap_start >= overlap_end:
# No overlap - this is a hard transition
junction_wl = (sensor1.wavelength_range[1] + sensor2.wavelength_range[0]) / 2
junction_idx = np.argmin(np.abs(wavelengths - junction_wl))
# Add stitching offset artifact
if config.add_stitch_artifacts:
offset = self.rng.normal(0, artifact_intensity, n_samples)
result[:, junction_idx:] += offset[:, np.newaxis]
# Add stitching noise
noise_width = min(10, n_wl - junction_idx)
noise = self.rng.normal(0, artifact_intensity * 0.5, (n_samples, noise_width))
result[:, junction_idx:junction_idx + noise_width] += noise
else:
# Overlap region - apply blending
result = self._blend_overlap_region(
result, wavelengths, overlap_start, overlap_end,
config.stitch_method, artifact_intensity
)
return result
def _blend_overlap_region(
self,
spectra: np.ndarray,
wavelengths: np.ndarray,
overlap_start: float,
overlap_end: float,
method: str,
noise_level: float,
) -> np.ndarray:
"""
Blend spectra in the overlap region between two sensors.
Args:
spectra: Input spectra.
wavelengths: Wavelength array.
overlap_start: Start of overlap region (nm).
overlap_end: End of overlap region (nm).
method: Blending method ('weighted', 'average', 'first', 'last').
noise_level: Amount of noise to add in overlap region.
Returns:
Blended spectra.
"""
n_samples = spectra.shape[0]
result = spectra.copy()
# Find overlap indices
overlap_mask = (wavelengths >= overlap_start) & (wavelengths <= overlap_end)
overlap_indices = np.where(overlap_mask)[0]
if len(overlap_indices) == 0:
return result
# Add sensor-specific characteristics in overlap region
if method == "weighted":
# Linear blend weights
weights = np.linspace(0, 1, len(overlap_indices))
# Apply slight gain difference between sensors
gain_diff = self.rng.normal(0, 0.02, n_samples)
for i, idx in enumerate(overlap_indices):
result[:, idx] += gain_diff * (weights[i] - 0.5)
elif method == "average":
# Simulate averaging two sensor signals (adds noise)
noise = self.rng.normal(0, noise_level * 0.5, (n_samples, len(overlap_indices)))
result[:, overlap_mask] += noise
elif method == "optimal":
# Simulate optimal combination (lower noise than individual sensors)
noise = self.rng.normal(0, noise_level * 0.3, (n_samples, len(overlap_indices)))
result[:, overlap_mask] += noise
# Add stitching artifacts
stitching_noise = self.rng.normal(0, noise_level, (n_samples, len(overlap_indices)))
result[:, overlap_mask] += stitching_noise
return result
# ========================================================================
# Phase 2: Multi-Scan Averaging
# ========================================================================
def _simulate_multi_scan_averaging(
self,
spectra: np.ndarray,
) -> np.ndarray:
"""
Simulate the effect of averaging multiple scans per sample.
Real NIR instruments often acquire multiple scans per sample and
average them to reduce noise. This method simulates that process.
Args:
spectra: Input spectra (n_samples, n_wavelengths).
Returns:
Spectra after simulating multi-scan averaging.
"""
if self.multi_scan_config is None:
return spectra
config = self.multi_scan_config
# Check if enabled
if not config.enabled:
return spectra
n_samples, n_wl = spectra.shape
n_scans = config.n_scans
# Generate individual scan noise
# More scans = lower effective noise (sqrt(n) reduction)
scan_noise_std = config.scan_to_scan_noise
# Generate all scans for each sample
all_scans = np.zeros((n_samples, n_scans, n_wl))
for scan_idx in range(n_scans):
# Each scan has independent noise
scan_noise = self.rng.normal(0, scan_noise_std, (n_samples, n_wl))
# Add wavelength jitter between scans (small shifts)
if config.wavelength_jitter > 0:
jitter = self.rng.normal(0, config.wavelength_jitter, n_samples)
# Add as a small baseline shift effect
scan_noise += jitter[:, np.newaxis] * 0.001
all_scans[:, scan_idx, :] = spectra + scan_noise
# Apply averaging/denoising method
method = config.averaging_method.lower()
if method == "mean":
result = np.mean(all_scans, axis=1)
elif method == "median":
result = np.median(all_scans, axis=1)
elif method == "weighted":
# Weight scans by inverse variance (more stable scans get higher weight)
scan_variances = np.var(all_scans, axis=2, keepdims=True)
weights = 1.0 / (scan_variances + 1e-10)
weights = weights / weights.sum(axis=1, keepdims=True)
result = np.sum(all_scans * weights, axis=1)
elif method == "savgol":
# Apply Savitzky-Golay filter across scans
result = np.mean(all_scans, axis=1) # Start with mean
for i in range(n_samples):
if n_wl >= 11: # Minimum points for window_length=11
result[i] = savgol_filter(result[i], window_length=11, polyorder=3)
else:
# Default to mean
result = np.mean(all_scans, axis=1)
# Apply outlier rejection if enabled
if config.discard_outliers:
result = self._reject_scan_outliers(
all_scans, result, config.outlier_threshold
)
return result
def _reject_scan_outliers(
self,
all_scans: np.ndarray,
averaged: np.ndarray,
threshold: float,
) -> np.ndarray:
"""
Reject outlier scans and recalculate average.
Args:
all_scans: All scan data (n_samples, n_scans, n_wavelengths).
averaged: Current averaged result.
threshold: Z-score threshold for outlier detection.
Returns:
Averaged spectra with outliers rejected.
"""
n_samples, n_scans, n_wl = all_scans.shape
result = averaged.copy()
for i in range(n_samples):
# Calculate deviation from mean for each scan
deviations = np.abs(all_scans[i] - averaged[i])
mean_deviation = np.mean(deviations, axis=1)
# Z-score for each scan
z_scores = (mean_deviation - np.mean(mean_deviation)) / (np.std(mean_deviation) + 1e-10)
# Mask outlier scans
valid_scans = z_scores < threshold
if np.sum(valid_scans) >= 2: # Need at least 2 valid scans
result[i] = np.mean(all_scans[i, valid_scans], axis=0)
return result
# ========================================================================
# Phase 2: Detector Effects
# ========================================================================
def _apply_detector_effects(
self,
spectra: np.ndarray,
wavelengths: np.ndarray,
) -> np.ndarray:
"""
Apply detector-specific effects.
Args:
spectra: Input spectra.
wavelengths: Wavelength array.
Returns:
Spectra with detector effects applied.
"""
if self.detector_simulator is None:
return spectra
return self.detector_simulator.apply(spectra, wavelengths)
[docs]
def generate(
self,
n_samples: int = 1000,
concentration_method: Literal[
"dirichlet", "uniform", "lognormal", "correlated"
] = "dirichlet",
include_batch_effects: bool = False,
n_batches: int = 1,
include_instrument_effects: bool = True,
include_multi_sensor: bool = True,
include_multi_scan: bool = True,
include_environmental_effects: bool = True,
include_scattering_effects: bool = True,
temperatures: Optional[np.ndarray] = None,
return_metadata: bool = False,
) -> Union[
Tuple[np.ndarray, np.ndarray, np.ndarray],
Tuple[np.ndarray, np.ndarray, np.ndarray, Dict[str, Any]],
]:
"""
Generate synthetic NIRS spectra.
This is the main generation method that creates synthetic spectra
by applying all physical effects in sequence.
Args:
n_samples: Number of spectra to generate.
concentration_method: Method for generating concentrations.
Options: 'dirichlet', 'uniform', 'lognormal', 'correlated'.
include_batch_effects: Whether to add batch/session effects.
n_batches: Number of batches (only if include_batch_effects=True).
include_instrument_effects: Whether to apply instrument-specific
effects (detector response, noise). Only applies if instrument
was specified during initialization.
include_multi_sensor: Whether to apply multi-sensor stitching
effects. Only applies if multi_sensor_config is set.
include_multi_scan: Whether to simulate multi-scan averaging.
Only applies if multi_scan_config is set.
include_environmental_effects: Whether to apply Phase 3 temperature
and moisture effects. Only applies if environmental_config is set.
include_scattering_effects: Whether to apply Phase 3 particle size
and EMSC-style scattering effects. Only applies if
scattering_effects_config is set.
temperatures: Optional array of temperatures (°C) for each sample.
If None and environmental effects are enabled, random temperatures
are generated based on the configuration. Shape: (n_samples,).
return_metadata: Whether to return additional metadata dictionary.
Returns:
If return_metadata=False:
Tuple of (X, Y, E):
- X: Spectra matrix (n_samples, n_wavelengths)
- Y: Concentration matrix (n_samples, n_components)
- E: Component spectra (n_components, n_wavelengths)
If return_metadata=True:
Tuple of (X, Y, E, metadata):
- metadata: Dictionary with generation details
Example:
>>> generator = SyntheticNIRSGenerator(random_state=42)
>>> X, Y, E = generator.generate(n_samples=500)
>>> print(f"Spectra: {X.shape}, Targets: {Y.shape}")
Spectra: (500, 751), Targets: (500, 5)
>>> # With instrument simulation (Phase 2)
>>> generator = SyntheticNIRSGenerator(
... instrument="foss_xds",
... random_state=42
... )
>>> X, Y, E = generator.generate(n_samples=500)
>>> # With environmental effects (Phase 3)
>>> from nirs4all.data.synthetic import EnvironmentalEffectsConfig
>>> env_config = EnvironmentalEffectsConfig()
>>> generator = SyntheticNIRSGenerator(
... environmental_config=env_config,
... random_state=42
... )
>>> X, Y, E = generator.generate(n_samples=500, include_environmental_effects=True)
>>> # With metadata
>>> X, Y, E, meta = generator.generate(100, return_metadata=True)
>>> print(meta.keys())
"""
metadata: Dict[str, Any] = {
"n_samples": n_samples,
"n_components": self.library.n_components,
"n_wavelengths": self.n_wavelengths,
"component_names": self.library.component_names,
"wavelengths": self.wavelengths.copy(),
"complexity": self.complexity,
"concentration_method": concentration_method,
"instrument": self.instrument.name if self.instrument else None,
"multi_sensor": self.multi_sensor_config is not None,
"multi_scan": self.multi_scan_config is not None,
"environmental_effects": self.environmental_config is not None,
"scattering_effects": self.scattering_effects_config is not None,
}
# 1. Generate concentrations
C = self.generate_concentrations(n_samples, method=concentration_method)
# 2. Apply Beer-Lambert law
A = self._apply_beer_lambert(C)
# 3. Apply path length variation
A = self._apply_path_length(A)
# 4. Generate and add baseline
baseline = self._generate_baseline(n_samples)
A = A + baseline
# 5. Apply global slope (typical NIR upward trend)
A = self._apply_global_slope(A)
# 6. Apply scatter effects
A = self._apply_scatter(A)
# 7. Apply batch effects if requested
if include_batch_effects and n_batches > 1:
samples_per_batch = [n_samples // n_batches] * n_batches
samples_per_batch[-1] += n_samples % n_batches
batch_offsets, batch_gains = self.generate_batch_effects(
n_batches, samples_per_batch
)
batch_ids = []
idx = 0
for batch_id, n_in_batch in enumerate(samples_per_batch):
batch_ids.extend([batch_id] * n_in_batch)
A[idx : idx + n_in_batch] = (
A[idx : idx + n_in_batch] * batch_gains[batch_id]
+ batch_offsets[batch_id]
)
idx += n_in_batch
metadata["batch_ids"] = np.array(batch_ids)
metadata["batch_offsets"] = batch_offsets
metadata["batch_gains"] = batch_gains
# 8. Apply wavelength shift/stretch
A = self._apply_wavelength_shift(A)
# 9. Apply instrumental response
A = self._apply_instrumental_response(A)
# 10. Phase 2: Apply detector effects
if include_instrument_effects and self.detector_simulator is not None:
A = self._apply_detector_effects(A, self.wavelengths)
else:
# 10. Add noise (legacy method if no detector simulator)
A = self._add_noise(A)
# 11. Phase 2: Apply multi-sensor stitching effects
if include_multi_sensor and self.multi_sensor_config is not None:
A = self._apply_multi_sensor_stitching(A, self.wavelengths)
metadata["multi_sensor_config"] = {
"n_sensors": len(self.multi_sensor_config.sensors),
"stitch_method": self.multi_sensor_config.stitch_method,
}
# 12. Phase 2: Simulate multi-scan averaging
if include_multi_scan and self.multi_scan_config is not None:
A = self._simulate_multi_scan_averaging(A)
metadata["multi_scan_config"] = {
"n_scans": self.multi_scan_config.n_scans,
"averaging_method": self.multi_scan_config.averaging_method,
}
# 13. Phase 3: Apply environmental effects (temperature, moisture)
if include_environmental_effects and self.environmental_simulator is not None:
# Generate or use provided temperatures
if temperatures is None:
temp_config = self.environmental_config.temperature
if temp_config is not None:
# Generate temperatures around sample_temperature with variation
base_temp = temp_config.sample_temperature
variation = temp_config.temperature_variation
if variation > 0:
temperatures = self.rng.normal(base_temp, variation, n_samples)
else:
temperatures = np.full(n_samples, base_temp)
A = self.environmental_simulator.apply(A, self.wavelengths, sample_temperatures=temperatures)
metadata["environmental_config"] = {
"enable_temperature": self.environmental_config.enable_temperature,
"enable_moisture": self.environmental_config.enable_moisture,
"temperatures": temperatures,
}
# 14. Phase 3: Apply scattering effects (particle size, EMSC)
if include_scattering_effects and self.scattering_effects_simulator is not None:
A = self.scattering_effects_simulator.apply(A, self.wavelengths)
metadata["scattering_effects_config"] = {
"enable_particle_size": self.scattering_effects_config.enable_particle_size,
"enable_emsc": self.scattering_effects_config.enable_emsc,
}
# 15. Add artifacts
A = self._add_artifacts(A, metadata)
if return_metadata:
return A, C, self.E.copy(), metadata
else:
return A, C, self.E.copy()
[docs]
def create_dataset(
self,
n_train: int = 800,
n_test: int = 200,
target_component: Optional[Union[str, int]] = None,
**generate_kwargs: Any,
) -> SpectroDataset:
"""
Create a SpectroDataset from synthetic spectra.
This method generates synthetic spectra and wraps them in a
SpectroDataset object ready for use with nirs4all pipelines.
Args:
n_train: Number of training samples.
n_test: Number of test samples.
target_component: Which component to use as target.
- If None: uses all components as multi-output target.
- If str: uses the component with that name.
- If int: uses the component at that index.
**generate_kwargs: Additional arguments passed to generate().
Returns:
SpectroDataset with train/test partitions.
Example:
>>> generator = SyntheticNIRSGenerator(random_state=42)
>>> dataset = generator.create_dataset(
... n_train=800,
... n_test=200,
... target_component="protein"
... )
>>> print(f"Train: {dataset.n_train}, Test: {dataset.n_test}")
"""
from nirs4all.data import SpectroDataset
# Generate all samples
n_total = n_train + n_test
X, C, _E = self.generate(n_samples=n_total, **generate_kwargs)
# Determine target
if target_component is None:
y = C
elif isinstance(target_component, str):
comp_idx = self.library.component_names.index(target_component)
y = C[:, comp_idx]
else:
y = C[:, target_component]
# Create dataset
dataset = SpectroDataset(name="synthetic_nirs")
# Create wavelength headers
headers = [str(int(wl)) for wl in self.wavelengths]
# Add training samples
dataset.add_samples(
X[:n_train],
indexes={"partition": "train"},
headers=headers,
header_unit="nm",
)
dataset.add_targets(y[:n_train])
# Add test samples
dataset.add_samples(
X[n_train:],
indexes={"partition": "test"},
headers=headers,
header_unit="nm",
)
dataset.add_targets(y[n_train:])
return dataset
[docs]
def __repr__(self) -> str:
"""Return string representation of the generator."""
parts = [
f"wavelengths={self.wavelength_start}-{self.wavelength_end}nm",
f"n_wavelengths={self.n_wavelengths}",
f"n_components={self.library.n_components}",
f"complexity='{self.complexity}'",
]
if self.instrument:
parts.append(f"instrument='{self.instrument.name}'")
if self.multi_sensor_config:
parts.append(f"multi_sensor=True({len(self.multi_sensor_config.sensors)} sensors)")
if self.multi_scan_config:
parts.append(f"multi_scan=True({self.multi_scan_config.n_scans} scans)")
if self.environmental_config:
env_effects = []
if self.environmental_config.enable_temperature:
env_effects.append("temp")
if self.environmental_config.enable_moisture:
env_effects.append("moisture")
parts.append(f"environmental=True({'+'.join(env_effects)})")
if self.scattering_effects_config:
scatter_effects = []
if self.scattering_effects_config.enable_particle_size:
scatter_effects.append("particle")
if self.scattering_effects_config.enable_emsc:
scatter_effects.append("emsc")
parts.append(f"scattering=True({'+'.join(scatter_effects)})")
return f"SyntheticNIRSGenerator({', '.join(parts)})"