"""Schema validation for generator specifications and expanded configurations.
This module provides comprehensive validation for:
- Generator specification syntax (before expansion)
- Expanded configuration structure (after expansion)
- Semantic validation of keyword usage
Classes:
ValidationError: Exception containing validation failure details
ValidationResult: Dataclass with validation outcome
ValidationSeverity: Enum for error severity levels
Functions:
validate_spec: Validate a generator specification
validate_config: Validate an expanded configuration
validate_expanded_configs: Validate a list of expanded configs
"""
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, List, Optional, Set, Union
from ..keywords import (
OR_KEYWORD,
RANGE_KEYWORD,
SIZE_KEYWORD,
COUNT_KEYWORD,
PICK_KEYWORD,
ARRANGE_KEYWORD,
THEN_PICK_KEYWORD,
THEN_ARRANGE_KEYWORD,
ALL_KEYWORDS,
PURE_OR_KEYS,
PURE_RANGE_KEYS,
)
[docs]
class ValidationSeverity(Enum):
"""Severity levels for validation issues."""
ERROR = "error" # Critical issue that will cause failure
WARNING = "warning" # Potential issue that may cause unexpected behavior
INFO = "info" # Informational, non-blocking suggestion
[docs]
@dataclass
class ValidationError(Exception):
"""Exception for validation failures with detailed context.
Attributes:
message: Human-readable error description
path: JSONPath-like location of the error (e.g., "root._or_[0]")
severity: Error severity level
code: Machine-readable error code
suggestion: Optional suggestion for fixing the error
"""
message: str
path: str = ""
severity: ValidationSeverity = ValidationSeverity.ERROR
code: str = ""
suggestion: Optional[str] = None
[docs]
def __str__(self) -> str:
"""Format error message with path."""
location = f" at {self.path}" if self.path else ""
return f"[{self.severity.value.upper()}] {self.message}{location}"
def __repr__(self) -> str:
return (
f"ValidationError(message={self.message!r}, path={self.path!r}, "
f"severity={self.severity}, code={self.code!r})"
)
[docs]
@dataclass
class ValidationResult:
"""Result of configuration validation.
Attributes:
is_valid: True if no errors (warnings allowed)
errors: List of validation errors
warnings: List of validation warnings
info: List of informational messages
node_count: Number of nodes validated
generator_count: Number of generator nodes found
"""
is_valid: bool = True
errors: List[ValidationError] = field(default_factory=list)
warnings: List[ValidationError] = field(default_factory=list)
info: List[ValidationError] = field(default_factory=list)
node_count: int = 0
generator_count: int = 0
[docs]
def add_error(self, error: ValidationError) -> None:
"""Add a validation error."""
if error.severity == ValidationSeverity.ERROR:
self.errors.append(error)
self.is_valid = False
elif error.severity == ValidationSeverity.WARNING:
self.warnings.append(error)
else:
self.info.append(error)
[docs]
def merge(self, other: "ValidationResult") -> "ValidationResult":
"""Merge another validation result into this one."""
self.is_valid = self.is_valid and other.is_valid
self.errors.extend(other.errors)
self.warnings.extend(other.warnings)
self.info.extend(other.info)
self.node_count += other.node_count
self.generator_count += other.generator_count
return self
[docs]
def __str__(self) -> str:
"""Format validation result summary."""
if self.is_valid:
status = "VALID"
else:
status = f"INVALID ({len(self.errors)} errors)"
parts = [f"ValidationResult: {status}"]
if self.warnings:
parts.append(f"{len(self.warnings)} warnings")
parts.append(f"{self.node_count} nodes, {self.generator_count} generators")
return " | ".join(parts)
# =============================================================================
# Specification Validation (before expansion)
# =============================================================================
[docs]
def validate_spec(
spec: Any,
path: str = "root",
strict: bool = False,
custom_validators: Optional[List[Callable]] = None
) -> ValidationResult:
"""Validate a generator specification before expansion.
Recursively validates the structure of a generator specification,
checking for valid syntax, consistent keyword usage, and semantic
correctness.
Args:
spec: The specification to validate (can be any type).
path: JSONPath-like location for error reporting.
strict: If True, also report warnings as errors.
custom_validators: Optional list of custom validation functions.
Each function should accept (node, path) and return ValidationResult.
Returns:
ValidationResult containing validation outcome.
Examples:
>>> result = validate_spec({"_or_": ["A", "B"]})
>>> result.is_valid
True
>>> result = validate_spec({"_or_": "not a list"})
>>> result.is_valid
False
>>> result.errors[0].message
"_or_ must be a list, got str"
"""
result = ValidationResult()
result.node_count = 1
# Handle non-dict types
if isinstance(spec, list):
for i, item in enumerate(spec):
item_result = validate_spec(
item, f"{path}[{i}]", strict, custom_validators
)
result.merge(item_result)
return result
if not isinstance(spec, dict):
# Scalars are always valid
return result
# Validate dict node
result = _validate_dict_spec(spec, path, strict)
# Recursively validate nested values
for key, value in spec.items():
if key not in ALL_KEYWORDS and isinstance(value, (dict, list)):
nested_result = validate_spec(
value, f"{path}.{key}", strict, custom_validators
)
result.merge(nested_result)
# Run custom validators
if custom_validators:
for validator in custom_validators:
custom_result = validator(spec, path)
if custom_result:
result.merge(custom_result)
return result
def _validate_dict_spec(spec: Dict[str, Any], path: str, strict: bool) -> ValidationResult:
"""Validate a dictionary specification node.
Args:
spec: Dictionary node to validate.
path: Current path for error reporting.
strict: Whether to treat warnings as errors.
Returns:
ValidationResult for this node.
"""
result = ValidationResult()
result.node_count = 1
# Check if this is a generator node
has_or = OR_KEYWORD in spec
has_range = RANGE_KEYWORD in spec
if has_or and has_range:
result.add_error(ValidationError(
message="Cannot have both _or_ and _range_ in the same node",
path=path,
code="CONFLICTING_KEYWORDS",
suggestion="Use separate nodes for _or_ and _range_"
))
return result
# Validate OR node
if has_or:
result.generator_count = 1
or_result = _validate_or_spec(spec, path, strict)
result.merge(or_result)
return result
# Validate RANGE node
if has_range:
result.generator_count = 1
range_result = _validate_range_spec(spec, path, strict)
result.merge(range_result)
return result
# Check for orphaned modifier keywords
orphaned = set(spec.keys()) & {
SIZE_KEYWORD, PICK_KEYWORD, ARRANGE_KEYWORD,
THEN_PICK_KEYWORD, THEN_ARRANGE_KEYWORD
}
if orphaned:
result.add_error(ValidationError(
message=f"Modifier keywords {orphaned} without _or_",
path=path,
severity=ValidationSeverity.WARNING if not strict else ValidationSeverity.ERROR,
code="ORPHANED_MODIFIERS",
suggestion="Add _or_ keyword or remove orphaned modifiers"
))
return result
def _validate_or_spec(spec: Dict[str, Any], path: str, strict: bool) -> ValidationResult:
"""Validate an _or_ specification node.
Args:
spec: Dictionary node containing _or_.
path: Current path for error reporting.
strict: Whether to treat warnings as errors.
Returns:
ValidationResult for this OR node.
"""
result = ValidationResult()
or_value = spec[OR_KEYWORD]
# _or_ must be a list
if not isinstance(or_value, list):
result.add_error(ValidationError(
message=f"_or_ must be a list, got {type(or_value).__name__}",
path=f"{path}.{OR_KEYWORD}",
code="INVALID_OR_TYPE"
))
return result
# Check for empty _or_
if len(or_value) == 0:
result.add_error(ValidationError(
message="Empty _or_ list will generate no configurations",
path=f"{path}.{OR_KEYWORD}",
severity=ValidationSeverity.WARNING if not strict else ValidationSeverity.ERROR,
code="EMPTY_OR"
))
# Validate size/pick/arrange specifications
for key in (SIZE_KEYWORD, PICK_KEYWORD, ARRANGE_KEYWORD):
if key in spec:
size_result = _validate_size_spec(spec[key], key, len(or_value), f"{path}.{key}")
result.merge(size_result)
# Validate then_pick/then_arrange
for key in (THEN_PICK_KEYWORD, THEN_ARRANGE_KEYWORD):
if key in spec:
if PICK_KEYWORD not in spec and ARRANGE_KEYWORD not in spec and SIZE_KEYWORD not in spec:
result.add_error(ValidationError(
message=f"{key} requires pick, arrange, or size to be specified",
path=f"{path}.{key}",
code="ORPHANED_THEN_KEYWORD"
))
# Validate count
if COUNT_KEYWORD in spec:
count = spec[COUNT_KEYWORD]
if not isinstance(count, int):
result.add_error(ValidationError(
message=f"count must be an integer, got {type(count).__name__}",
path=f"{path}.{COUNT_KEYWORD}",
code="INVALID_COUNT_TYPE"
))
elif count < 0:
result.add_error(ValidationError(
message=f"count must be non-negative, got {count}",
path=f"{path}.{COUNT_KEYWORD}",
code="NEGATIVE_COUNT"
))
# Check for conflicting selection modes
selection_modes = sum(1 for k in (SIZE_KEYWORD, PICK_KEYWORD, ARRANGE_KEYWORD) if k in spec)
if selection_modes > 1:
result.add_error(ValidationError(
message="Cannot use size, pick, and arrange together",
path=path,
severity=ValidationSeverity.WARNING if not strict else ValidationSeverity.ERROR,
code="CONFLICTING_SELECTION",
suggestion="Use only one of: size (legacy), pick (combinations), or arrange (permutations)"
))
# Check for unknown keys in pure OR node
if set(spec.keys()).issubset(PURE_OR_KEYS):
extra_keys = set(spec.keys()) - PURE_OR_KEYS
if extra_keys:
result.add_error(ValidationError(
message=f"Unknown keys in OR node: {extra_keys}",
path=path,
severity=ValidationSeverity.WARNING if not strict else ValidationSeverity.ERROR,
code="UNKNOWN_OR_KEYS"
))
# Recursively validate choices
for i, choice in enumerate(or_value):
if isinstance(choice, (dict, list)):
choice_result = validate_spec(choice, f"{path}.{OR_KEYWORD}[{i}]", strict)
result.merge(choice_result)
return result
def _validate_range_spec(spec: Dict[str, Any], path: str, strict: bool) -> ValidationResult:
"""Validate a _range_ specification node.
Args:
spec: Dictionary node containing _range_.
path: Current path for error reporting.
strict: Whether to treat warnings as errors.
Returns:
ValidationResult for this range node.
"""
result = ValidationResult()
range_value = spec[RANGE_KEYWORD]
# Validate array syntax
if isinstance(range_value, list):
if len(range_value) not in (2, 3):
result.add_error(ValidationError(
message=f"Range array must have 2 or 3 elements, got {len(range_value)}",
path=f"{path}.{RANGE_KEYWORD}",
code="INVALID_RANGE_LENGTH"
))
elif not all(isinstance(x, (int, float)) for x in range_value):
result.add_error(ValidationError(
message="Range array elements must be numeric",
path=f"{path}.{RANGE_KEYWORD}",
code="INVALID_RANGE_ELEMENTS"
))
elif len(range_value) >= 2:
start, end = range_value[0], range_value[1]
step = range_value[2] if len(range_value) == 3 else 1
if step == 0:
result.add_error(ValidationError(
message="Range step cannot be zero",
path=f"{path}.{RANGE_KEYWORD}",
code="ZERO_STEP"
))
elif (end < start and step > 0) or (end > start and step < 0):
result.add_error(ValidationError(
message="Range will produce no values (step direction mismatch)",
path=f"{path}.{RANGE_KEYWORD}",
severity=ValidationSeverity.WARNING,
code="EMPTY_RANGE"
))
# Validate dict syntax
elif isinstance(range_value, dict):
required = {"from", "to"}
missing = required - set(range_value.keys())
if missing:
result.add_error(ValidationError(
message=f"Range dict missing required keys: {missing}",
path=f"{path}.{RANGE_KEYWORD}",
code="MISSING_RANGE_KEYS"
))
for key in ("from", "to", "step"):
if key in range_value and not isinstance(range_value[key], (int, float)):
result.add_error(ValidationError(
message=f"Range '{key}' must be numeric",
path=f"{path}.{RANGE_KEYWORD}.{key}",
code="INVALID_RANGE_VALUE"
))
else:
result.add_error(ValidationError(
message=f"Range spec must be array or dict, got {type(range_value).__name__}",
path=f"{path}.{RANGE_KEYWORD}",
code="INVALID_RANGE_TYPE"
))
# Validate count
if COUNT_KEYWORD in spec:
count = spec[COUNT_KEYWORD]
if not isinstance(count, int):
result.add_error(ValidationError(
message=f"count must be an integer, got {type(count).__name__}",
path=f"{path}.{COUNT_KEYWORD}",
code="INVALID_COUNT_TYPE"
))
elif count < 0:
result.add_error(ValidationError(
message=f"count must be non-negative, got {count}",
path=f"{path}.{COUNT_KEYWORD}",
code="NEGATIVE_COUNT"
))
# Check for invalid keys in range node
valid_range_keys = {RANGE_KEYWORD, COUNT_KEYWORD}
extra_keys = set(spec.keys()) - valid_range_keys
if extra_keys:
# If pure range node has extra keys, it's an error
if set(spec.keys()) <= {RANGE_KEYWORD, COUNT_KEYWORD}:
pass # Pure range, no extra keys
else:
# Mixed node - check if extra keys are valid
for key in extra_keys:
if key in PURE_OR_KEYS and key != OR_KEYWORD:
result.add_error(ValidationError(
message=f"OR modifier '{key}' not valid with _range_",
path=f"{path}.{key}",
code="INVALID_RANGE_MODIFIER"
))
return result
def _validate_size_spec(
spec: Any,
key_name: str,
max_size: int,
path: str
) -> ValidationResult:
"""Validate a size/pick/arrange specification.
Args:
spec: The size specification value.
key_name: Name of the key (size/pick/arrange).
max_size: Maximum valid size (length of _or_ list).
path: Current path for error reporting.
Returns:
ValidationResult for this size spec.
"""
result = ValidationResult()
# Single integer
if isinstance(spec, int):
if spec < 0:
result.add_error(ValidationError(
message=f"{key_name} must be non-negative, got {spec}",
path=path,
code="NEGATIVE_SIZE"
))
elif spec > max_size:
result.add_error(ValidationError(
message=f"{key_name}={spec} exceeds available choices ({max_size})",
path=path,
severity=ValidationSeverity.WARNING,
code="SIZE_EXCEEDS_CHOICES"
))
return result
# Tuple or list (range or nested)
if isinstance(spec, (tuple, list)):
if len(spec) != 2:
result.add_error(ValidationError(
message=f"{key_name} tuple/list must have 2 elements, got {len(spec)}",
path=path,
code="INVALID_SIZE_LENGTH"
))
return result
# Check if it's a range (tuple) or nested [outer, inner] (list)
if isinstance(spec, tuple):
# Range specification (from, to)
from_val, to_val = spec
if not isinstance(from_val, int) or not isinstance(to_val, int):
result.add_error(ValidationError(
message=f"{key_name} range must contain integers",
path=path,
code="INVALID_SIZE_RANGE_TYPE"
))
elif from_val < 0 or to_val < 0:
result.add_error(ValidationError(
message=f"{key_name} range values must be non-negative",
path=path,
code="NEGATIVE_SIZE_RANGE"
))
elif from_val > to_val:
result.add_error(ValidationError(
message=f"{key_name} range start ({from_val}) > end ({to_val})",
path=path,
code="INVERTED_SIZE_RANGE"
))
else:
# List could be nested [outer, inner] or range
# Nested syntax validation
for i, val in enumerate(spec):
if not isinstance(val, int):
result.add_error(ValidationError(
message=f"{key_name}[{i}] must be an integer, got {type(val).__name__}",
path=f"{path}[{i}]",
code="INVALID_NESTED_SIZE"
))
return result
# Invalid type
result.add_error(ValidationError(
message=f"{key_name} must be int, tuple, or list, got {type(spec).__name__}",
path=path,
code="INVALID_SIZE_TYPE"
))
return result
# =============================================================================
# Configuration Validation (after expansion)
# =============================================================================
[docs]
def validate_config(
config: Any,
schema: Optional[Dict[str, Any]] = None,
required_keys: Optional[Set[str]] = None,
forbidden_keys: Optional[Set[str]] = None,
path: str = "root"
) -> ValidationResult:
"""Validate an expanded configuration.
This validates configurations after expansion, checking for
structural correctness and optionally against a schema.
Args:
config: The expanded configuration to validate.
schema: Optional schema definition for validation.
required_keys: Optional set of keys that must be present.
forbidden_keys: Optional set of keys that must not be present.
path: JSONPath-like location for error reporting.
Returns:
ValidationResult containing validation outcome.
Examples:
>>> config = {"class": "MyClass", "params": {"n": 5}}
>>> result = validate_config(config, required_keys={"class"})
>>> result.is_valid
True
"""
result = ValidationResult()
result.node_count = 1
if not isinstance(config, dict):
# Non-dict configs are valid unless schema requires dict
if schema and schema.get("type") == "object":
result.add_error(ValidationError(
message=f"Expected object, got {type(config).__name__}",
path=path,
code="TYPE_MISMATCH"
))
return result
# Check required keys
if required_keys:
missing = required_keys - set(config.keys())
if missing:
result.add_error(ValidationError(
message=f"Missing required keys: {missing}",
path=path,
code="MISSING_REQUIRED_KEYS"
))
# Check forbidden keys
if forbidden_keys:
present = forbidden_keys & set(config.keys())
if present:
result.add_error(ValidationError(
message=f"Forbidden keys present: {present}",
path=path,
code="FORBIDDEN_KEYS_PRESENT"
))
# Check for unexpanded generator keywords (should not be present after expansion)
generator_keywords = {OR_KEYWORD, RANGE_KEYWORD}
unexpanded = generator_keywords & set(config.keys())
if unexpanded:
result.add_error(ValidationError(
message=f"Unexpanded generator keywords found: {unexpanded}",
path=path,
severity=ValidationSeverity.WARNING,
code="UNEXPANDED_KEYWORDS",
suggestion="Ensure expand_spec() was called on this configuration"
))
# Schema validation if provided
if schema:
schema_result = _validate_against_schema(config, schema, path)
result.merge(schema_result)
return result
[docs]
def validate_expanded_configs(
configs: List[Any],
schema: Optional[Dict[str, Any]] = None,
min_count: int = 0,
max_count: Optional[int] = None
) -> ValidationResult:
"""Validate a list of expanded configurations.
Args:
configs: List of expanded configurations.
schema: Optional schema for each configuration.
min_count: Minimum number of configurations required.
max_count: Maximum number of configurations allowed.
Returns:
ValidationResult for the entire list.
"""
result = ValidationResult()
if not isinstance(configs, list):
result.add_error(ValidationError(
message=f"Expected list of configs, got {type(configs).__name__}",
path="root",
code="NOT_A_LIST"
))
return result
# Check count constraints
if len(configs) < min_count:
result.add_error(ValidationError(
message=f"Too few configurations: {len(configs)} < {min_count}",
path="root",
code="TOO_FEW_CONFIGS"
))
if max_count is not None and len(configs) > max_count:
result.add_error(ValidationError(
message=f"Too many configurations: {len(configs)} > {max_count}",
path="root",
code="TOO_MANY_CONFIGS"
))
# Validate each configuration
for i, config in enumerate(configs):
config_result = validate_config(config, schema=schema, path=f"configs[{i}]")
result.merge(config_result)
return result
def _validate_against_schema(
config: Dict[str, Any],
schema: Dict[str, Any],
path: str
) -> ValidationResult:
"""Validate config against schema definition.
Simple schema validation supporting:
- type: Expected type ("string", "number", "integer", "boolean", "array", "object")
- required: List of required keys
- properties: Dict of property schemas
- items: Schema for array items
Args:
config: Configuration to validate.
schema: Schema definition.
path: Current path for error reporting.
Returns:
ValidationResult for schema validation.
"""
result = ValidationResult()
# Type check
expected_type = schema.get("type")
if expected_type:
if not _check_type(config, expected_type):
result.add_error(ValidationError(
message=f"Type mismatch: expected {expected_type}, got {type(config).__name__}",
path=path,
code="SCHEMA_TYPE_MISMATCH"
))
return result # Don't continue if type is wrong
# Required keys
required = schema.get("required", [])
if required and isinstance(config, dict):
missing = set(required) - set(config.keys())
if missing:
result.add_error(ValidationError(
message=f"Missing required properties: {missing}",
path=path,
code="SCHEMA_MISSING_REQUIRED"
))
# Property validation
properties = schema.get("properties", {})
if properties and isinstance(config, dict):
for key, prop_schema in properties.items():
if key in config:
prop_result = _validate_against_schema(
config[key], prop_schema, f"{path}.{key}"
)
result.merge(prop_result)
# Array items validation
items_schema = schema.get("items")
if items_schema and isinstance(config, list):
for i, item in enumerate(config):
item_result = _validate_against_schema(
item, items_schema, f"{path}[{i}]"
)
result.merge(item_result)
return result
def _check_type(value: Any, expected: str) -> bool:
"""Check if value matches expected type string.
Args:
value: Value to check.
expected: Type string ("string", "number", "integer", etc.)
Returns:
True if type matches, False otherwise.
"""
type_map = {
"string": str,
"number": (int, float),
"integer": int,
"boolean": bool,
"array": list,
"object": dict,
"null": type(None),
}
expected_types = type_map.get(expected)
if expected_types is None:
return True # Unknown type, assume valid
return isinstance(value, expected_types)