| """ |
| Data loading utilities for bias evaluation framework. |
| |
| This module handles all file I/O operations with proper error handling and validation. |
| Supports both legacy 4-field format and full AI BRIDGE 29-field schema. |
| Includes automatic lexicon validation on load. |
| """ |
| import csv |
| import json |
| from pathlib import Path |
| from typing import List, Dict, Any, Optional |
|
|
| from .models import ( |
| GroundTruthSample, Language, BiasCategory, BiasLabel, |
| StereotypeCategory, TargetGender, Explicitness, Sentiment, |
| SafetyFlag, QAStatus |
| ) |
| from .lexicon_validator import ( |
| LexiconValidator, ValidationReport, LexiconValidationError, |
| validate_lexicon_on_load |
| ) |
| from config import lexicon_filename, ground_truth_filename |
|
|
|
|
| class DataLoadError(Exception): |
| """Custom exception for data loading errors.""" |
| pass |
|
|
|
|
| class GroundTruthLoader: |
| """Handles loading and validation of ground truth datasets.""" |
| |
| def __init__(self, data_dir: Path = Path("eval")): |
| """ |
| Initialize the ground truth loader. |
| |
| Args: |
| data_dir: Directory containing ground truth files |
| """ |
| self.data_dir = data_dir |
| |
| def load_ground_truth(self, language: Language) -> List[GroundTruthSample]: |
| """ |
| Load ground truth samples for a specific language. |
| |
| Args: |
| language: Language to load ground truth for |
| |
| Returns: |
| List of validated ground truth samples |
| |
| Raises: |
| DataLoadError: If file cannot be loaded or data is invalid |
| """ |
| file_path = self._get_ground_truth_path(language) |
| |
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| reader = csv.DictReader(f) |
| samples = [] |
| |
| for row_num, row in enumerate(reader, start=2): |
| try: |
| sample = self._parse_ground_truth_row(row) |
| samples.append(sample) |
| except Exception as e: |
| raise DataLoadError( |
| f"Invalid data in {file_path} at row {row_num}: {e}" |
| ) from e |
| |
| return samples |
| |
| except FileNotFoundError: |
| raise DataLoadError(f"Ground truth file not found: {file_path}") |
| except Exception as e: |
| raise DataLoadError(f"Failed to load ground truth from {file_path}: {e}") from e |
| |
| def _get_ground_truth_path(self, language: Language) -> Path: |
| """Get the file path for ground truth data. |
| |
| Tries versioned filename first, falls back to non-versioned. |
| """ |
| filename = ground_truth_filename(language.value) |
| versioned_path = self.data_dir / filename |
| if versioned_path.exists(): |
| return versioned_path |
| |
| fallback_path = self.data_dir / f"ground_truth_{language.value}.csv" |
| if fallback_path.exists(): |
| return fallback_path |
| return versioned_path |
| |
| def _parse_ground_truth_row(self, row: Dict[str, str]) -> GroundTruthSample: |
| """ |
| Parse a single CSV row into a GroundTruthSample. |
| |
| Supports both legacy 4-field format and full AI BRIDGE schema. |
| """ |
| |
| text = row['text'].strip('"') |
| has_bias = row['has_bias'].lower() == 'true' |
| raw_category = row.get('bias_category', '').strip() |
| try: |
| bias_category = BiasCategory(raw_category) if raw_category else BiasCategory.NONE |
| except ValueError: |
| |
| |
| bias_category = BiasCategory.STEREOTYPE |
| expected_correction = row.get('expected_correction', '') |
|
|
| |
| is_extended = 'target_gender' in row or 'bias_label' in row |
|
|
| if is_extended: |
| return GroundTruthSample( |
| text=text, |
| has_bias=has_bias, |
| bias_category=bias_category, |
| expected_correction=expected_correction, |
| |
| id=row.get('id'), |
| language=row.get('language'), |
| script=row.get('script'), |
| country=row.get('country'), |
| region_dialect=row.get('region_dialect'), |
| source_type=row.get('source_type'), |
| source_ref=row.get('source_ref'), |
| collection_date=row.get('collection_date'), |
| translation=row.get('translation'), |
| domain=row.get('domain'), |
| topic=row.get('topic'), |
| theme=row.get('theme'), |
| sensitive_characteristic=row.get('sensitive_characteristic'), |
| |
| target_gender=self._parse_enum(row.get('target_gender'), TargetGender), |
| bias_label=self._parse_enum(row.get('bias_label'), BiasLabel), |
| stereotype_category=self._parse_enum(row.get('stereotype_category'), StereotypeCategory), |
| explicitness=self._parse_enum(row.get('explicitness'), Explicitness), |
| bias_severity=self._parse_int(row.get('bias_severity')), |
| sentiment_toward_referent=self._parse_enum(row.get('sentiment_toward_referent'), Sentiment), |
| device=row.get('device'), |
| |
| safety_flag=self._parse_enum(row.get('safety_flag'), SafetyFlag), |
| pii_removed=self._parse_bool(row.get('pii_removed')), |
| annotator_id=row.get('annotator_id'), |
| qa_status=self._parse_enum(row.get('qa_status'), QAStatus), |
| approver_id=row.get('approver_id'), |
| cohen_kappa=self._parse_float(row.get('cohen_kappa')), |
| notes=row.get('notes'), |
| eval_split=row.get('eval_split') |
| ) |
| else: |
| |
| return GroundTruthSample( |
| text=text, |
| has_bias=has_bias, |
| bias_category=bias_category, |
| expected_correction=expected_correction |
| ) |
|
|
| def _parse_enum(self, value: Optional[str], enum_class) -> Optional[Any]: |
| """Parse a string value into an enum, returning None if invalid.""" |
| if not value or value.upper() in ('', 'NEEDS_ANNOTATION', 'N/A', 'NONE'): |
| return None |
| try: |
| |
| value_lower = value.lower().replace('_', '-') |
| for member in enum_class: |
| if member.value.lower() == value_lower or member.name.lower() == value_lower: |
| return member |
| return None |
| except (ValueError, KeyError): |
| return None |
|
|
| def _parse_int(self, value: Optional[str]) -> Optional[int]: |
| """Parse a string to int, returning None if invalid.""" |
| if not value or value in ('', 'N/A'): |
| return None |
| try: |
| return int(value) |
| except ValueError: |
| return None |
|
|
| def _parse_float(self, value: Optional[str]) -> Optional[float]: |
| """Parse a string to float, returning None if invalid.""" |
| if not value or value in ('', 'N/A'): |
| return None |
| try: |
| return float(value) |
| except ValueError: |
| return None |
|
|
| def _parse_bool(self, value: Optional[str]) -> Optional[bool]: |
| """Parse a string to bool, returning None if invalid.""" |
| if not value or value in ('', 'N/A'): |
| return None |
| return value.lower() in ('true', '1', 'yes') |
|
|
|
|
| class RulesLoader: |
| """Handles loading bias detection rules from CSV files with validation.""" |
|
|
| def __init__(self, rules_dir: Path = Path("rules"), validate: bool = True, |
| strict_validation: bool = False): |
| """ |
| Initialize the rules loader. |
| |
| Args: |
| rules_dir: Directory containing rule files |
| validate: If True, validates lexicons before loading |
| strict_validation: If True, warnings become errors during validation |
| """ |
| self.rules_dir = rules_dir |
| self.validate = validate |
| self.strict_validation = strict_validation |
| self._validator = LexiconValidator(strict_mode=strict_validation) |
| self._validation_reports: Dict[str, ValidationReport] = {} |
|
|
| def get_validation_report(self, language: Language) -> Optional[ValidationReport]: |
| """Get the validation report for a language if available.""" |
| return self._validation_reports.get(language.value) |
|
|
| def load_rules(self, language: Language) -> List[Dict[str, str]]: |
| """ |
| Load bias detection rules for a specific language. |
| |
| Args: |
| language: Language to load rules for |
| |
| Returns: |
| List of rule dictionaries with AI BRIDGE extended fields |
| |
| Raises: |
| DataLoadError: If rules cannot be loaded |
| LexiconValidationError: If validation fails (when validate=True) |
| """ |
| file_path = self._get_rules_path(language) |
|
|
| |
| if self.validate: |
| report = self._validator.validate_file(file_path) |
| self._validation_reports[language.value] = report |
|
|
| if not report.is_valid: |
| |
| print(f"\n⚠️ Lexicon validation issues for {language.value}:") |
| for issue in report.issues: |
| if issue.severity.value == "error": |
| print(f" ❌ Row {issue.row_number}: {issue.message}") |
|
|
| raise LexiconValidationError(report) |
|
|
| elif report.warning_count > 0: |
| print(f"\n⚠️ Lexicon warnings for {language.value}: {report.warning_count} warnings") |
|
|
| try: |
| with open(file_path, 'r', encoding='utf-8') as f: |
| reader = csv.DictReader(f) |
| rules = [] |
|
|
| for row in reader: |
| |
| if row.get('biased'): |
| rule = { |
| 'biased': row['biased'], |
| 'neutral_primary': row.get('neutral_primary', ''), |
| 'severity': row.get('severity', 'replace'), |
| 'pos': row.get('pos', 'noun'), |
| 'tags': row.get('tags', ''), |
| |
| 'bias_label': row.get('bias_label', 'stereotype'), |
| 'stereotype_category': row.get('stereotype_category', 'profession'), |
| 'explicitness': row.get('explicitness', 'explicit'), |
| |
| 'ngeli': row.get('ngeli', ''), |
| 'number': row.get('number', ''), |
| 'requires_agreement': row.get('requires_agreement', 'false'), |
| 'scope': row.get('scope', ''), |
| 'register': row.get('register', 'formal'), |
| |
| 'avoid_when': row.get('avoid_when', ''), |
| 'constraints': row.get('constraints', ''), |
| } |
| rules.append(rule) |
|
|
| return rules |
|
|
| except FileNotFoundError: |
| raise DataLoadError(f"Rules file not found: {file_path}") |
| except Exception as e: |
| raise DataLoadError(f"Failed to load rules from {file_path}: {e}") from e |
| |
| def _get_rules_path(self, language: Language) -> Path: |
| """Get the file path for rules data.""" |
| filename = lexicon_filename(language.value) |
| return self.rules_dir / filename |
|
|
|
|
| class ResultsWriter: |
| """Handles writing evaluation results to files.""" |
| |
| def __init__(self, results_dir: Path = Path("eval/results")): |
| """ |
| Initialize the results writer. |
| |
| Args: |
| results_dir: Directory to write results to |
| """ |
| self.results_dir = results_dir |
| self.results_dir.mkdir(parents=True, exist_ok=True) |
| |
| def write_csv_report(self, results: List[Any], filename: str) -> Path: |
| """ |
| Write evaluation results to CSV file. |
| |
| Args: |
| results: List of result dictionaries |
| filename: Name of output file |
| |
| Returns: |
| Path to written file |
| |
| Raises: |
| DataLoadError: If file cannot be written |
| """ |
| file_path = self.results_dir / filename |
| |
| try: |
| with open(file_path, 'w', newline='', encoding='utf-8') as f: |
| if results: |
| writer = csv.DictWriter(f, fieldnames=results[0].keys()) |
| writer.writeheader() |
| writer.writerows(results) |
| |
| return file_path |
| |
| except Exception as e: |
| raise DataLoadError(f"Failed to write CSV report to {file_path}: {e}") from e |
| |
| def write_json_report(self, data: Dict[str, Any], filename: str) -> Path: |
| """ |
| Write data to JSON file. |
| |
| Args: |
| data: Data to write |
| filename: Name of output file |
| |
| Returns: |
| Path to written file |
| |
| Raises: |
| DataLoadError: If file cannot be written |
| """ |
| file_path = self.results_dir / filename |
| |
| try: |
| with open(file_path, 'w', encoding='utf-8') as f: |
| json.dump(data, f, indent=2, ensure_ascii=False) |
| |
| return file_path |
| |
| except Exception as e: |
| raise DataLoadError(f"Failed to write JSON report to {file_path}: {e}") from e |