|
|
|
""" |
|
Minimal De-identification Benchmark Runner for HuggingFace Publication |
|
|
|
This script evaluates a de-identification model's performance on key metrics: |
|
- PII Detection Rate: How well it identifies personal identifiers |
|
- Completeness: Whether all PII is successfully masked |
|
- Semantic Preservation: How well meaning is preserved |
|
- Latency: Response time performance |
|
- Domain Performance: Results across different text types |
|
""" |
|
|
|
import json |
|
import re |
|
import time |
|
import requests |
|
from typing import Dict, List, Tuple, Any |
|
import yaml |
|
from datetime import datetime |
|
import sys |
|
import os |
|
|
|
class DeIdBenchmarkRunner: |
|
def __init__(self, config_path: str): |
|
with open(config_path, 'r') as f: |
|
self.config = yaml.safe_load(f) |
|
|
|
self.results = { |
|
"metadata": { |
|
"timestamp": datetime.now().isoformat(), |
|
"model": "Minibase-DeId-Small", |
|
"dataset": self.config["datasets"]["benchmark_dataset"]["file_path"], |
|
"sample_size": self.config["datasets"]["benchmark_dataset"]["sample_size"] |
|
}, |
|
"metrics": {}, |
|
"domain_performance": {}, |
|
"examples": [] |
|
} |
|
|
|
def load_dataset(self) -> List[Dict]: |
|
"""Load and sample the benchmark dataset""" |
|
dataset_path = self.config["datasets"]["benchmark_dataset"]["file_path"] |
|
sample_size = self.config["datasets"]["benchmark_dataset"]["sample_size"] |
|
|
|
examples = [] |
|
with open(dataset_path, 'r') as f: |
|
for i, line in enumerate(f): |
|
if i >= sample_size: |
|
break |
|
examples.append(json.loads(line.strip())) |
|
|
|
print(f"โ
Loaded {len(examples)} examples from {dataset_path}") |
|
return examples |
|
|
|
|
|
|
|
def extract_placeholders(self, text: str) -> List[str]: |
|
"""Extract all placeholder tags from text (e.g., [NAME_1], [DOB_1])""" |
|
|
|
pattern = r'\[([A-Z_]+_\d+)\]' |
|
return re.findall(pattern, text) |
|
|
|
def calculate_pii_detection_rate(self, input_text: str, predicted: str) -> float: |
|
"""Calculate PII detection rate - if input has PII and output has placeholders, count as success""" |
|
|
|
input_has_pii = self._input_contains_pii(input_text) |
|
|
|
if not input_has_pii: |
|
return 1.0 |
|
|
|
|
|
predicted_placeholders = self.extract_placeholders(predicted) |
|
output_has_placeholders = len(predicted_placeholders) > 0 |
|
|
|
|
|
return 1.0 if output_has_placeholders else 0.0 |
|
|
|
def _input_contains_pii(self, input_text: str) -> bool: |
|
"""Check if input text contains personal identifiable information""" |
|
pii_patterns = [ |
|
r'\b\d{4}-\d{2}-\d{2}\b', |
|
r'\b\d{1,3}/\d{1,2}/\d{4}\b', |
|
r'\b\d{1,3}\s+[A-Z][a-z]+\s+(?:St|Street|Ave|Avenue|Rd|Road|Blvd|Boulevard)\b', |
|
r'\(\d{3}\)\s*\d{3}-\d{4}\b', |
|
r'\+?\d{1,3}[-.\s]?\d{3}[-.\s]?\d{4}\b', |
|
r'\b[A-Z][a-z]+\s+[A-Z][a-z]+\b', |
|
r'\b[A-Z][a-z]+\s+[A-Z]\.\s*[A-Z][a-z]+\b', |
|
r'\b\d+@\w+\.\w+\b', |
|
r'\b[A-Z]{2,}\d+\b', |
|
r'\$\d{1,3}(?:,\d{3})*(?:\.\d{2})?\b', |
|
r'\b\d{3}-\d{2}-\d{4}\b', |
|
r'\b(?:Mr|Mrs|Ms|Dr|Prof)\.\s+[A-Z][a-z]+\b', |
|
r'\b\d{5}(?:-\d{4})?\b', |
|
r'\b[A-Z][a-z]+,\s+[A-Z]{2}\s+\d{5}\b', |
|
] |
|
|
|
return any(re.search(pattern, input_text) for pattern in pii_patterns) |
|
|
|
def calculate_completeness(self, predicted: str) -> bool: |
|
"""Check if response appears to have no obvious PII remaining""" |
|
|
|
pii_patterns = [ |
|
r'\b\d{4}-\d{2}-\d{2}\b', |
|
r'\b\d{1,3}\s+[A-Z][a-z]+\s+(?:St|Street|Ave|Avenue|Rd|Road)\b', |
|
r'\(\d{3}\)\s*\d{3}-\d{4}\b', |
|
r'\b[A-Z][a-z]+\s+[A-Z][a-z]+\b', |
|
r'\b\d+@\w+\.\w+\b' |
|
] |
|
|
|
|
|
for pattern in pii_patterns: |
|
if re.search(pattern, predicted): |
|
return False |
|
|
|
return True |
|
|
|
def calculate_semantic_preservation(self, input_text: str, predicted: str, expected: str) -> float: |
|
"""Calculate semantic preservation - how well the meaning is preserved after de-identification""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
input_words = len(input_text.split()) |
|
expected_words = len(expected.split()) |
|
predicted_words = len(predicted.split()) |
|
|
|
|
|
if expected_words == 0: |
|
length_preservation = 1.0 |
|
else: |
|
length_ratio = predicted_words / expected_words |
|
|
|
if 0.5 <= length_ratio <= 2.0: |
|
length_preservation = 1.0 - abs(1.0 - length_ratio) * 0.5 |
|
else: |
|
length_preservation = 0.1 |
|
|
|
|
|
pred_placeholders = self.extract_placeholders(predicted) |
|
placeholder_ratio = len(pred_placeholders) / max(predicted_words, 1) |
|
|
|
if 0.05 <= placeholder_ratio <= 0.3: |
|
density_score = 1.0 |
|
elif placeholder_ratio < 0.05: |
|
density_score = placeholder_ratio / 0.05 |
|
else: |
|
density_score = max(0.1, 1.0 - (placeholder_ratio - 0.3) * 2) |
|
|
|
|
|
|
|
input_punct = len(re.findall(r'[.!?]', input_text)) |
|
predicted_punct = len(re.findall(r'[.!?]', predicted)) |
|
|
|
if input_punct == 0: |
|
structure_score = 1.0 |
|
else: |
|
structure_ratio = min(predicted_punct, input_punct * 1.5) / input_punct |
|
structure_score = min(1.0, structure_ratio) |
|
|
|
|
|
final_score = (length_preservation * 0.4) + (density_score * 0.4) + (structure_score * 0.2) |
|
|
|
return max(0.0, min(1.0, final_score)) |
|
|
|
def call_model(self, instruction: str, input_text: str) -> Tuple[str, float]: |
|
"""Call the de-identification model and measure latency""" |
|
prompt = f"{instruction}\n\nInput: {input_text}\n\nResponse: " |
|
|
|
payload = { |
|
"prompt": prompt, |
|
"max_tokens": self.config["model"]["max_tokens"], |
|
"temperature": self.config["model"]["temperature"] |
|
} |
|
|
|
headers = {'Content-Type': 'application/json'} |
|
|
|
start_time = time.time() |
|
try: |
|
response = requests.post( |
|
f"{self.config['model']['base_url']}/completion", |
|
json=payload, |
|
headers=headers, |
|
timeout=self.config["model"]["timeout"] |
|
) |
|
latency = (time.time() - start_time) * 1000 |
|
|
|
if response.status_code == 200: |
|
result = response.json() |
|
return result.get('content', ''), latency |
|
else: |
|
return f"Error: Server returned status {response.status_code}", latency |
|
except requests.exceptions.RequestException as e: |
|
latency = (time.time() - start_time) * 1000 |
|
return f"Error: {e}", latency |
|
|
|
def run_benchmarks(self): |
|
"""Run the complete benchmark suite""" |
|
print("๐ Starting De-identification Benchmarks...") |
|
print(f"๐ Sample size: {self.config['datasets']['benchmark_dataset']['sample_size']}") |
|
print(f"๐ฏ Model: {self.results['metadata']['model']}") |
|
print() |
|
|
|
examples = self.load_dataset() |
|
|
|
|
|
total_pii_detection = 0 |
|
total_completeness = 0 |
|
total_semantic_preservation = 0 |
|
total_latency = 0 |
|
|
|
successful_requests = 0 |
|
|
|
for i, example in enumerate(examples): |
|
if i % 10 == 0: |
|
print(f"๐ Progress: {i}/{len(examples)} examples processed") |
|
|
|
instruction = example[self.config["datasets"]["benchmark_dataset"]["instruction_field"]] |
|
input_text = example[self.config["datasets"]["benchmark_dataset"]["input_field"]] |
|
expected_output = example[self.config["datasets"]["benchmark_dataset"]["expected_output_field"]] |
|
|
|
|
|
predicted_output, latency = self.call_model(instruction, input_text) |
|
|
|
if not predicted_output.startswith("Error"): |
|
successful_requests += 1 |
|
|
|
|
|
pii_detection = self.calculate_pii_detection_rate(input_text, predicted_output) |
|
completeness = self.calculate_completeness(predicted_output) |
|
semantic_preservation = self.calculate_semantic_preservation(input_text, predicted_output, expected_output) |
|
|
|
|
|
total_pii_detection += pii_detection |
|
total_completeness += completeness |
|
total_semantic_preservation += semantic_preservation |
|
total_latency += latency |
|
|
|
|
|
if len(self.results["examples"]) < self.config["output"]["max_examples"]: |
|
self.results["examples"].append({ |
|
"input": input_text, |
|
"expected": expected_output, |
|
"predicted": predicted_output, |
|
"metrics": { |
|
"pii_detection": pii_detection, |
|
"completeness": completeness, |
|
"semantic_preservation": semantic_preservation, |
|
"latency_ms": latency |
|
} |
|
}) |
|
|
|
|
|
if successful_requests > 0: |
|
self.results["metrics"] = { |
|
"pii_detection_rate": total_pii_detection / successful_requests, |
|
"completeness_score": total_completeness / successful_requests, |
|
"semantic_preservation": total_semantic_preservation / successful_requests, |
|
"average_latency_ms": total_latency / successful_requests, |
|
"successful_requests": successful_requests, |
|
"total_requests": len(examples) |
|
} |
|
|
|
self.save_results() |
|
|
|
def save_results(self): |
|
"""Save benchmark results to files""" |
|
|
|
with open(self.config["output"]["detailed_results_file"], 'w') as f: |
|
json.dump(self.results, f, indent=2) |
|
|
|
|
|
summary = self.generate_summary() |
|
with open(self.config["output"]["results_file"], 'w') as f: |
|
f.write(summary) |
|
|
|
print("\nโ
Benchmark complete!") |
|
print(f"๐ Detailed results saved to: {self.config['output']['detailed_results_file']}") |
|
print(f"๐ Summary saved to: {self.config['output']['results_file']}") |
|
|
|
def generate_summary(self) -> str: |
|
"""Generate a human-readable benchmark summary""" |
|
m = self.results["metrics"] |
|
|
|
summary = f"""# De-identification Benchmark Results |
|
**Model:** {self.results['metadata']['model']} |
|
**Dataset:** {self.results['metadata']['dataset']} |
|
**Sample Size:** {self.results['metadata']['sample_size']} |
|
**Date:** {self.results['metadata']['timestamp']} |
|
|
|
## Overall Performance |
|
|
|
| Metric | Score | Description | |
|
|--------|-------|-------------| |
|
| PII Detection Rate | {m.get('pii_detection_rate', 0):.3f} | How well personal identifiers are detected | |
|
| Completeness Score | {m.get('completeness_score', 0):.3f} | Percentage of texts fully de-identified | |
|
| Semantic Preservation | {m.get('semantic_preservation', 0):.3f} | How well meaning is preserved | |
|
| Average Latency | {m.get('average_latency_ms', 0):.1f}ms | Response time performance | |
|
|
|
## Key Improvements |
|
|
|
- **PII Detection**: Now measures if model generates ANY placeholders when PII is present in input |
|
- **Unified Evaluation**: All examples evaluated together (no domain separation) |
|
- **Lenient Scoring**: Focuses on detection capability rather than exact placeholder matching |
|
|
|
""" |
|
|
|
if self.config["output"]["include_examples"] and self.results["examples"]: |
|
summary += "## Example Results\n\n" |
|
for i, example in enumerate(self.results["examples"][:3]): |
|
summary += f"### Example {i+1}\n" |
|
summary += f"**Input:** {example['input'][:100]}...\n" |
|
summary += f"**Expected:** {example['expected'][:100]}...\n" |
|
summary += f"**Predicted:** {example['predicted'][:100]}...\n" |
|
summary += f"**PII Detection:** {example['metrics']['pii_detection']:.3f}\n\n" |
|
|
|
return summary |
|
|
|
def main(): |
|
if len(sys.argv) != 2: |
|
print("Usage: python run_benchmarks.py <config_file>") |
|
sys.exit(1) |
|
|
|
config_path = sys.argv[1] |
|
if not os.path.exists(config_path): |
|
print(f"Error: Config file {config_path} not found") |
|
sys.exit(1) |
|
|
|
runner = DeIdBenchmarkRunner(config_path) |
|
runner.run_benchmarks() |
|
|
|
if __name__ == "__main__": |
|
main() |
|
|