"""
Configuration validation with JSON Schema for nirs4all.
Provides validation functions for pipeline and dataset configuration files,
with detailed error messages including line numbers and suggestions.
"""
import json
from pathlib import Path
from typing import Tuple, List, Any, Dict, Optional, Union
import yaml
from nirs4all.core.logging import get_logger
logger = get_logger(__name__)
[docs]
class ConfigValidationError(Exception):
"""Exception raised when configuration validation fails.
Attributes:
errors: List of validation error messages.
config_path: Path to the configuration file (if any).
"""
def __init__(self, errors: List[str], config_path: Optional[str] = None):
self.errors = errors
self.config_path = config_path
message = f"Configuration validation failed"
if config_path:
message += f" for {config_path}"
message += f": {'; '.join(errors)}"
super().__init__(message)
# =============================================================================
# JSON Schemas for configuration validation
# =============================================================================
# Schema for component specification (sklearn, TF, PyTorch, JAX objects)
COMPONENT_SCHEMA = {
"type": "object",
"properties": {
"class": {"type": "string", "description": "Full module path to the class"},
"params": {"type": "object", "description": "Constructor parameters"},
"function": {"type": "string", "description": "Full module path to a function"},
},
"anyOf": [
{"required": ["class"]},
{"required": ["function"]},
]
}
# Schema for a pipeline step
STEP_SCHEMA = {
"oneOf": [
# Simple component: {"class": "...", "params": {...}}
COMPONENT_SCHEMA,
# Model step: {"model": {...}}
{
"type": "object",
"properties": {
"model": {"$ref": "#/$defs/component"},
"name": {"type": "string"},
"finetune_params": {"type": "object"},
},
"required": ["model"]
},
# Preprocessing step: {"preprocessing": {...}}
{
"type": "object",
"properties": {
"preprocessing": {"$ref": "#/$defs/component"},
},
"required": ["preprocessing"]
},
# Y processing step: {"y_processing": {...}}
{
"type": "object",
"properties": {
"y_processing": {"$ref": "#/$defs/component"},
},
"required": ["y_processing"]
},
# Feature augmentation: {"feature_augmentation": {...}}
{
"type": "object",
"properties": {
"feature_augmentation": {"type": "object"},
},
"required": ["feature_augmentation"]
},
# Generator syntax: {"_or_": [...], ...}
{
"type": "object",
"properties": {
"_or_": {"type": "array"},
"count": {"type": "integer", "minimum": 1},
},
"required": ["_or_"]
},
# Range generator: {"_range_": [...], ...}
{
"type": "object",
"properties": {
"_range_": {"type": "array", "minItems": 2, "maxItems": 3},
"param": {"type": "string"},
},
"required": ["_range_"]
},
# Branch step: {"branch": {...}}
{
"type": "object",
"properties": {
"branch": {"type": "object"},
},
"required": ["branch"]
},
]
}
# Schema for pipeline configuration file
PIPELINE_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "nirs4all Pipeline Configuration",
"description": "Schema for nirs4all pipeline configuration files",
"type": "object",
"properties": {
"pipeline": {
"type": "array",
"description": "List of pipeline steps",
"items": {
"type": "object",
"description": "A pipeline step (preprocessing, model, splitter, etc.)"
},
"minItems": 1
},
"name": {
"type": "string",
"description": "Pipeline name"
},
"description": {
"type": "string",
"description": "Pipeline description"
}
},
"required": ["pipeline"]
}
# Schema for dataset configuration file
DATASET_SCHEMA = {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "nirs4all Dataset Configuration",
"description": "Schema for nirs4all dataset configuration files",
"type": "object",
"properties": {
# Data file paths
"train_x": {
"oneOf": [
{"type": "string", "description": "Path to training features file"},
{"type": "array", "items": {"type": "string"}, "description": "Paths to multiple training feature files (multi-source)"}
]
},
"train_y": {
"oneOf": [
{"type": "string", "description": "Path to training targets file"},
{"type": "array", "items": {"type": "string"}}
]
},
"test_x": {
"oneOf": [
{"type": "string", "description": "Path to test/validation features file"},
{"type": "array", "items": {"type": "string"}}
]
},
"test_y": {
"oneOf": [
{"type": "string", "description": "Path to test/validation targets file"},
{"type": "array", "items": {"type": "string"}}
]
},
"train_group": {
"oneOf": [
{"type": "string", "description": "Path to training metadata file"},
{"type": "array", "items": {"type": "string"}}
]
},
"test_group": {
"oneOf": [
{"type": "string", "description": "Path to test/validation metadata file"},
{"type": "array", "items": {"type": "string"}}
]
},
# Dataset metadata
"name": {
"type": "string",
"description": "Dataset name"
},
"task_type": {
"type": "string",
"enum": ["regression", "binary_classification", "multiclass_classification", "auto"],
"description": "Type of ML task"
},
"signal_type": {
"type": "string",
"enum": ["absorbance", "reflectance", "reflectance%", "transmittance", "transmittance%", "auto"],
"description": "Type of spectral signal"
},
# Aggregation settings
"aggregate": {
"oneOf": [
{"type": "string", "description": "Metadata column name for aggregation"},
{"type": "boolean", "description": "True to aggregate by y values"}
]
},
"aggregate_method": {
"type": "string",
"enum": ["mean", "median", "vote"],
"description": "Method for aggregating predictions"
},
"aggregate_exclude_outliers": {
"type": "boolean",
"description": "Whether to exclude outliers before aggregation"
},
# Loading parameters
"global_params": {
"type": "object",
"properties": {
"header_unit": {
"type": "string",
"enum": ["cm-1", "nm", "none", "text", "index"],
"description": "Unit of wavelength headers"
},
"signal_type": {"type": "string"},
"delimiter": {"type": "string"},
"decimal_separator": {"type": "string"},
"has_header": {"type": "boolean"},
"na_policy": {"type": "string", "enum": ["drop", "fill", "error"]}
}
},
"train_x_params": {"type": "object", "description": "Override parameters for train_x loading"},
"train_y_params": {"type": "object", "description": "Override parameters for train_y loading"},
"test_x_params": {"type": "object", "description": "Override parameters for test_x loading"},
"test_y_params": {"type": "object", "description": "Override parameters for test_y loading"},
"train_group_params": {"type": "object", "description": "Override parameters for train_group loading"},
"test_group_params": {"type": "object", "description": "Override parameters for test_group loading"},
# Filters
"train_x_filter": {"type": "string", "description": "Filter expression for train_x"},
"train_y_filter": {"type": "string", "description": "Filter expression for train_y"},
"test_x_filter": {"type": "string", "description": "Filter expression for test_x"},
"test_y_filter": {"type": "string", "description": "Filter expression for test_y"},
},
"anyOf": [
{"required": ["train_x"]},
{"required": ["test_x"]},
{"required": ["folder"]}
]
}
# =============================================================================
# Validation Functions
# =============================================================================
def _load_config_content(config_path: str) -> Tuple[Dict[str, Any], str]:
"""Load and parse a configuration file.
Args:
config_path: Path to JSON or YAML configuration file.
Returns:
Tuple of (parsed_config, file_format).
Raises:
FileNotFoundError: If file doesn't exist.
ValueError: If file is invalid JSON/YAML.
"""
path = Path(config_path)
if not path.exists():
raise FileNotFoundError(f"Configuration file not found: {config_path}")
if not path.is_file():
raise ValueError(f"Path is not a file: {config_path}")
suffix = path.suffix.lower()
if suffix not in ('.json', '.yaml', '.yml'):
raise ValueError(
f"Unsupported file format: {suffix}\n"
f"Expected .json, .yaml, or .yml"
)
try:
with open(path, 'r', encoding='utf-8') as f:
content = f.read()
if not content.strip():
raise ValueError(f"Configuration file is empty: {config_path}")
if suffix == '.json':
try:
config = json.loads(content)
except json.JSONDecodeError as exc:
raise ValueError(
f"Invalid JSON in {config_path}\n"
f"Error at line {exc.lineno}, column {exc.colno}:\n"
f" {exc.msg}"
) from exc
else:
try:
config = yaml.safe_load(content)
except yaml.YAMLError as exc:
if hasattr(exc, 'problem_mark') and exc.problem_mark:
mark = exc.problem_mark
raise ValueError(
f"Invalid YAML in {config_path}\n"
f"Error at line {mark.line + 1}, column {mark.column + 1}:\n"
f" {getattr(exc, 'problem', 'Unknown error')}"
) from exc
else:
raise ValueError(f"Invalid YAML in {config_path}: {exc}") from exc
if config is None:
raise ValueError(f"Configuration file is empty or null: {config_path}")
if not isinstance(config, dict):
raise ValueError(
f"Configuration must be a dictionary/object.\n"
f"Got: {type(config).__name__}"
)
return config, suffix
except (IOError, OSError) as exc:
raise ValueError(f"Error reading file {config_path}: {exc}") from exc
def _validate_against_schema(
config: Dict[str, Any],
schema: Dict[str, Any],
config_type: str
) -> List[str]:
"""Validate config against JSON schema.
Args:
config: Configuration dictionary.
schema: JSON schema to validate against.
config_type: Type name for error messages ('pipeline' or 'dataset').
Returns:
List of validation error messages (empty if valid).
"""
errors = []
try:
import jsonschema
from jsonschema import Draft7Validator, ValidationError
validator = Draft7Validator(schema)
for error in sorted(validator.iter_errors(config), key=lambda e: str(e.path)):
path = ".".join(str(p) for p in error.path) if error.path else "(root)"
errors.append(f"At '{path}': {error.message}")
except ImportError:
# jsonschema not installed - do basic validation
logger.warning("jsonschema not installed. Using basic validation only.")
errors.extend(_basic_validate(config, config_type))
return errors
def _basic_validate(config: Dict[str, Any], config_type: str) -> List[str]:
"""Basic validation without jsonschema dependency.
Args:
config: Configuration dictionary.
config_type: 'pipeline' or 'dataset'.
Returns:
List of validation error messages.
"""
errors = []
if config_type == 'pipeline':
if 'pipeline' not in config:
errors.append("Missing required key: 'pipeline'")
elif not isinstance(config['pipeline'], list):
errors.append("'pipeline' must be a list of steps")
elif len(config['pipeline']) == 0:
errors.append("'pipeline' list cannot be empty")
else: # dataset
has_data = any(key in config for key in ['train_x', 'test_x', 'folder'])
if not has_data:
errors.append("Missing data source: need 'train_x', 'test_x', or 'folder'")
# Validate task_type if present
if 'task_type' in config:
valid_task_types = ['regression', 'binary_classification', 'multiclass_classification', 'auto']
if config['task_type'] not in valid_task_types:
errors.append(
f"Invalid task_type: '{config['task_type']}'. "
f"Valid values: {valid_task_types}"
)
# Validate signal_type if present
if 'signal_type' in config:
valid_signal_types = ['absorbance', 'reflectance', 'reflectance%', 'transmittance', 'transmittance%', 'auto']
if config['signal_type'] not in valid_signal_types:
errors.append(
f"Invalid signal_type: '{config['signal_type']}'. "
f"Valid values: {valid_signal_types}"
)
return errors
def _check_file_paths(config: Dict[str, Any], base_path: Optional[Path] = None) -> List[str]:
"""Check that referenced data files exist.
Args:
config: Dataset configuration dictionary.
base_path: Base path for resolving relative paths.
Returns:
List of warning messages for missing files.
"""
warnings = []
file_keys = ['train_x', 'train_y', 'test_x', 'test_y', 'train_group', 'test_group']
for key in file_keys:
if key not in config:
continue
value = config[key]
paths = value if isinstance(value, list) else [value]
for file_path in paths:
if not isinstance(file_path, str):
continue
path = Path(file_path)
if not path.is_absolute() and base_path:
path = base_path / path
if not path.exists():
warnings.append(f"File not found for '{key}': {file_path}")
return warnings
[docs]
def validate_pipeline_config(
config_source: Union[str, Dict[str, Any]],
check_class_paths: bool = False
) -> Tuple[bool, List[str], List[str]]:
"""Validate a pipeline configuration.
Args:
config_source: Path to config file, or config dictionary.
check_class_paths: If True, verify that class paths are importable.
Returns:
Tuple of (is_valid, errors, warnings).
"""
errors: List[str] = []
warnings: List[str] = []
# Load config if path
if isinstance(config_source, str):
try:
config, _ = _load_config_content(config_source)
except (FileNotFoundError, ValueError) as exc:
return False, [str(exc)], []
else:
config = config_source
# Validate against schema
schema_errors = _validate_against_schema(config, PIPELINE_SCHEMA, 'pipeline')
errors.extend(schema_errors)
# Additional validations
if 'pipeline' in config and isinstance(config['pipeline'], list):
for i, step in enumerate(config['pipeline']):
if step is None:
warnings.append(f"Step {i+1} is null (will be skipped)")
if isinstance(step, dict) and 'class' in step:
class_path = step['class']
if check_class_paths:
# Try to import the class
try:
module_path, class_name = class_path.rsplit('.', 1)
import importlib
module = importlib.import_module(module_path)
if not hasattr(module, class_name):
errors.append(
f"Step {i+1}: Class '{class_name}' not found in module '{module_path}'"
)
except (ValueError, ImportError) as exc:
errors.append(
f"Step {i+1}: Cannot import class '{class_path}': {exc}"
)
return len(errors) == 0, errors, warnings
[docs]
def validate_dataset_config(
config_source: Union[str, Dict[str, Any]],
check_files: bool = True
) -> Tuple[bool, List[str], List[str]]:
"""Validate a dataset configuration.
Args:
config_source: Path to config file, or config dictionary.
check_files: If True, check that referenced data files exist.
Returns:
Tuple of (is_valid, errors, warnings).
"""
errors: List[str] = []
warnings: List[str] = []
base_path: Optional[Path] = None
# Load config if path
if isinstance(config_source, str):
try:
config, _ = _load_config_content(config_source)
base_path = Path(config_source).parent
except (FileNotFoundError, ValueError) as exc:
return False, [str(exc)], []
else:
config = config_source
# Validate against schema
schema_errors = _validate_against_schema(config, DATASET_SCHEMA, 'dataset')
errors.extend(schema_errors)
# Check file paths
if check_files and len(errors) == 0:
file_warnings = _check_file_paths(config, base_path)
warnings.extend(file_warnings)
return len(errors) == 0, errors, warnings
[docs]
def validate_config_file(
config_path: str,
config_type: Optional[str] = None,
check_files: bool = True,
check_class_paths: bool = False
) -> Tuple[bool, List[str], List[str]]:
"""Validate a configuration file, auto-detecting type if not specified.
Args:
config_path: Path to the configuration file.
config_type: 'pipeline', 'dataset', or None for auto-detection.
check_files: For dataset configs, check if data files exist.
check_class_paths: For pipeline configs, verify class imports.
Returns:
Tuple of (is_valid, errors, warnings).
"""
# Load config first
try:
config, _ = _load_config_content(config_path)
except (FileNotFoundError, ValueError) as exc:
return False, [str(exc)], []
# Auto-detect type if not specified
if config_type is None:
if 'pipeline' in config:
config_type = 'pipeline'
elif any(k in config for k in ['train_x', 'test_x', 'folder']):
config_type = 'dataset'
else:
return False, [
"Cannot determine configuration type.\n"
"Pipeline configs should have a 'pipeline' key.\n"
"Dataset configs should have 'train_x', 'test_x', or 'folder'."
], []
# Validate based on type
if config_type == 'pipeline':
return validate_pipeline_config(config, check_class_paths=check_class_paths)
else:
return validate_dataset_config(
config,
check_files=check_files
)
[docs]
def get_validation_summary(
is_valid: bool,
errors: List[str],
warnings: List[str],
config_path: Optional[str] = None
) -> str:
"""Generate a human-readable validation summary.
Args:
is_valid: Whether validation passed.
errors: List of error messages.
warnings: List of warning messages.
config_path: Optional path for context.
Returns:
Formatted summary string.
"""
lines = []
if config_path:
lines.append(f"Validation results for: {config_path}")
lines.append("=" * 60)
if is_valid:
lines.append("✓ Configuration is valid")
else:
lines.append("✗ Configuration has errors")
if errors:
lines.append("\nErrors:")
for error in errors:
lines.append(f" - {error}")
if warnings:
lines.append("\nWarnings:")
for warning in warnings:
lines.append(f" - {warning}")
return "\n".join(lines)