"""
Auto-detection for file parameters.
This module provides enhanced auto-detection capabilities for CSV files,
including delimiter detection, decimal separator detection, header detection,
and signal type inference from headers.
Phase 8 Implementation - Dataset Configuration Roadmap
Section 8.2: Auto-Detection Improvements
"""
import csv
import io
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
import numpy as np
from nirs4all.core.logging import get_logger
logger = get_logger(__name__)
[docs]
@dataclass
class DetectionResult:
"""Result of auto-detection.
Attributes:
delimiter: Detected field delimiter.
decimal_separator: Detected decimal separator.
has_header: Whether the file has a header row.
header_unit: Detected unit type for headers.
signal_type: Detected signal type.
encoding: Detected file encoding.
n_columns: Detected number of columns.
n_rows: Estimated number of rows.
confidence: Confidence scores for each detected parameter.
warnings: List of detection warnings.
"""
delimiter: str = ";"
decimal_separator: str = "."
has_header: bool = True
header_unit: str = "cm-1"
signal_type: Optional[str] = None
encoding: str = "utf-8"
n_columns: int = 0
n_rows: int = 0
confidence: Dict[str, float] = field(default_factory=dict)
warnings: List[str] = field(default_factory=list)
[docs]
def to_params(self) -> Dict[str, Any]:
"""Convert to loading parameters dictionary."""
return {
"delimiter": self.delimiter,
"decimal_separator": self.decimal_separator,
"has_header": self.has_header,
"header_unit": self.header_unit,
"signal_type": self.signal_type,
"encoding": self.encoding,
}
[docs]
class AutoDetector:
"""Auto-detect file parameters.
Provides methods to detect CSV delimiters, decimal separators,
header presence, header units, and signal types from file content.
Example:
```python
detector = AutoDetector()
result = detector.detect("path/to/file.csv")
print(f"Delimiter: {result.delimiter}")
print(f"Has header: {result.has_header}")
print(f"Signal type: {result.signal_type}")
```
"""
# Common delimiters in order of priority
DELIMITERS = [",", ";", "\t", "|", " "]
# Patterns for header unit detection
HEADER_PATTERNS = {
"nm": [
r"^\d{3,4}(?:\.\d+)?$", # 400, 450.5, 1200
r"^\d{3,4}(?:\.\d+)?nm$", # 400nm
],
"cm-1": [
r"^\d{4,5}(?:\.\d+)?$", # 4000, 10000.5
r"^\d{4,5}(?:\.\d+)?cm-1$", # 4000cm-1
r"^\d{4,5}(?:\.\d+)?wavenumber$", # Explicit wavenumber
],
"text": [
r"^[a-zA-Z]", # Starts with letter
r"^feature_\d+$", # feature_1, feature_2
r"^[xX]_?\d+$", # X1, x_1
],
"index": [
r"^\d{1,3}$", # 1, 10, 100 (small numbers, likely indices)
],
}
# Signal type patterns in header values
SIGNAL_TYPE_PATTERNS = {
"absorbance": [
r"abs(orbance)?",
r"log\s*\(?1/[RT]\)?",
r"A\s*=",
],
"reflectance": [
r"reflect(ance)?",
r"^R$",
r"R\s*%",
],
"transmittance": [
r"transmit(tance)?",
r"^T$",
r"T\s*%",
],
}
def __init__(
self,
sample_lines: int = 50,
min_confidence: float = 0.6,
):
"""Initialize detector.
Args:
sample_lines: Number of lines to sample for detection.
min_confidence: Minimum confidence threshold for detection.
"""
self.sample_lines = sample_lines
self.min_confidence = min_confidence
[docs]
def detect(
self,
source: Union[str, Path, bytes, io.StringIO],
known_params: Optional[Dict[str, Any]] = None,
) -> DetectionResult:
"""Detect file parameters.
Args:
source: Path to file, file content as bytes, or StringIO.
known_params: Optional known parameters to skip detection for.
Returns:
DetectionResult with detected parameters.
"""
result = DetectionResult()
known_params = known_params or {}
# Read content
content, encoding = self._read_content(source)
result.encoding = known_params.get("encoding", encoding)
if not content:
result.warnings.append("Empty file content")
return result
# Get sample lines
lines = self._get_sample_lines(content)
if not lines:
result.warnings.append("No data lines found")
return result
# Detect delimiter
if "delimiter" in known_params:
result.delimiter = known_params["delimiter"]
result.confidence["delimiter"] = 1.0
else:
result.delimiter, conf = self._detect_delimiter(lines)
result.confidence["delimiter"] = conf
# Parse with detected delimiter
parsed_rows = self._parse_lines(lines, result.delimiter)
if not parsed_rows:
result.warnings.append("Could not parse lines")
return result
result.n_columns = max(len(row) for row in parsed_rows) if parsed_rows else 0
result.n_rows = len(parsed_rows)
# Detect decimal separator
if "decimal_separator" in known_params:
result.decimal_separator = known_params["decimal_separator"]
result.confidence["decimal_separator"] = 1.0
else:
result.decimal_separator, conf = self._detect_decimal_separator(parsed_rows)
result.confidence["decimal_separator"] = conf
# Detect header
if "has_header" in known_params:
result.has_header = known_params["has_header"]
result.confidence["has_header"] = 1.0
else:
result.has_header, conf = self._detect_header(
parsed_rows,
result.decimal_separator
)
result.confidence["has_header"] = conf
# Detect header unit
if "header_unit" in known_params:
result.header_unit = known_params["header_unit"]
result.confidence["header_unit"] = 1.0
elif result.has_header and parsed_rows:
result.header_unit, conf = self._detect_header_unit(parsed_rows[0])
result.confidence["header_unit"] = conf
# Detect signal type
if "signal_type" in known_params:
result.signal_type = known_params["signal_type"]
result.confidence["signal_type"] = 1.0
elif result.has_header and parsed_rows:
result.signal_type, conf = self._detect_signal_type_from_header(parsed_rows[0])
result.confidence["signal_type"] = conf
if result.signal_type is None:
# Try to infer from data values
result.signal_type, conf = self._detect_signal_type_from_values(
parsed_rows[1:] if result.has_header else parsed_rows,
result.decimal_separator
)
result.confidence["signal_type"] = conf
return result
def _read_content(
self,
source: Union[str, Path, bytes, io.StringIO]
) -> Tuple[str, str]:
"""Read content from source.
Args:
source: Path, bytes, or StringIO.
Returns:
Tuple of (content, encoding).
"""
if isinstance(source, io.StringIO):
source.seek(0)
return source.read(), "utf-8"
if isinstance(source, bytes):
# Try to detect encoding
try:
return source.decode("utf-8"), "utf-8"
except UnicodeDecodeError:
try:
return source.decode("latin-1"), "latin-1"
except UnicodeDecodeError:
return source.decode("utf-8", errors="replace"), "utf-8"
path = Path(source)
if not path.exists():
return "", "utf-8"
# Try encodings
for encoding in ["utf-8", "latin-1", "cp1252"]:
try:
with open(path, "r", encoding=encoding) as f:
return f.read(), encoding
except UnicodeDecodeError:
continue
# Fallback
with open(path, "r", encoding="utf-8", errors="replace") as f:
return f.read(), "utf-8"
def _get_sample_lines(self, content: str) -> List[str]:
"""Get sample lines from content.
Args:
content: File content.
Returns:
List of sample lines.
"""
lines = []
for i, line in enumerate(content.split("\n")):
if i >= self.sample_lines:
break
line = line.strip()
if line:
lines.append(line)
return lines
def _parse_lines(self, lines: List[str], delimiter: str) -> List[List[str]]:
"""Parse lines with delimiter.
Args:
lines: List of lines.
delimiter: Field delimiter.
Returns:
List of parsed rows (lists of fields).
"""
content = "\n".join(lines)
reader = csv.reader(io.StringIO(content), delimiter=delimiter)
return [row for row in reader if any(cell.strip() for cell in row)]
def _detect_delimiter(self, lines: List[str]) -> Tuple[str, float]:
"""Detect the field delimiter.
Args:
lines: Sample lines.
Returns:
Tuple of (delimiter, confidence).
"""
best_delim = ";"
max_score = 0.0
for delim in self.DELIMITERS:
score = self._score_delimiter(lines, delim)
if score > max_score:
max_score = score
best_delim = delim
confidence = min(max_score / 10, 1.0) # Normalize score to confidence
return best_delim, confidence
def _score_delimiter(self, lines: List[str], delim: str) -> float:
"""Score a delimiter based on consistency.
Args:
lines: Sample lines.
delim: Delimiter to test.
Returns:
Score (higher is better).
"""
if not lines:
return 0.0
try:
reader = csv.reader(io.StringIO("\n".join(lines)), delimiter=delim)
col_counts = [len(row) for row in reader if row]
except csv.Error:
return 0.0
if not col_counts:
return 0.0
# Most common column count
from collections import Counter
count_freq = Counter(col_counts)
most_common_count, frequency = count_freq.most_common(1)[0]
# Score based on consistency and number of columns
consistency = frequency / len(col_counts)
col_bonus = min(most_common_count / 10, 5) # Prefer more columns, max bonus 5
# Penalize if only 1 column
if most_common_count == 1:
return 0.1
return consistency * 5 + col_bonus
def _detect_decimal_separator(
self,
parsed_rows: List[List[str]]
) -> Tuple[str, float]:
"""Detect the decimal separator.
Args:
parsed_rows: Parsed data rows.
Returns:
Tuple of (separator, confidence).
"""
# Count occurrences of . and , in numeric-looking values
dot_count = 0
comma_count = 0
dot_valid = 0
comma_valid = 0
for row in parsed_rows[1:]: # Skip potential header
for cell in row:
cell = cell.strip()
if not cell:
continue
# Check if it looks like a number with dot
if "." in cell and "," not in cell:
try:
float(cell)
dot_valid += 1
except ValueError:
pass
dot_count += 1
# Check if it looks like a number with comma
if "," in cell and "." not in cell:
try:
float(cell.replace(",", "."))
comma_valid += 1
except ValueError:
pass
comma_count += 1
# Prefer dot as it's more common in scientific data
if dot_valid >= comma_valid:
return ".", min((dot_valid + 1) / (max(dot_count, 1) + comma_count + 1), 1.0)
else:
return ",", min((comma_valid + 1) / (max(comma_count, 1) + dot_count + 1), 1.0)
def _detect_header(
self,
parsed_rows: List[List[str]],
decimal_sep: str
) -> Tuple[bool, float]:
"""Detect if file has a header row.
Args:
parsed_rows: Parsed data rows.
decimal_sep: Detected decimal separator.
Returns:
Tuple of (has_header, confidence).
"""
if len(parsed_rows) < 2:
return True, 0.5 # Default to True with low confidence
first_row = parsed_rows[0]
data_rows = parsed_rows[1:min(10, len(parsed_rows))]
# Count numeric values in first row vs data rows
first_numeric = sum(1 for cell in first_row if self._is_numeric(cell, decimal_sep))
first_ratio = first_numeric / len(first_row) if first_row else 0
data_numeric_ratios = []
for row in data_rows:
if not row:
continue
numeric = sum(1 for cell in row if self._is_numeric(cell, decimal_sep))
data_numeric_ratios.append(numeric / len(row))
if not data_numeric_ratios:
return True, 0.5
avg_data_ratio = np.mean(data_numeric_ratios)
# If first row has significantly fewer numeric values, it's likely a header
if first_ratio < avg_data_ratio - 0.3:
confidence = min((avg_data_ratio - first_ratio) * 2, 1.0)
return True, confidence
# If ratios are similar, probably no header
if abs(first_ratio - avg_data_ratio) < 0.1:
return False, 0.7
return True, 0.5 # Default with uncertainty
def _is_numeric(self, value: str, decimal_sep: str = ".") -> bool:
"""Check if a string value is numeric.
Args:
value: Value to check.
decimal_sep: Decimal separator.
Returns:
True if numeric.
"""
value = value.strip()
if not value:
return False
# Handle scientific notation
if "e" in value.lower():
try:
float(value.replace(decimal_sep, "."))
return True
except ValueError:
return False
# Standard numeric
if decimal_sep == ",":
value = value.replace(",", ".")
try:
float(value)
return True
except ValueError:
return False
def _detect_header_unit(self, header_row: List[str]) -> Tuple[str, float]:
"""Detect the unit type from header values.
Args:
header_row: First row (header) values.
Returns:
Tuple of (unit_type, confidence).
"""
scores = {unit: 0 for unit in self.HEADER_PATTERNS}
for cell in header_row:
cell = cell.strip()
if not cell:
continue
for unit, patterns in self.HEADER_PATTERNS.items():
for pattern in patterns:
if re.match(pattern, cell, re.IGNORECASE):
scores[unit] += 1
break
if not any(scores.values()):
return "text", 0.5 # Default
# Find best match
best_unit = max(scores, key=scores.get)
best_score = scores[best_unit]
# Calculate confidence
total = sum(scores.values())
confidence = best_score / total if total > 0 else 0.5
# Special case: distinguish between nm and cm-1 based on value ranges
if best_unit in ["nm", "cm-1"]:
numeric_values = []
for cell in header_row:
try:
val = float(cell.strip())
numeric_values.append(val)
except ValueError:
continue
if numeric_values:
min_val = min(numeric_values)
max_val = max(numeric_values)
# Typical wavelength range: 350-2500 nm
# Typical wavenumber range: 400-12500 cm-1
if 350 <= min_val <= 2500 and max_val <= 2500:
return "nm", 0.8
elif 400 <= min_val <= 12500 and max_val <= 12500 and min_val > 2500:
return "cm-1", 0.8
return best_unit, confidence
def _detect_signal_type_from_header(
self,
header_row: List[str]
) -> Tuple[Optional[str], float]:
"""Detect signal type from header content.
Args:
header_row: First row (header) values.
Returns:
Tuple of (signal_type or None, confidence).
"""
full_header = " ".join(header_row).lower()
for signal_type, patterns in self.SIGNAL_TYPE_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, full_header, re.IGNORECASE):
return signal_type, 0.8
return None, 0.0
def _detect_signal_type_from_values(
self,
data_rows: List[List[str]],
decimal_sep: str
) -> Tuple[Optional[str], float]:
"""Detect signal type from data values.
Args:
data_rows: Data rows (excluding header).
decimal_sep: Decimal separator.
Returns:
Tuple of (signal_type or None, confidence).
"""
# Collect numeric values
values = []
for row in data_rows[:20]: # Sample first 20 rows
for cell in row:
try:
val = float(cell.strip().replace(decimal_sep, "."))
values.append(val)
except ValueError:
continue
if not values:
return None, 0.0
min_val = min(values)
max_val = max(values)
# Absorbance: typically 0-3 range
if 0 <= min_val <= 0.5 and 0.5 <= max_val <= 5:
return "absorbance", 0.6
# Reflectance/Transmittance: typically 0-1 or 0-100 range
if 0 <= min_val <= 0.1 and 0.8 <= max_val <= 1.0:
return "reflectance", 0.5
if 0 <= min_val <= 10 and 80 <= max_val <= 100:
return "reflectance%", 0.5
return None, 0.0
[docs]
def detect_file_parameters(
source: Union[str, Path, bytes],
known_params: Optional[Dict[str, Any]] = None,
sample_lines: int = 50,
) -> DetectionResult:
"""Convenience function to detect file parameters.
Args:
source: Path to file or file content.
known_params: Optional known parameters.
sample_lines: Number of lines to sample.
Returns:
DetectionResult with detected parameters.
"""
detector = AutoDetector(sample_lines=sample_lines)
return detector.detect(source, known_params)
[docs]
def detect_signal_type(
header: Optional[List[str]] = None,
data: Optional[np.ndarray] = None,
) -> Tuple[Optional[str], float]:
"""Detect signal type from header and/or data.
Args:
header: Optional list of header values.
data: Optional data array.
Returns:
Tuple of (signal_type or None, confidence).
"""
detector = AutoDetector()
# Try header first
if header:
signal_type, conf = detector._detect_signal_type_from_header(header)
if signal_type and conf >= 0.6:
return signal_type, conf
# Try data values
if data is not None:
min_val = np.nanmin(data)
max_val = np.nanmax(data)
# Absorbance: typically 0-3 range
if 0 <= min_val <= 0.5 and 0.5 <= max_val <= 5:
return "absorbance", 0.6
# Reflectance/Transmittance: typically 0-1 or 0-100 range
if 0 <= min_val <= 0.1 and 0.8 <= max_val <= 1.0:
return "reflectance", 0.5
if 0 <= min_val <= 10 and 80 <= max_val <= 100:
return "reflectance%", 0.5
return None, 0.0