Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Gradio app for validating dataset mentions from stratified validation sample. | |
| This app allows users to: | |
| 1. Review dataset mentions with context | |
| 2. Validate as dataset or non-dataset | |
| 3. Compare extraction model vs judge (GPT-5.2) | |
| 4. Track validation progress with live statistics | |
| Adapted from annotation_app.py for direct_judge validation workflow. | |
| Configured for Hugging Face Spaces deployment. | |
| """ | |
| import gradio as gr | |
| import json | |
| import re | |
| import os | |
| import argparse | |
| from pathlib import Path | |
| from dotenv import load_dotenv | |
| # Load .env for local development | |
| load_dotenv() | |
| try: | |
| from gradio_pdf import PDF as gr_pdf | |
| except ImportError: | |
| gr_pdf = None | |
| from typing import Dict, List, Tuple, Optional | |
| from datetime import datetime | |
| from huggingface_hub import HfApi, login | |
| from datasets import Dataset, load_dataset | |
| class ValidationAnnotator: | |
| """ | |
| Handle validation annotation logic and state management. | |
| Note: This works with stratified validation samples from direct_judge outputs. | |
| No 4o data available - only judge (GPT-5.2) verdicts are shown. | |
| """ | |
| def __init__(self, input_file: str, hf_dataset_repo: Optional[str] = None, hf_token: Optional[str] = None, | |
| pdf_dir: Optional[str] = None, pdf_url_base: Optional[str] = None, pdf_repo_id: Optional[str] = None): | |
| self.input_file = Path(input_file) | |
| self.output_file = self.input_file.parent / f"{self.input_file.stem}_human_validated.jsonl" | |
| # HF Datasets integration | |
| self.hf_dataset_repo = hf_dataset_repo | |
| self.hf_token = hf_token or os.getenv("HF_TOKEN") | |
| # PDF configuration | |
| self.pdf_dir = Path(pdf_dir) if pdf_dir else None | |
| self.pdf_url_base = pdf_url_base | |
| self.pdf_repo_id = pdf_repo_id | |
| if self.pdf_dir and not self.pdf_dir.exists(): | |
| print(f"β οΈ PDF directory not found: {self.pdf_dir}") | |
| self.hf_enabled = False | |
| # Try to enable HF Datasets if credentials provided | |
| if self.hf_dataset_repo and self.hf_token: | |
| try: | |
| login(token=self.hf_token, add_to_git_credential=False) | |
| self.hf_api = HfApi() | |
| self.hf_enabled = True | |
| print(f"β HF Datasets enabled: {self.hf_dataset_repo}") | |
| except Exception as e: | |
| print(f"β οΈ HF Datasets disabled: {e}") | |
| self.hf_enabled = False | |
| # Load data | |
| self.records = self._load_records() | |
| self.annotations = self._load_annotations() | |
| # Build chunk index for navigation | |
| self._build_chunk_index() | |
| # Current position | |
| self.current_idx = 0 | |
| # Filter state | |
| self.current_filter = "All" # Options: "All", "named", "descriptive", "vague", "non-dataset" | |
| self.filtered_indices = list(range(len(self.records))) # All records by default | |
| # Move to first unannotated record | |
| self._find_next_unannotated() | |
| def _load_records(self) -> List[Dict]: | |
| """Load records from input JSONL file.""" | |
| records = [] | |
| with open(self.input_file, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| if line.strip(): | |
| records.append(json.loads(line)) | |
| return records | |
| def _build_chunk_index(self): | |
| """Build index mapping chunk_id to record indices.""" | |
| self.chunk_ids = [] # Ordered list of unique chunk_ids | |
| self.chunk_to_indices = {} # chunk_id -> list of record indices | |
| for idx, record in enumerate(self.records): | |
| chunk_id = record.get('chunk_id', f'unknown_{idx}') | |
| if chunk_id not in self.chunk_to_indices: | |
| self.chunk_ids.append(chunk_id) | |
| self.chunk_to_indices[chunk_id] = [] | |
| self.chunk_to_indices[chunk_id].append(idx) | |
| self.total_chunks = len(self.chunk_ids) | |
| self.total_datasets = len(self.records) | |
| def _get_chunk_info(self, idx: int) -> Tuple[int, int, int]: | |
| """Get chunk info for a given record index. | |
| Returns: (chunk_number, dataset_in_chunk, total_in_chunk) | |
| """ | |
| if idx >= len(self.records): | |
| return (0, 0, 0) | |
| record = self.records[idx] | |
| chunk_id = record.get('chunk_id', f'unknown_{idx}') | |
| chunk_number = self.chunk_ids.index(chunk_id) + 1 if chunk_id in self.chunk_ids else 0 | |
| chunk_indices = self.chunk_to_indices.get(chunk_id, [idx]) | |
| dataset_in_chunk = chunk_indices.index(idx) + 1 if idx in chunk_indices else 1 | |
| total_in_chunk = len(chunk_indices) | |
| return (chunk_number, dataset_in_chunk, total_in_chunk) | |
| def _load_annotations(self) -> Dict: | |
| """Load existing annotations from local file and/or HF Datasets.""" | |
| annotations = {} | |
| # Try loading from HF Datasets first (cloud backup) | |
| if self.hf_enabled: | |
| try: | |
| dataset = load_dataset(self.hf_dataset_repo, split="train", token=self.hf_token) | |
| for item in dataset: | |
| annotations[item['sample_id']] = item | |
| print(f"β Loaded {len(annotations)} annotations from HF Datasets") | |
| except Exception as e: | |
| print(f"β οΈ Could not load from HF Datasets: {e}") | |
| # Also load from local file (may have newer annotations) | |
| if self.output_file.exists(): | |
| local_count = 0 | |
| with open(self.output_file, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| if line.strip(): | |
| ann = json.loads(line) | |
| annotations[ann['sample_id']] = ann | |
| local_count += 1 | |
| if local_count > 0: | |
| print(f"β Loaded {local_count} annotations from local file") | |
| return annotations | |
| def _save_annotation(self, sample_id: int, verdict: str, notes: str = ""): | |
| """Save a single annotation to file and optionally to HF Datasets.""" | |
| record = self.records[self.current_idx] | |
| # Determine if extraction/judge said dataset | |
| # Dataset = any tag that is NOT "non-dataset" (includes named, descriptive, vague) | |
| extraction_is_dataset = record['extraction_tag'] != 'non-dataset' | |
| judge_is_dataset = record['judge_tag'] != 'non-dataset' | |
| human_is_dataset = verdict == 'dataset' | |
| annotation = { | |
| 'sample_id': sample_id, | |
| 'text': record['text'], | |
| 'document': record['document'], | |
| 'stratum': record['stratum'], | |
| # Human annotation | |
| 'human_verdict': verdict, # 'dataset' or 'non-dataset' | |
| 'human_notes': notes, | |
| 'annotated_at': datetime.now().isoformat(), | |
| # Original extraction | |
| 'extraction_tag': record['extraction_tag'], | |
| 'extraction_confidence': record['extraction_confidence'], | |
| # Judge (GPT-5.2) | |
| 'judge_tag': record['judge_tag'], | |
| 'judge_confidence': record['judge_confidence'], | |
| 'judge_reasoning': record.get('judge_reasoning', ''), | |
| 'judge_data_type': record.get('judge_data_type', ''), | |
| # Computed agreements | |
| 'human_agrees_extraction': human_is_dataset == extraction_is_dataset, | |
| 'human_agrees_judge': human_is_dataset == judge_is_dataset, | |
| 'extraction_agrees_judge': extraction_is_dataset == judge_is_dataset, | |
| } | |
| # Update in-memory annotations | |
| self.annotations[sample_id] = annotation | |
| # Append to local file | |
| with open(self.output_file, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(annotation, ensure_ascii=False) + '\n') | |
| # Push to HF Datasets (async backup) | |
| if self.hf_enabled: | |
| try: | |
| self._push_to_hf_datasets() | |
| except Exception as e: | |
| print(f"β οΈ Failed to push to HF Datasets: {e}") | |
| def _push_to_hf_datasets(self): | |
| """Push all annotations to HF Datasets.""" | |
| if not self.hf_enabled or not self.annotations: | |
| return | |
| try: | |
| # Convert annotations dict to list | |
| annotations_list = list(self.annotations.values()) | |
| # Create dataset | |
| dataset = Dataset.from_list(annotations_list) | |
| # Push to hub | |
| dataset.push_to_hub( | |
| self.hf_dataset_repo, | |
| token=self.hf_token, | |
| private=True # Keep annotations private by default | |
| ) | |
| print(f"β Pushed {len(annotations_list)} annotations to HF Datasets") | |
| except Exception as e: | |
| print(f"β οΈ Error pushing to HF Datasets: {e}") | |
| raise | |
| def _split_sentences(self, text: str) -> list: | |
| """Split text into sentences using a simple rule-based approach.""" | |
| # Split on period/question/exclamation followed by whitespace, or paragraph breaks | |
| chunks = re.split(r'(?<=[.!?])\s+|\n\s*\n', text) | |
| return [c.strip() for c in chunks if c.strip()] | |
| def _extract_context(self, text: str, dataset_name: str, context_sentences: int = 2) -> list: | |
| """ | |
| Extract context around dataset mention and format for highlighting. | |
| Uses sentence-based windowing: returns the sentence containing the dataset | |
| plus context_sentences before and after (default: Β±2 sentences). | |
| Returns: | |
| List of tuples: [(text, label), ...] where label is "DATASET" for the dataset name | |
| """ | |
| if not text: | |
| return [(f"[No context available for '{dataset_name}']", None)] | |
| # Normalize text: remove excessive whitespace but preserve sentence structure | |
| text = re.sub(r'\s+', ' ', text).strip() | |
| dataset_name_clean = re.sub(r'\s+', ' ', dataset_name).strip() | |
| # Split into sentences | |
| sentences = self._split_sentences(text) | |
| if not sentences: | |
| return [(text[:500] + "..." if len(text) > 500 else text, None)] | |
| # Create regex to match name with flexible whitespace | |
| name_parts = dataset_name_clean.split() | |
| if not name_parts: | |
| return [(text[:500] + "..." if len(text) > 500 else text, None)] | |
| pattern_str = r'\s+'.join([re.escape(part) for part in name_parts]) | |
| pattern = re.compile(pattern_str, re.IGNORECASE) | |
| # Find sentence containing the dataset name | |
| target_idx = None | |
| for i, sent in enumerate(sentences): | |
| if pattern.search(sent): | |
| target_idx = i | |
| break | |
| if target_idx is None: | |
| # Fallback: return truncated text without highlighting | |
| return [(text[:500] + "..." if len(text) > 500 else text, None)] | |
| # Get Β±context_sentences around the match | |
| start_idx = max(0, target_idx - context_sentences) | |
| end_idx = min(len(sentences), target_idx + context_sentences + 1) | |
| # Join the context sentences | |
| context_text = " ".join(sentences[start_idx:end_idx]) | |
| # Add ellipsis indicators | |
| prefix = "..." if start_idx > 0 else "" | |
| suffix = "..." if end_idx < len(sentences) else "" | |
| # Find the dataset name in the context for highlighting | |
| match = pattern.search(context_text) | |
| if not match: | |
| # Return without highlighting if somehow not found | |
| return [(prefix + context_text + suffix, None)] | |
| # Build highlighted output | |
| before = prefix + context_text[:match.start()] | |
| dataset = context_text[match.start():match.end()] | |
| after = context_text[match.end():] + suffix | |
| return [ | |
| (before, None), | |
| (dataset, "DATASET"), | |
| (after, None) | |
| ] | |
| def set_filter(self, filter_value: str): | |
| """Set the current filter and update filtered indices. | |
| When 'All' is selected: Show all records including siblings | |
| When a specific tag is selected: Show only primary samples with that tag (no siblings) | |
| """ | |
| self.current_filter = filter_value | |
| if filter_value == "All": | |
| # Show all records including siblings | |
| self.filtered_indices = list(range(len(self.records))) | |
| else: | |
| # Filter by extraction_tag only (not judge_tag) | |
| # AND exclude siblings (only show primary samples) | |
| self.filtered_indices = [ | |
| i for i, record in enumerate(self.records) | |
| if record.get('extraction_tag') == filter_value | |
| and record.get('is_primary', True) # Only primary samples, not siblings | |
| ] | |
| # Always jump to first unannotated record in the new filtered set for determinism | |
| self._find_next_unannotated() | |
| def _is_annotated(self, idx: int) -> bool: | |
| """Check if a record has been annotated.""" | |
| sample_id = self.records[idx].get('sample_id', idx) | |
| return sample_id in self.annotations | |
| def _should_skip(self, idx: int) -> bool: | |
| """Check if record is a one-word vague/descriptive that should be skipped.""" | |
| if idx >= len(self.records): | |
| return False | |
| record = self.records[idx] | |
| text = record.get('text', '') | |
| word_count = len(text.split()) | |
| ext_tag = record.get('extraction_tag', '') | |
| judge_tag = record.get('judge_tag', '') | |
| # Skip one-word vague/descriptive mentions | |
| skip_tags = {'vague', 'descriptive'} | |
| if word_count == 1 and (ext_tag in skip_tags or judge_tag in skip_tags): | |
| return True | |
| return False | |
| def _find_next_unannotated(self): | |
| """Find the next unannotated record within the current filtered set.""" | |
| if not self.filtered_indices: | |
| self.current_idx = len(self.records) | |
| return | |
| for idx in self.filtered_indices: | |
| if not self._is_annotated(idx) and not self._should_skip(idx): | |
| self.current_idx = idx | |
| return | |
| # All filtered records are annotated or skippable, go to the first filtered one if we have any | |
| # or stick to the end if we want to show the completion screen. | |
| # Actually, let's go to the last filtered one if all are annotated. | |
| if self.filtered_indices: | |
| self.current_idx = self.filtered_indices[0] | |
| else: | |
| self.current_idx = len(self.records) | |
| def get_current_display(self) -> Tuple[str, list, str, str, str, str, Dict, str]: | |
| """Get current record for display.""" | |
| if self.current_idx >= len(self.records): | |
| return "π All samples validated!", [], "", "", f"Progress: {len(self.annotations)}/{len(self.records)} (100%)", "β Complete", {}, "" | |
| record = self.records[self.current_idx] | |
| # Get context with highlighting | |
| context = self._extract_context( | |
| record.get('full_context', '') or record.get('usage_context', ''), | |
| record['text'] | |
| ) | |
| # Build AI verdicts (Judge only - no 4o in direct_judge) | |
| # Dataset = any tag that is NOT "non-dataset" (includes named, descriptive, vague) | |
| ai_verdicts_str = "" | |
| # Extraction model verdict | |
| # Dataset if tag is NOT "non-dataset" | |
| ext_tag = record['extraction_tag'] | |
| ext_is_dataset = ext_tag != 'non-dataset' | |
| ext_emoji = "β" if ext_is_dataset else "β" | |
| ai_verdicts_str = f"### Extraction Model:\n" | |
| ai_verdicts_str += f"**Verdict:** {ext_emoji} {'Dataset' if ext_is_dataset else 'Non-Dataset'}\n" | |
| ai_verdicts_str += f"**Tag:** `{ext_tag}`\n" | |
| ai_verdicts_str += f"**Confidence:** {record['extraction_confidence']:.1%}\n" | |
| # Judge (GPT-5.2) verdict | |
| # Dataset if tag is NOT "non-dataset" | |
| judge_tag = record['judge_tag'] | |
| judge_is_dataset = judge_tag != 'non-dataset' | |
| judge_emoji = "β" if judge_is_dataset else "β" | |
| ai_verdicts_str += f"\n### Judge (GPT-5.2):\n" | |
| ai_verdicts_str += f"**Verdict:** {judge_emoji} {'Dataset' if judge_is_dataset else 'Non-Dataset'}\n" | |
| ai_verdicts_str += f"**Tag:** `{judge_tag}`\n" | |
| ai_verdicts_str += f"**Confidence:** {record['judge_confidence']:.1%}\n" | |
| if record.get('judge_data_type'): | |
| ai_verdicts_str += f"**Data Type:** {record['judge_data_type']}\n" | |
| if record.get('judge_reasoning'): | |
| reasoning = record['judge_reasoning'] | |
| ai_verdicts_str += f"\n*Reasoning:* {reasoning}..." | |
| # Metadata | |
| # Metadata | |
| metadata_parts = [] | |
| metadata_parts.append(f"- **Stratum:** `{record['stratum']}`") | |
| # metadata_parts.append(f"- **Document:** `{record['document']}...`") | |
| if record.get("source_document"): | |
| metadata_parts.append(f"- **Source File:** `{record.get('source_document')}`") | |
| if record.get("page_number"): | |
| metadata_parts.append(f"- **Page(s):** {record.get('page_number')}") | |
| is_primary = record.get('is_primary', True) | |
| metadata_parts.append(f"- **Type:** {'Primary sample' if is_primary else 'Sibling (same chunk)'}") | |
| if record.get('geography'): | |
| geo = record['geography'] | |
| if isinstance(geo, dict): | |
| geo = geo.get('text', str(geo)) | |
| metadata_parts.append(f"- **Geography:** {geo}") | |
| metadata_str = "\n".join(metadata_parts) | |
| # Get chunk info | |
| chunk_num, ds_in_chunk, total_in_chunk = self._get_chunk_info(self.current_idx) | |
| # Progress: N/N-max datasets | |
| annotated = len(self.annotations) | |
| progress = f"Datasets: {annotated}/{self.total_datasets} ({annotated/self.total_datasets*100:.1f}%)" | |
| # Status | |
| is_annotated = self._is_annotated(self.current_idx) | |
| if is_annotated: | |
| ann = self.annotations.get(record.get('sample_id', self.current_idx), {}) | |
| status = f"β Validated as: {ann.get('human_verdict', 'unknown')}" | |
| else: | |
| status = "β Pending Validation" | |
| # Navigation info with chunk details | |
| nav = { | |
| 'chunk_info': f"Input Text: {chunk_num}/{self.total_chunks}", | |
| 'dataset_in_chunk': f"Dataset: {ds_in_chunk}/{total_in_chunk} in this chunk", | |
| 'record_info': f"Overall: {self.current_idx + 1}/{self.total_datasets}", | |
| 'can_prev': self.current_idx > 0, | |
| 'can_next': self.current_idx < self.total_datasets - 1 | |
| } | |
| # PDF Source path and page | |
| source_doc = record.get("source_document") | |
| page_num = record.get("page_number") | |
| pdf_value = None | |
| # Convert page_num to int and add 1 (offset from 0-indexed data) | |
| try: | |
| if page_num: | |
| page_num = int(page_num) + 1 | |
| else: | |
| page_num = 1 | |
| except (ValueError, TypeError): | |
| page_num = 1 | |
| if source_doc and self.pdf_dir: | |
| # Local PDF directory | |
| pdf_path = self.pdf_dir / source_doc | |
| if pdf_path.exists(): | |
| pdf_value = str(pdf_path.absolute()) | |
| print(f"π Found PDF for sample {self.current_idx}: {pdf_value} (Page {page_num})", flush=True) | |
| else: | |
| print(f"β οΈ PDF file not found: {pdf_path}", flush=True) | |
| elif source_doc and self.pdf_repo_id: | |
| # Server-side caching via HF Hub (avoids CORS/frontend download issues) | |
| # Remove leading slash if present | |
| source_doc_clean = source_doc.lstrip('/') | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| print(f"π₯ Downloading/Caching PDF from {self.pdf_repo_id}: {source_doc_clean}", flush=True) | |
| pdf_path_cached = hf_hub_download( | |
| repo_id=self.pdf_repo_id, | |
| filename=source_doc_clean, | |
| repo_type="dataset", | |
| token=self.hf_token | |
| ) | |
| pdf_value = str(pdf_path_cached) | |
| print(f"π¦ Cached local path: {pdf_value}", flush=True) | |
| except Exception as e: | |
| print(f"β Failed to download PDF: {e}", flush=True) | |
| # Fallback to URL if download fails and url base is available | |
| if self.pdf_url_base: | |
| pdf_value = f"{self.pdf_url_base.rstrip('/')}/{source_doc_clean}" | |
| print(f"β οΈ Falling back to remote URL: {pdf_value}", flush=True) | |
| elif source_doc and self.pdf_url_base: | |
| # Remote PDF via URL (e.g., HF Datasets) | |
| # Remove any leading slashes from source_doc | |
| source_doc_clean = source_doc.lstrip('/') | |
| pdf_value = f"{self.pdf_url_base.rstrip('/')}/{source_doc_clean}" | |
| print(f"π Using remote PDF for sample {self.current_idx}: {pdf_value} (Page {page_num})", flush=True) | |
| elif source_doc: | |
| print(f"βΉοΈ PDF source specified ({source_doc}) but no pdf_dir or pdf_url_base provided.", flush=True) | |
| return record['text'], context, metadata_str, ai_verdicts_str, progress, status, nav, pdf_value, page_num | |
| def annotate(self, verdict: str, notes: str = "") -> Tuple[str, list, str, str, str, str]: | |
| """Annotate current record and move to next.""" | |
| if self.current_idx < len(self.records): | |
| record = self.records[self.current_idx] | |
| sample_id = record.get('sample_id', self.current_idx) | |
| self._save_annotation(sample_id, verdict, notes) | |
| self.next_record() | |
| return self.get_current_display()[:6] | |
| def next_record(self): | |
| """Move to next record in the filtered set.""" | |
| if not self.filtered_indices: | |
| return | |
| try: | |
| current_pos = self.filtered_indices.index(self.current_idx) | |
| if current_pos < len(self.filtered_indices) - 1: | |
| self.current_idx = self.filtered_indices[current_pos + 1] | |
| except ValueError: | |
| # Current idx not in filtered set (maybe filter changed), jump to first | |
| self.current_idx = self.filtered_indices[0] | |
| def prev_record(self): | |
| """Move to previous record in the filtered set.""" | |
| if not self.filtered_indices: | |
| return | |
| try: | |
| current_pos = self.filtered_indices.index(self.current_idx) | |
| if current_pos > 0: | |
| self.current_idx = self.filtered_indices[current_pos - 1] | |
| except ValueError: | |
| # Current idx not in filtered set, jump to first | |
| self.current_idx = self.filtered_indices[0] | |
| def skip_to_next_unannotated(self): | |
| """Skip to next unannotated record (also skipping one-word vague/descriptive).""" | |
| for i in range(self.current_idx + 1, len(self.records)): | |
| if not self._is_annotated(i) and not self._should_skip(i): | |
| self.current_idx = i | |
| return | |
| def get_statistics(self) -> str: | |
| """Get current annotation statistics as markdown.""" | |
| if not self.annotations: | |
| return "_No annotations yet_" | |
| total = len(self.annotations) | |
| human_dataset = sum(1 for a in self.annotations.values() if a['human_verdict'] == 'dataset') | |
| human_non = total - human_dataset | |
| agrees_ext = sum(1 for a in self.annotations.values() if a['human_agrees_extraction']) | |
| agrees_judge = sum(1 for a in self.annotations.values() if a['human_agrees_judge']) | |
| stats = f"""**Annotated:** {total}/{len(self.records)} | |
| **Human Verdicts:** | |
| - Dataset: {human_dataset} | |
| - Non-Dataset: {human_non} | |
| **Agreement Rates:** | |
| - Extraction Model: {agrees_ext/total*100:.1f}% | |
| - Judge (GPT-5.2): {agrees_judge/total*100:.1f}% | |
| """ | |
| return stats | |
| def create_app(input_file: str, hf_dataset_repo: Optional[str] = None, hf_token: Optional[str] = None, | |
| pdf_dir: Optional[str] = None, pdf_url_base: Optional[str] = None, pdf_repo_id: Optional[str] = None): | |
| """Create and configure Gradio app.""" | |
| annotator = ValidationAnnotator(input_file, hf_dataset_repo, hf_token, pdf_dir, pdf_url_base, pdf_repo_id) | |
| # Custom CSS for the green button and dark mode toggle | |
| css = """ | |
| #accept_btn { | |
| background-color: #22c55e !important; | |
| color: white !important; | |
| } | |
| #accept_btn:hover { | |
| background-color: #16a34a !important; | |
| } | |
| #theme_toggle { | |
| position: fixed; | |
| top: 10px; | |
| right: 10px; | |
| z-index: 1000; | |
| padding: 8px 16px; | |
| border-radius: 20px; | |
| cursor: pointer; | |
| font-size: 14px; | |
| } | |
| """ | |
| # JavaScript for dark mode toggle | |
| js = """ | |
| function toggleDarkMode() { | |
| const body = document.body; | |
| const isDark = body.classList.contains('dark'); | |
| if (isDark) { | |
| body.classList.remove('dark'); | |
| localStorage.setItem('theme', 'light'); | |
| document.getElementById('theme_toggle').textContent = 'π Dark Mode'; | |
| } else { | |
| body.classList.add('dark'); | |
| localStorage.setItem('theme', 'dark'); | |
| document.getElementById('theme_toggle').textContent = 'βοΈ Light Mode'; | |
| } | |
| } | |
| // Apply saved theme on load | |
| document.addEventListener('DOMContentLoaded', function() { | |
| const savedTheme = localStorage.getItem('theme'); | |
| if (savedTheme === 'dark') { | |
| document.body.classList.add('dark'); | |
| const btn = document.getElementById('theme_toggle'); | |
| if (btn) btn.textContent = 'βοΈ Light Mode'; | |
| } | |
| // Force resize when switching to Annotate tab to help PDF viewer | |
| document.body.addEventListener('click', function(e) { | |
| if (e.target && e.target.innerText && e.target.innerText.includes('Annotate')) { | |
| console.log('Annotate tab clicked - forcing resize'); | |
| setTimeout(() => { | |
| window.dispatchEvent(new Event('resize')); | |
| // Also try to find any canvases and nudge them | |
| document.querySelectorAll('canvas').forEach(c => { | |
| c.dispatchEvent(new Event('resize')); | |
| }); | |
| }, 500); | |
| } | |
| }, true); | |
| }); | |
| """ | |
| with gr.Blocks(title="Dataset Annotation Tool", css=css, js=js) as app: | |
| # Theme toggle button | |
| gr.HTML('<button id="theme_toggle" onclick="toggleDarkMode()">π Dark Mode</button>') | |
| gr.Markdown("# Dataset Annotation Tool") | |
| with gr.Tabs(): | |
| # Tab 1: Introduction and Instructions | |
| with gr.Tab("π Introduction & Instructions"): | |
| gr.Markdown(""" | |
| ## Welcome to the Dataset Annotation Tool | |
| This tool helps validate dataset mentions extracted from UNHCR and ReliefWeb documents. Your annotations will improve the accuracy of our dataset extraction model. | |
| ### What You'll Be Annotating | |
| You'll review **candidate dataset mentions** that our AI model has identified in humanitarian documents. Your task is to determine whether each mention is: | |
| - β **A Dataset**: A collection of data that can be referenced, analyzed, or used (e.g., surveys, databases, statistical reports) | |
| - β **Not a Dataset**: A document title, framework, strategy, or general reference that doesn't represent actual data | |
| ### About the Data | |
| - **Source**: UNHCR and ReliefWeb PDF documents | |
| - **Sampling**: Stratified sample across different mention types (named, descriptive, vague) | |
| - **AI Models**: | |
| - **Extraction Model**: Fine-tuned model that identified these mentions | |
| - **Judge (GPT-5.2)**: LLM-based validator that reviewed the extractions | |
| ### How to Annotate | |
| 1. **Review the Mention**: Read the **Dataset Name** and examine the **Context** (highlighted in yellow) | |
| 2. **Check Metadata**: Review document source, stratum, and geography information | |
| 3. **Compare AI Predictions** (Optional): Toggle "π€ Show what the AI thinks" to see model predictions | |
| 4. **Make Your Decision**: | |
| - Click **β DATASET** (green) if it's a valid dataset | |
| - Click **β NOT A DATASET** (red) if it's not a dataset | |
| 5. **Add Notes** (Optional): Document your reasoning for ambiguous cases | |
| 6. **Navigate**: Use Previous/Next buttons or skip to unannotated samples | |
| 7. **Save Progress**: | |
| - Click **πΎ Download Annotations** to backup locally | |
| - Auto-backup to HF Datasets (if configured) | |
| ### What Makes Something a Dataset? | |
| β **IS a Dataset:** | |
| - Survey data (e.g., "UNHCR Household Survey 2023") | |
| - Statistical databases (e.g., "Population Statistics Database") | |
| - Assessment results with data (e.g., "Needs Assessment 2024" when cited as data source) | |
| - Index datasets (e.g., "Multidimensional Poverty Index") | |
| - Monitoring data (e.g., "Protection Monitoring Data") | |
| β **NOT a Dataset:** | |
| - Report titles (e.g., "Global Trends Report 2024" as a publication) | |
| - Frameworks/strategies (e.g., "Global Compact on Refugees") | |
| - Assessment activities (e.g., "Rapid Assessment" as the activity itself) | |
| - General document references | |
| ### Tips for Accuracy | |
| - **Context is key**: The same term can be a dataset or not depending on usage | |
| - **Look for data indicators**: Numbers, statistics, "based on", "source:", "data from" | |
| - **When in doubt**: Add a note explaining your reasoning | |
| - **Be consistent**: Use the same criteria throughout your annotation session | |
| ### Your Impact | |
| Your annotations will: | |
| - Improve model precision and recall | |
| - Help identify patterns in false positives/negatives | |
| - Create training data for the next model version | |
| - Support better dataset discovery in humanitarian documents | |
| --- | |
| **Ready to start?** Click the **"Annotate"** tab above to begin! | |
| """) | |
| # Get initial values for robust first render | |
| init_name, init_context, init_metadata, init_ai, init_progress, init_status, init_nav, init_pdf, init_page = annotator.get_current_display() | |
| init_chunk_info = init_nav.get('chunk_info', '') | |
| init_dataset_in_chunk = init_nav.get('dataset_in_chunk', '') | |
| init_stats = annotator.get_statistics() | |
| # Tab 2: Annotation Interface | |
| with gr.Tab("βοΈ Annotate") as annotate_tab: | |
| gr.Markdown("Review and annotate dataset mentions. PDF viewer is below for reference.") | |
| # Top Section: Annotation Controls | |
| with gr.Row(): | |
| # Dataset Info & Context | |
| with gr.Column(scale=3): | |
| dataset_name = gr.Textbox(label="Dataset Name", value=init_name, interactive=False, max_lines=2) | |
| context_box = gr.HighlightedText( | |
| label="Context (Β±2 sentences, dataset highlighted)", | |
| value=init_context, | |
| color_map={"DATASET": "yellow"}, | |
| show_legend=False, | |
| combine_adjacent=True | |
| ) | |
| metadata_box = gr.Markdown(init_metadata, label="Metadata") | |
| show_ai_checkbox = gr.Checkbox(label="π€ Show what the AI thinks", value=False) | |
| ai_verdicts_box = gr.Markdown(init_ai, label="AI Analysis", visible=False) | |
| # Controls & Progress | |
| with gr.Column(scale=2): | |
| # Filter dropdown | |
| filter_dropdown = gr.Dropdown( | |
| choices=["All", "named", "descriptive", "vague", "non-dataset"], | |
| value="All", | |
| label="π Filter by Tag Type", | |
| interactive=True | |
| ) | |
| progress_box = gr.Textbox(label="Progress", value=init_progress, interactive=False, lines=1) | |
| chunk_info_box = gr.Textbox(label="Input Text Position", value=init_chunk_info, interactive=False, lines=1) | |
| dataset_in_chunk_box = gr.Textbox(label="Dataset in Chunk", value=init_dataset_in_chunk, interactive=False, lines=1) | |
| status_box = gr.Textbox(label="Status", value=init_status, interactive=False, lines=1) | |
| notes_box = gr.Textbox( | |
| label="Notes (optional)", | |
| placeholder="Add any comments about this dataset...", | |
| lines=3 | |
| ) | |
| with gr.Row(): | |
| accept_btn = gr.Button("β DATASET", variant="primary", size="lg", elem_id="accept_btn") | |
| reject_btn = gr.Button("β NOT A DATASET", variant="stop", size="lg") | |
| gr.Markdown("---") | |
| with gr.Row(): | |
| prev_btn = gr.Button("β Previous", size="sm") | |
| next_btn = gr.Button("Next β", size="sm") | |
| skip_btn = gr.Button("βοΈ Skip to Next Unannotated", size="sm") | |
| with gr.Accordion("π Live Statistics", open=False): | |
| stats_box = gr.Markdown(init_stats) | |
| # Download button for manual backup | |
| download_btn = gr.DownloadButton( | |
| "πΎ Download Annotations", | |
| value=str(annotator.output_file) if annotator.output_file.exists() else None, | |
| size="sm", | |
| variant="secondary" | |
| ) | |
| # HF Datasets status | |
| if annotator.hf_enabled: | |
| gr.Markdown(f"βοΈ **Auto-backup enabled:** [{annotator.hf_dataset_repo}](https://huggingface.co/datasets/{annotator.hf_dataset_repo})") | |
| else: | |
| gr.Markdown("β οΈ **Auto-backup disabled**") | |
| gr.Markdown(f"**Input:** `{Path(input_file).name}`") | |
| gr.Markdown("---") | |
| # Bottom Section: PDF Viewer (Full Width) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| if gr_pdf is None: | |
| gr.Markdown("### β οΈ `gradio-pdf` not found\nPlease run `uv pip install gradio-pdf` and restart.") | |
| pdf_viewer = gr.HTML(visible=False) | |
| else: | |
| # Use gradio-pdf component | |
| pdf_viewer = gr_pdf( | |
| label="Source Document", | |
| height=1000, | |
| visible=True | |
| ) | |
| refresh_pdf_btn = gr.Button("π Reload PDF Viewer", size="sm") | |
| # Hidden PDF component to authorize file serving | |
| if annotator.pdf_dir: | |
| gr.File(value=None, visible=False, interactive=False) | |
| nav_state = gr.State({}) | |
| def update_display(): | |
| print(f"π‘ Updating display for index {annotator.current_idx}...", flush=True) | |
| name, context, metadata, ai_verdicts, progress, status, nav, pdf_path, page_num = annotator.get_current_display() | |
| chunk_info = nav.get('chunk_info', '') | |
| dataset_in_chunk = nav.get('dataset_in_chunk', '') | |
| stats = annotator.get_statistics() | |
| # Use gr.update for gradio_pdf component | |
| pdf_update = gr.update(value=pdf_path, starting_page=page_num) | |
| print(f"πΌοΈ PDF Update: path={pdf_path}, page={page_num}", flush=True) | |
| return name, context, metadata, ai_verdicts, progress, chunk_info, dataset_in_chunk, status, nav, stats, pdf_update | |
| def accept_and_next(notes): | |
| name, context, metadata, ai_verdicts, progress, status = annotator.annotate('dataset', notes) | |
| _, _, _, _, _, _, nav, pdf_value, page_num = annotator.get_current_display() | |
| chunk_info = nav.get('chunk_info', '') | |
| dataset_in_chunk = nav.get('dataset_in_chunk', '') | |
| stats = annotator.get_statistics() | |
| # Use gr.update for gradio_pdf component | |
| pdf_update = gr.update(value=pdf_value, starting_page=page_num) | |
| return name, context, metadata, ai_verdicts, progress, chunk_info, dataset_in_chunk, status, "", nav, stats, pdf_update | |
| def reject_and_next(notes): | |
| name, context, metadata, ai_verdicts, progress, status = annotator.annotate('non-dataset', notes) | |
| _, _, _, _, _, _, nav, pdf_value, page_num = annotator.get_current_display() | |
| chunk_info = nav.get('chunk_info', '') | |
| dataset_in_chunk = nav.get('dataset_in_chunk', '') | |
| stats = annotator.get_statistics() | |
| # Use gr.update for gradio_pdf component | |
| pdf_update = gr.update(value=pdf_value, starting_page=page_num) | |
| return name, context, metadata, ai_verdicts, progress, chunk_info, dataset_in_chunk, status, "", nav, stats, pdf_update | |
| def go_next(): | |
| annotator.next_record() | |
| return update_display() | |
| def go_prev(): | |
| annotator.prev_record() | |
| return update_display() | |
| def skip_unannotated(): | |
| annotator.skip_to_next_unannotated() | |
| return update_display() | |
| def toggle_ai_verdicts(show_ai): | |
| if show_ai: | |
| # Get current AI verdicts content | |
| display_data = annotator.get_current_display() | |
| ai_verdicts = display_data[3] # ai_verdicts_str is the 4th value | |
| return gr.update(visible=True, value=ai_verdicts) | |
| return gr.update(visible=False) | |
| def get_download_file(): | |
| """Return the path to the annotations file for download.""" | |
| if annotator.output_file.exists(): | |
| return str(annotator.output_file) | |
| return None | |
| # Outputs - updated with chunk_info and dataset_in_chunk | |
| # Outputs - updated with chunk_info and dataset_in_chunk | |
| outputs_list = [dataset_name, context_box, metadata_box, ai_verdicts_box, progress_box, chunk_info_box, dataset_in_chunk_box, status_box, nav_state, stats_box, pdf_viewer] | |
| outputs_annotate = [dataset_name, context_box, metadata_box, ai_verdicts_box, progress_box, chunk_info_box, dataset_in_chunk_box, status_box, notes_box, nav_state, stats_box, pdf_viewer] | |
| accept_btn.click(accept_and_next, inputs=[notes_box], outputs=outputs_annotate).then( | |
| get_download_file, outputs=[download_btn] | |
| ) | |
| reject_btn.click(reject_and_next, inputs=[notes_box], outputs=outputs_annotate).then( | |
| get_download_file, outputs=[download_btn] | |
| ) | |
| next_btn.click(go_next, outputs=outputs_list) | |
| prev_btn.click(go_prev, outputs=outputs_list) | |
| skip_btn.click(skip_unannotated, outputs=outputs_list) | |
| def apply_filter(filter_value): | |
| annotator.set_filter(filter_value) | |
| return update_display() | |
| filter_dropdown.change(apply_filter, inputs=[filter_dropdown], outputs=outputs_list) | |
| show_ai_checkbox.change(toggle_ai_verdicts, inputs=[show_ai_checkbox], outputs=[ai_verdicts_box]) | |
| def initial_load_no_pdf(): | |
| """Initial load without PDF to avoid the blank page bug on first render. | |
| The PDF will be loaded when the user first clicks the Annotate tab.""" | |
| print("π Initial app load - PDF set to None (will load on tab select)", flush=True) | |
| name, context, metadata, ai_verdicts, progress, status, nav, pdf_path, page_num = annotator.get_current_display() | |
| chunk_info = nav.get('chunk_info', '') | |
| dataset_in_chunk = nav.get('dataset_in_chunk', '') | |
| stats = annotator.get_statistics() | |
| # Return None for PDF to avoid initial render bug | |
| pdf_update = gr.update(value=None) | |
| return name, context, metadata, ai_verdicts, progress, chunk_info, dataset_in_chunk, status, nav, stats, pdf_update | |
| # Load data when app starts - WITHOUT PDF to avoid blank page bug | |
| app.load(initial_load_no_pdf, outputs=outputs_list) | |
| # When Annotate tab is selected, load the PDF (this is the "second update" that triggers proper render) | |
| annotate_tab.select(update_display, outputs=outputs_list) | |
| refresh_pdf_btn.click(update_display, outputs=outputs_list) | |
| return app | |
| # For Hugging Face Spaces deployment | |
| if __name__ == "__main__": | |
| # Parse command line arguments | |
| parser = argparse.ArgumentParser(description="Dataset Annotation Tool") | |
| parser.add_argument("--input", "-i", type=str, default="validation_sample_filtering_retained.jsonl", | |
| help="Input JSONL file (default: validation_sample_filtering_retained.jsonl)") | |
| parser.add_argument("--pdf-dir", "-p", type=str, default=None, | |
| help="Directory containing local PDF files (optional)") | |
| parser.add_argument("--pdf-url-base", "-u", type=str, default=None, | |
| help="Base URL for remote PDFs (if not using local files)") | |
| args = parser.parse_args() | |
| # Check if file exists | |
| input_file = args.input | |
| if not Path(input_file).exists(): | |
| raise FileNotFoundError( | |
| f"Input file '{input_file}' not found. " | |
| "Please ensure the data file is in the repository." | |
| ) | |
| # Get HF credentials from environment (set in Space secrets) | |
| hf_dataset_repo = os.getenv("HF_DATASET_REPO") # e.g., "username/reliefweb-annotations" | |
| hf_token = os.getenv("HF_TOKEN") # HF write token | |
| # Determine PDF source: command-line args take priority, then env vars | |
| pdf_dir = args.pdf_dir | |
| pdf_url_base = args.pdf_url_base | |
| # If no explicit PDF source, check for HF PDF repo environment variable | |
| pdf_repo_id = None | |
| if not pdf_dir and not pdf_url_base: | |
| hf_pdf_repo = os.getenv("HF_RELIEFWEB_PDFS_REPO") # e.g., "ai4data/reliefweb-pdfs" | |
| if hf_pdf_repo: | |
| # Handle both formats: repo ID or full URL | |
| if hf_pdf_repo.startswith("https://"): | |
| # Already a full URL, use it directly (ensure it ends with /) | |
| pdf_url_base = hf_pdf_repo.rstrip('/') + '/' | |
| else: | |
| # Repo ID format - enabling server-side caching! | |
| pdf_repo_id = hf_pdf_repo | |
| # Also set url base as fallback | |
| pdf_url_base = f"https://huggingface.co/datasets/{hf_pdf_repo}/resolve/main/" | |
| print(f"π Using HF PDF repository: {hf_pdf_repo}", flush=True) | |
| if pdf_repo_id: | |
| print(f" π Server-side caching ENABLED for repo: {pdf_repo_id}", flush=True) | |
| print(f" PDF URL base (fallback): {pdf_url_base}", flush=True) | |
| else: | |
| print("β οΈ No PDF source configured. Set --pdf-dir, --pdf-url-base, or HF_RELIEFWEB_PDFS_REPO.", flush=True) | |
| # Create and launch the app | |
| app = create_app(input_file, hf_dataset_repo, hf_token, pdf_dir, pdf_url_base, pdf_repo_id) | |
| # Ensure allowed paths are absolute for Gradio (only needed for local files) | |
| allowed = [] | |
| if pdf_dir: | |
| pdf_dir_parent = str(Path(pdf_dir).parent.resolve()) | |
| allowed = [pdf_dir_parent] | |
| print(f"π Launching with allowed_paths: {allowed}", flush=True) | |
| print(f"π PDF Directory Check: {Path(pdf_dir).exists()}", flush=True) | |
| elif pdf_repo_id: | |
| # If caching from HF, we need to allow access to the HF cache directory | |
| # Typical path: ~/.cache/huggingface/hub | |
| # We'll allow the user's home directory to be safe/simple for now, | |
| # or we could try to resolve the specific cache path. | |
| # Allowing hierarchy up to home is usually robust for local caches. | |
| home_dir = str(Path.home().resolve()) | |
| allowed = [home_dir] | |
| print(f"π Launching with cached HF PDFs - Allowing access to: {allowed}", flush=True) | |
| else: | |
| print("π Launching with remote PDF URLs (no local allowed_paths needed)", flush=True) | |
| app.launch(allowed_paths=allowed, ssr_mode=False) | |