File size: 3,453 Bytes
ee65d10
72a7fea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ee65d10
72a7fea
ee65d10
72a7fea
ee65d10
72a7fea
ee65d10
 
 
72a7fea
 
ee65d10
72a7fea
 
 
 
 
 
 
 
ee65d10
 
72a7fea
 
 
 
 
ee65d10
 
 
 
72a7fea
 
 
 
 
ee65d10
72a7fea
 
 
ee65d10
72a7fea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ee65d10
 
 
 
72a7fea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# mcp/alerts.py
#!/usr/bin/env python3
"""MedGenesis – saved‑query alert helper (async).

Monitors previously saved queries (PubMed/arXiv) and notifies when new
papers appear.  Stores a lightweight JSON DB on disk (**locally inside
Space**), keeping the latest *N* paper links per query.

Changes vs. legacy version
~~~~~~~~~~~~~~~~~~~~~~~~~~
* **Thread‑safe JSON IO** with `asyncio.Lock` (handles simultaneous
  Streamlit sessions).
* Exponential‑back‑off retry around `orchestrate_search` so one bad
  request doesn’t kill the whole loop.
* Maximum DB size of 100 queries; oldest evicted automatically.
* Ensures atomic file write via `Path.write_text` to a temp file.

DB location: `<project_root>/data/medgen_alerts.json` (created on first run).
"""
from __future__ import annotations

import asyncio, json, tempfile
from pathlib import Path
from typing import Dict, List

from mcp.orchestrator import orchestrate_search

_DB_PATH = (Path(__file__).parent / "data" / "medgen_alerts.json").resolve()
_DB_PATH.parent.mkdir(exist_ok=True)

_MAX_IDS    = 30   # keep last N paper links per query
_MAX_QUERIES = 100  # protect runaway DB growth

_lock = asyncio.Lock()

# ---------------------------------------------------------------------
# Internal JSON helpers
# ---------------------------------------------------------------------

def _read_db() -> Dict[str, List[str]]:
    if _DB_PATH.exists():
        try:
            return json.loads(_DB_PATH.read_text())
        except json.JSONDecodeError:
            return {}
    return {}


def _write_db(data: Dict[str, List[str]]):
    # atomic write: write to tmp then replace
    tmp = tempfile.NamedTemporaryFile("w", delete=False, dir=_DB_PATH.parent)
    json.dump(data, tmp, indent=2)
    tmp.flush()
    Path(tmp.name).replace(_DB_PATH)

# ---------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------
async def check_alerts(queries: List[str]) -> Dict[str, List[str]]:
    """Return `{query: [fresh_links]}` for queries with new papers."""
    async with _lock:  # serialize DB read/write
        db = _read_db()

    new_links_map: Dict[str, List[str]] = {}

    async def _process(q: str):
        # Retry orchestrate_search up to 3× (2 s back‑off)
        delay = 2
        for _ in range(3):
            try:
                res = await orchestrate_search(q)
                break
            except Exception:
                await asyncio.sleep(delay)
                delay *= 2
        else:
            return  # give up on this query

        links = [p["link"] for p in res["papers"]]
        prev  = set(db.get(q, []))
        fresh = [l for l in links if l not in prev]
        if fresh:
            new_links_map[q] = fresh
        db[q] = links[:_MAX_IDS]

    await asyncio.gather(*[_process(q) for q in queries])

    # Trim DB if exceeding query cap
    if len(db) > _MAX_QUERIES:
        for key in list(db.keys())[ _MAX_QUERIES: ]:
            db.pop(key, None)

    async with _lock:
        _write_db(db)

    return new_links_map


# ---------------------------------------------------------------------
# CLI demo
# ---------------------------------------------------------------------
if __name__ == "__main__":
    async def _demo():
        demo_map = await check_alerts(["glioblastoma CRISPR"])
        print("Fresh:", demo_map)
    asyncio.run(_demo())