File size: 3,453 Bytes
ee65d10 72a7fea ee65d10 72a7fea ee65d10 72a7fea ee65d10 72a7fea ee65d10 72a7fea ee65d10 72a7fea ee65d10 72a7fea ee65d10 72a7fea ee65d10 72a7fea ee65d10 72a7fea ee65d10 72a7fea |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# mcp/alerts.py
#!/usr/bin/env python3
"""MedGenesis – saved‑query alert helper (async).
Monitors previously saved queries (PubMed/arXiv) and notifies when new
papers appear. Stores a lightweight JSON DB on disk (**locally inside
Space**), keeping the latest *N* paper links per query.
Changes vs. legacy version
~~~~~~~~~~~~~~~~~~~~~~~~~~
* **Thread‑safe JSON IO** with `asyncio.Lock` (handles simultaneous
Streamlit sessions).
* Exponential‑back‑off retry around `orchestrate_search` so one bad
request doesn’t kill the whole loop.
* Maximum DB size of 100 queries; oldest evicted automatically.
* Ensures atomic file write via `Path.write_text` to a temp file.
DB location: `<project_root>/data/medgen_alerts.json` (created on first run).
"""
from __future__ import annotations
import asyncio, json, tempfile
from pathlib import Path
from typing import Dict, List
from mcp.orchestrator import orchestrate_search
_DB_PATH = (Path(__file__).parent / "data" / "medgen_alerts.json").resolve()
_DB_PATH.parent.mkdir(exist_ok=True)
_MAX_IDS = 30 # keep last N paper links per query
_MAX_QUERIES = 100 # protect runaway DB growth
_lock = asyncio.Lock()
# ---------------------------------------------------------------------
# Internal JSON helpers
# ---------------------------------------------------------------------
def _read_db() -> Dict[str, List[str]]:
if _DB_PATH.exists():
try:
return json.loads(_DB_PATH.read_text())
except json.JSONDecodeError:
return {}
return {}
def _write_db(data: Dict[str, List[str]]):
# atomic write: write to tmp then replace
tmp = tempfile.NamedTemporaryFile("w", delete=False, dir=_DB_PATH.parent)
json.dump(data, tmp, indent=2)
tmp.flush()
Path(tmp.name).replace(_DB_PATH)
# ---------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------
async def check_alerts(queries: List[str]) -> Dict[str, List[str]]:
"""Return `{query: [fresh_links]}` for queries with new papers."""
async with _lock: # serialize DB read/write
db = _read_db()
new_links_map: Dict[str, List[str]] = {}
async def _process(q: str):
# Retry orchestrate_search up to 3× (2 s back‑off)
delay = 2
for _ in range(3):
try:
res = await orchestrate_search(q)
break
except Exception:
await asyncio.sleep(delay)
delay *= 2
else:
return # give up on this query
links = [p["link"] for p in res["papers"]]
prev = set(db.get(q, []))
fresh = [l for l in links if l not in prev]
if fresh:
new_links_map[q] = fresh
db[q] = links[:_MAX_IDS]
await asyncio.gather(*[_process(q) for q in queries])
# Trim DB if exceeding query cap
if len(db) > _MAX_QUERIES:
for key in list(db.keys())[ _MAX_QUERIES: ]:
db.pop(key, None)
async with _lock:
_write_db(db)
return new_links_map
# ---------------------------------------------------------------------
# CLI demo
# ---------------------------------------------------------------------
if __name__ == "__main__":
async def _demo():
demo_map = await check_alerts(["glioblastoma CRISPR"])
print("Fresh:", demo_map)
asyncio.run(_demo())
|