File size: 3,844 Bytes
c175b07
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import os, re, sqlite3, datetime, requests
from pathlib import Path
from typing import Optional, List, Dict

DATA_DIR = Path("data")
PROV_DB = "provenance.db"

# ---------- SQLite provenance ----------
def init_provenance_db(db_path: str = PROV_DB):
    conn = sqlite3.connect(db_path)
    c = conn.cursor()
    c.execute("""
    CREATE TABLE IF NOT EXISTS retrieved_docs (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        foundation_id INTEGER NOT NULL,
        year INTEGER,
        title TEXT,
        doc_type TEXT,
        file_path TEXT,
        source_url TEXT,
        fetched_at TEXT DEFAULT CURRENT_TIMESTAMP
    )""")
    conn.commit(); conn.close()

def log_provenance(foundation_id: int, year: Optional[int], title: str,
                   doc_type: str, file_path: str, source_url: str,
                   db_path: str = PROV_DB):
    conn = sqlite3.connect(db_path); c = conn.cursor()
    c.execute("""INSERT INTO retrieved_docs
        (foundation_id, year, title, doc_type, file_path, source_url, fetched_at)
        VALUES (?,?,?,?,?,?,?)""",
        (foundation_id, year, title, doc_type, file_path, source_url,
         datetime.datetime.now().isoformat()))
    conn.commit(); conn.close()

# ---------- Filesystem ----------
def safe_filename(name: str) -> str:
    name = re.sub(r"[^\w\-. ]+", "_", name)
    return re.sub(r"\s+", "_", name).strip("_")

def ensure_foundation_year_dir(fid: int, year: Optional[int]) -> Path:
    base = DATA_DIR / f"{fid}_data"
    if year: base = base / str(year)
    base.mkdir(parents=True, exist_ok=True)
    return base

def download_pdf(url: str, save_dir: Path, preferred_name: Optional[str] = None) -> str:
    filename = preferred_name or url.split("/")[-1].split("?")[0]
    if not filename.lower().endswith(".pdf"):
        filename += ".pdf"
    filename = safe_filename(filename)
    target = save_dir / filename
    r = requests.get(url, stream=True, timeout=30); r.raise_for_status()
    with open(target, "wb") as f:
        for chunk in r.iter_content(8192):
            if chunk: f.write(chunk)
    return str(target)

# ---------- SerpAPI search ----------
def serpapi_search(query: str, num_results: int = 20, serpapi_key: Optional[str] = None) -> List[Dict]:
    key = serpapi_key or os.getenv("SERPAPI_KEY")
    if not key:
        raise RuntimeError("SERPAPI_KEY not set (add it in HF Space Secrets).")
    params = {"engine": "google", "q": query, "num": num_results, "api_key": key}
    resp = requests.get("https://serpapi.com/search", params=params, timeout=20)
    resp.raise_for_status()
    return resp.json().get("organic_results", [])

def _is_pdf_link(link: str) -> bool:
    l = link.lower()
    return l.endswith(".pdf") or (".pdf" in l)

def score_candidate(item: Dict, foundation_name: str, year: Optional[int]) -> float:
    title = (item.get("title") or "").lower()
    link = (item.get("link") or "").lower()
    score = 0.0
    if any(k in title for k in ["annual", "report", "jahresbericht", "rapport", "rapport annuel"]): score += 2
    if foundation_name.lower()[:10] in title or foundation_name.lower()[:10] in link: score += 1.5
    if year and (str(year) in title or str(year) in link): score += 1.5
    if _is_pdf_link(link): score += 1.0
    return score

def find_best_report_url(foundation_name: str, year: Optional[int], extra_terms: Optional[str], serpapi_key: Optional[str]) -> Optional[Dict]:
    q = f'{foundation_name} annual report'
    if year: q += f' {year}'
    if extra_terms: q += f' {extra_terms}'
    q += ' filetype:pdf site:org | site:ch | site:foundation | site:stiftung | site:fondation'
    results = serpapi_search(q, num_results=20, serpapi_key=serpapi_key)
    if not results: return None
    ranked = sorted(results, key=lambda r: score_candidate(r, foundation_name, year), reverse=True)
    return ranked[0]