Spaces:
Paused
Paused
| import os, re, sqlite3, datetime, requests | |
| from pathlib import Path | |
| from typing import Optional, List, Dict | |
| DATA_DIR = Path("data") | |
| PROV_DB = "provenance.db" | |
| # ---------- SQLite provenance ---------- | |
| def init_provenance_db(db_path: str = PROV_DB): | |
| conn = sqlite3.connect(db_path) | |
| c = conn.cursor() | |
| c.execute(""" | |
| CREATE TABLE IF NOT EXISTS retrieved_docs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| foundation_id INTEGER NOT NULL, | |
| year INTEGER, | |
| title TEXT, | |
| doc_type TEXT, | |
| file_path TEXT, | |
| source_url TEXT, | |
| fetched_at TEXT DEFAULT CURRENT_TIMESTAMP | |
| )""") | |
| conn.commit(); conn.close() | |
| def log_provenance(foundation_id: int, year: Optional[int], title: str, | |
| doc_type: str, file_path: str, source_url: str, | |
| db_path: str = PROV_DB): | |
| conn = sqlite3.connect(db_path); c = conn.cursor() | |
| c.execute("""INSERT INTO retrieved_docs | |
| (foundation_id, year, title, doc_type, file_path, source_url, fetched_at) | |
| VALUES (?,?,?,?,?,?,?)""", | |
| (foundation_id, year, title, doc_type, file_path, source_url, | |
| datetime.datetime.now().isoformat())) | |
| conn.commit(); conn.close() | |
| # ---------- Filesystem ---------- | |
| def safe_filename(name: str) -> str: | |
| name = re.sub(r"[^\w\-. ]+", "_", name) | |
| return re.sub(r"\s+", "_", name).strip("_") | |
| def ensure_foundation_year_dir(fid: int, year: Optional[int]) -> Path: | |
| base = DATA_DIR / f"{fid}_data" | |
| if year: base = base / str(year) | |
| base.mkdir(parents=True, exist_ok=True) | |
| return base | |
| def download_pdf(url: str, save_dir: Path, preferred_name: Optional[str] = None) -> str: | |
| filename = preferred_name or url.split("/")[-1].split("?")[0] | |
| if not filename.lower().endswith(".pdf"): | |
| filename += ".pdf" | |
| filename = safe_filename(filename) | |
| target = save_dir / filename | |
| r = requests.get(url, stream=True, timeout=30); r.raise_for_status() | |
| with open(target, "wb") as f: | |
| for chunk in r.iter_content(8192): | |
| if chunk: f.write(chunk) | |
| return str(target) | |
| # ---------- SerpAPI search ---------- | |
| def serpapi_search(query: str, num_results: int = 20, serpapi_key: Optional[str] = None) -> List[Dict]: | |
| key = serpapi_key or os.getenv("SERPAPI_KEY") | |
| if not key: | |
| raise RuntimeError("SERPAPI_KEY not set (add it in HF Space Secrets).") | |
| params = {"engine": "google", "q": query, "num": num_results, "api_key": key} | |
| resp = requests.get("https://serpapi.com/search", params=params, timeout=20) | |
| resp.raise_for_status() | |
| return resp.json().get("organic_results", []) | |
| def _is_pdf_link(link: str) -> bool: | |
| l = link.lower() | |
| return l.endswith(".pdf") or (".pdf" in l) | |
| def score_candidate(item: Dict, foundation_name: str, year: Optional[int]) -> float: | |
| title = (item.get("title") or "").lower() | |
| link = (item.get("link") or "").lower() | |
| score = 0.0 | |
| if any(k in title for k in ["annual", "report", "jahresbericht", "rapport", "rapport annuel"]): score += 2 | |
| if foundation_name.lower()[:10] in title or foundation_name.lower()[:10] in link: score += 1.5 | |
| if year and (str(year) in title or str(year) in link): score += 1.5 | |
| if _is_pdf_link(link): score += 1.0 | |
| return score | |
| def find_best_report_url(foundation_name: str, year: Optional[int], extra_terms: Optional[str], serpapi_key: Optional[str]) -> Optional[Dict]: | |
| q = f'{foundation_name} annual report' | |
| if year: q += f' {year}' | |
| if extra_terms: q += f' {extra_terms}' | |
| q += ' filetype:pdf site:org | site:ch | site:foundation | site:stiftung | site:fondation' | |
| results = serpapi_search(q, num_results=20, serpapi_key=serpapi_key) | |
| if not results: return None | |
| ranked = sorted(results, key=lambda r: score_candidate(r, foundation_name, year), reverse=True) | |
| return ranked[0] | |