"""Fractional Convolutional Kernel PLS (FCK-PLS) regressor for nirs4all.
A sklearn-compatible implementation of FCK-PLS that uses fractional order
convolutional filters to build spectral features, then applies PLS regression.
This approach is particularly suited for NIRS data where derivative-like
features at various fractional orders can capture different spectral signatures.
Supports both NumPy (CPU) and JAX (GPU/TPU) backends.
References
----------
.. [1] Podlubny, I. (1999). Fractional differential equations. Academic Press.
.. [2] Chen, Y., Petras, I., & Xue, D. (2009). Fractional order control -
A tutorial. In Proc. American Control Conference (pp. 1397-1411).
Mathematical formulation
------------------------
Let
- X ∈ ℝ^{n×p} be the input matrix of n samples and p features
(e.g. NIRS spectra, treated as 1D signals over wavelength),
- Y ∈ ℝ^{n×q} be the response matrix.
FCK-PLS builds an explicit feature map Φ : ℝ^p → ℝ^{D} by convolving each
spectrum with a bank of L fractional filters { h_ℓ }_{ℓ=1,…,L}, and then
applies PLS in this expanded feature space.
Fractional filter bank
----------------------
Each filter h_ℓ ∈ ℝ^{k} (k odd) is defined by parameters (α_ℓ, σ_ℓ) that
control its fractional "order" and scale. Conceptually, h_ℓ approximates
a 1D operator whose frequency response has the form
H_ℓ(ω) ∝ |ω|^{α_ℓ} exp(−σ_ℓ ω²),
so that:
- α_ℓ ≈ 0 corresponds to a smoothing operator,
- α_ℓ ≈ 1 to a first-derivative-like operator,
- α_ℓ ≈ 2 to a second-derivative-like operator,
with intermediate values giving fractional-order behavior. In practice,
h_ℓ is implemented as a discrete, symmetric 1D kernel constructed from
(α_ℓ, σ_ℓ, k) and normalized for numerical stability.
For a single spectrum x ∈ ℝ^p, the convolution with filter ℓ is
f_ℓ = x * h_ℓ, f_ℓ ∈ ℝ^{p′},
where * denotes 1D discrete convolution along the wavelength axis (with
either "same" or "valid" output length p′).
The feature map Φ stacks all convolved signals:
Φ(x) = [ f_1ᵀ, f_2ᵀ, …, f_Lᵀ ]ᵀ ∈ ℝ^{D},
D = L · p′.
Collecting all samples, we form the feature-expanded matrix
X_feat ∈ ℝ^{n×D}, row i = Φ(x_i)ᵀ.
PLS in feature space
--------------------
On X_feat and Y, FCK-PLS applies a standard PLS regression:
- find loading matrix W_feat ∈ ℝ^{D×r},
- scores T = X_feat W_feat ∈ ℝ^{n×r},
- regression matrix C ∈ ℝ^{r×q},
such that:
1) the covariance between T and Y is maximized (PLS objective), and
2) the regression Y ≈ T C is well fitted in least-squares sense.
Equivalently, one can define a kernel in the original input space
K_{ij} = Φ(x_i)ᵀ Φ(x_j),
and view FCK-PLS as a (linear) Kernel PLS in the feature space induced
by the fractional convolutional map Φ. In this implementation, the
feature map is explicit (X_feat is computed directly) and a standard
PLSRegression is applied.
Prediction for new data X* proceeds as follows:
1) apply the same preprocessing and fractional convolutional featurizer
to get X*_feat,
2) compute scores T* = X*_feat W_feat,
3) output Ŷ* = T* C (with inverse scaling if standardization is used).
By tuning { (α_ℓ, σ_ℓ) } and the number of components r, FCK-PLS can
adaptively emphasize different fractional smooth/derivative behaviors
and scales in the spectra, providing a flexible family of
preprocessing+PLS models specialized to 1D spectral data.
"""
from __future__ import annotations
from functools import partial
from typing import Literal, Sequence, Union
import numpy as np
from numpy.typing import ArrayLike, NDArray
from scipy.signal import fftconvolve
from sklearn.base import BaseEstimator, RegressorMixin, TransformerMixin
from sklearn.cross_decomposition import PLSRegression
from sklearn.preprocessing import StandardScaler
from sklearn.utils.validation import check_is_fitted
def _check_jax_available():
"""Check if JAX is available for GPU acceleration."""
try:
import jax
return True
except ImportError:
return False
# =============================================================================
# Fractional Filter Construction
# =============================================================================
[docs]
def fractional_kernel_1d(
alpha: float,
sigma: float,
kernel_size: int,
) -> NDArray[np.floating]:
"""Build a 1D discrete kernel for fractional smoothing/derivative.
This kernel approximates fractional order operators by combining a
Gaussian envelope with a fractional power modulation. The result
captures derivative-like behavior at non-integer orders.
Parameters
----------
alpha : float
Fractional order in [0, 2]:
- 0: Pure smoothing (Gaussian-like)
- 1: First-derivative-like behavior
- 2: Second-derivative-like behavior
Intermediate values provide fractional derivatives.
sigma : float
Scale parameter controlling the width of the kernel.
Larger sigma = wider filter, more smoothing.
kernel_size : int
Number of points in the kernel (should be odd).
Returns
-------
h : ndarray of shape (kernel_size,)
Normalized discrete filter.
Notes
-----
This is a heuristic approximation suitable for spectral data.
For mathematically rigorous fractional derivatives, use FFT-based
implementations with frequency domain multiplication by |ω|^α.
"""
k = kernel_size
idx = np.arange(k) - (k - 1) / 2.0 # Symmetric indices centered at 0
# Gaussian envelope for spatial localization
sigma_safe = max(sigma, 1e-6)
gaussian = np.exp(-0.5 * (idx / sigma_safe) ** 2)
# Handle alpha = 0 case (pure smoothing)
if alpha < 1e-10:
h = gaussian.copy()
else:
# Fractional power modulation with sign preservation
sign = np.sign(idx)
frac = np.abs(idx) ** alpha
frac[np.abs(idx) < 1e-10] = 0.0 # Handle center point
# Combine: Gaussian envelope * signed fractional power
h = gaussian * (frac * sign)
# Zero-mean for derivative-like behavior (when alpha > 0)
if alpha > 0.1:
h = h - h.mean()
# L1 normalization for stability
norm = np.sum(np.abs(h))
if norm > 1e-12:
h = h / norm
return h
[docs]
def fractional_kernel_grrunwald_letnikov(
alpha: float,
kernel_size: int,
) -> NDArray[np.floating]:
"""Build Grünwald-Letnikov fractional derivative kernel.
This is a more mathematically rigorous approximation of the fractional
derivative operator using the Grünwald-Letnikov definition.
Parameters
----------
alpha : float
Fractional order (can be any real number).
kernel_size : int
Number of points in the kernel.
Returns
-------
h : ndarray of shape (kernel_size,)
Grünwald-Letnikov coefficients.
Notes
-----
The Grünwald-Letnikov definition:
D^α f(x) ≈ lim_{h→0} (1/h^α) Σ_{j=0}^{n} (-1)^j C(α,j) f(x - jh)
where C(α,j) = Γ(α+1) / (Γ(j+1) * Γ(α-j+1))
"""
from scipy.special import gammaln
coeffs = np.zeros(kernel_size, dtype=np.float64)
for j in range(kernel_size):
# Binomial-like coefficient: (-1)^j * C(α, j)
# C(α, j) = Γ(α+1) / (Γ(j+1) * Γ(α-j+1))
# Using log-gamma for numerical stability
arg1 = alpha + 1
arg2 = j + 1
arg3 = alpha - j + 1
# Check for valid gamma arguments (no non-positive integers)
args_valid = True
for arg in [arg1, arg2, arg3]:
if arg <= 0 and float(arg).is_integer():
args_valid = False
break
if args_valid:
try:
# Use log-gamma to avoid overflow
log_coeff = gammaln(arg1) - gammaln(arg2) - gammaln(arg3)
coeff = np.exp(log_coeff)
if np.isfinite(coeff):
coeffs[j] = ((-1) ** j) * coeff
else:
coeffs[j] = 0.0
except (ValueError, OverflowError, RuntimeWarning):
coeffs[j] = 0.0
else:
coeffs[j] = 0.0
# Ensure no NaN or Inf
coeffs = np.nan_to_num(coeffs, nan=0.0, posinf=0.0, neginf=0.0)
# Normalize
norm = np.sum(np.abs(coeffs))
if norm > 1e-12:
coeffs = coeffs / norm
return coeffs
# =============================================================================
# Fractional Convolutional Featurizer
# =============================================================================
[docs]
class FractionalConvFeaturizer(BaseEstimator, TransformerMixin):
"""Convolutional featurizer using a bank of fractional filters.
Builds features by convolving input spectra with multiple fractional
order filters at different scales. This captures derivative-like
information at various fractional orders, which can be useful for
identifying spectral features.
Parameters
----------
alphas : sequence of float, default=(0.0, 0.5, 1.0, 1.5, 2.0)
Fractional orders for the filter bank.
- 0: Smoothing/identity-like
- 0.5: Half-derivative
- 1: First derivative
- 1.5: Fractional between 1st and 2nd derivative
- 2: Second derivative
sigmas : sequence of float, default=(2.0,)
Scale parameters. If single value, same sigma for all alphas.
If same length as alphas, pairs (alpha[i], sigma[i]).
kernel_size : int, default=15
Size of convolution kernels (should be odd).
mode : str, default='same'
Convolution mode:
- 'same': Output same length as input
- 'valid': Output shorter (no padding)
kernel_type : str, default='heuristic'
Type of fractional kernel:
- 'heuristic': Gaussian-modulated fractional power
- 'grunwald': Grünwald-Letnikov coefficients
Attributes
----------
kernels_ : list of ndarray
Precomputed convolution kernels.
n_kernels_ : int
Number of kernels in the filter bank.
"""
def __init__(
self,
alphas: Sequence[float] = (0.0, 0.5, 1.0, 1.5, 2.0),
sigmas: Sequence[float] = (2.0,),
kernel_size: int = 15,
mode: Literal['same', 'valid'] = 'same',
kernel_type: Literal['heuristic', 'grunwald'] = 'heuristic',
):
"""Initialize the fractional convolutional featurizer."""
self.alphas = list(alphas)
self.sigmas = list(sigmas)
self.kernel_size = kernel_size
self.mode = mode
self.kernel_type = kernel_type
[docs]
def fit(self, X, y=None):
"""Precompute convolution kernels.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training data (used only for validation).
y : ignored
Returns
-------
self : FractionalConvFeaturizer
"""
# Ensure odd kernel size
if self.kernel_size % 2 == 0:
raise ValueError("kernel_size must be odd")
# Handle sigma broadcasting
if len(self.sigmas) == 1 and len(self.alphas) > 1:
sigmas = [self.sigmas[0]] * len(self.alphas)
else:
sigmas = self.sigmas
if len(sigmas) != len(self.alphas):
raise ValueError(
"sigmas must have length 1 or same length as alphas. "
f"Got {len(sigmas)} sigmas and {len(self.alphas)} alphas."
)
# Build kernels
self.kernels_ = []
for alpha, sigma in zip(self.alphas, sigmas):
if self.kernel_type == 'heuristic':
h = fractional_kernel_1d(alpha, sigma, self.kernel_size)
elif self.kernel_type == 'grunwald':
h = fractional_kernel_grrunwald_letnikov(alpha, self.kernel_size)
else:
raise ValueError(
f"kernel_type must be 'heuristic' or 'grunwald', "
f"got '{self.kernel_type}'"
)
self.kernels_.append(h)
self.n_kernels_ = len(self.kernels_)
return self
[docs]
def get_kernel_info(self) -> dict:
"""Get information about the filter bank.
Returns
-------
info : dict
Dictionary containing kernel parameters and shapes.
"""
check_is_fitted(self, ['kernels_'])
return {
'n_kernels': self.n_kernels_,
'alphas': self.alphas,
'sigmas': self.sigmas if len(self.sigmas) == len(self.alphas)
else [self.sigmas[0]] * len(self.alphas),
'kernel_size': self.kernel_size,
'kernel_type': self.kernel_type,
'mode': self.mode,
}
# =============================================================================
# JAX Backend for Fractional Convolution
# =============================================================================
def _get_jax_fckpls_functions():
"""Lazy import and create JAX FCK-PLS functions."""
import jax
import jax.numpy as jnp
from jax import lax
jax.config.update("jax_enable_x64", True)
@jax.jit
def convolve_1d_jax(x, kernel):
"""1D convolution using JAX."""
# Use lax.conv_general_dilated for 1D convolution
# Reshape for conv: (batch, spatial, channels)
x_reshaped = x.reshape(1, -1, 1)
kernel_reshaped = kernel.reshape(-1, 1, 1)
# Padding for 'same' mode
pad = kernel.shape[0] // 2
result = lax.conv_general_dilated(
x_reshaped,
kernel_reshaped,
window_strides=(1,),
padding=((pad, pad),),
dimension_numbers=('NWC', 'WIO', 'NWC'),
)
return result.ravel()
def apply_filter_bank_jax(X, kernels):
"""Apply filter bank using JAX.
Note: This function is not JIT-compiled because the number
of kernels is dynamic.
"""
X = jnp.asarray(X)
n_samples, n_features = X.shape
n_kernels = len(kernels)
results = []
for h in kernels:
h_jax = jnp.asarray(h)
# Apply to each sample
convolved = jax.vmap(lambda x: convolve_1d_jax(x, h_jax))(X)
results.append(convolved)
return jnp.hstack(results)
return {
'convolve_1d': convolve_1d_jax,
'apply_filter_bank': apply_filter_bank_jax,
}
_JAX_FCKPLS_FUNCS = None
def _get_cached_jax_fckpls():
"""Get cached JAX FCK-PLS functions."""
global _JAX_FCKPLS_FUNCS
if _JAX_FCKPLS_FUNCS is None:
_JAX_FCKPLS_FUNCS = _get_jax_fckpls_functions()
return _JAX_FCKPLS_FUNCS
# =============================================================================
# FCK-PLS Estimator Class
# =============================================================================
[docs]
class FCKPLS(BaseEstimator, RegressorMixin):
"""Fractional Convolutional Kernel PLS (FCK-PLS).
FCK-PLS builds spectral features by convolving input spectra with a bank
of fractional order filters, then applies PLS regression on the expanded
feature space. This approach captures derivative-like information at
various fractional orders.
The pipeline is:
1. Optional standardization of X and Y
2. FractionalConvFeaturizer: X -> X_feat (feature expansion)
3. PLSRegression: X_feat, Y -> predictions
Parameters
----------
n_components : int, default=10
Number of PLS components to extract.
alphas : sequence of float, default=(0.0, 0.5, 1.0, 1.5, 2.0)
Fractional orders for the filter bank.
sigmas : sequence of float, default=(2.0,)
Scale parameters for fractional kernels.
kernel_size : int, default=15
Size of convolution kernels (must be odd).
mode : str, default='same'
Convolution mode: 'same' or 'valid'.
kernel_type : str, default='heuristic'
Fractional kernel type: 'heuristic' or 'grunwald'.
standardize : bool, default=True
Whether to standardize X and Y before fitting.
backend : str, default='numpy'
Computational backend:
- 'numpy': NumPy/SciPy backend (CPU)
- 'jax': JAX backend (supports GPU/TPU)
Attributes
----------
n_features_in_ : int
Number of input features.
n_features_out_ : int
Number of features after convolution.
featurizer_ : FractionalConvFeaturizer
The fitted fractional featurizer.
pls_ : PLSRegression
The fitted PLS model.
Examples
--------
>>> from nirs4all.operators.models.sklearn.fckpls import FCKPLS
>>> import numpy as np
>>> # Generate spectral data
>>> np.random.seed(42)
>>> X = np.random.randn(100, 200) # 100 samples, 200 wavelengths
>>> y = X[:, 50:60].mean(axis=1) + 0.1 * np.random.randn(100)
>>> # Fit FCK-PLS with default fractional orders
>>> model = FCKPLS(n_components=10, alphas=(0.0, 0.5, 1.0, 1.5, 2.0))
>>> model.fit(X, y)
FCKPLS(...)
>>> predictions = model.predict(X)
>>> # Use specific fractional orders
>>> model2 = FCKPLS(n_components=10, alphas=(0.0, 1.0, 2.0), sigmas=(3.0,))
>>> model2.fit(X, y)
Notes
-----
The fractional order α controls the type of spectral feature extracted:
- α ≈ 0: Smoothed spectrum (low-pass filtering)
- α ≈ 1: First derivative-like (highlights slopes)
- α ≈ 2: Second derivative-like (highlights peaks/valleys)
- Fractional α: Intermediate behavior
The sigma parameter controls the scale of the filter. Larger sigma
captures broader spectral features; smaller sigma captures local details.
FCK-PLS can be computationally expensive with many filters and large
spectra. Consider using the JAX backend for GPU acceleration.
See Also
--------
SIMPLS : Standard PLS without feature expansion.
IntervalPLS : PLS with wavelength interval selection.
"""
# Explicitly declare estimator type for sklearn compatibility (e.g., StackingRegressor)
_estimator_type = "regressor"
def __init__(
self,
n_components: int = 10,
alphas: Sequence[float] = (0.0, 0.5, 1.0, 1.5, 2.0),
sigmas: Sequence[float] = (2.0,),
kernel_size: int = 15,
mode: Literal['same', 'valid'] = 'same',
kernel_type: Literal['heuristic', 'grunwald'] = 'heuristic',
standardize: bool = True,
backend: str = 'numpy',
):
"""Initialize FCK-PLS regressor."""
self.n_components = n_components
self.alphas = alphas
self.sigmas = sigmas
self.kernel_size = kernel_size
self.mode = mode
self.kernel_type = kernel_type
self.standardize = standardize
self.backend = backend
[docs]
def fit(
self,
X: ArrayLike,
y: ArrayLike,
) -> "FCKPLS":
"""Fit the FCK-PLS model.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Training spectra.
y : array-like of shape (n_samples,) or (n_samples, n_targets)
Target values.
Returns
-------
self : FCKPLS
Fitted estimator.
Raises
------
ValueError
If parameters are invalid.
ImportError
If JAX backend requested but not installed.
"""
if self.backend not in ('numpy', 'jax'):
raise ValueError(
f"backend must be 'numpy' or 'jax', got '{self.backend}'"
)
if self.backend == 'jax' and not _check_jax_available():
raise ImportError(
"JAX is required for FCKPLS with backend='jax'. "
"Install with: pip install jax"
)
X = np.asarray(X, dtype=np.float64)
y = np.asarray(y, dtype=np.float64)
self._y_1d = y.ndim == 1
if self._y_1d:
y = y.reshape(-1, 1)
n_samples, n_features = X.shape
n_targets = y.shape[1]
self.n_features_in_ = n_features
# Standardization
if self.standardize:
self.x_scaler_ = StandardScaler(with_mean=True, with_std=True)
self.y_scaler_ = StandardScaler(with_mean=True, with_std=True)
else:
self.x_scaler_ = None
self.y_scaler_ = None
X_proc = X.copy()
Y_proc = y.copy()
if self.x_scaler_ is not None:
X_proc = self.x_scaler_.fit_transform(X_proc)
if self.y_scaler_ is not None:
Y_proc = self.y_scaler_.fit_transform(Y_proc)
# Build featurizer
self.featurizer_ = FractionalConvFeaturizer(
alphas=self.alphas,
sigmas=self.sigmas,
kernel_size=self.kernel_size,
mode=self.mode,
kernel_type=self.kernel_type,
)
self.featurizer_.fit(X_proc)
# Apply featurizer
if self.backend == 'jax':
import jax.numpy as jnp
jax_funcs = _get_cached_jax_fckpls()
X_feat = np.asarray(
jax_funcs['apply_filter_bank'](X_proc, self.featurizer_.kernels_)
)
else:
X_feat = self.featurizer_.transform(X_proc)
self.n_features_out_ = X_feat.shape[1]
# Limit components
max_components = min(n_samples - 1, X_feat.shape[1])
n_comp = min(self.n_components, max_components)
# Fit PLS on expanded features
self.pls_ = PLSRegression(n_components=n_comp)
self.pls_.fit(X_feat, Y_proc)
return self
[docs]
def predict(self, X: ArrayLike) -> NDArray[np.floating]:
"""Predict using the FCK-PLS model.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Samples to predict.
Returns
-------
y_pred : ndarray of shape (n_samples,) or (n_samples, n_targets)
Predicted values.
"""
check_is_fitted(self, ['featurizer_', 'pls_'])
X = np.asarray(X, dtype=np.float64)
# Preprocess
X_proc = X.copy()
if self.x_scaler_ is not None:
X_proc = self.x_scaler_.transform(X_proc)
# Apply featurizer
if self.backend == 'jax':
import jax.numpy as jnp
jax_funcs = _get_cached_jax_fckpls()
X_feat = np.asarray(
jax_funcs['apply_filter_bank'](X_proc, self.featurizer_.kernels_)
)
else:
X_feat = self.featurizer_.transform(X_proc)
# PLS predict
Y_proc = self.pls_.predict(X_feat)
# Inverse transform
if self.y_scaler_ is not None:
Y = self.y_scaler_.inverse_transform(Y_proc)
else:
Y = Y_proc
if self._y_1d:
Y = Y.ravel()
return Y
[docs]
def get_filter_info(self) -> dict:
"""Get information about the fractional filter bank.
Returns
-------
info : dict
Dictionary containing filter parameters.
"""
check_is_fitted(self, ['featurizer_'])
return self.featurizer_.get_kernel_info()
[docs]
def get_fractional_features(self, X: ArrayLike) -> NDArray[np.floating]:
"""Get the fractional convolution features.
Parameters
----------
X : array-like of shape (n_samples, n_features)
Input spectra.
Returns
-------
X_feat : ndarray of shape (n_samples, n_features_out)
Fractional convolution features.
"""
check_is_fitted(self, ['featurizer_'])
X = np.asarray(X, dtype=np.float64)
X_proc = X.copy()
if self.x_scaler_ is not None:
X_proc = self.x_scaler_.transform(X_proc)
if self.backend == 'jax':
import jax.numpy as jnp
jax_funcs = _get_cached_jax_fckpls()
X_feat = np.asarray(
jax_funcs['apply_filter_bank'](X_proc, self.featurizer_.kernels_)
)
else:
X_feat = self.featurizer_.transform(X_proc)
return X_feat
[docs]
def get_params(self, deep: bool = True) -> dict:
"""Get parameters for this estimator."""
return {
'n_components': self.n_components,
'alphas': self.alphas,
'sigmas': self.sigmas,
'kernel_size': self.kernel_size,
'mode': self.mode,
'kernel_type': self.kernel_type,
'standardize': self.standardize,
'backend': self.backend,
}
[docs]
def set_params(self, **params) -> "FCKPLS":
"""Set the parameters of this estimator."""
for key, value in params.items():
setattr(self, key, value)
return self
[docs]
def __repr__(self) -> str:
"""Return string representation."""
return (
f"FCKPLS(n_components={self.n_components}, "
f"alphas={self.alphas}, "
f"sigmas={self.sigmas}, "
f"kernel_size={self.kernel_size}, "
f"mode='{self.mode}', "
f"kernel_type='{self.kernel_type}', "
f"backend='{self.backend}')"
)
# Aliases
FractionalPLS = FCKPLS