"""Refactored FeatureSource using component-based architecture."""
import numpy as np
from typing import List, Optional
from nirs4all.data.types import InputFeatures, ProcessingList, SampleIndices
from nirs4all.data._features import (
ArrayStorage,
ProcessingManager,
HeaderManager,
LayoutTransformer,
UpdateStrategy,
AugmentationHandler,
LayoutType,
)
[docs]
class FeatureSource:
"""Manages a 3D numpy array of features using modular components.
This class provides efficient storage and manipulation of feature data with multiple
processing stages. Each sample can have multiple processing versions (e.g., raw, normalized,
filtered), all stored in a single aligned 3D array.
The implementation uses a component-based architecture for better modularity:
- ArrayStorage: Manages the 3D numpy array
- ProcessingManager: Tracks processing IDs and their indices
- HeaderManager: Manages feature headers and units
- LayoutTransformer: Transforms arrays to different layouts
- UpdateStrategy: Handles update operation logic
- AugmentationHandler: Manages sample augmentation
Attributes:
padding: Whether to allow padding when adding features with fewer dimensions.
pad_value: Value to use for padding (default: 0.0).
"""
def __init__(self, padding: bool = True, pad_value: float = 0.0):
"""Initialize an empty FeatureSource.
Args:
padding: If True, allow padding features to match existing dimensions.
pad_value: Value to use for padding missing features.
"""
self._storage = ArrayStorage(padding=padding, pad_value=pad_value)
self._processing_mgr = ProcessingManager()
self._header_mgr = HeaderManager()
self._layout_transformer = LayoutTransformer()
self._update_strategy = UpdateStrategy()
self._augmentation_handler = AugmentationHandler()
def __repr__(self):
return (
f"FeatureSource(shape={self._storage.shape}, "
f"dtype={self._storage.dtype}, "
f"processing_ids={self._processing_mgr.processing_ids})"
)
def __str__(self) -> str:
array = self._storage.array
if array.size > 0:
mean_value = round(float(np.mean(array)), 3)
variance_value = round(float(np.var(array)), 3)
min_value = round(float(np.min(array)), 3)
max_value = round(float(np.max(array)), 3)
else:
mean_value = variance_value = min_value = max_value = 0.0
return (
f"{self._storage.shape}, "
f"processings={self._processing_mgr.processing_ids}, "
f"min={min_value}, max={max_value}, "
f"mean={mean_value}, var={variance_value}"
)
@property
def headers(self) -> Optional[List[str]]:
"""Get the feature headers.
Returns:
List of header strings, or None if not set.
"""
return self._header_mgr.headers
@property
def header_unit(self) -> str:
"""Get the unit type of the headers.
Returns:
Unit type string ("cm-1", "nm", "none", "text", "index").
"""
return self._header_mgr.header_unit
@property
def num_samples(self) -> int:
"""Get the number of samples.
Returns:
Number of samples (first dimension of array).
"""
return self._storage.num_samples
@property
def num_processings(self) -> int:
"""Get the number of processing stages.
Returns:
Number of unique processings (second dimension of array).
"""
return self._processing_mgr.num_processings
@property
def num_features(self) -> int:
"""Get the number of features per processing.
Returns:
Number of features (third dimension of array).
"""
return self._storage.num_features
@property
def num_2d_features(self) -> int:
"""Get total features when flattened to 2D.
Returns:
Product of processings and features dimensions.
"""
return self._storage.num_processings * self._storage.num_features
@property
def processing_ids(self) -> List[str]:
"""Get a copy of the processing ID list.
Returns:
List of processing identifiers.
"""
return self._processing_mgr.processing_ids
[docs]
def add_samples(
self,
new_samples: np.ndarray,
headers: Optional[List[str]] = None
) -> None:
"""Add new samples to the feature source.
Only allowed when there's only one processing (raw). Samples are added as
a new row in the array with a single processing dimension.
Args:
new_samples: 2D array of shape (n_samples, n_features).
headers: Optional list of feature header names.
Raises:
ValueError: If the dataset already has multiple processings, or if
new_samples is not 2D.
"""
if self.num_processings > 1:
raise ValueError(
"Cannot add new samples to a dataset that already has been processed."
)
if new_samples.ndim != 2:
raise ValueError(
f"new_samples must be a 2D array, got {new_samples.ndim} dimensions"
)
self._storage.add_samples(new_samples)
# Only update headers if provided, and preserve existing unit
if headers is not None:
current_unit = self._header_mgr.header_unit
self._header_mgr.set_headers(headers, unit=current_unit)
[docs]
def add_samples_batch_3d(self, data: np.ndarray) -> None:
"""Add multiple samples with 3D data in a single operation - O(N) instead of O(N²).
This method is optimized for bulk insertion of augmented samples where
each sample may have multiple processings.
Args:
data: 3D array of shape (n_samples, n_processings, n_features).
Raises:
ValueError: If data dimensions don't match existing processings/features.
"""
if data.ndim != 3:
raise ValueError(f"data must be a 3D array, got {data.ndim} dimensions")
self._storage.add_samples_batch(data)
[docs]
def update_features(
self,
source_processings: ProcessingList,
features: InputFeatures,
processings: ProcessingList
) -> None:
"""Add new features or replace existing ones.
Args:
source_processings: List of existing processing names to replace.
Empty string "" means add new.
features: List of feature arrays, each of shape (n_samples, n_features),
or single array.
processings: List of target processing names for the data.
Example:
# Add new 'savgol' and 'detrend', replace 'raw' with 'msc'
update_features(["", "raw", ""],
[savgol_data, msc_data, detrend_data],
["savgol", "msc", "detrend"])
"""
# Normalize features to list of arrays
feature_list = self._normalize_features_input(features)
if not feature_list:
return
# Categorize operations
replacements, additions = self._update_strategy.categorize_operations(
feature_list,
source_processings,
processings,
self._processing_mgr._processing_id_to_index
)
# Check if we should resize features
should_resize, new_num_features = self._update_strategy.should_resize_features(
replacements,
additions,
self._storage.num_features
)
if should_resize:
self._storage.resize_features(new_num_features)
self._header_mgr.clear_headers()
# Apply operations
self._apply_replacements(replacements)
self._apply_additions(additions)
[docs]
def reset_features(
self,
features: np.ndarray,
processings: List[str]
) -> None:
"""Reset features and processings.
Replaces all features and processings with new data.
Args:
features: New feature data (2D or 3D).
processings: List of new processing names.
"""
# Reset storage
self._storage.reset_data(features)
# Reset processing manager
self._processing_mgr.reset_processings(processings)
# Clear headers as dimensions likely changed
self._header_mgr.clear_headers()
def _normalize_features_input(self, features: InputFeatures) -> List[np.ndarray]:
"""Normalize various feature input formats to list of arrays.
Args:
features: Input features in various formats.
Returns:
List of numpy arrays.
"""
if isinstance(features, np.ndarray):
return [features]
if isinstance(features, list):
if not features:
return []
# Check if it's list of lists (multi-source case)
if isinstance(features[0], list):
return list(features[0]) # Take first source
# Check if it's list of arrays
if isinstance(features[0], np.ndarray):
return list(features)
return []
def _apply_replacements(self, replacements: List) -> None:
"""Apply replacement operations.
Args:
replacements: List of ReplacementOperation objects.
"""
for replacement in replacements:
self._storage.update_processing(replacement.proc_idx, replacement.new_data)
# Update processing name if different
old_name = self._processing_mgr.processing_ids[replacement.proc_idx]
if replacement.new_proc_name != old_name:
self._processing_mgr.rename_processing(
old_name,
replacement.new_proc_name
)
def _apply_additions(self, additions: List) -> None:
"""Apply addition operations.
Args:
additions: List of AdditionOperation objects.
"""
for addition in additions:
self._storage.add_processing(addition.new_data)
self._processing_mgr.add_processing(addition.new_proc_name)
[docs]
def augment_samples(
self,
sample_indices: List[int],
data: np.ndarray,
processings: List[str],
count_list: List[int]
) -> None:
"""Create augmented samples by duplicating existing samples.
Args:
sample_indices: List of sample indices to augment.
data: Augmented feature data of shape (total_augmented_samples, n_features).
processings: Processing names for the augmented data.
count_list: Number of augmentations per sample.
"""
# Validate inputs
total_augmentations = self._augmentation_handler.validate_augmentation_inputs(
sample_indices,
data,
count_list,
self.num_samples
)
if total_augmentations == 0:
return
# Normalize processings
proc_list = self._augmentation_handler.normalize_processings(processings)
# Augment samples in storage (duplicates existing samples)
self._storage.augment_samples(sample_indices, count_list, new_proc_data=None)
# Add new processings for augmented samples
for proc_name in proc_list:
if not self._processing_mgr.has_processing(proc_name):
self._add_new_processing_for_augmentation(
proc_name,
data,
total_augmentations
)
def _add_new_processing_for_augmentation(
self,
proc_name: str,
data: np.ndarray,
total_augmentations: int
) -> None:
"""Add a new processing for augmented samples only.
Args:
proc_name: Name for the new processing.
data: Processing data for augmented samples.
total_augmentations: Number of augmented samples.
"""
# Add processing to storage (expands array and adds data for augmented samples)
self._storage._add_processing_for_augmented(data, total_augmentations)
# Register the new processing
self._processing_mgr.add_processing(proc_name)
[docs]
def x(self, indices: SampleIndices, layout: str) -> np.ndarray:
"""Retrieve feature data in specified layout.
Args:
indices: Sample indices to retrieve.
layout: Output format:
- "2d": Flatten to (samples, processings * features)
- "2d_interleaved": Transpose then flatten to (samples, features * processings)
- "3d": Keep as (samples, processings, features)
- "3d_transpose": Transpose to (samples, features, processings)
Returns:
Feature array in requested layout.
Raises:
ValueError: If layout is unknown.
"""
if len(indices) == 0:
return self._layout_transformer.get_empty_array(
layout,
self.num_processings,
self.num_features,
self._storage.dtype
)
# Get data from storage
selected_data = self._storage.get_data(np.array(indices))
# Transform to requested layout
return self._layout_transformer.transform(
selected_data,
layout,
self.num_processings,
self.num_features
)