Source code for nirs4all.data.parsers.folder_parser

"""
Folder parser for dataset configuration.

This parser handles folder paths, scanning for data files matching
standard naming conventions (Xcal, Xval, etc.).
"""

from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

from .base import BaseParser, ParserResult


# File naming patterns for auto-detection
FILE_PATTERNS = {
    "train_x": [
        "Xcal", "X_cal", "Cal_X", "calX",
        "train_X", "trainX", "X_train", "Xtrain"
    ],
    "test_x": [
        "Xval", "X_val", "val_X", "valX",
        "Xtest", "X_test", "test_X", "testX"
    ],
    "train_y": [
        "Ycal", "Y_cal", "Cal_Y", "calY",
        "train_Y", "trainY", "Y_train", "Ytrain"
    ],
    "test_y": [
        "Ytest", "Y_test", "test_Y", "testY",
        "Yval", "Y_val", "val_Y", "valY"
    ],
    "train_group": [
        "Mcal", "M_cal", "Cal_M", "calM",
        "train_M", "trainM", "M_train", "Mtrain",
        "Metacal", "Meta_cal", "Cal_Meta", "calMeta",
        "train_Meta", "trainMeta", "Meta_train", "Metatrain",
        "metadatacal", "metadata_cal", "Cal_metadata", "calMetadata",
        "train_metadata", "trainMetadata", "metadata_train", "metadatatrain"
    ],
    "test_group": [
        "Mtest", "M_test", "test_M", "testM",
        "Mval", "M_val", "val_M", "valM",
        "Metatest", "Meta_test", "test_Meta", "testMeta",
        "Metaval", "Meta_val", "val_Meta", "valMeta",
        "metadatatest", "metadata_test", "test_metadata", "testMetadata",
        "metadataval", "metadata_val", "val_metadata", "valMetadata"
    ],
}


[docs] class FolderParser(BaseParser): """Parser for folder-based dataset configuration. This parser scans a folder for data files matching standard naming conventions and creates a configuration dictionary. Supported file formats: - CSV files (.csv) - Compressed CSV files (.csv.gz, .csv.zip) Multi-source detection: - If multiple files match the same pattern (e.g., Xcal_NIR.csv, Xcal_MIR.csv), they are treated as multi-source data. """ # Supported file extensions SUPPORTED_EXTENSIONS = {'.csv', '.csv.gz', '.csv.zip', '.gz', '.zip'}
[docs] def can_parse(self, input_data: Any) -> bool: """Check if input is a folder path. Args: input_data: The input to check. Returns: True if input is a string path to an existing directory. """ if isinstance(input_data, str): # Check if it looks like a file path (has JSON/YAML extension) lower_path = input_data.lower() if lower_path.endswith(('.json', '.yaml', '.yml')): return False # Check if it's a directory return Path(input_data).is_dir() if isinstance(input_data, Path): return input_data.is_dir() # Also handle dict with 'folder' key if isinstance(input_data, dict): return 'folder' in input_data return False
[docs] def parse(self, input_data: Any) -> ParserResult: """Parse a folder path into a configuration. Args: input_data: Folder path (str, Path) or dict with 'folder' key. Returns: ParserResult with configuration from scanned files. """ # Extract folder path and optional params if isinstance(input_data, dict): folder_path = input_data.get('folder') global_params = input_data.get('params') or input_data.get('global_params') else: folder_path = input_data global_params = None if folder_path is None: return ParserResult( success=False, errors=["No folder path provided"], source_type="folder" ) path = Path(folder_path) if not path.exists(): return ParserResult( success=False, errors=[f"Folder does not exist: {folder_path}"], source_type="folder" ) if not path.is_dir(): return ParserResult( success=False, errors=[f"Path is not a directory: {folder_path}"], source_type="folder" ) # Scan folder for data files config, warnings = self._scan_folder(path, global_params) # Check if any data was found has_train = config.get('train_x') is not None has_test = config.get('test_x') is not None if not has_train and not has_test: return ParserResult( success=False, errors=[ f"No data files found in folder: {folder_path}. " "Expected files matching patterns like Xcal.csv, Xval.csv, etc." ], source_type="folder" ) # Extract dataset name dataset_name = self._extract_name_from_path(path) return ParserResult( success=True, config=config, dataset_name=dataset_name, warnings=warnings, source_type="folder" )
def _scan_folder( self, folder: Path, global_params: Optional[Dict[str, Any]] = None ) -> Tuple[Dict[str, Any], List[str]]: """Scan folder for data files. Args: folder: Path to folder to scan. global_params: Optional global loading parameters. Returns: Tuple of (config_dict, warnings_list). """ config = { "train_x": None, "train_x_filter": None, "train_x_params": None, "train_y": None, "train_y_filter": None, "train_y_params": None, "train_group": None, "train_group_filter": None, "train_group_params": None, "train_params": None, "test_x": None, "test_x_filter": None, "test_x_params": None, "test_y": None, "test_y_filter": None, "test_y_params": None, "test_group": None, "test_group_filter": None, "test_group_params": None, "test_params": None, "global_params": global_params } warnings = [] # Get all files in folder all_files = list(folder.glob("*")) # Match files to patterns for key, patterns in FILE_PATTERNS.items(): matched_files = [] for pattern in patterns: pattern_lower = pattern.lower() for file_path in all_files: if not file_path.is_file(): continue # Check if pattern matches (case-insensitive) if pattern_lower in file_path.name.lower(): # Check file extension if self._has_supported_extension(file_path): file_posix = file_path.as_posix() if file_posix not in matched_files: matched_files.append(file_posix) # Assign matches to config if len(matched_files) == 1: config[key] = matched_files[0] elif len(matched_files) > 1: # Multi-source detected warnings.append( f"Multiple files matched for {key}: {len(matched_files)} sources detected." ) config[key] = matched_files return config, warnings def _has_supported_extension(self, path: Path) -> bool: """Check if file has a supported extension. Args: path: Path to check. Returns: True if file extension is supported. """ name = path.name.lower() # Check for compound extensions first if name.endswith('.csv.gz') or name.endswith('.csv.zip'): return True # Check simple extensions return path.suffix.lower() in self.SUPPORTED_EXTENSIONS