import numpy as np
import pywt
import scipy
from scipy import signal
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import StandardScaler, scale
from sklearn.utils import check_array
from sklearn.utils.validation import FLOAT_DTYPES, check_is_fitted
[docs]
class Wavelet(TransformerMixin, BaseEstimator):
"""
Single level Discrete Wavelet Transform.
Performs a discrete wavelet transform on `data`, using a `wavelet` function.
Parameters
----------
wavelet : Wavelet object or name, default='haar'
Wavelet to use: ['Haar', 'Daubechies', 'Symlets', 'Coiflets', 'Biorthogonal',
'Reverse biorthogonal', 'Discrete Meyer (FIR Approximation)'...]
mode : str, optional, default='periodization'
Signal extension mode.
"""
def __init__(self, wavelet: str = "haar", mode: str = "periodization", *, copy: bool = True):
self.copy = copy
self.wavelet = wavelet
self.mode = mode
def _reset(self):
pass
[docs]
def fit(self, X, y=None):
"""
Verify the X data compliance with wavelet transform.
Parameters
----------
X : array-like, spectra
The data to transform.
y : None
Ignored.
Raises
------
ValueError
If the input X is a sparse matrix.
Returns
-------
Wavelet
The fitted object.
"""
if scipy.sparse.issparse(X):
raise ValueError("Wavelets does not support scipy.sparse input")
return self
def _more_tags(self):
return {"allow_nan": False}
[docs]
class Haar(Wavelet):
"""
Shortcut to the Wavelet haar transform.
"""
def __init__(self, *, copy: bool = True):
super().__init__("haar", "periodization", copy=copy)
[docs]
def savgol(
spectra: np.ndarray,
window_length: int = 11,
polyorder: int = 3,
deriv: int = 0,
delta: float = 1.0,
) -> np.ndarray:
"""
Perform Savitzky–Golay filtering on the data (also calculates derivatives).
This function is a wrapper for scipy.signal.savgol_filter.
Args:
spectra (numpy.ndarray): NIRS data matrix.
window_length (int): Size of the filter window in samples (default 11).
polyorder (int): Order of the polynomial estimation (default 3).
deriv (int): Order of the derivation (default 0).
delta (float): Sampling distance of the data.
Returns:
numpy.ndarray: NIRS data smoothed with Savitzky-Golay filtering.
"""
return signal.savgol_filter(spectra, window_length, polyorder, deriv, delta=delta)
[docs]
class SavitzkyGolay(TransformerMixin, BaseEstimator):
"""
A class for smoothing and differentiating data using the Savitzky-Golay filter.
Parameters:
-----------
window_length : int, optional (default=11)
The length of the window used for smoothing.
polyorder : int, optional (default=3)
The order of the polynomial used for fitting the samples within the window.
deriv : int, optional (default=0)
The order of the derivative to compute.
delta : float, optional (default=1.0)
The sampling distance of the data.
copy : bool, optional (default=True)
Whether to copy the input data.
Methods:
--------
fit(X, y=None)
Fits the transformer to the data X.
transform(X, copy=None)
Applies the Savitzky-Golay filter to the data X.
"""
def __init__(
self,
window_length: int = 11,
polyorder: int = 3,
deriv: int = 0,
delta: float = 1.0,
*,
copy: bool = True
):
self.copy = copy
self.window_length = window_length
self.polyorder = polyorder
self.deriv = deriv
self.delta = delta
def _reset(self):
pass
[docs]
def fit(self, X, y=None):
"""
Verify the X data compliance with Savitzky-Golay filter.
Parameters
----------
X : array-like
The data to transform.
y : None
Ignored.
Raises
------
ValueError
If the input X is a sparse matrix.
Returns
-------
SavitzkyGolay
The fitted object.
"""
if scipy.sparse.issparse(X):
raise ValueError("SavitzkyGolay does not support scipy.sparse input")
return self
def _more_tags(self):
return {"allow_nan": False}
[docs]
class MultiplicativeScatterCorrection(TransformerMixin, BaseEstimator):
def __init__(self, scale=True, *, copy=True):
self.copy = copy
self.scale = scale
def _reset(self):
if hasattr(self, "scaler_"):
del self.scaler_
del self.a_
del self.b_
[docs]
def fit(self, X, y=None):
self._reset()
return self.partial_fit(X, y)
[docs]
def partial_fit(self, X, y=None):
if scipy.sparse.issparse(X):
raise TypeError("Normalization does not support scipy.sparse input")
first_pass = not hasattr(self, "mean_")
# X = self._validate_data(X, reset=first_pass, dtype=FLOAT_DTYPES, estimator=self)
tmp_x = X
if self.scale:
scaler = StandardScaler(with_std=False)
scaler.fit(X)
self.scaler_ = scaler
tmp_x = scaler.transform(X)
reference = np.mean(tmp_x, axis=1)
a = np.empty(X.shape[1], dtype=float)
b = np.empty(X.shape[1], dtype=float)
for col in range(X.shape[1]):
a[col], b[col] = np.polyfit(reference, tmp_x[:, col], deg=1)
self.a_ = a
self.b_ = b
return self
def _more_tags(self):
return {"allow_nan": False}
[docs]
def msc(spectra, scaled=True):
"""Performs multiplicative scatter correction to the mean.
Args:
spectra (numpy.ndarray): NIRS data matrix.
scaled (bool): Whether to scale the data. Defaults to True.
Returns:
numpy.ndarray: Scatter-corrected NIR spectra.
"""
if scaled:
spectra = scale(spectra, with_std=False, axis=0) # StandardScaler / demean
reference = np.mean(spectra, axis=1)
for col in range(spectra.shape[1]):
a, b = np.polyfit(reference, spectra[:, col], deg=1)
spectra[:, col] = (spectra[:, col] - b) / a
return spectra
[docs]
class ExtendedMultiplicativeScatterCorrection(TransformerMixin, BaseEstimator):
"""
Extended Multiplicative Scatter Correction (EMSC).
EMSC extends MSC by including polynomial terms to model chemical
and physical light scattering effects.
Parameters
----------
degree : int, default=2
Degree of polynomial for modeling interference.
scale : bool, default=True
Whether to scale the data before correction.
copy : bool, default=True
Whether to copy input data.
"""
def __init__(self, degree: int = 2, scale: bool = True, *, copy: bool = True):
self.copy = copy
self.scale = scale
self.degree = degree
def _reset(self):
if hasattr(self, "scaler_"):
del self.scaler_
del self.reference_
del self.wavelengths_
[docs]
def fit(self, X, y=None):
self._reset()
return self.partial_fit(X, y)
[docs]
def partial_fit(self, X, y=None):
if scipy.sparse.issparse(X):
raise TypeError("EMSC does not support scipy.sparse input")
first_pass = not hasattr(self, "reference_")
tmp_x = X.copy() if self.copy else X
if self.scale:
scaler = StandardScaler(with_std=False)
scaler.fit(X)
self.scaler_ = scaler
tmp_x = scaler.transform(tmp_x)
# Compute mean reference spectrum
self.reference_ = np.mean(tmp_x, axis=0)
# Create wavelength indices for polynomial terms
self.wavelengths_ = np.arange(X.shape[1])
return self
def _more_tags(self):
return {"allow_nan": False}
[docs]
class AreaNormalization(TransformerMixin, BaseEstimator):
"""
Area normalization of spectra.
Normalizes each spectrum by dividing by its total area (sum of absolute values).
This removes intensity variations while preserving spectral shape.
Parameters
----------
method : str, default='sum'
Method for computing area: 'sum' (sum of values), 'abs_sum' (sum of absolute values),
or 'trapz' (trapezoidal integration).
copy : bool, default=True
Whether to copy input data.
"""
def __init__(self, method: str = 'sum', *, copy: bool = True):
self.copy = copy
self.method = method
def _reset(self):
pass
[docs]
def fit(self, X, y=None):
if scipy.sparse.issparse(X):
raise ValueError("AreaNormalization does not support scipy.sparse input")
if self.method not in ['sum', 'abs_sum', 'trapz']:
raise ValueError(f"method must be 'sum', 'abs_sum', or 'trapz', got {self.method}")
return self
def _more_tags(self):
return {"allow_nan": False}
[docs]
def first_derivative(
spectra: np.ndarray,
delta: float = 1.0,
edge_order: int = 2,
) -> np.ndarray:
"""
First numerical derivative along feature axis using central differences.
Args:
spectra (numpy.ndarray): NIRS data matrix (n_samples, n_features).
delta (float): Sampling step along the feature axis.
edge_order (int): 1 or 2, order of accuracy at the boundaries.
Returns:
numpy.ndarray: First derivative dX/dλ with same shape as input.
"""
return np.gradient(spectra, delta, axis=1, edge_order=edge_order)
[docs]
class FirstDerivative(TransformerMixin, BaseEstimator):
"""
First numerical derivative using numpy.gradient.
Parameters
----------
delta : float, default=1.0
Sampling step along the feature axis.
edge_order : int, default=2
1 or 2, order of accuracy at the boundaries.
copy : bool, default=True
Whether to copy input.
"""
def __init__(self, delta: float = 1.0, edge_order: int = 2, *, copy: bool = True):
self.copy = copy
self.delta = delta
self.edge_order = edge_order
def _reset(self):
pass
[docs]
def fit(self, X, y=None):
if scipy.sparse.issparse(X):
raise ValueError("FirstDerivative does not support scipy.sparse input")
return self
def _more_tags(self):
return {"allow_nan": False}
[docs]
def second_derivative(
spectra: np.ndarray,
delta: float = 1.0,
edge_order: int = 2,
) -> np.ndarray:
"""
Second numerical derivative along feature axis.
Args:
spectra (numpy.ndarray): NIRS data matrix (n_samples, n_features).
delta (float): Sampling step along the feature axis.
edge_order (int): 1 or 2, order of accuracy at the boundaries.
Returns:
numpy.ndarray: Second derivative d²X/dλ² with same shape as input.
"""
d1 = np.gradient(spectra, delta, axis=1, edge_order=edge_order)
return np.gradient(d1, delta, axis=1, edge_order=edge_order)
def _compute_entropy(x: np.ndarray, n_bins: int = 10) -> float:
"""Compute entropy of a 1D array."""
from scipy.stats import entropy as scipy_entropy
hist, _ = np.histogram(x, bins=n_bins, density=True)
hist = hist[hist > 0]
return scipy_entropy(hist) if len(hist) > 0 else 0.0
[docs]
class WaveletFeatures(TransformerMixin, BaseEstimator):
"""
Discrete Wavelet Transform feature extractor for spectral data.
Decomposes spectra into approximation (smooth trends) and detail (sharp
features) coefficients at multiple scales, then extracts statistical
features from each level. This captures both global baseline variations
and local absorption peaks.
Scientific basis:
- Multi-resolution analysis captures features at different scales
- Daubechies wavelets (db4) are well-suited for smooth signals
- Wavelet coefficients are partially decorrelated
Parameters
----------
wavelet : str, default='db4'
Wavelet to use (e.g., 'haar', 'db4', 'coif3', 'sym4').
max_level : int, default=5
Maximum decomposition level.
n_coeffs_per_level : int, default=10
Number of top coefficients (by magnitude) to extract per level.
copy : bool, default=True
Whether to copy input data.
Attributes
----------
actual_level_ : int
Actual decomposition level used (may be less than max_level
depending on signal length).
n_features_out_ : int
Number of output features.
References
----------
Mallat (1989). A theory for multiresolution signal decomposition:
the wavelet representation. IEEE PAMI.
"""
def __init__(
self,
wavelet: str = 'db4',
max_level: int = 5,
n_coeffs_per_level: int = 10,
*,
copy: bool = True
):
self.wavelet = wavelet
self.max_level = max_level
self.n_coeffs_per_level = n_coeffs_per_level
self.copy = copy
def _reset(self):
if hasattr(self, 'actual_level_'):
del self.actual_level_
del self.n_features_out_
del self.feature_names_
[docs]
def fit(self, X, y=None):
"""
Fit the wavelet feature extractor.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : None
Ignored.
Returns
-------
self : WaveletFeatures
Fitted transformer.
"""
if scipy.sparse.issparse(X):
raise ValueError("WaveletFeatures does not support scipy.sparse input")
self._reset()
n_features = X.shape[1]
max_level_possible = pywt.dwt_max_level(n_features, self.wavelet)
self.actual_level_ = min(self.max_level, max_level_possible)
# Generate feature names and count total features
self.feature_names_ = []
# Approximation coefficients: 4 stats + n_coeffs
for stat in ['mean', 'std', 'energy', 'entropy']:
self.feature_names_.append(f"wf_approx_{stat}")
for i in range(self.n_coeffs_per_level):
self.feature_names_.append(f"wf_approx_coef_{i}")
# Detail coefficients at each level: 4 stats + n_coeffs per level
for level in range(1, self.actual_level_ + 1):
for stat in ['mean', 'std', 'energy', 'entropy']:
self.feature_names_.append(f"wf_d{level}_{stat}")
for i in range(self.n_coeffs_per_level):
self.feature_names_.append(f"wf_d{level}_coef_{i}")
self.n_features_out_ = len(self.feature_names_)
return self
[docs]
def get_feature_names_out(self, input_features=None):
"""Get output feature names."""
check_is_fitted(self, 'feature_names_')
return np.array(self.feature_names_)
def _more_tags(self):
return {"allow_nan": False}
[docs]
class WaveletPCA(TransformerMixin, BaseEstimator):
"""
Multi-scale PCA on wavelet coefficients.
Applies PCA separately to each wavelet decomposition level, creating
a compact multi-scale representation where each scale contributes a
few principal components. This preserves frequency-specific information
while reducing dimensionality.
Scientific basis:
- Combines multi-resolution analysis with decorrelation
- Each scale captures different frequency information
- PCA per scale reduces redundancy within each frequency band
- Results in a compact, interpretable feature set
Parameters
----------
wavelet : str, default='db4'
Wavelet to use (e.g., 'haar', 'db4', 'coif3', 'sym4').
max_level : int, default=4
Maximum decomposition level.
n_components_per_level : int, default=3
Number of PCA components to keep per decomposition level.
whiten : bool, default=True
Whether to whiten the PCA components.
copy : bool, default=True
Whether to copy input data.
Attributes
----------
actual_level_ : int
Actual decomposition level used.
pcas_ : dict
Fitted PCA objects per level.
scalers_ : dict
Fitted StandardScaler objects per level.
n_features_out_ : int
Number of output features.
References
----------
Trygg & Wold (1998). PLS regression on wavelet compressed NIR spectra.
"""
def __init__(
self,
wavelet: str = 'db4',
max_level: int = 4,
n_components_per_level: int = 3,
whiten: bool = True,
*,
copy: bool = True
):
self.wavelet = wavelet
self.max_level = max_level
self.n_components_per_level = n_components_per_level
self.whiten = whiten
self.copy = copy
def _reset(self):
if hasattr(self, 'actual_level_'):
del self.actual_level_
del self.pcas_
del self.scalers_
del self.feature_names_
del self.n_features_out_
[docs]
def fit(self, X, y=None):
"""
Fit the wavelet-PCA transformer.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : None
Ignored.
Returns
-------
self : WaveletPCA
Fitted transformer.
"""
from sklearn.decomposition import PCA
if scipy.sparse.issparse(X):
raise ValueError("WaveletPCA does not support scipy.sparse input")
self._reset()
n_samples, n_features = X.shape
max_level_possible = pywt.dwt_max_level(n_features, self.wavelet)
self.actual_level_ = min(self.max_level, max_level_possible)
# Decompose all samples to get coefficient arrays
all_coeffs = {i: [] for i in range(self.actual_level_ + 1)}
for i in range(n_samples):
coeffs = pywt.wavedec(X[i], self.wavelet, level=self.actual_level_)
for level_idx, c in enumerate(coeffs):
all_coeffs[level_idx].append(c)
# Fit PCA for each level
self.pcas_ = {}
self.scalers_ = {}
self.feature_names_ = []
for level_idx in range(self.actual_level_ + 1):
level_data = np.array(all_coeffs[level_idx])
n_coeffs = level_data.shape[1]
n_comps = min(self.n_components_per_level, n_coeffs, n_samples - 1)
if n_comps > 0:
scaler = StandardScaler()
level_scaled = scaler.fit_transform(level_data)
pca = PCA(n_components=n_comps, whiten=self.whiten)
pca.fit(level_scaled)
self.scalers_[level_idx] = scaler
self.pcas_[level_idx] = pca
level_name = 'approx' if level_idx == 0 else f'd{level_idx}'
for j in range(n_comps):
self.feature_names_.append(f"wpca_{level_name}_pc{j}")
self.n_features_out_ = len(self.feature_names_)
return self
[docs]
def get_feature_names_out(self, input_features=None):
"""Get output feature names."""
check_is_fitted(self, 'feature_names_')
return np.array(self.feature_names_)
def _more_tags(self):
return {"allow_nan": False}
[docs]
class WaveletSVD(TransformerMixin, BaseEstimator):
"""
Multi-scale SVD on wavelet coefficients.
Applies Truncated SVD separately to each wavelet decomposition level,
creating a compact multi-scale representation. Similar to WaveletPCA
but uses SVD which doesn't center data and works better for sparse data.
Scientific basis:
- Combines multi-resolution analysis with dimensionality reduction
- Each scale captures different frequency information
- SVD per scale reduces redundancy within each frequency band
- Results in a compact feature set
Parameters
----------
wavelet : str, default='db4'
Wavelet to use (e.g., 'haar', 'db4', 'coif3', 'sym4').
max_level : int, default=4
Maximum decomposition level.
n_components_per_level : int, default=3
Number of SVD components to keep per decomposition level.
copy : bool, default=True
Whether to copy input data.
Attributes
----------
actual_level_ : int
Actual decomposition level used.
svds_ : dict
Fitted TruncatedSVD objects per level.
n_features_out_ : int
Number of output features.
References
----------
Trygg & Wold (1998). PLS regression on wavelet compressed NIR spectra.
"""
def __init__(
self,
wavelet: str = 'db4',
max_level: int = 4,
n_components_per_level: int = 3,
*,
copy: bool = True
):
self.wavelet = wavelet
self.max_level = max_level
self.n_components_per_level = n_components_per_level
self.copy = copy
def _reset(self):
if hasattr(self, 'actual_level_'):
del self.actual_level_
del self.svds_
del self.feature_names_
del self.n_features_out_
[docs]
def fit(self, X, y=None):
"""
Fit the wavelet-SVD transformer.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : None
Ignored.
Returns
-------
self : WaveletSVD
Fitted transformer.
"""
from sklearn.decomposition import TruncatedSVD
if scipy.sparse.issparse(X):
raise ValueError("WaveletSVD does not support scipy.sparse input")
self._reset()
n_samples, n_features = X.shape
max_level_possible = pywt.dwt_max_level(n_features, self.wavelet)
self.actual_level_ = min(self.max_level, max_level_possible)
# Decompose all samples to get coefficient arrays
all_coeffs = {i: [] for i in range(self.actual_level_ + 1)}
for i in range(n_samples):
coeffs = pywt.wavedec(X[i], self.wavelet, level=self.actual_level_)
for level_idx, c in enumerate(coeffs):
all_coeffs[level_idx].append(c)
# Fit SVD for each level
self.svds_ = {}
self.feature_names_ = []
for level_idx in range(self.actual_level_ + 1):
level_data = np.array(all_coeffs[level_idx])
n_coeffs = level_data.shape[1]
# TruncatedSVD requires n_components < min(n_samples, n_features)
n_comps = min(self.n_components_per_level, n_coeffs - 1, n_samples - 1)
if n_comps > 0:
svd = TruncatedSVD(n_components=n_comps)
svd.fit(level_data)
self.svds_[level_idx] = svd
level_name = 'approx' if level_idx == 0 else f'd{level_idx}'
for j in range(n_comps):
self.feature_names_.append(f"wsvd_{level_name}_sv{j}")
self.n_features_out_ = len(self.feature_names_)
return self
[docs]
def get_feature_names_out(self, input_features=None):
"""Get output feature names."""
check_is_fitted(self, 'feature_names_')
return np.array(self.feature_names_)
def _more_tags(self):
return {"allow_nan": False}
[docs]
class SecondDerivative(TransformerMixin, BaseEstimator):
"""
Second numerical derivative using numpy.gradient.
Parameters
----------
delta : float, default=1.0
Sampling step along the feature axis.
edge_order : int, default=2
1 or 2, order of accuracy at the boundaries.
copy : bool, default=True
Whether to copy input.
"""
def __init__(self, delta: float = 1.0, edge_order: int = 2, *, copy: bool = True):
self.copy = copy
self.delta = delta
self.edge_order = edge_order
def _reset(self):
pass
[docs]
def fit(self, X, y=None):
if scipy.sparse.issparse(X):
raise ValueError("SecondDerivative does not support scipy.sparse input")
return self
def _more_tags(self):
return {"allow_nan": False}
[docs]
def reflectance_to_absorbance(
spectra: np.ndarray,
min_value: float = 1e-8,
) -> np.ndarray:
"""
Convert reflectance spectra to absorbance.
Applies the Beer-Lambert law: A = -log10(R) = log10(1/R)
where R is reflectance and A is absorbance.
Args:
spectra (numpy.ndarray): Reflectance NIRS data matrix (n_samples, n_features).
Values should be in range (0, 1] or as percentages (0, 100].
min_value (float): Minimum value to clamp reflectance to avoid log(0).
Default is 1e-8.
Returns:
numpy.ndarray: Absorbance spectra with same shape as input.
"""
X = np.clip(spectra, min_value, None)
return -np.log10(X)
[docs]
class ReflectanceToAbsorbance(TransformerMixin, BaseEstimator):
"""
Convert reflectance spectra to absorbance using Beer-Lambert law.
Applies the transformation: A = -log10(R) = log10(1/R)
where R is reflectance and A is absorbance.
This is a fundamental transformation in NIR spectroscopy, as absorbance
is linearly related to concentration (Beer-Lambert law), while reflectance
is not.
Parameters
----------
min_value : float, default=1e-8
Minimum value to clamp reflectance to avoid log(0).
Values below this threshold will be set to min_value before
applying the log transform.
percent : bool, default=False
If True, assumes input reflectance is in percentage (0-100)
and divides by 100 before conversion.
copy : bool, default=True
Whether to copy input data.
Notes
-----
- Input reflectance values should be positive.
- For reflectance in range (0, 1], output absorbance is non-negative.
- For reflectance > 1 (e.g., percentage values), set percent=True.
Examples
--------
>>> from nirs4all.operators.transforms.nirs import ReflectanceToAbsorbance
>>> import numpy as np
>>> R = np.array([[0.5, 0.25, 0.1], [0.8, 0.4, 0.2]])
>>> transformer = ReflectanceToAbsorbance()
>>> A = transformer.fit_transform(R)
>>> # A ≈ [[0.301, 0.602, 1.0], [0.097, 0.398, 0.699]]
"""
def __init__(self, min_value: float = 1e-8, percent: bool = False, *, copy: bool = True):
self.copy = copy
self.min_value = min_value
self.percent = percent
def _reset(self):
pass
[docs]
def fit(self, X, y=None):
"""
Fit the transformer (no-op, included for API compatibility).
Parameters
----------
X : array-like of shape (n_samples, n_features)
Reflectance spectra.
y : None
Ignored.
Returns
-------
self : ReflectanceToAbsorbance
Fitted transformer.
"""
if scipy.sparse.issparse(X):
raise ValueError("ReflectanceToAbsorbance does not support scipy.sparse input")
return self
def _more_tags(self):
return {"allow_nan": False}
# =============================================================================
# PyBaselines Wrapper - General baseline correction
# =============================================================================
# Registry of available pybaselines methods organized by category
PYBASELINES_METHODS = {
# Whittaker-based methods
'asls': ('whittaker', 'asls'), # Asymmetric Least Squares
'iasls': ('whittaker', 'iasls'), # Improved Asymmetric Least Squares
'airpls': ('whittaker', 'airpls'), # Adaptive Iteratively Reweighted PLS
'arpls': ('whittaker', 'arpls'), # Asymmetrically Reweighted PLS
'drpls': ('whittaker', 'drpls'), # Doubly Reweighted PLS
'iarpls': ('whittaker', 'iarpls'), # Improved ARPLS
'aspls': ('whittaker', 'aspls'), # Adaptive Smoothness PLS
'psalsa': ('whittaker', 'psalsa'), # Peaked Signal's Asymmetric Least Squares
'derpsalsa': ('whittaker', 'derpsalsa'), # Derivative PSALSA
# Polynomial methods
'poly': ('polynomial', 'poly'), # Regular polynomial
'modpoly': ('polynomial', 'modpoly'), # Modified polynomial
'imodpoly': ('polynomial', 'imodpoly'), # Improved modified polynomial
'penalized_poly': ('polynomial', 'penalized_poly'), # Penalized polynomial
'loess': ('polynomial', 'loess'), # Locally estimated scatterplot smoothing
'quant_reg': ('polynomial', 'quant_reg'), # Quantile regression
# Morphological methods
'mor': ('morphological', 'mor'), # Morphological
'imor': ('morphological', 'imor'), # Improved morphological
'mormol': ('morphological', 'mormol'), # Morphological and mollified
'amormol': ('morphological', 'amormol'), # Averaging morphological and mollified
'rolling_ball': ('morphological', 'rolling_ball'), # Rolling ball
'mwmv': ('morphological', 'mwmv'), # Moving window minimum value
'tophat': ('morphological', 'tophat'), # Top-hat transform
'mpspline': ('morphological', 'mpspline'), # Morphological penalized spline
'jbcd': ('morphological', 'jbcd'), # Joint baseline correction and denoising
# Spline methods
'mixture_model': ('spline', 'mixture_model'), # Mixture model
'irsqr': ('spline', 'irsqr'), # Iteratively reweighted spline quantile regression
'corner_cutting': ('spline', 'corner_cutting'), # Corner-cutting
'pspline_asls': ('spline', 'pspline_asls'), # Penalized spline ASLS
'pspline_iasls': ('spline', 'pspline_iasls'), # Penalized spline IASLS
'pspline_airpls': ('spline', 'pspline_airpls'), # Penalized spline airPLS
'pspline_arpls': ('spline', 'pspline_arpls'), # Penalized spline arPLS
'pspline_drpls': ('spline', 'pspline_drpls'), # Penalized spline drPLS
'pspline_iarpls': ('spline', 'pspline_iarpls'), # Penalized spline iarPLS
'pspline_aspls': ('spline', 'pspline_aspls'), # Penalized spline asPLS
'pspline_psalsa': ('spline', 'pspline_psalsa'), # Penalized spline PSALSA
'pspline_derpsalsa': ('spline', 'pspline_derpsalsa'), # Penalized spline derPSALSA
# Smooth methods
'noise_median': ('smooth', 'noise_median'), # Noise median
'snip': ('smooth', 'snip'), # Statistics-sensitive Non-linear Iterative Peak-clipping
'swima': ('smooth', 'swima'), # Small-Window Moving Average
'ipsa': ('smooth', 'ipsa'), # Iterative Polynomial Smoothing Algorithm
# Classification methods (require training data or special handling)
'dietrich': ('classification', 'dietrich'), # Dietrich's method
'golotvin': ('classification', 'golotvin'), # Golotvin's method
'std_distribution': ('classification', 'std_distribution'), # Standard distribution
'fastchrom': ('classification', 'fastchrom'), # FastChrom
'cwt_br': ('classification', 'cwt_br'), # Continuous wavelet transform
# Optimizers (iterative methods)
'collab_pls': ('optimizers', 'collab_pls'), # Collaborative PLS
'optimize_extended_range': ('optimizers', 'optimize_extended_range'),
'adaptive_minmax': ('optimizers', 'adaptive_minmax'), # Adaptive min-max
# Misc methods
'interp_pts': ('misc', 'interp_pts'), # Interpolation between points
'beads': ('misc', 'beads'), # Baseline estimation and denoising with sparsity
}
[docs]
def pybaseline_correction(
spectra: np.ndarray,
method: str = 'asls',
**kwargs
) -> np.ndarray:
"""
Apply baseline correction using pybaselines library.
This is a general wrapper for all pybaselines methods, allowing
flexible baseline correction with various algorithms.
Args:
spectra (numpy.ndarray): NIRS data matrix (n_samples, n_features).
method (str): Baseline correction method. Available methods:
Whittaker: 'asls', 'iasls', 'airpls', 'arpls', 'drpls', 'iarpls',
'aspls', 'psalsa', 'derpsalsa'
Polynomial: 'poly', 'modpoly', 'imodpoly', 'penalized_poly', 'loess', 'quant_reg'
Morphological: 'mor', 'imor', 'mormol', 'amormol', 'rolling_ball',
'mwmv', 'tophat', 'mpspline', 'jbcd'
Spline: 'mixture_model', 'irsqr', 'corner_cutting', 'pspline_asls', etc.
Smooth: 'noise_median', 'snip', 'swima', 'ipsa'
Classification: 'dietrich', 'golotvin', 'std_distribution', 'fastchrom', 'cwt_br'
Optimizers: 'collab_pls', 'optimize_extended_range', 'adaptive_minmax'
Misc: 'interp_pts', 'beads'
**kwargs: Additional parameters passed to the specific baseline method.
Returns:
numpy.ndarray: Baseline-corrected spectra with same shape as input.
Raises:
ImportError: If pybaselines is not installed.
ValueError: If an unknown method is specified.
Examples
--------
>>> from nirs4all.operators.transforms.nirs import pybaseline_correction
>>> corrected = pybaseline_correction(spectra, method='airpls', lam=1e5)
>>> corrected = pybaseline_correction(spectra, method='imodpoly', poly_order=3)
>>> corrected = pybaseline_correction(spectra, method='snip', max_half_window=30)
"""
try:
import pybaselines
except ImportError:
raise ImportError(
"pybaselines is required for baseline correction. "
"Install it with: pip install pybaselines"
)
method_lower = method.lower()
if method_lower not in PYBASELINES_METHODS:
available = ', '.join(sorted(PYBASELINES_METHODS.keys()))
raise ValueError(
f"Unknown baseline method '{method}'. "
f"Available methods: {available}"
)
module_name, func_name = PYBASELINES_METHODS[method_lower]
# Import the specific module and function
module = getattr(pybaselines, module_name)
baseline_func = getattr(module, func_name)
corrected = np.empty_like(spectra)
for i in range(spectra.shape[0]):
baseline, _ = baseline_func(spectra[i], **kwargs)
corrected[i] = spectra[i] - baseline
return corrected
[docs]
class PyBaselineCorrection(TransformerMixin, BaseEstimator):
"""
General baseline correction using pybaselines library.
A flexible wrapper for the pybaselines library that provides access to
numerous baseline correction algorithms. This transformer allows easy
integration of any pybaselines method into sklearn pipelines.
Parameters
----------
method : str, default='asls'
The baseline correction method to use. Available methods by category:
**Whittaker-based** (smooth baselines with asymmetric weighting):
- 'asls': Asymmetric Least Squares
- 'iasls': Improved Asymmetric Least Squares
- 'airpls': Adaptive Iteratively Reweighted PLS
- 'arpls': Asymmetrically Reweighted PLS
- 'drpls': Doubly Reweighted PLS
- 'iarpls': Improved ARPLS
- 'aspls': Adaptive Smoothness PLS
- 'psalsa': Peaked Signal's Asymmetric Least Squares
- 'derpsalsa': Derivative PSALSA
**Polynomial** (polynomial fitting):
- 'poly': Regular polynomial
- 'modpoly': Modified polynomial
- 'imodpoly': Improved modified polynomial
- 'penalized_poly': Penalized polynomial
- 'loess': Locally estimated scatterplot smoothing
- 'quant_reg': Quantile regression
**Morphological** (morphological operations):
- 'mor': Morphological
- 'imor': Improved morphological
- 'mormol': Morphological and mollified
- 'amormol': Averaging morphological and mollified
- 'rolling_ball': Rolling ball algorithm
- 'mwmv': Moving window minimum value
- 'tophat': Top-hat transform
- 'mpspline': Morphological penalized spline
- 'jbcd': Joint baseline correction and denoising
**Spline** (spline-based methods):
- 'mixture_model': Mixture model
- 'irsqr': Iteratively reweighted spline quantile regression
- 'corner_cutting': Corner-cutting
- 'pspline_asls', 'pspline_iasls', 'pspline_airpls', etc.
**Smooth** (smoothing-based):
- 'noise_median': Noise median
- 'snip': Statistics-sensitive Non-linear Iterative Peak-clipping
- 'swima': Small-Window Moving Average
- 'ipsa': Iterative Polynomial Smoothing Algorithm
**Misc**:
- 'beads': Baseline estimation and denoising with sparsity
- 'interp_pts': Interpolation between points
copy : bool, default=True
Whether to copy input data.
**method_params : dict
Additional parameters passed to the specific baseline method.
Common parameters include:
- lam (float): Smoothness parameter for Whittaker methods
- p (float): Asymmetry parameter for ASLS-type methods
- poly_order (int): Polynomial order for polynomial methods
- max_half_window (int): Window size for morphological/smooth methods
- max_iter (int): Maximum iterations
- tol (float): Convergence tolerance
Attributes
----------
n_features_in_ : int
Number of features seen during fit.
Examples
--------
>>> from nirs4all.operators.transforms.nirs import PyBaselineCorrection
>>> import numpy as np
Basic usage with ASLS:
>>> transformer = PyBaselineCorrection(method='asls', lam=1e6, p=0.01)
>>> corrected = transformer.fit_transform(spectra)
Using airPLS:
>>> transformer = PyBaselineCorrection(method='airpls', lam=1e5)
>>> corrected = transformer.fit_transform(spectra)
Using improved modified polynomial:
>>> transformer = PyBaselineCorrection(method='imodpoly', poly_order=3)
>>> corrected = transformer.fit_transform(spectra)
Using SNIP for Raman-like data:
>>> transformer = PyBaselineCorrection(method='snip', max_half_window=40)
>>> corrected = transformer.fit_transform(spectra)
Using rolling ball:
>>> transformer = PyBaselineCorrection(method='rolling_ball', half_window=50)
>>> corrected = transformer.fit_transform(spectra)
In a pipeline:
>>> from sklearn.pipeline import Pipeline
>>> from sklearn.preprocessing import StandardScaler
>>> pipeline = Pipeline([
... ('baseline', PyBaselineCorrection(method='airpls', lam=1e5)),
... ('scale', StandardScaler()),
... ])
References
----------
pybaselines documentation: https://pybaselines.readthedocs.io/
"""
def __init__(
self,
method: str = 'asls',
*,
copy: bool = True,
**method_params
):
self.method = method
self.copy = copy
self.method_params = method_params
def _reset(self):
if hasattr(self, 'n_features_in_'):
del self.n_features_in_
[docs]
def fit(self, X, y=None):
"""
Fit the transformer (validates method and stores number of features).
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data.
y : None
Ignored.
Returns
-------
self : PyBaselineCorrection
Fitted transformer.
"""
if scipy.sparse.issparse(X):
raise ValueError("PyBaselineCorrection does not support scipy.sparse input")
# Validate method
method_lower = self.method.lower()
if method_lower not in PYBASELINES_METHODS:
available = ', '.join(sorted(PYBASELINES_METHODS.keys()))
raise ValueError(
f"Unknown baseline method '{self.method}'. "
f"Available methods: {available}"
)
self._reset()
self.n_features_in_ = X.shape[1]
return self
[docs]
def get_params(self, deep=True):
"""Get parameters for this estimator."""
params = {
'method': self.method,
'copy': self.copy,
}
params.update(self.method_params)
return params
[docs]
def set_params(self, **params):
"""Set parameters for this estimator."""
method = params.pop('method', None)
copy = params.pop('copy', None)
if method is not None:
self.method = method
if copy is not None:
self.copy = copy
self.method_params.update(params)
return self
def _more_tags(self):
return {"allow_nan": False}
[docs]
@staticmethod
def list_methods():
"""
List all available baseline correction methods.
Returns
-------
dict
Dictionary with method categories as keys and list of methods as values.
"""
categories = {}
for method, (module, _) in PYBASELINES_METHODS.items():
if module not in categories:
categories[module] = []
categories[module].append(method)
return categories
class _BaselineMethodAlias(PyBaselineCorrection):
"""
Base class for convenience baseline method aliases.
This class properly handles get_params/set_params for sklearn clone()
compatibility by storing method-specific parameters as instance attributes.
"""
_method_name = None # Override in subclasses
def __init__(self, *, copy: bool = True, **method_params):
# Store parameters as instance attributes for sklearn compatibility
self.copy = copy
self._method_param_names = list(method_params.keys())
for key, value in method_params.items():
setattr(self, key, value)
# Initialize parent with the fixed method
super().__init__(method=self._method_name, copy=copy, **method_params)
def get_params(self, deep=True):
"""Get parameters for this estimator (excluding 'method' for subclasses)."""
params = {'copy': self.copy}
for key in self._method_param_names:
params[key] = getattr(self, key)
return params
def set_params(self, **params):
"""Set parameters for this estimator."""
copy = params.pop('copy', None)
if copy is not None:
self.copy = copy
for key, value in params.items():
if key in self._method_param_names:
setattr(self, key, value)
self.method_params[key] = value
return self
# Convenience aliases for common methods
[docs]
class AirPLS(_BaselineMethodAlias):
"""
Adaptive Iteratively Reweighted Penalized Least Squares baseline correction.
A robust baseline correction method that adaptively adjusts weights
based on the difference between the fitted baseline and the data.
Parameters
----------
lam : float, default=1e6
Smoothness parameter. Larger values produce smoother baselines.
max_iter : int, default=50
Maximum number of iterations.
tol : float, default=1e-3
Convergence tolerance.
copy : bool, default=True
Whether to copy input data.
References
----------
Zhang, Z.M., et al. (2010). Baseline correction using adaptive iteratively
reweighted penalized least squares. Analyst, 135(5), 1138-1146.
"""
_method_name = 'airpls'
def __init__(self, lam: float = 1e6, max_iter: int = 50, tol: float = 1e-3, *, copy: bool = True):
super().__init__(copy=copy, lam=lam, max_iter=max_iter, tol=tol)
[docs]
class ArPLS(_BaselineMethodAlias):
"""
Asymmetrically Reweighted Penalized Least Squares baseline correction.
Parameters
----------
lam : float, default=1e6
Smoothness parameter.
max_iter : int, default=50
Maximum number of iterations.
tol : float, default=1e-3
Convergence tolerance.
copy : bool, default=True
Whether to copy input data.
References
----------
Baek, S.J., et al. (2015). Baseline correction using asymmetrically
reweighted penalized least squares smoothing. Analyst, 140(1), 250-257.
"""
_method_name = 'arpls'
def __init__(self, lam: float = 1e6, max_iter: int = 50, tol: float = 1e-3, *, copy: bool = True):
super().__init__(copy=copy, lam=lam, max_iter=max_iter, tol=tol)
[docs]
class IModPoly(_BaselineMethodAlias):
"""
Improved Modified Polynomial baseline correction.
A polynomial-based baseline correction that iteratively fits and
removes points above the baseline.
Parameters
----------
poly_order : int, default=5
Polynomial order for fitting.
max_iter : int, default=250
Maximum number of iterations.
tol : float, default=1e-3
Convergence tolerance.
copy : bool, default=True
Whether to copy input data.
References
----------
Zhao, J., et al. (2007). Automated autofluorescence background subtraction
algorithm for biomedical Raman spectroscopy. Applied Spectroscopy, 61(11), 1225-1232.
"""
_method_name = 'imodpoly'
def __init__(self, poly_order: int = 5, max_iter: int = 250, tol: float = 1e-3, *, copy: bool = True):
super().__init__(copy=copy, poly_order=poly_order, max_iter=max_iter, tol=tol)
[docs]
class ModPoly(_BaselineMethodAlias):
"""
Modified Polynomial baseline correction.
Parameters
----------
poly_order : int, default=5
Polynomial order for fitting.
max_iter : int, default=250
Maximum number of iterations.
tol : float, default=1e-3
Convergence tolerance.
copy : bool, default=True
Whether to copy input data.
References
----------
Lieber, C.A. and Mahadevan-Jansen, A. (2003). Automated method for subtraction
of fluorescence from biological Raman spectra. Applied Spectroscopy, 57(11), 1363-1367.
"""
_method_name = 'modpoly'
def __init__(self, poly_order: int = 5, max_iter: int = 250, tol: float = 1e-3, *, copy: bool = True):
super().__init__(copy=copy, poly_order=poly_order, max_iter=max_iter, tol=tol)
[docs]
class SNIP(_BaselineMethodAlias):
"""
Statistics-sensitive Non-linear Iterative Peak-clipping baseline correction.
Particularly effective for spectra with many peaks (e.g., Raman, XRF).
Parameters
----------
max_half_window : int, default=40
Maximum half-window size for the algorithm.
decreasing : bool, default=True
Whether to use decreasing window sizes.
smooth_half_window : int or None, default=None
Half-window for smoothing. None means no smoothing.
copy : bool, default=True
Whether to copy input data.
References
----------
Ryan, C.G., et al. (1988). SNIP, a statistics-sensitive background treatment
for the quantitative analysis of PIXE spectra in geoscience applications.
Nuclear Instruments and Methods in Physics Research B, 34(3), 396-402.
"""
_method_name = 'snip'
def __init__(self, max_half_window: int = 40, decreasing: bool = True,
smooth_half_window: int = None, *, copy: bool = True):
super().__init__(copy=copy, max_half_window=max_half_window,
decreasing=decreasing, smooth_half_window=smooth_half_window)
[docs]
class RollingBall(_BaselineMethodAlias):
"""
Rolling Ball baseline correction.
A morphological approach that simulates rolling a ball beneath the spectrum.
Parameters
----------
half_window : int, default=50
Half-window size for the rolling ball.
smooth_half_window : int or None, default=None
Half-window for smoothing. None means no smoothing.
copy : bool, default=True
Whether to copy input data.
References
----------
Kneen, M.A. and Annegarn, H.J. (1996). Algorithm for fitting XRF, SEM and
PIXE X-ray spectra backgrounds. Nuclear Instruments and Methods in Physics
Research B, 109, 209-213.
"""
_method_name = 'rolling_ball'
def __init__(self, half_window: int = 50, smooth_half_window: int = None, *, copy: bool = True):
super().__init__(copy=copy, half_window=half_window, smooth_half_window=smooth_half_window)
[docs]
class IASLS(_BaselineMethodAlias):
"""
Improved Asymmetric Least Squares baseline correction.
An improvement over ASLS that uses a different weighting scheme.
Parameters
----------
lam : float, default=1e6
Smoothness parameter.
p : float, default=0.01
Asymmetry parameter.
lam_1 : float, default=1e-4
First derivative smoothing parameter.
max_iter : int, default=50
Maximum number of iterations.
tol : float, default=1e-3
Convergence tolerance.
copy : bool, default=True
Whether to copy input data.
References
----------
He, S., et al. (2014). Baseline correction for Raman spectra using an
improved asymmetric least squares method. Analytical Methods, 6(12), 4402-4407.
"""
_method_name = 'iasls'
def __init__(self, lam: float = 1e6, p: float = 0.01, lam_1: float = 1e-4,
max_iter: int = 50, tol: float = 1e-3, *, copy: bool = True):
super().__init__(copy=copy, lam=lam, p=p, lam_1=lam_1, max_iter=max_iter, tol=tol)
[docs]
class BEADS(_BaselineMethodAlias):
"""
Baseline Estimation And Denoising with Sparsity.
Simultaneously estimates baseline and removes noise using sparsity constraints.
Parameters
----------
lam_0 : float, default=1.0
Regularization parameter for the baseline.
lam_1 : float, default=1.0
Regularization parameter for the first derivative.
lam_2 : float, default=1.0
Regularization parameter for the second derivative.
max_iter : int, default=50
Maximum number of iterations.
tol : float, default=1e-2
Convergence tolerance.
copy : bool, default=True
Whether to copy input data.
References
----------
Ning, X., et al. (2014). Chromatogram baseline estimation and denoising
using sparsity (BEADS). Chemometrics and Intelligent Laboratory Systems, 139, 156-167.
"""
_method_name = 'beads'
def __init__(self, lam_0: float = 1.0, lam_1: float = 1.0, lam_2: float = 1.0,
max_iter: int = 50, tol: float = 1e-2, *, copy: bool = True):
super().__init__(copy=copy, lam_0=lam_0, lam_1=lam_1, lam_2=lam_2, max_iter=max_iter, tol=tol)
# Keep asls_baseline function for backward compatibility
[docs]
def asls_baseline(
spectra: np.ndarray,
lam: float = 1e6,
p: float = 0.01,
max_iter: int = 50,
tol: float = 1e-3,
) -> np.ndarray:
"""
Compute baseline using Asymmetric Least Squares Smoothing.
This is a convenience wrapper around pybaseline_correction with method='asls'.
Args:
spectra (numpy.ndarray): NIRS data matrix (n_samples, n_features).
lam (float): Smoothness parameter (lambda). Default is 1e6.
p (float): Asymmetry parameter (0 < p < 1). Default is 0.01.
max_iter (int): Maximum number of iterations. Default is 50.
tol (float): Convergence tolerance. Default is 1e-3.
Returns:
numpy.ndarray: Baseline-corrected spectra with same shape as input.
"""
return pybaseline_correction(spectra, method='asls', lam=lam, p=p, max_iter=max_iter, tol=tol)
[docs]
class ASLSBaseline(_BaselineMethodAlias):
"""
Asymmetric Least Squares (AsLS) baseline correction.
Convenience class for ASLS baseline correction. This is equivalent to
PyBaselineCorrection(method='asls', ...).
Parameters
----------
lam : float, default=1e6
Smoothness parameter (lambda).
p : float, default=0.01
Asymmetry parameter (0 < p < 1).
max_iter : int, default=50
Maximum number of iterations.
tol : float, default=1e-3
Convergence tolerance.
copy : bool, default=True
Whether to copy input data.
References
----------
Eilers, P.H.C. and Boelens, H.F.M. (2005). Baseline Correction with
Asymmetric Least Squares Smoothing.
"""
_method_name = 'asls'
def __init__(self, lam: float = 1e6, p: float = 0.01, max_iter: int = 50,
tol: float = 1e-3, *, copy: bool = True):
super().__init__(copy=copy, lam=lam, p=p, max_iter=max_iter, tol=tol)