Spaces:
Sleeping
Sleeping
| """ | |
| Data validation utilities for GEPA optimizer | |
| """ | |
| from typing import List, Dict, Any, Optional, Tuple | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class DataValidator: | |
| """ | |
| Validates datasets for completeness and GEPA compatibility | |
| """ | |
| def __init__(self): | |
| self.required_fields = ['input', 'output'] | |
| self.optional_fields = ['metadata', 'id', 'tags'] | |
| def validate_dataset(self, dataset: List[Dict[str, Any]]) -> Tuple[bool, List[str]]: | |
| """ | |
| Validate entire dataset | |
| Args: | |
| dataset: List of data items to validate | |
| Returns: | |
| Tuple[bool, List[str]]: (is_valid, list_of_errors) | |
| """ | |
| errors = [] | |
| # Basic dataset checks | |
| if not dataset: | |
| errors.append("Dataset is empty") | |
| return False, errors | |
| if not isinstance(dataset, list): | |
| errors.append("Dataset must be a list") | |
| return False, errors | |
| # Validate each item | |
| for idx, item in enumerate(dataset): | |
| item_errors = self.validate_item(item, idx) | |
| errors.extend(item_errors) | |
| # Check for minimum dataset size | |
| if len(dataset) < 2: | |
| errors.append("Dataset should have at least 2 items for proper train/val split") | |
| # Log validation results | |
| if errors: | |
| logger.warning(f"Dataset validation failed with {len(errors)} errors") | |
| else: | |
| logger.info(f"Dataset validation passed for {len(dataset)} items") | |
| return len(errors) == 0, errors | |
| def validate_item(self, item: Dict[str, Any], index: Optional[int] = None) -> List[str]: | |
| """ | |
| Validate a single dataset item | |
| Args: | |
| item: Single data item to validate | |
| index: Optional item index for error reporting | |
| Returns: | |
| List[str]: List of validation errors | |
| """ | |
| errors = [] | |
| item_ref = f"item {index}" if index is not None else "item" | |
| # Check if item is a dictionary | |
| if not isinstance(item, dict): | |
| errors.append(f"{item_ref}: Must be a dictionary") | |
| return errors | |
| # Check for required fields | |
| if 'input' not in item: | |
| errors.append(f"{item_ref}: Missing required 'input' field") | |
| elif not isinstance(item['input'], str): | |
| errors.append(f"{item_ref}: 'input' field must be a string") | |
| elif not item['input'].strip(): | |
| errors.append(f"{item_ref}: 'input' field cannot be empty") | |
| # Check output field (can be empty but should exist for supervised learning) | |
| if 'output' in item: | |
| if not isinstance(item['output'], str): | |
| errors.append(f"{item_ref}: 'output' field must be a string") | |
| # Validate metadata if present | |
| if 'metadata' in item and not isinstance(item['metadata'], dict): | |
| errors.append(f"{item_ref}: 'metadata' field must be a dictionary") | |
| return errors | |
| def validate_gepa_format(self, gepa_data: List[Dict[str, Any]]) -> Tuple[bool, List[str]]: | |
| """ | |
| Validate data in GEPA format | |
| Args: | |
| gepa_data: Data in GEPA format | |
| Returns: | |
| Tuple[bool, List[str]]: (is_valid, list_of_errors) | |
| """ | |
| errors = [] | |
| if not gepa_data: | |
| errors.append("GEPA dataset is empty") | |
| return False, errors | |
| for idx, item in enumerate(gepa_data): | |
| if 'input' not in item: | |
| errors.append(f"GEPA item {idx}: Missing 'input' field") | |
| if 'expected_output' not in item: | |
| errors.append(f"GEPA item {idx}: Missing 'expected_output' field") | |
| if 'metadata' not in item: | |
| errors.append(f"GEPA item {idx}: Missing 'metadata' field") | |
| elif not isinstance(item['metadata'], dict): | |
| errors.append(f"GEPA item {idx}: 'metadata' must be a dictionary") | |
| return len(errors) == 0, errors | |
| def validate_split(self, trainset: List[Dict], valset: List[Dict]) -> Tuple[bool, List[str]]: | |
| """ | |
| Validate train/validation split | |
| Args: | |
| trainset: Training data | |
| valset: Validation data | |
| Returns: | |
| Tuple[bool, List[str]]: (is_valid, list_of_errors) | |
| """ | |
| errors = [] | |
| if not trainset: | |
| errors.append("Training set is empty") | |
| if not valset: | |
| errors.append("Validation set is empty") | |
| # Check proportions | |
| total_size = len(trainset) + len(valset) | |
| if total_size > 0: | |
| train_ratio = len(trainset) / total_size | |
| if train_ratio < 0.5: | |
| errors.append(f"Training set too small: {train_ratio:.2%} of total data") | |
| elif train_ratio > 0.95: | |
| errors.append(f"Validation set too small: {1-train_ratio:.2%} of total data") | |
| return len(errors) == 0, errors | |
| def get_dataset_stats(self, dataset: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Get statistics about the dataset | |
| Args: | |
| dataset: Dataset to analyze | |
| Returns: | |
| Dict[str, Any]: Dataset statistics | |
| """ | |
| if not dataset: | |
| return {'total_items': 0, 'valid': False} | |
| stats = { | |
| 'total_items': len(dataset), | |
| 'has_output': sum(1 for item in dataset if item.get('output')), | |
| 'avg_input_length': 0, | |
| 'avg_output_length': 0, | |
| 'empty_inputs': 0, | |
| 'empty_outputs': 0 | |
| } | |
| input_lengths = [] | |
| output_lengths = [] | |
| for item in dataset: | |
| if isinstance(item, dict): | |
| input_text = item.get('input', '') | |
| output_text = item.get('output', '') | |
| if isinstance(input_text, str): | |
| input_lengths.append(len(input_text)) | |
| if not input_text.strip(): | |
| stats['empty_inputs'] += 1 | |
| if isinstance(output_text, str): | |
| output_lengths.append(len(output_text)) | |
| if not output_text.strip(): | |
| stats['empty_outputs'] += 1 | |
| if input_lengths: | |
| stats['avg_input_length'] = sum(input_lengths) / len(input_lengths) | |
| if output_lengths: | |
| stats['avg_output_length'] = sum(output_lengths) / len(output_lengths) | |
| # Determine if dataset looks valid | |
| stats['valid'] = ( | |
| stats['total_items'] > 0 and | |
| stats['empty_inputs'] < stats['total_items'] * 0.5 # Less than 50% empty inputs | |
| ) | |
| return stats | |