Source code for nirs4all.cli.commands.dataset

"""
Dataset CLI commands for nirs4all.

Provides commands for dataset validation, inspection, and diagnostics.

Phase 8 Implementation - Dataset Configuration Roadmap
Section 8.4: Error Handling & Diagnostics - Validation CLI
"""

import json
import sys
from pathlib import Path
from typing import Optional

from nirs4all.core.logging import get_logger

logger = get_logger(__name__)


[docs] def dataset_validate(args): """Validate a dataset configuration file.""" from nirs4all.data.schema.validation import ( ConfigValidator, DiagnosticReport, DiagnosticBuilder, ErrorRegistry, ) from nirs4all.data.parsers.normalizer import ConfigNormalizer config_path = args.config_file verbose = args.verbose check_files = args.check_files output_format = args.format # Check file exists first if not Path(config_path).exists(): if output_format == "json": print(json.dumps({"error": f"File not found: {config_path}", "is_valid": False})) else: print(f"Error: Configuration file not found: {config_path}") sys.exit(1) # Create diagnostic report report = DiagnosticReport(config_path=config_path) builder = DiagnosticBuilder() try: # Load and normalize config normalizer = ConfigNormalizer() config, dataset_name = normalizer.normalize(config_path) if config is None: report.add(builder.create( ErrorRegistry.E100, details="Could not parse configuration file" )) else: # Validate configuration validator = ConfigValidator(check_file_existence=check_files) result = validator.validate(config) # Convert validation errors/warnings to diagnostic messages for error in result.errors: report.add(DiagnosticBuilder().create( ErrorRegistry.E100, details=str(error), location=config_path )) for warning in result.warnings: report.add(DiagnosticBuilder().create( ErrorRegistry.E204, path=config_path, encoding="", location=config_path )) if verbose and config: # Add info about detected configuration print(f"\nDataset name: {dataset_name}") if "train_x" in config: print(f"Training features: {config['train_x']}") if "test_x" in config: print(f"Test features: {config['test_x']}") if "task_type" in config: print(f"Task type: {config['task_type']}") except FileNotFoundError as e: report.add(builder.file_not_found(str(e))) except ValueError as e: report.add(builder.create( ErrorRegistry.E100, details=str(e), location=config_path )) except Exception as e: report.add(builder.create( ErrorRegistry.E900, error=str(e), location=config_path )) if verbose: import traceback traceback.print_exc() # Output results if output_format == "json": print(json.dumps(report.to_dict(), indent=2)) else: print(report) # Exit with appropriate code sys.exit(0 if report.is_valid else 1)
[docs] def dataset_inspect(args): """Inspect a dataset configuration and show details.""" from nirs4all.data.parsers.normalizer import ConfigNormalizer from nirs4all.data.detection import detect_file_parameters config_path = args.config_file detect_params = args.detect # Check file exists if not Path(config_path).exists(): print(f"Error: File not found: {config_path}") sys.exit(1) # Load configuration normalizer = ConfigNormalizer() config, dataset_name = normalizer.normalize(config_path) if config is None: print(f"Error: Could not load configuration from {config_path}") sys.exit(1) print(f"Dataset Configuration: {config_path}") print("=" * 60) print(f"\nDataset Name: {dataset_name}") # Show data sources print("\nData Sources:") for key in ["train_x", "train_y", "train_group", "test_x", "test_y", "test_group"]: if key in config and config[key]: value = config[key] if isinstance(value, list): print(f" {key}:") for i, v in enumerate(value): print(f" [{i}] {v}") else: print(f" {key}: {value}") # Show parameters if "global_params" in config and config["global_params"]: print("\nGlobal Parameters:") for key, value in config["global_params"].items(): print(f" {key}: {value}") # Task and aggregation if "task_type" in config: print(f"\nTask Type: {config['task_type']}") if "aggregate" in config: print(f"Aggregation: {config['aggregate']}") if "aggregate_method" in config: print(f"Aggregation Method: {config['aggregate_method']}") # Auto-detect file parameters if requested if detect_params: print("\nAuto-detected Parameters:") train_x = config.get("train_x") if train_x: if isinstance(train_x, list): train_x = train_x[0] if isinstance(train_x, str) and Path(train_x).exists(): result = detect_file_parameters(train_x) print(f" Delimiter: '{result.delimiter}' (confidence: {result.confidence.get('delimiter', 0):.2f})") print(f" Decimal separator: '{result.decimal_separator}' (confidence: {result.confidence.get('decimal_separator', 0):.2f})") print(f" Has header: {result.has_header} (confidence: {result.confidence.get('has_header', 0):.2f})") print(f" Header unit: {result.header_unit} (confidence: {result.confidence.get('header_unit', 0):.2f})") if result.signal_type: print(f" Signal type: {result.signal_type} (confidence: {result.confidence.get('signal_type', 0):.2f})") if result.warnings: print(" Warnings:") for warning in result.warnings: print(f" - {warning}")
[docs] def dataset_export(args): """Export dataset configuration to normalized format.""" from nirs4all.data.parsers.normalizer import ConfigNormalizer from nirs4all.data.serialization import ConfigSerializer, SerializationFormat config_path = args.config_file output_path = args.output output_format = args.format # Load configuration normalizer = ConfigNormalizer() config, dataset_name = normalizer.normalize(config_path) if config is None: print(f"Error: Could not load configuration from {config_path}") sys.exit(1) # Serialize serializer = ConfigSerializer(normalize=True) fmt = SerializationFormat.YAML if output_format == "yaml" else SerializationFormat.JSON if output_path: serializer.save(config, output_path, format=fmt) print(f"Exported to: {output_path}") else: if fmt == SerializationFormat.YAML: print(serializer.to_yaml(config)) else: print(serializer.to_json(config))
[docs] def dataset_diff(args): """Compare two dataset configurations.""" from nirs4all.data.parsers.normalizer import ConfigNormalizer from nirs4all.data.serialization import diff_configs config1_path = args.config1 config2_path = args.config2 output_format = args.format # Load configurations normalizer = ConfigNormalizer() config1, name1 = normalizer.normalize(config1_path) config2, name2 = normalizer.normalize(config2_path) if config1 is None: print(f"Error: Could not load configuration from {config1_path}") sys.exit(1) if config2 is None: print(f"Error: Could not load configuration from {config2_path}") sys.exit(1) # Diff diff = diff_configs(config1, config2) if output_format == "json": result = { "config1": config1_path, "config2": config2_path, "is_identical": diff.is_identical(), "added": list(diff.added.keys()), "removed": list(diff.removed.keys()), "changed": list(diff.changed.keys()), } print(json.dumps(result, indent=2)) else: print(f"Comparing: {config1_path} vs {config2_path}") print("=" * 60) print(diff)
[docs] def add_dataset_commands(subparsers): """Add dataset commands to CLI.""" # Dataset command group dataset_parser = subparsers.add_parser( 'dataset', help='Dataset configuration commands' ) dataset_subparsers = dataset_parser.add_subparsers(dest='dataset_command') # dataset validate validate_parser = dataset_subparsers.add_parser( 'validate', help='Validate a dataset configuration file' ) validate_parser.add_argument( 'config_file', type=str, help='Path to the dataset configuration file (JSON, YAML, or folder path)' ) validate_parser.add_argument( '--check-files', action='store_true', default=True, help='Check that referenced data files exist (default: true)' ) validate_parser.add_argument( '--no-check-files', action='store_false', dest='check_files', help='Do not check if referenced data files exist' ) validate_parser.add_argument( '-v', '--verbose', action='store_true', help='Show detailed validation information' ) validate_parser.add_argument( '-f', '--format', type=str, choices=['text', 'json'], default='text', help='Output format (default: text)' ) validate_parser.set_defaults(func=dataset_validate) # dataset inspect inspect_parser = dataset_subparsers.add_parser( 'inspect', help='Inspect a dataset configuration' ) inspect_parser.add_argument( 'config_file', type=str, help='Path to the dataset configuration file' ) inspect_parser.add_argument( '--detect', action='store_true', help='Auto-detect file parameters (delimiter, encoding, etc.)' ) inspect_parser.set_defaults(func=dataset_inspect) # dataset export export_parser = dataset_subparsers.add_parser( 'export', help='Export dataset configuration to normalized format' ) export_parser.add_argument( 'config_file', type=str, help='Path to the dataset configuration file' ) export_parser.add_argument( '-o', '--output', type=str, help='Output file path (prints to stdout if not specified)' ) export_parser.add_argument( '-f', '--format', type=str, choices=['yaml', 'json'], default='yaml', help='Output format (default: yaml)' ) export_parser.set_defaults(func=dataset_export) # dataset diff diff_parser = dataset_subparsers.add_parser( 'diff', help='Compare two dataset configurations' ) diff_parser.add_argument( 'config1', type=str, help='Path to first configuration file' ) diff_parser.add_argument( 'config2', type=str, help='Path to second configuration file' ) diff_parser.add_argument( '-f', '--format', type=str, choices=['text', 'json'], default='text', help='Output format (default: text)' ) diff_parser.set_defaults(func=dataset_diff)