test-ui / eval /data_loader.py
juakazike's picture
Deploy testing UI for expert validation
d7d1833 verified
"""
Data loading utilities for bias evaluation framework.
This module handles all file I/O operations with proper error handling and validation.
Supports both legacy 4-field format and full AI BRIDGE 29-field schema.
Includes automatic lexicon validation on load.
"""
import csv
import json
from pathlib import Path
from typing import List, Dict, Any, Optional
from .models import (
GroundTruthSample, Language, BiasCategory, BiasLabel,
StereotypeCategory, TargetGender, Explicitness, Sentiment,
SafetyFlag, QAStatus
)
from .lexicon_validator import (
LexiconValidator, ValidationReport, LexiconValidationError,
validate_lexicon_on_load
)
from config import lexicon_filename, ground_truth_filename
class DataLoadError(Exception):
"""Custom exception for data loading errors."""
pass
class GroundTruthLoader:
"""Handles loading and validation of ground truth datasets."""
def __init__(self, data_dir: Path = Path("eval")):
"""
Initialize the ground truth loader.
Args:
data_dir: Directory containing ground truth files
"""
self.data_dir = data_dir
def load_ground_truth(self, language: Language) -> List[GroundTruthSample]:
"""
Load ground truth samples for a specific language.
Args:
language: Language to load ground truth for
Returns:
List of validated ground truth samples
Raises:
DataLoadError: If file cannot be loaded or data is invalid
"""
file_path = self._get_ground_truth_path(language)
try:
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
samples = []
for row_num, row in enumerate(reader, start=2): # Start at 2 for header
try:
sample = self._parse_ground_truth_row(row)
samples.append(sample)
except Exception as e:
raise DataLoadError(
f"Invalid data in {file_path} at row {row_num}: {e}"
) from e
return samples
except FileNotFoundError:
raise DataLoadError(f"Ground truth file not found: {file_path}")
except Exception as e:
raise DataLoadError(f"Failed to load ground truth from {file_path}: {e}") from e
def _get_ground_truth_path(self, language: Language) -> Path:
"""Get the file path for ground truth data."""
filename = ground_truth_filename(language.value)
return self.data_dir / filename
def _parse_ground_truth_row(self, row: Dict[str, str]) -> GroundTruthSample:
"""
Parse a single CSV row into a GroundTruthSample.
Supports both legacy 4-field format and full AI BRIDGE schema.
"""
# Core required fields
text = row['text'].strip('"')
has_bias = row['has_bias'].lower() == 'true'
bias_category = BiasCategory(row['bias_category'])
expected_correction = row.get('expected_correction', '')
# Check if this is AI BRIDGE extended format
is_extended = 'target_gender' in row or 'bias_label' in row
if is_extended:
return GroundTruthSample(
text=text,
has_bias=has_bias,
bias_category=bias_category,
expected_correction=expected_correction,
# AI BRIDGE metadata fields
id=row.get('id'),
language=row.get('language'),
script=row.get('script'),
country=row.get('country'),
region_dialect=row.get('region_dialect'),
source_type=row.get('source_type'),
source_ref=row.get('source_ref'),
collection_date=row.get('collection_date'),
translation=row.get('translation'),
domain=row.get('domain'),
topic=row.get('topic'),
theme=row.get('theme'),
sensitive_characteristic=row.get('sensitive_characteristic'),
# AI BRIDGE bias annotation fields
target_gender=self._parse_enum(row.get('target_gender'), TargetGender),
bias_label=self._parse_enum(row.get('bias_label'), BiasLabel),
stereotype_category=self._parse_enum(row.get('stereotype_category'), StereotypeCategory),
explicitness=self._parse_enum(row.get('explicitness'), Explicitness),
bias_severity=self._parse_int(row.get('bias_severity')),
sentiment_toward_referent=self._parse_enum(row.get('sentiment_toward_referent'), Sentiment),
device=row.get('device'),
# Quality and safety fields
safety_flag=self._parse_enum(row.get('safety_flag'), SafetyFlag),
pii_removed=self._parse_bool(row.get('pii_removed')),
annotator_id=row.get('annotator_id'),
qa_status=self._parse_enum(row.get('qa_status'), QAStatus),
approver_id=row.get('approver_id'),
cohen_kappa=self._parse_float(row.get('cohen_kappa')),
notes=row.get('notes'),
eval_split=row.get('eval_split')
)
else:
# Legacy 4-field format
return GroundTruthSample(
text=text,
has_bias=has_bias,
bias_category=bias_category,
expected_correction=expected_correction
)
def _parse_enum(self, value: Optional[str], enum_class) -> Optional[Any]:
"""Parse a string value into an enum, returning None if invalid."""
if not value or value.upper() in ('', 'NEEDS_ANNOTATION', 'N/A', 'NONE'):
return None
try:
# Handle both value and name matching
value_lower = value.lower().replace('_', '-')
for member in enum_class:
if member.value.lower() == value_lower or member.name.lower() == value_lower:
return member
return None
except (ValueError, KeyError):
return None
def _parse_int(self, value: Optional[str]) -> Optional[int]:
"""Parse a string to int, returning None if invalid."""
if not value or value in ('', 'N/A'):
return None
try:
return int(value)
except ValueError:
return None
def _parse_float(self, value: Optional[str]) -> Optional[float]:
"""Parse a string to float, returning None if invalid."""
if not value or value in ('', 'N/A'):
return None
try:
return float(value)
except ValueError:
return None
def _parse_bool(self, value: Optional[str]) -> Optional[bool]:
"""Parse a string to bool, returning None if invalid."""
if not value or value in ('', 'N/A'):
return None
return value.lower() in ('true', '1', 'yes')
class RulesLoader:
"""Handles loading bias detection rules from CSV files with validation."""
def __init__(self, rules_dir: Path = Path("rules"), validate: bool = True,
strict_validation: bool = False):
"""
Initialize the rules loader.
Args:
rules_dir: Directory containing rule files
validate: If True, validates lexicons before loading
strict_validation: If True, warnings become errors during validation
"""
self.rules_dir = rules_dir
self.validate = validate
self.strict_validation = strict_validation
self._validator = LexiconValidator(strict_mode=strict_validation)
self._validation_reports: Dict[str, ValidationReport] = {}
def get_validation_report(self, language: Language) -> Optional[ValidationReport]:
"""Get the validation report for a language if available."""
return self._validation_reports.get(language.value)
def load_rules(self, language: Language) -> List[Dict[str, str]]:
"""
Load bias detection rules for a specific language.
Args:
language: Language to load rules for
Returns:
List of rule dictionaries with AI BRIDGE extended fields
Raises:
DataLoadError: If rules cannot be loaded
LexiconValidationError: If validation fails (when validate=True)
"""
file_path = self._get_rules_path(language)
# Validate lexicon before loading
if self.validate:
report = self._validator.validate_file(file_path)
self._validation_reports[language.value] = report
if not report.is_valid:
# Log validation issues
print(f"\n⚠️ Lexicon validation issues for {language.value}:")
for issue in report.issues:
if issue.severity.value == "error":
print(f" ❌ Row {issue.row_number}: {issue.message}")
raise LexiconValidationError(report)
elif report.warning_count > 0:
print(f"\n⚠️ Lexicon warnings for {language.value}: {report.warning_count} warnings")
try:
with open(file_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
rules = []
for row in reader:
# Include rules with biased term (neutral_primary can be empty for deletion patterns)
if row.get('biased'):
rule = {
'biased': row['biased'],
'neutral_primary': row.get('neutral_primary', ''),
'severity': row.get('severity', 'replace'),
'pos': row.get('pos', 'noun'),
'tags': row.get('tags', ''),
# AI BRIDGE extended fields
'bias_label': row.get('bias_label', 'stereotype'),
'stereotype_category': row.get('stereotype_category', 'profession'),
'explicitness': row.get('explicitness', 'explicit'),
# Language-specific fields
'ngeli': row.get('ngeli', ''),
'number': row.get('number', ''),
'requires_agreement': row.get('requires_agreement', 'false'),
'scope': row.get('scope', ''),
'register': row.get('register', 'formal'),
}
rules.append(rule)
return rules
except FileNotFoundError:
raise DataLoadError(f"Rules file not found: {file_path}")
except Exception as e:
raise DataLoadError(f"Failed to load rules from {file_path}: {e}") from e
def _get_rules_path(self, language: Language) -> Path:
"""Get the file path for rules data."""
filename = lexicon_filename(language.value)
return self.rules_dir / filename
class ResultsWriter:
"""Handles writing evaluation results to files."""
def __init__(self, results_dir: Path = Path("eval/results")):
"""
Initialize the results writer.
Args:
results_dir: Directory to write results to
"""
self.results_dir = results_dir
self.results_dir.mkdir(parents=True, exist_ok=True)
def write_csv_report(self, results: List[Any], filename: str) -> Path:
"""
Write evaluation results to CSV file.
Args:
results: List of result dictionaries
filename: Name of output file
Returns:
Path to written file
Raises:
DataLoadError: If file cannot be written
"""
file_path = self.results_dir / filename
try:
with open(file_path, 'w', newline='', encoding='utf-8') as f:
if results:
writer = csv.DictWriter(f, fieldnames=results[0].keys())
writer.writeheader()
writer.writerows(results)
return file_path
except Exception as e:
raise DataLoadError(f"Failed to write CSV report to {file_path}: {e}") from e
def write_json_report(self, data: Dict[str, Any], filename: str) -> Path:
"""
Write data to JSON file.
Args:
data: Data to write
filename: Name of output file
Returns:
Path to written file
Raises:
DataLoadError: If file cannot be written
"""
file_path = self.results_dir / filename
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
return file_path
except Exception as e:
raise DataLoadError(f"Failed to write JSON report to {file_path}: {e}") from e