Source code for nirs4all.data.parsers.files_parser

"""
Files parser for dataset configuration.

This parser handles the new 'files' syntax defined in the specification.
Implemented in Phase 4 to support partition assignment.

The files syntax allows specifying multiple files with column/row selection
and partition assignment within a single configuration.

Example:
    files:
      - path: data/measurements.csv
        partition: train
        columns:
          features: "2:-1"
          targets: -1
          metadata: [0, 1]

    # Or with complex partition:
    files:
      - path: data/all_data.csv
        partition:
          column: "split"
          train_values: ["train"]
          test_values: ["test"]

The sources syntax (Phase 6) allows specifying multiple feature sources
for sensor fusion or multi-instrument datasets:

Example:
    sources:
      - name: "NIR"
        files:
          - path: data/NIR_train.csv
            partition: train
          - path: data/NIR_test.csv
            partition: test
        params:
          header_unit: nm
          signal_type: absorbance
      - name: "MIR"
        train_x: data/MIR_train.csv
        test_x: data/MIR_test.csv
        params:
          header_unit: cm-1
          signal_type: absorbance

    targets:
      path: data/targets.csv
      link_by: sample_id
"""

from pathlib import Path
from typing import Any, Dict, List, Optional, Union

from .base import BaseParser, ParserResult
from ..schema import (
    DatasetConfigSchema,
    FileConfig,
    ColumnConfig,
    PartitionConfig,
    LoadingParams,
    PartitionType,
    SourceConfig,
    SourceFileConfig,
    SharedTargetsConfig,
    SharedMetadataConfig,
    VariationConfig,
    VariationFileConfig,
    PreprocessingApplied,
    VariationMode,
)


