Spaces:
Sleeping
Sleeping
| """Build a provider-backed fallback snapshot cache.""" | |
| from __future__ import annotations | |
| import json | |
| from pathlib import Path | |
| import sys | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from typing import Dict, List | |
| import requests | |
| ROOT = Path(__file__).resolve().parent.parent | |
| if str(ROOT) not in sys.path: | |
| sys.path.insert(0, str(ROOT)) | |
| from server.cases import EPSS_URL, NVD_CVE_URL, OSV_VULN_URL, _extract_cve_id | |
| SNAPSHOT_DIR = ROOT / "data" / "snapshots" | |
| INDEX_PATH = ROOT / "data" / "snapshot_index.json" | |
| PYPA_TREE_URL = "https://api.github.com/repos/pypa/advisory-database/git/trees/main?recursive=1" | |
| def get_candidate_ids(limit: int = 200) -> List[str]: | |
| response = requests.get(PYPA_TREE_URL, timeout=30) | |
| response.raise_for_status() | |
| tree = response.json().get("tree", []) | |
| ids = [] | |
| for item in tree: | |
| path = item.get("path", "") | |
| if not path.startswith("vulns/") or not path.endswith(".yaml"): | |
| continue | |
| ident = path.rsplit("/", 1)[-1][:-5] | |
| if ident.startswith(("PYSEC-", "GHSA-")): | |
| ids.append(ident) | |
| return ids[: limit * 4] | |
| def fetch_json(url: str, *, params: Dict[str, str] | None = None) -> Dict: | |
| response = requests.get(url, params=params, timeout=20) | |
| response.raise_for_status() | |
| return response.json() | |
| def build_snapshot(osv_id: str) -> Dict | None: | |
| osv = fetch_json(OSV_VULN_URL.format(osv_id=osv_id)) | |
| if not osv.get("affected"): | |
| return None | |
| cve_id = _extract_cve_id(osv) | |
| snapshot = { | |
| "id": osv.get("id"), | |
| "summary": osv.get("summary"), | |
| "details": osv.get("details"), | |
| "aliases": osv.get("aliases", []), | |
| "references": osv.get("references", []), | |
| "affected": osv.get("affected", []), | |
| "severity": "MEDIUM", | |
| "nvd_description": "", | |
| "epss_score": 0.0, | |
| "epss_percentile": 0.0, | |
| } | |
| if cve_id: | |
| try: | |
| nvd = fetch_json(NVD_CVE_URL, params={"cveId": cve_id}) | |
| vulnerability = (nvd.get("vulnerabilities") or [{}])[0].get("cve", {}) | |
| metrics = vulnerability.get("metrics", {}) | |
| severity = None | |
| for key in ("cvssMetricV40", "cvssMetricV31", "cvssMetricV30", "cvssMetricV2"): | |
| if key in metrics: | |
| item = metrics[key][0] | |
| severity = ( | |
| item.get("cvssData", {}).get("baseSeverity") | |
| or item.get("baseSeverity") | |
| ) | |
| if severity: | |
| break | |
| descriptions = vulnerability.get("descriptions", []) | |
| snapshot["severity"] = severity or snapshot["severity"] | |
| snapshot["nvd_description"] = next( | |
| ( | |
| desc.get("value", "") | |
| for desc in descriptions | |
| if desc.get("lang") == "en" | |
| ), | |
| descriptions[0].get("value", "") if descriptions else "", | |
| ) | |
| except Exception: | |
| pass | |
| try: | |
| epss = fetch_json(EPSS_URL, params={"cve": cve_id}) | |
| item = (epss.get("data") or [{}])[0] | |
| snapshot["epss_score"] = float(item.get("epss", 0.0) or 0.0) | |
| snapshot["epss_percentile"] = float(item.get("percentile", 0.0) or 0.0) | |
| except Exception: | |
| pass | |
| return snapshot | |
| def main(target_count: int = 200) -> None: | |
| SNAPSHOT_DIR.mkdir(parents=True, exist_ok=True) | |
| candidates = get_candidate_ids(target_count)[: max(target_count + 40, 240)] | |
| saved = [] | |
| with ThreadPoolExecutor(max_workers=12) as executor: | |
| futures = {executor.submit(build_snapshot, osv_id): osv_id for osv_id in candidates} | |
| for future in as_completed(futures): | |
| if len(saved) >= target_count: | |
| executor.shutdown(wait=False, cancel_futures=True) | |
| break | |
| osv_id = futures[future] | |
| try: | |
| snapshot = future.result() | |
| except Exception: | |
| continue | |
| if not snapshot: | |
| continue | |
| out_path = SNAPSHOT_DIR / f"{osv_id}.json" | |
| out_path.write_text(json.dumps(snapshot, indent=2, sort_keys=True)) | |
| saved.append( | |
| { | |
| "osv_id": osv_id, | |
| "file": str(out_path.relative_to(ROOT)), | |
| "cve_id": _extract_cve_id(snapshot), | |
| "package": (snapshot.get("affected") or [{}])[0].get("package", {}).get("name", ""), | |
| } | |
| ) | |
| INDEX_PATH.parent.mkdir(parents=True, exist_ok=True) | |
| saved = sorted(saved, key=lambda item: item["osv_id"]) | |
| INDEX_PATH.write_text(json.dumps({"count": len(saved), "snapshots": saved}, indent=2)) | |
| print(f"Saved {len(saved)} snapshots to {SNAPSHOT_DIR}") | |
| if __name__ == "__main__": | |
| main() | |