| """Verification service for BibTeX entries. |
| |
| This service extracts the core verification logic from app.py, |
| making it reusable for both Gradio UI and FastAPI endpoints. |
| """ |
| import tempfile |
| import threading |
| from pathlib import Path |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| from dataclasses import dataclass |
| from typing import Optional |
|
|
| from src.parsers import BibParser |
| from src.fetchers import ( |
| ArxivFetcher, |
| ScholarFetcher, |
| CrossRefFetcher, |
| SemanticScholarFetcher, |
| OpenAlexFetcher, |
| DBLPFetcher, |
| ) |
| from src.analyzers import MetadataComparator, DuplicateDetector |
| from src.report.generator import EntryReport |
| from src.config.workflow import get_default_workflow |
| from src.utils.normalizer import TextNormalizer |
| from src.core.config import settings |
| from src.core.logging import get_logger |
| from src.core.exceptions import ParserException, FetcherException |
|
|
| logger = get_logger(__name__) |
|
|
|
|
| @dataclass |
| class VerificationResult: |
| """Result of BibTeX verification.""" |
|
|
| entry_reports: list[EntryReport] |
| duplicate_groups: list |
| verified_count: int |
| warning_count: int |
| error_count: int |
| total_count: int |
|
|
| @property |
| def success_rate(self) -> float: |
| """Calculate success rate.""" |
| if self.total_count == 0: |
| return 0.0 |
| return (self.verified_count / self.total_count) * 100 |
|
|
|
|
| class VerificationService: |
| """Service for verifying BibTeX entries against academic databases.""" |
|
|
| def __init__(self): |
| """Initialize verification service.""" |
| self.parser = BibParser() |
| self.arxiv_fetcher = ArxivFetcher() |
| self.crossref_fetcher = CrossRefFetcher() |
| self.scholar_fetcher = ScholarFetcher() |
| self.semantic_scholar_fetcher = SemanticScholarFetcher() |
| self.openalex_fetcher = OpenAlexFetcher() |
| self.dblp_fetcher = DBLPFetcher() |
| self.comparator = MetadataComparator() |
| self.duplicate_detector = DuplicateDetector() |
| logger.info("VerificationService initialized") |
|
|
| def verify_bibtex_string( |
| self, |
| bibtex_content: str, |
| progress_callback: Optional[callable] = None, |
| ) -> VerificationResult: |
| """Verify BibTeX content from string. |
| |
| Args: |
| bibtex_content: BibTeX content as string |
| progress_callback: Optional callback for progress updates (progress, desc) |
| |
| Returns: |
| VerificationResult with all verification data |
| |
| Raises: |
| ParserException: If BibTeX parsing fails |
| FetcherException: If fetching fails |
| """ |
| if not bibtex_content.strip(): |
| raise ParserException("Empty BibTeX content provided") |
|
|
| logger.info("Starting BibTeX verification") |
|
|
| |
| try: |
| if progress_callback: |
| progress_callback(0, "Parsing BibTeX...") |
|
|
| |
| with tempfile.NamedTemporaryFile( |
| mode="w", suffix=".bib", delete=False, encoding="utf-8" |
| ) as f: |
| f.write(bibtex_content) |
| temp_bib_path = f.name |
|
|
| entries = self.parser.parse_file(temp_bib_path) |
| Path(temp_bib_path).unlink() |
|
|
| if not entries: |
| raise ParserException("No valid BibTeX entries found") |
|
|
| logger.info(f"Parsed {len(entries)} BibTeX entries") |
|
|
| except Exception as e: |
| logger.error(f"BibTeX parsing failed: {e}") |
| raise ParserException(f"Failed to parse BibTeX: {str(e)}") |
|
|
| |
| duplicate_groups = self.duplicate_detector.find_duplicates(entries) |
| if duplicate_groups: |
| logger.warning(f"Found {len(duplicate_groups)} duplicate groups") |
|
|
| |
| workflow_config = get_default_workflow() |
|
|
| |
| entry_reports = [] |
| progress_lock = threading.Lock() |
| verified_count = 0 |
| warning_count = 0 |
| error_count = 0 |
|
|
| if progress_callback: |
| progress_callback(0.1, "Initializing fetchers...") |
|
|
| def process_single_entry(entry, idx, total): |
| """Process a single BibTeX entry.""" |
| comparison_result = None |
| all_results = [] |
|
|
| for step in workflow_config.get_enabled_steps(): |
| result = None |
|
|
| try: |
| if step.name == "arxiv_id" and entry.has_arxiv and self.arxiv_fetcher: |
| arxiv_meta = self.arxiv_fetcher.fetch_by_id(entry.arxiv_id) |
| if arxiv_meta: |
| result = self.comparator.compare_with_arxiv(entry, arxiv_meta) |
|
|
| elif step.name == "crossref_doi" and entry.doi and self.crossref_fetcher: |
| crossref_result = self.crossref_fetcher.search_by_doi(entry.doi) |
| if crossref_result: |
| result = self.comparator.compare_with_crossref(entry, crossref_result) |
|
|
| elif step.name == "semantic_scholar" and entry.title and self.semantic_scholar_fetcher: |
| ss_result = ( |
| self.semantic_scholar_fetcher.fetch_by_doi(entry.doi) |
| if entry.doi |
| else None |
| ) |
| if not ss_result: |
| ss_result = self.semantic_scholar_fetcher.search_by_title(entry.title) |
| if ss_result: |
| result = self.comparator.compare_with_semantic_scholar(entry, ss_result) |
|
|
| elif step.name == "dblp" and entry.title and self.dblp_fetcher: |
| dblp_result = self.dblp_fetcher.search_by_title(entry.title) |
| if dblp_result: |
| result = self.comparator.compare_with_dblp(entry, dblp_result) |
|
|
| elif step.name == "openalex" and entry.title and self.openalex_fetcher: |
| oa_result = ( |
| self.openalex_fetcher.fetch_by_doi(entry.doi) |
| if entry.doi |
| else None |
| ) |
| if not oa_result: |
| oa_result = self.openalex_fetcher.search_by_title(entry.title) |
| if oa_result: |
| result = self.comparator.compare_with_openalex(entry, oa_result) |
|
|
| elif step.name == "arxiv_title" and entry.title and self.arxiv_fetcher: |
| results = self.arxiv_fetcher.search_by_title(entry.title, max_results=3) |
| if results: |
| best_result = None |
| best_sim = 0.0 |
| norm1 = TextNormalizer.normalize_for_comparison(entry.title) |
| for r in results: |
| sim = TextNormalizer.similarity_ratio( |
| norm1, |
| TextNormalizer.normalize_for_comparison(r.title), |
| ) |
| if sim > best_sim: |
| best_sim, best_result = sim, r |
| if best_result and best_sim > 0.5: |
| result = self.comparator.compare_with_arxiv(entry, best_result) |
|
|
| elif step.name == "crossref_title" and entry.title and self.crossref_fetcher: |
| crossref_result = self.crossref_fetcher.search_by_title(entry.title) |
| if crossref_result: |
| result = self.comparator.compare_with_crossref(entry, crossref_result) |
|
|
| elif step.name == "google_scholar" and entry.title and self.scholar_fetcher: |
| scholar_result = self.scholar_fetcher.search_by_title(entry.title) |
| if scholar_result: |
| result = self.comparator.compare_with_scholar(entry, scholar_result) |
|
|
| except Exception as e: |
| logger.warning(f"Error in step {step.name} for entry {entry.key}: {e}") |
| continue |
|
|
| if result: |
| all_results.append(result) |
| if result.is_match: |
| comparison_result = result |
| break |
|
|
| |
| if not comparison_result and all_results: |
| all_results.sort(key=lambda r: r.confidence, reverse=True) |
| comparison_result = all_results[0] |
| elif not comparison_result: |
| comparison_result = self.comparator.create_unable_result( |
| entry, "Unable to find this paper in any data source" |
| ) |
|
|
| return EntryReport(entry=entry, comparison=comparison_result) |
|
|
| |
| max_workers = min(settings.max_workers, len(entries)) |
| logger.info(f"Processing {len(entries)} entries with {max_workers} workers") |
|
|
| with ThreadPoolExecutor(max_workers=max_workers) as executor: |
| future_to_entry = { |
| executor.submit(process_single_entry, e, i, len(entries)): (e, i) |
| for i, e in enumerate(entries) |
| } |
|
|
| for future in as_completed(future_to_entry): |
| entry, idx = future_to_entry[future] |
| try: |
| entry_report = future.result() |
| with progress_lock: |
| entry_reports.append(entry_report) |
|
|
| if entry_report.comparison and entry_report.comparison.is_match: |
| verified_count += 1 |
| elif entry_report.comparison and entry_report.comparison.has_issues: |
| warning_count += 1 |
| else: |
| error_count += 1 |
|
|
| if progress_callback: |
| progress_callback( |
| 0.1 + (0.9 * (idx + 1) / len(entries)), |
| f"Verifying entries {idx + 1}/{len(entries)}...", |
| ) |
|
|
| except Exception as e: |
| with progress_lock: |
| error_count += 1 |
| logger.error(f"Error processing entry {entry.key}: {e}") |
|
|
| logger.info( |
| f"Verification complete: {verified_count} verified, " |
| f"{warning_count} warnings, {error_count} errors" |
| ) |
|
|
| return VerificationResult( |
| entry_reports=entry_reports, |
| duplicate_groups=duplicate_groups, |
| verified_count=verified_count, |
| warning_count=warning_count, |
| error_count=error_count, |
| total_count=len(entries), |
| ) |
|
|