[docs] class FilesParser(BaseParser): """Parser for new 'files' syntax configuration. The files syntax provides: - Flexible column selection (by index, name, regex, range) - Row selection and filtering - Partition assignment per file or via partition config - Key-based sample linking across files """
[docs] def can_parse(self, input_data: Any) -> bool: """Check if this is a files-format configuration. Args: input_data: The input to check. Returns: True if input has 'files' key with non-empty list. """ if not isinstance(input_data, dict): return False files = input_data.get('files') if files is None: return False return isinstance(files, list) and len(files) > 0
[docs] def parse(self, input_data: Dict[str, Any]) -> ParserResult: """Parse a files-format configuration. Args: input_data: Dictionary configuration to parse. Returns: ParserResult with parsed configuration. """ files_list = input_data.get('files', []) errors = [] warnings = [] parsed_files = [] # Parse global settings name = input_data.get('name') description = input_data.get('description') task_type = input_data.get('task_type') signal_type = input_data.get('signal_type') global_params = input_data.get('global_params') # Parse global partition config (applies if file doesn't specify partition) global_partition = input_data.get('partition') # Parse each file configuration for idx, file_config in enumerate(files_list): try: parsed_file = self._parse_single_file( file_config, idx, global_partition ) parsed_files.append(parsed_file) except Exception as e: errors.append(f"Error parsing file {idx}: {e}") if not parsed_files and not errors: errors.append("No valid files found in 'files' configuration.") # Organize files by partition train_files = [] test_files = [] predict_files = [] for pf in parsed_files: partition = pf.get('_resolved_partition', 'train') if partition == 'train': train_files.append(pf) elif partition == 'test': test_files.append(pf) elif partition == 'predict': predict_files.append(pf) else: warnings.append( f"Unknown partition '{partition}' for file {pf.get('path')}, " f"defaulting to 'train'." ) train_files.append(pf) # Build config schema data config_data = {} if name: config_data['name'] = name if description: config_data['description'] = description if task_type: config_data['task_type'] = task_type if global_params: if isinstance(global_params, dict): config_data['global_params'] = LoadingParams(**global_params) else: config_data['global_params'] = global_params # Convert file lists to train_x/test_x format for now # (for backward compatibility with existing loaders) if train_files: if len(train_files) == 1: config_data['train_x'] = train_files[0].get('path') else: config_data['train_x'] = [f.get('path') for f in train_files] if test_files: if len(test_files) == 1: config_data['test_x'] = test_files[0].get('path') else: config_data['test_x'] = [f.get('path') for f in test_files] # Store parsed files for advanced processing config_data['files'] = [ self._to_file_config(pf) for pf in parsed_files ] return ParserResult( success=len(errors) == 0, config=DatasetConfigSchema(**config_data) if not errors else None, errors=errors, warnings=warnings, source_type="files" )
def _parse_single_file( self, file_config: Union[str, Dict[str, Any]], index: int, global_partition: Optional[Union[str, Dict[str, Any]]] = None, ) -> Dict[str, Any]: """Parse a single file configuration. Args: file_config: File configuration (path string or dict). index: File index in list. global_partition: Global partition config to use if file doesn't specify one. Returns: Parsed file configuration dict. """ # Handle simple string path if isinstance(file_config, str): resolved_partition = self._resolve_partition_from_path( file_config, global_partition ) return { 'path': file_config, '_resolved_partition': resolved_partition, } # Handle full dict configuration if not isinstance(file_config, dict): raise ValueError( f"File config must be string or dict, got {type(file_config)}" ) path = file_config.get('path') if not path: raise ValueError(f"File config at index {index} missing 'path' key") # Parse partition file_partition = file_config.get('partition') resolved_partition = self._resolve_partition( file_partition, path, global_partition ) # Parse columns columns = file_config.get('columns') parsed_columns = None if columns: if isinstance(columns, dict): parsed_columns = ColumnConfig(**columns) else: parsed_columns = columns # Parse params params = file_config.get('params') parsed_params = None if params: if isinstance(params, dict): parsed_params = LoadingParams(**params) else: parsed_params = params return { 'path': path, 'partition': file_partition, '_resolved_partition': resolved_partition, 'columns': parsed_columns, 'params': parsed_params, 'link_by': file_config.get('link_by'), 'rows': file_config.get('rows'), } def _resolve_partition( self, file_partition: Optional[Union[str, Dict[str, Any]]], path: str, global_partition: Optional[Union[str, Dict[str, Any]]], ) -> str: """Resolve partition assignment for a file. Priority: file_partition > global_partition > path inference. """ # If file has explicit partition if file_partition is not None: if isinstance(file_partition, str): return file_partition.lower() elif isinstance(file_partition, dict): # Dict partition means column-based or complex - defer to loader return 'mixed' elif isinstance(file_partition, PartitionType): return file_partition.value # If global partition is set if global_partition is not None: if isinstance(global_partition, str): return global_partition.lower() elif isinstance(global_partition, dict): return 'mixed' # Infer from path return self._resolve_partition_from_path(path, None) def _resolve_partition_from_path( self, path: str, fallback: Optional[str], ) -> str: """Infer partition from file path naming convention. Args: path: File path. fallback: Fallback partition if cannot infer. Returns: Partition name ('train', 'test', or 'predict'). """ path_lower = Path(path).stem.lower() # Training patterns train_patterns = ('train', 'cal', 'calibration', 'xcal', 'xtrain') for pattern in train_patterns: if pattern in path_lower: return 'train' # Test patterns test_patterns = ('test', 'val', 'validation', 'xval', 'xtest') for pattern in test_patterns: if pattern in path_lower: return 'test' # Predict patterns predict_patterns = ('predict', 'unknown', 'new') for pattern in predict_patterns: if pattern in path_lower: return 'predict' # Default to train if cannot infer return fallback if fallback else 'train' def _to_file_config(self, parsed: Dict[str, Any]) -> FileConfig: """Convert parsed file dict to FileConfig model.""" partition_type = None partition_str = parsed.get('_resolved_partition') if partition_str and partition_str != 'mixed': try: partition_type = PartitionType(partition_str) except ValueError: pass return FileConfig( path=parsed['path'], partition=partition_type, columns=parsed.get('columns'), params=parsed.get('params'), link_by=parsed.get('link_by'), )
[docs] class SourcesParser(BaseParser): """Parser for multi-source 'sources' syntax configuration. The sources syntax provides: - Named feature sources (e.g., NIR, MIR spectrometers) - Per-source loading parameters - Automatic source alignment by sample key - Shared targets and metadata across sources Example configuration: sources: - name: "NIR" files: - path: data/NIR_train.csv partition: train - path: data/NIR_test.csv partition: test params: header_unit: nm signal_type: absorbance - name: "MIR" train_x: data/MIR_train.csv test_x: data/MIR_test.csv params: header_unit: cm-1 targets: path: data/targets.csv link_by: sample_id metadata: path: data/metadata.csv link_by: sample_id """
[docs] def can_parse(self, input_data: Any) -> bool: """Check if this is a sources-format configuration. Args: input_data: The input to check. Returns: True if input has 'sources' key with non-empty list. """ if not isinstance(input_data, dict): return False sources = input_data.get('sources') if sources is None: return False return isinstance(sources, list) and len(sources) > 0
[docs] def parse(self, input_data: Dict[str, Any]) -> ParserResult: """Parse a sources-format configuration. Converts the sources syntax to a DatasetConfigSchema that can be further converted to legacy format for backward compatibility. Args: input_data: Dictionary configuration to parse. Returns: ParserResult with parsed configuration. """ sources_list = input_data.get('sources', []) errors = [] warnings = [] parsed_sources = [] # Parse global settings name = input_data.get('name') description = input_data.get('description') task_type = input_data.get('task_type') global_params = input_data.get('global_params') # Parse each source configuration for idx, source_config in enumerate(sources_list): try: parsed_source = self._parse_single_source(source_config, idx, global_params) parsed_sources.append(parsed_source) except Exception as e: errors.append(f"Error parsing source {idx}: {e}") if not parsed_sources and not errors: errors.append("No valid sources found in 'sources' configuration.") # Validate source names are unique source_names = [s.name for s in parsed_sources] if len(source_names) != len(set(source_names)): errors.append( f"Duplicate source names found: {source_names}. " f"Each source must have a unique name." ) # Parse shared targets shared_targets = None targets_config = input_data.get('targets') if targets_config: try: shared_targets = self._parse_shared_targets(targets_config) except Exception as e: errors.append(f"Error parsing targets: {e}") # Parse shared metadata shared_metadata = None metadata_config = input_data.get('metadata') if metadata_config: try: shared_metadata = self._parse_shared_metadata(metadata_config) except Exception as e: errors.append(f"Error parsing metadata: {e}") if errors: return ParserResult( success=False, errors=errors, warnings=warnings, source_type="sources" ) # Build config schema data config_data = { 'sources': parsed_sources, } if name: config_data['name'] = name if description: config_data['description'] = description if task_type: config_data['task_type'] = task_type if global_params: if isinstance(global_params, dict): config_data['global_params'] = LoadingParams(**global_params) else: config_data['global_params'] = global_params if shared_targets: config_data['shared_targets'] = shared_targets if shared_metadata: config_data['shared_metadata'] = shared_metadata # Create schema object try: schema = DatasetConfigSchema(**config_data) except Exception as e: return ParserResult( success=False, errors=[f"Failed to create config schema: {e}"], warnings=warnings, source_type="sources" ) # Extract dataset name dataset_name = name if not dataset_name: # Use first source name as dataset name if parsed_sources: dataset_name = f"multisource_{parsed_sources[0].name}" else: dataset_name = "multisource_dataset" # Add warning about multi-source warnings.append( f"Multi-source dataset with {len(parsed_sources)} source(s): " f"{[s.name for s in parsed_sources]}" ) return ParserResult( success=True, config=schema, dataset_name=dataset_name, errors=[], warnings=warnings, source_type="sources" )
def _parse_single_source( self, source_config: Dict[str, Any], index: int, global_params: Optional[Dict[str, Any]] = None, ) -> SourceConfig: """Parse a single source configuration. Args: source_config: Source configuration dict. index: Source index in list. global_params: Global parameters to merge with source params. Returns: Parsed SourceConfig. Raises: ValueError: If source configuration is invalid. """ if not isinstance(source_config, dict): raise ValueError( f"Source config must be a dict, got {type(source_config)}" ) # Name is required name = source_config.get('name') if not name: name = f"source_{index}" # Parse source params source_params = source_config.get('params') if source_params and isinstance(source_params, dict): # Merge with global params if global_params: merged = global_params.copy() merged.update(source_params) source_params = LoadingParams(**merged) else: source_params = LoadingParams(**source_params) elif global_params: source_params = LoadingParams(**global_params) else: source_params = None # Parse files list files_list = source_config.get('files') parsed_files = None if files_list: parsed_files = [] for f in files_list: if isinstance(f, str): parsed_files.append(f) elif isinstance(f, dict): parsed_files.append(SourceFileConfig(**f)) else: parsed_files.append(f) # Get direct paths train_x = source_config.get('train_x') test_x = source_config.get('test_x') # Get link_by link_by = source_config.get('link_by') return SourceConfig( name=name, files=parsed_files, train_x=train_x, test_x=test_x, params=source_params, link_by=link_by, ) def _parse_shared_targets( self, targets_config: Union[str, Dict[str, Any], List[Any]], ) -> Union[SharedTargetsConfig, List[SharedTargetsConfig]]: """Parse shared targets configuration. Args: targets_config: Targets configuration (path, dict, or list). Returns: Parsed SharedTargetsConfig or list of them. """ if isinstance(targets_config, str): return SharedTargetsConfig(path=targets_config) if isinstance(targets_config, dict): return SharedTargetsConfig(**targets_config) if isinstance(targets_config, list): return [ SharedTargetsConfig(**t) if isinstance(t, dict) else SharedTargetsConfig(path=t) if isinstance(t, str) else t for t in targets_config ] raise ValueError(f"Invalid targets config type: {type(targets_config)}") def _parse_shared_metadata( self, metadata_config: Union[str, Dict[str, Any], List[Any]], ) -> Union[SharedMetadataConfig, List[SharedMetadataConfig]]: """Parse shared metadata configuration. Args: metadata_config: Metadata configuration (path, dict, or list). Returns: Parsed SharedMetadataConfig or list of them. """ if isinstance(metadata_config, str): return SharedMetadataConfig(path=metadata_config) if isinstance(metadata_config, dict): return SharedMetadataConfig(**metadata_config) if isinstance(metadata_config, list): return [ SharedMetadataConfig(**m) if isinstance(m, dict) else SharedMetadataConfig(path=m) if isinstance(m, str) else m for m in metadata_config ] raise ValueError(f"Invalid metadata config type: {type(metadata_config)}")
[docs] class VariationsParser(BaseParser): """Parser for feature variations 'variations' syntax configuration. The variations syntax provides: - Named feature variations (e.g., raw, snv, derivative) - Per-variation loading parameters - Preprocessing provenance tracking - Multiple variation modes (separate, concat, select, compare) Example configuration: variations: - name: "raw" files: - path: data/spectra_raw.csv partition: train - path: data/spectra_raw_test.csv partition: test - name: "snv" description: "SNV preprocessed spectra" preprocessing_applied: - type: "SNV" software: "OPUS 8.0" train_x: data/spectra_snv_train.csv test_x: data/spectra_snv_test.csv variation_mode: separate # or concat, select, compare variation_select: ["raw", "snv"] # only for mode=select targets: path: data/targets.csv link_by: sample_id """
[docs] def can_parse(self, input_data: Any) -> bool: """Check if this is a variations-format configuration. Args: input_data: The input to check. Returns: True if input has 'variations' key with non-empty list. """ if not isinstance(input_data, dict): return False variations = input_data.get('variations') if variations is None: return False return isinstance(variations, list) and len(variations) > 0
[docs] def parse(self, input_data: Dict[str, Any]) -> ParserResult: """Parse a variations-format configuration. Converts the variations syntax to a DatasetConfigSchema that can be further converted to legacy format for backward compatibility. Args: input_data: Dictionary configuration to parse. Returns: ParserResult with parsed configuration. """ variations_list = input_data.get('variations', []) errors = [] warnings = [] parsed_variations = [] # Parse global settings name = input_data.get('name') description = input_data.get('description') task_type = input_data.get('task_type') signal_type = input_data.get('signal_type') global_params = input_data.get('global_params') variation_mode = input_data.get('variation_mode', 'separate') variation_select = input_data.get('variation_select') variation_prefix = input_data.get('variation_prefix') # Parse each variation configuration for idx, variation_config in enumerate(variations_list): try: parsed_variation = self._parse_single_variation( variation_config, idx, global_params ) parsed_variations.append(parsed_variation) except Exception as e: errors.append(f"Error parsing variation {idx}: {e}") if not parsed_variations and not errors: errors.append("No valid variations found in 'variations' configuration.") # Validate variation names are unique variation_names = [v.name for v in parsed_variations] if len(variation_names) != len(set(variation_names)): errors.append( f"Duplicate variation names found: {variation_names}. " f"Each variation must have a unique name." ) # Parse shared targets shared_targets = None targets_config = input_data.get('targets') if targets_config: try: shared_targets = self._parse_shared_targets(targets_config) except Exception as e: errors.append(f"Error parsing targets: {e}") # Parse shared metadata shared_metadata = None metadata_config = input_data.get('metadata') if metadata_config: try: shared_metadata = self._parse_shared_metadata(metadata_config) except Exception as e: errors.append(f"Error parsing metadata: {e}") if errors: return ParserResult( success=False, errors=errors, warnings=warnings, source_type="variations" ) # Build config schema data config_data = { 'variations': parsed_variations, } if name: config_data['name'] = name if description: config_data['description'] = description if task_type: config_data['task_type'] = task_type if signal_type: config_data['signal_type'] = signal_type if global_params: if isinstance(global_params, dict): config_data['global_params'] = LoadingParams(**global_params) else: config_data['global_params'] = global_params if variation_mode: config_data['variation_mode'] = variation_mode if variation_select: config_data['variation_select'] = variation_select if variation_prefix is not None: config_data['variation_prefix'] = variation_prefix if shared_targets: config_data['shared_targets'] = shared_targets if shared_metadata: config_data['shared_metadata'] = shared_metadata # Create schema object try: schema = DatasetConfigSchema(**config_data) except Exception as e: return ParserResult( success=False, errors=[f"Failed to create config schema: {e}"], warnings=warnings, source_type="variations" ) # Extract dataset name dataset_name = name if not dataset_name: # Use first variation name as dataset name if parsed_variations: dataset_name = f"variations_{parsed_variations[0].name}" else: dataset_name = "variations_dataset" # Add info about variations mode_str = variation_mode if isinstance(variation_mode, str) else variation_mode.value warnings.append( f"Variation dataset with {len(parsed_variations)} variation(s): " f"{[v.name for v in parsed_variations]}, mode: {mode_str}" ) return ParserResult( success=True, config=schema, dataset_name=dataset_name, errors=[], warnings=warnings, source_type="variations" )
def _parse_single_variation( self, variation_config: Dict[str, Any], index: int, global_params: Optional[Dict[str, Any]] = None, ) -> VariationConfig: """Parse a single variation configuration. Args: variation_config: Variation configuration dict. index: Variation index in list. global_params: Global parameters to merge with variation params. Returns: Parsed VariationConfig. Raises: ValueError: If variation configuration is invalid. """ if not isinstance(variation_config, dict): raise ValueError( f"Variation config must be a dict, got {type(variation_config)}" ) # Name is required name = variation_config.get('name') if not name: name = f"variation_{index}" # Description is optional description = variation_config.get('description') # Parse variation params variation_params = variation_config.get('params') if variation_params and isinstance(variation_params, dict): # Merge with global params if global_params: merged = global_params.copy() merged.update(variation_params) variation_params = LoadingParams(**merged) else: variation_params = LoadingParams(**variation_params) elif global_params: variation_params = LoadingParams(**global_params) else: variation_params = None # Parse files list files_list = variation_config.get('files') parsed_files = None if files_list: parsed_files = [] for f in files_list: if isinstance(f, str): parsed_files.append(f) elif isinstance(f, dict): parsed_files.append(VariationFileConfig(**f)) else: parsed_files.append(f) # Get direct paths train_x = variation_config.get('train_x') test_x = variation_config.get('test_x') # Parse preprocessing_applied preprocessing_applied = None preprocessing_list = variation_config.get('preprocessing_applied') if preprocessing_list: preprocessing_applied = [] for p in preprocessing_list: if isinstance(p, dict): preprocessing_applied.append(PreprocessingApplied(**p)) elif isinstance(p, PreprocessingApplied): preprocessing_applied.append(p) return VariationConfig( name=name, description=description, files=parsed_files, train_x=train_x, test_x=test_x, params=variation_params, preprocessing_applied=preprocessing_applied, ) def _parse_shared_targets( self, targets_config: Union[str, Dict[str, Any], List[Any]], ) -> Union[SharedTargetsConfig, List[SharedTargetsConfig]]: """Parse shared targets configuration. Args: targets_config: Targets configuration (path, dict, or list). Returns: Parsed SharedTargetsConfig or list of them. """ if isinstance(targets_config, str): return SharedTargetsConfig(path=targets_config) if isinstance(targets_config, dict): return SharedTargetsConfig(**targets_config) if isinstance(targets_config, list): return [ SharedTargetsConfig(**t) if isinstance(t, dict) else SharedTargetsConfig(path=t) if isinstance(t, str) else t for t in targets_config ] raise ValueError(f"Invalid targets config type: {type(targets_config)}") def _parse_shared_metadata( self, metadata_config: Union[str, Dict[str, Any], List[Any]], ) -> Union[SharedMetadataConfig, List[SharedMetadataConfig]]: """Parse shared metadata configuration. Args: metadata_config: Metadata configuration (path, dict, or list). Returns: Parsed SharedMetadataConfig or list of them. """ if isinstance(metadata_config, str): return SharedMetadataConfig(path=metadata_config) if isinstance(metadata_config, dict): return SharedMetadataConfig(**metadata_config) if isinstance(metadata_config, list): return [ SharedMetadataConfig(**m) if isinstance(m, dict) else SharedMetadataConfig(path=m) if isinstance(m, str) else m for m in metadata_config ] raise ValueError(f"Invalid metadata config type: {type(metadata_config)}")