| """ |
| Validation engine for detecting and reporting issues in generated schemas. |
| Checks for: invalid JSON, missing keys, type mismatches, cross-layer inconsistencies. |
| """ |
|
|
| import json |
| from typing import Any, Dict, List, Optional, Tuple |
| from schemas import ValidationResult, GeneratedConfig, DBTable, APIEndpoint, UIPage, Role, FieldType |
|
|
|
|
| class Validator: |
| """Comprehensive validation for generated configurations.""" |
| |
| def __init__(self): |
| self.result = ValidationResult() |
| |
| def validate_json(self, json_str: str) -> Tuple[bool, Optional[Dict[str, Any]]]: |
| """Validate if string is valid JSON.""" |
| try: |
| data = json.loads(json_str) |
| return True, data |
| except json.JSONDecodeError as e: |
| self.result.add_error(f"Invalid JSON: {str(e)}") |
| return False, None |
| |
| def validate_required_fields(self, data: Dict[str, Any], required_fields: List[str]) -> bool: |
| """Check if all required fields are present.""" |
| missing = [f for f in required_fields if f not in data or data[f] is None] |
| if missing: |
| self.result.add_error(f"Missing required fields: {', '.join(missing)}") |
| return False |
| return True |
| |
| def validate_generated_config_structure(self, config_dict: Dict[str, Any]) -> bool: |
| """Validate top-level structure of GeneratedConfig.""" |
| required_fields = [ |
| "app_name", "app_description", "database_schema", |
| "api_schema", "ui_schema", "auth_config", "roles", "business_logic" |
| ] |
| return self.validate_required_fields(config_dict, required_fields) |
| |
| def validate_database_schema(self, db_schema: List[Dict[str, Any]]) -> bool: |
| """Validate database schema integrity.""" |
| if not isinstance(db_schema, list): |
| self.result.add_error("database_schema must be a list") |
| return False |
| |
| table_names = set() |
| is_valid = True |
| |
| for i, table in enumerate(db_schema): |
| if not isinstance(table, dict): |
| self.result.add_error(f"Table {i} is not a dict") |
| is_valid = False |
| continue |
| |
| |
| if "name" not in table or "fields" not in table or "primary_key" not in table: |
| self.result.add_error(f"Table {i}: missing name, fields, or primary_key") |
| is_valid = False |
| continue |
| |
| table_names.add(table["name"]) |
| |
| |
| if not isinstance(table["fields"], list): |
| self.result.add_error(f"Table '{table['name']}': fields must be a list") |
| is_valid = False |
| continue |
| |
| |
| for field in table["fields"]: |
| if not self._validate_field(field, table["name"]): |
| is_valid = False |
| |
| |
| for table in db_schema: |
| if "relations" in table and table["relations"]: |
| for field, related_table in table["relations"].items(): |
| if related_table not in table_names: |
| self.result.add_warning( |
| f"Foreign key in {table['name']}.{field} references non-existent table: {related_table}" |
| ) |
| |
| return is_valid |
| |
| def _validate_field(self, field: Dict[str, Any], table_name: str) -> bool: |
| """Validate a single field.""" |
| if not isinstance(field, dict): |
| self.result.add_error(f"Field in {table_name} is not a dict") |
| return False |
| |
| required = ["name", "type"] |
| if not all(k in field for k in required): |
| self.result.add_error(f"Field in {table_name}: missing name or type") |
| return False |
| |
| field_type = field["type"] |
| valid_types = [t.value for t in FieldType] |
| if field_type not in valid_types: |
| self.result.add_error( |
| f"Field '{field['name']}' in {table_name}: invalid type '{field_type}'" |
| ) |
| return False |
| |
| return True |
| |
| def validate_api_schema(self, api_schema: List[Dict[str, Any]]) -> bool: |
| """Validate API schema structure.""" |
| if not isinstance(api_schema, list): |
| self.result.add_error("api_schema must be a list") |
| return False |
| |
| valid_methods = ["GET", "POST", "PUT", "DELETE", "PATCH"] |
| is_valid = True |
| |
| for i, endpoint in enumerate(api_schema): |
| if not isinstance(endpoint, dict): |
| self.result.add_error(f"Endpoint {i} is not a dict") |
| is_valid = False |
| continue |
| |
| |
| if "path" not in endpoint or "method" not in endpoint: |
| self.result.add_error(f"Endpoint {i}: missing path or method") |
| is_valid = False |
| continue |
| |
| |
| if endpoint["method"] not in valid_methods: |
| self.result.add_error( |
| f"Endpoint {endpoint['path']}: invalid method '{endpoint['method']}'" |
| ) |
| is_valid = False |
| |
| return is_valid |
| |
| def validate_ui_schema(self, ui_schema: List[Dict[str, Any]]) -> bool: |
| """Validate UI schema structure.""" |
| if not isinstance(ui_schema, list): |
| self.result.add_error("ui_schema must be a list") |
| return False |
| |
| is_valid = True |
| |
| for i, page in enumerate(ui_schema): |
| if not isinstance(page, dict): |
| self.result.add_error(f"Page {i} is not a dict") |
| is_valid = False |
| continue |
| |
| required = ["path", "title", "components"] |
| if not all(k in page for k in required): |
| self.result.add_error(f"Page {i}: missing path, title, or components") |
| is_valid = False |
| continue |
| |
| if not isinstance(page["components"], list): |
| self.result.add_error(f"Page {i}: components must be a list") |
| is_valid = False |
| |
| return is_valid |
| |
| def validate_cross_layer_consistency(self, config_dict: Dict[str, Any]) -> bool: |
| """Validate consistency between API, DB, and UI layers.""" |
| is_valid = True |
| |
| |
| api_fields = set() |
| for endpoint in config_dict.get("api_schema", []): |
| if endpoint.get("request_body"): |
| api_fields.update(endpoint["request_body"].keys()) |
| if endpoint.get("response_body"): |
| api_fields.update(endpoint["response_body"].keys()) |
| |
| |
| db_fields = set() |
| for table in config_dict.get("database_schema", []): |
| for field in table.get("fields", []): |
| if isinstance(field, dict): |
| db_fields.add(field.get("name")) |
| elif hasattr(field, "name"): |
| db_fields.add(field.name) |
| |
| |
| ui_fields = set() |
| for page in config_dict.get("ui_schema", []): |
| for component in page.get("components", []): |
| if isinstance(component, dict): |
| ui_fields.update(component.get("fields", {}).keys() if isinstance(component.get("fields"), dict) else []) |
| |
| |
| if api_fields and db_fields: |
| |
| missing_db_fields = api_fields - db_fields |
| if missing_db_fields and len(missing_db_fields) > 3: |
| self.result.add_warning( |
| f"API references fields not in DB: {missing_db_fields}" |
| ) |
| |
| return is_valid |
| |
| def validate_roles_and_permissions(self, roles: List[Dict[str, Any]], |
| api_schema: List[Dict[str, Any]]) -> bool: |
| """Validate roles match API requirements.""" |
| is_valid = True |
| |
| if not isinstance(roles, list): |
| self.result.add_error("roles must be a list") |
| return False |
| |
| role_names = set() |
| for role in roles: |
| if not isinstance(role, dict) or "name" not in role: |
| self.result.add_error("Each role must be a dict with 'name'") |
| is_valid = False |
| continue |
| role_names.add(role["name"]) |
| |
| |
| for endpoint in api_schema: |
| required_role = endpoint.get("required_role") |
| if required_role and required_role not in role_names: |
| self.result.add_warning( |
| f"API endpoint {endpoint.get('path')} requires role '{required_role}' which doesn't exist" |
| ) |
| |
| return is_valid |
| |
| def validate_no_hallucinations(self, config_dict: Dict[str, Any]) -> bool: |
| """Detect hallucinated or nonsensical fields.""" |
| is_valid = True |
| |
| |
| for key, value in config_dict.items(): |
| if isinstance(value, str): |
| |
| if any(p in value.lower() for p in ["todo", "placeholder", "tbd", "fix me"]): |
| self.result.add_warning(f"Potential placeholder in {key}: {value}") |
| |
| |
| if key == "business_logic" and not isinstance(value, dict): |
| self.result.add_error(f"{key} should be a dict, got {type(value)}") |
| is_valid = False |
| |
| return is_valid |
| |
| def validate_complete(self, config_dict: Dict[str, Any]) -> ValidationResult: |
| """Run complete validation suite.""" |
| self.result = ValidationResult() |
| |
| |
| if not self.validate_generated_config_structure(config_dict): |
| return self.result |
| |
| |
| self.validate_database_schema(config_dict.get("database_schema", [])) |
| self.validate_api_schema(config_dict.get("api_schema", [])) |
| self.validate_ui_schema(config_dict.get("ui_schema", [])) |
| |
| |
| self.validate_cross_layer_consistency(config_dict) |
| self.validate_roles_and_permissions( |
| config_dict.get("roles", []), |
| config_dict.get("api_schema", []) |
| ) |
| |
| |
| self.validate_no_hallucinations(config_dict) |
| |
| return self.result |
|
|