|
import requests |
|
import uuid |
|
import json |
|
import re |
|
import xml.etree.ElementTree as ET |
|
from bs4 import BeautifulSoup |
|
from datetime import datetime |
|
import os |
|
import openai |
|
import urllib.parse |
|
from dotenv import load_dotenv |
|
import time |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
def get_openai_api_key(): |
|
"""Get OpenAI API key from environment variables""" |
|
api_key = os.environ.get("OPENAI_API_KEY") |
|
if not api_key: |
|
raise ValueError("OPENAI_API_KEY environment variable is not set") |
|
return api_key |
|
|
|
|
|
openai.api_key = get_openai_api_key() |
|
|
|
|
|
SYSTEM_PROMPT = """You are an advanced clinical AI assistant designed to aid healthcare professionals. |
|
Follow these guidelines in all responses: |
|
1. **Answer Directly First**: Begin by providing your best answer based on available information. If information is limited, provide your assessment based on what is known, and indicate areas of uncertainty. |
|
2. **Follow with Clarifying Questions**: After giving your initial assessment, include specific follow-up questions that would help refine your answer. These should be clearly labeled in a separate "Follow-up Questions:" section. |
|
3. Professional tone: Maintain a clear, respectful, and professional tone appropriate for medical consultation. |
|
4. Evidence-based practice: Base all responses on current medical evidence and guidelines. |
|
5. Transparency: Clearly distinguish between established medical facts, clinical guidance, and areas of uncertainty. |
|
6. Structured analysis: Present information in a clear, organized manner following clinical reasoning patterns. |
|
7. Citation: Always cite specific sources for medical claims when available using the [PMID:123456] format, where 123456 is the actual PubMed ID number. |
|
8. Limitations: Acknowledge the limits of AI medical advice and recommend in-person consultation when appropriate. |
|
9. Comprehensive approach: Consider differential diagnoses and relevant contextual factors. |
|
10. Patient-centered: Focus on clinically relevant information while maintaining respect for the patient. |
|
For each consultation: |
|
1. Provide an initial assessment based on available information (as per guideline 1). |
|
2. Include specific follow-up questions (as per guideline 2). |
|
3. Provide differential diagnosis with likelihood assessment. |
|
4. Suggest appropriate next steps (testing, treatment, referral). |
|
5. Include reasoning for your conclusions. |
|
6. Cite medical literature or guidelines supporting your assessment using [PMID:123456]. |
|
IMPORTANT: Your primary duty is to support clinical decision-making, not replace clinical judgment. |
|
""" |
|
|
|
FOLLOW_UP_PROMPT = """Continue this medical consultation based on the previous discussion. |
|
Consider the information already gathered and the tentative diagnosis/plan. |
|
When responding to the follow-up: |
|
1. Directly address the follow-up question with evidence-based information. |
|
2. Reference relevant details from the prior conversation. |
|
3. If additional information would be helpful, include specific follow-up questions in a clearly labeled "Follow-up Questions:" section. |
|
4. Update recommendations if appropriate based on new information. |
|
5. Maintain the same structured approach with transparent reasoning. |
|
6. Cite additional medical literature or guidelines when relevant using [PMID:123456]. |
|
Remember that this is an ongoing consultation where continuity of care is important. |
|
""" |
|
|
|
|
|
def extract_and_link_sources(text, evidence_snippets): |
|
""" |
|
Replace [PMID:123456] citation placeholders with actual links to PubMed articles. |
|
Also handles DOI citations and other citation formats for compatibility. |
|
|
|
Args: |
|
text (str): Text containing citations |
|
evidence_snippets (list): List of evidence snippets with metadata |
|
|
|
Returns: |
|
tuple: (text with citations replaced with links, map of source IDs to metadata) |
|
""" |
|
|
|
pmid_pattern = r'\[PMID:(\d+)\]' |
|
|
|
doi_pattern = r'\[DOI:(10\.\d+\/[^\]]+)\]' |
|
|
|
source_pattern = r'\[([\w\d:_\-\.+]+)\]' |
|
|
|
|
|
pmid_matches = re.findall(pmid_pattern, text) |
|
|
|
doi_matches = re.findall(doi_pattern, text) |
|
|
|
source_matches = re.findall(source_pattern, text) |
|
|
|
|
|
source_matches = [s for s in source_matches if not (s.startswith('PMID:') or s.startswith('DOI:'))] |
|
|
|
|
|
source_map = {} |
|
|
|
|
|
for pmid in pmid_matches: |
|
for snippet in evidence_snippets: |
|
|
|
if 'pmid' in snippet and snippet['pmid'] == pmid: |
|
source_map[f"PMID:{pmid}"] = { |
|
"id": snippet["id"], |
|
"title": snippet["title"].strip(), |
|
"url": snippet["url"], |
|
"citation": snippet["citation"], |
|
"pmid": pmid |
|
} |
|
break |
|
|
|
elif snippet["id"] == f"PMID:{pmid}": |
|
source_map[f"PMID:{pmid}"] = { |
|
"id": snippet["id"], |
|
"title": snippet["title"].strip(), |
|
"url": snippet["url"], |
|
"citation": snippet["citation"], |
|
"pmid": pmid |
|
} |
|
break |
|
|
|
|
|
for doi in doi_matches: |
|
for snippet in evidence_snippets: |
|
|
|
if 'doi' in snippet and snippet['doi'] == doi: |
|
source_map[f"DOI:{doi}"] = { |
|
"id": snippet.get("id", f"DOI:{doi}"), |
|
"title": snippet["title"].strip(), |
|
"url": snippet["url"], |
|
"citation": snippet["citation"], |
|
"doi": doi |
|
} |
|
break |
|
|
|
elif snippet.get("id") == f"DOI:{doi}": |
|
source_map[f"DOI:{doi}"] = { |
|
"id": snippet["id"], |
|
"title": snippet["title"].strip(), |
|
"url": snippet["url"], |
|
"citation": snippet["citation"], |
|
"doi": doi |
|
} |
|
break |
|
|
|
|
|
for source_id_match in source_matches: |
|
if source_id_match not in source_map and source_id_match != "source_id": |
|
for snippet in evidence_snippets: |
|
if source_id_match == snippet["id"]: |
|
source_map[source_id_match] = { |
|
"id": snippet["id"], |
|
"title": snippet["title"].strip(), |
|
"url": snippet["url"], |
|
"citation": snippet["citation"], |
|
"pmid": snippet.get("pmid", ""), |
|
"doi": snippet.get("doi", "") |
|
} |
|
break |
|
|
|
|
|
linked_text = text |
|
for pmid_key in [f"PMID:{pmid}" for pmid in pmid_matches]: |
|
if pmid_key in source_map: |
|
source_data = source_map[pmid_key] |
|
safe_key = re.escape(pmid_key) |
|
pattern = f"\\[{safe_key}\\]" |
|
|
|
|
|
short_title = source_data['title'][:60] + "..." if len(source_data['title']) > 60 else source_data['title'] |
|
replacement = f"[{short_title}]({source_data['url']})" |
|
|
|
linked_text = re.sub(f"\\[{safe_key}\\]", replacement, linked_text) |
|
|
|
|
|
for doi_key in [f"DOI:{doi}" for doi in doi_matches]: |
|
if doi_key in source_map: |
|
source_data = source_map[doi_key] |
|
safe_key = re.escape(doi_key) |
|
pattern = f"\\[{safe_key}\\]" |
|
|
|
|
|
short_title = source_data['title'][:60] + "..." if len(source_data['title']) > 60 else source_data['title'] |
|
replacement = f"[{short_title}]({source_data['url']})" |
|
|
|
linked_text = re.sub(f"\\[{safe_key}\\]", replacement, linked_text) |
|
|
|
|
|
for source_id_key, source_data in source_map.items(): |
|
if not (source_id_key.startswith("PMID:") or source_id_key.startswith("DOI:")): |
|
safe_id = re.escape(source_id_key) |
|
pattern = f"\\[{safe_id}\\]" |
|
replacement = f"[{source_data['title']}]({source_data['url']})" |
|
linked_text = re.sub(pattern, replacement, linked_text) |
|
|
|
|
|
if "source_id" in source_matches: |
|
|
|
if evidence_snippets and "source_id" not in source_map: |
|
snippet = evidence_snippets[0] |
|
if snippet.get("url") and snippet.get("title"): |
|
source_map["source_id"] = { |
|
"id": snippet["id"], |
|
"title": snippet["title"].strip(), |
|
"url": snippet["url"], |
|
"citation": snippet["citation"], |
|
"pmid": snippet.get("pmid", ""), |
|
"doi": snippet.get("doi", "") |
|
} |
|
replacement = f"[{snippet['title']}]({snippet['url']})" |
|
linked_text = re.sub(r'\[source_id\]', replacement, linked_text) |
|
|
|
|
|
linked_text = re.sub(r'\[source_id\]', "[Medical Reference]", linked_text) |
|
linked_text = re.sub(r'\[PMID:(\d+)\]', r'[PubMed Article]', linked_text) |
|
linked_text = re.sub(r'\[DOI:(10\.\d+\/[^\]]+)\]', r'[Europe PMC Article]', linked_text) |
|
|
|
return linked_text, source_map |
|
|
|
|
|
def fetch_from_pubmed_api(query, max_results=3, api_key=None): |
|
"""Fetch medical evidence from PubMed API using E-utilities""" |
|
results = [] |
|
|
|
|
|
cleaned_query = re.sub(r'^(hi|hello|hey|greetings|good morning|good afternoon|good evening)[,\.]?\s+', '', query.lower()) |
|
cleaned_query = re.sub(r"(i'?m|i am)\s+a\s+\d+[-\s]year[-\s]old", '', cleaned_query) |
|
cleaned_query = re.sub(r'(my name is|i am|i have been|i\'ve been|i was|i have|i\'ve had|i feel|i\'m feeling|i experienced)', '', cleaned_query) |
|
|
|
|
|
symptom_patterns = [ |
|
r'(muscle weakness)', r'(fatigue)', r'(rash)', r'(pain)', r'(swelling)', |
|
r'(difficulty breathing|shortness of breath)', r'(fever)', r'(headache)', |
|
r'(nausea|vomiting)', r'(dizziness)', r'(numbness)', r'(tingling)' |
|
] |
|
|
|
medical_terms = [] |
|
for pattern in symptom_patterns: |
|
matches = re.findall(pattern, query.lower()) |
|
if matches: |
|
medical_terms.extend(matches) |
|
|
|
|
|
if medical_terms: |
|
search_query = " AND ".join(medical_terms) |
|
|
|
if cleaned_query: |
|
search_query = f"({search_query}) OR ({cleaned_query})" |
|
else: |
|
|
|
search_query = cleaned_query |
|
|
|
|
|
encoded_query = urllib.parse.quote(search_query) |
|
|
|
|
|
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/" |
|
|
|
|
|
search_params = { |
|
"db": "pubmed", |
|
"term": encoded_query, |
|
"retmax": max_results, |
|
"retmode": "json", |
|
"sort": "relevance" |
|
} |
|
|
|
|
|
if api_key: |
|
search_params["api_key"] = api_key |
|
|
|
try: |
|
|
|
search_response = requests.get(f"{base_url}esearch.fcgi", params=search_params) |
|
|
|
if search_response.status_code != 200: |
|
return [] |
|
|
|
search_data = search_response.json() |
|
|
|
if "esearchresult" in search_data and "idlist" in search_data["esearchresult"]: |
|
ids = search_data["esearchresult"]["idlist"] |
|
|
|
if ids: |
|
|
|
fetch_params = { |
|
"db": "pubmed", |
|
"id": ",".join(ids), |
|
"retmode": "xml" |
|
} |
|
if api_key: |
|
fetch_params["api_key"] = api_key |
|
|
|
fetch_response = requests.get(f"{base_url}efetch.fcgi", params=fetch_params) |
|
|
|
if fetch_response.status_code != 200: |
|
return [] |
|
|
|
try: |
|
|
|
root = ET.fromstring(fetch_response.text) |
|
|
|
for article in root.findall(".//PubmedArticle"): |
|
try: |
|
pmid = article.findtext(".//PMID") |
|
title = article.findtext(".//ArticleTitle") or "No title available" |
|
|
|
|
|
abstract_elements = article.findall(".//AbstractText") |
|
abstract = " ".join([(elem.text or "") for elem in abstract_elements]) |
|
|
|
|
|
authors = [] |
|
for author in article.findall(".//Author"): |
|
last_name = author.findtext(".//LastName") or "" |
|
initials = author.findtext(".//Initials") or "" |
|
if last_name and initials: |
|
authors.append(f"{last_name} {initials}") |
|
|
|
author_str = ", ".join(authors[:3]) |
|
if len(authors) > 3: |
|
author_str += " et al." |
|
|
|
|
|
journal = article.findtext(".//Journal/Title") or "Journal not specified" |
|
year = article.findtext(".//PubDate/Year") or "N/A" |
|
|
|
|
|
citation = f"{author_str}. ({year}). {title}. {journal}. PMID: {pmid}" |
|
|
|
|
|
url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" |
|
|
|
|
|
pmc_id = article.findtext(".//ArticleId[@IdType='pmc']") |
|
has_free_text = bool(pmc_id) or article.findtext(".//PublicationStatus") == "epublish" |
|
|
|
|
|
if pmc_id: |
|
url = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/" |
|
|
|
results.append({ |
|
"id": f"pubmed:{pmid}", |
|
"title": title, |
|
"text": abstract[:800] + "..." if len(abstract) > 800 else abstract, |
|
"citation": citation, |
|
"url": url, |
|
"source_type": "PubMed" + (" (Free Full Text)" if has_free_text else ""), |
|
"is_open_access": has_free_text |
|
}) |
|
except Exception: |
|
continue |
|
except ET.ParseError: |
|
return [] |
|
|
|
return results |
|
except Exception: |
|
return [] |
|
|
|
def fetch_from_pmc_api(query, max_results=2, api_key=None): |
|
"""Fetch free full text articles from PubMed Central (PMC)""" |
|
results = [] |
|
|
|
|
|
cleaned_query = re.sub(r'^(hi|hello|hey|greetings|good morning|good afternoon|good evening)[,\.]?\s+', '', query.lower()) |
|
cleaned_query = re.sub(r"(i'?m|i am)\s+a\s+\d+[-\s]year[-\s]old", '', cleaned_query) |
|
cleaned_query = re.sub(r'(my name is|i am|i have been|i\'ve been|i was|i have|i\'ve had|i feel|i\'m feeling|i experienced)', '', cleaned_query) |
|
|
|
|
|
encoded_query = urllib.parse.quote(cleaned_query + " AND free full text[filter]") |
|
|
|
|
|
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/" |
|
|
|
|
|
search_params = { |
|
"db": "pmc", |
|
"term": encoded_query, |
|
"retmax": max_results, |
|
"retmode": "json", |
|
"sort": "relevance" |
|
} |
|
|
|
|
|
if api_key: |
|
search_params["api_key"] = api_key |
|
|
|
try: |
|
|
|
search_response = requests.get(f"{base_url}esearch.fcgi", params=search_params) |
|
|
|
if search_response.status_code != 200: |
|
return [] |
|
|
|
search_data = search_response.json() |
|
|
|
if "esearchresult" in search_data and "idlist" in search_data["esearchresult"]: |
|
ids = search_data["esearchresult"]["idlist"] |
|
|
|
if ids: |
|
|
|
fetch_params = { |
|
"db": "pmc", |
|
"id": ",".join(ids), |
|
"retmode": "xml" |
|
} |
|
if api_key: |
|
fetch_params["api_key"] = api_key |
|
|
|
fetch_response = requests.get(f"{base_url}efetch.fcgi", params=fetch_params) |
|
|
|
if fetch_response.status_code != 200: |
|
return [] |
|
|
|
try: |
|
|
|
root = ET.fromstring(fetch_response.text) |
|
|
|
for article in root.findall(".//article"): |
|
try: |
|
|
|
article_id_elements = article.findall(".//article-id") |
|
pmc_id = None |
|
for id_elem in article_id_elements: |
|
if id_elem.get("pub-id-type") == "pmc": |
|
pmc_id = id_elem.text |
|
|
|
if not pmc_id: |
|
continue |
|
|
|
|
|
title_elem = article.find(".//article-title") |
|
title = "".join(title_elem.itertext()) if title_elem is not None else "No title available" |
|
|
|
|
|
abstract_elem = article.find(".//abstract") |
|
abstract = "" |
|
if abstract_elem is not None: |
|
for p in abstract_elem.findall(".//p"): |
|
abstract += " ".join(p.itertext()) + " " |
|
|
|
|
|
if not abstract: |
|
body = article.find(".//body") |
|
if body is not None: |
|
paragraphs = body.findall(".//p") |
|
abstract = " ".join([" ".join(p.itertext()) for p in paragraphs[:3]]) |
|
|
|
|
|
journal_elem = article.find(".//journal-title") |
|
journal = "".join(journal_elem.itertext()) if journal_elem is not None else "PMC Journal" |
|
|
|
year_elem = article.find(".//pub-date/year") |
|
year = year_elem.text if year_elem is not None else "N/A" |
|
|
|
|
|
authors = [] |
|
for contrib in article.findall(".//contrib[@contrib-type='author']"): |
|
surname = contrib.find(".//surname") |
|
given_names = contrib.find(".//given-names") |
|
if surname is not None and given_names is not None: |
|
authors.append(f"{surname.text} {given_names.text[0] if given_names.text else ''}") |
|
|
|
author_str = ", ".join(authors[:3]) |
|
if len(authors) > 3: |
|
author_str += " et al." |
|
|
|
|
|
citation = f"{author_str}. ({year}). {title}. {journal}. PMC{pmc_id}" |
|
|
|
|
|
url = f"https://www.ncbi.nlm.nih.gov/pmc/articles/PMC{pmc_id}/" |
|
|
|
results.append({ |
|
"id": f"pmc:{pmc_id}", |
|
"title": title, |
|
"text": abstract[:800] + "..." if len(abstract) > 800 else abstract, |
|
"citation": citation, |
|
"url": url, |
|
"source_type": "PubMed Central (Open Access)", |
|
"is_open_access": True |
|
}) |
|
except Exception: |
|
continue |
|
except ET.ParseError: |
|
return [] |
|
|
|
return results |
|
except Exception: |
|
return [] |
|
|
|
def fetch_from_who_api(query, max_results=1): |
|
"""Fetch information from WHO guidelines - using web scraping as alternative to API""" |
|
try: |
|
|
|
search_url = f"https://www.who.int/publications/search-results?indexTerms={query.replace(' ', '+')}" |
|
response = requests.get(search_url) |
|
|
|
if response.status_code == 200: |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
results = [] |
|
|
|
|
|
articles = soup.select('.search-results article')[:max_results] |
|
|
|
for article in articles: |
|
title_elem = article.select_one('h3') |
|
title = title_elem.text.strip() if title_elem else "WHO Guideline" |
|
|
|
desc_elem = article.select_one('.search-description') |
|
description = desc_elem.text.strip() if desc_elem else "" |
|
|
|
link_elem = article.select_one('a') |
|
link = "https://www.who.int" + link_elem['href'] if link_elem and 'href' in link_elem.attrs else "" |
|
|
|
date_elem = article.select_one('.search-meta') |
|
date = date_elem.text.strip() if date_elem else "Date not specified" |
|
|
|
|
|
who_id = link.split('/')[-1] if link else f"who-{uuid.uuid4().hex[:8]}" |
|
|
|
results.append({ |
|
"id": f"who:{who_id}", |
|
"title": title, |
|
"text": description[:800] + "..." if len(description) > 800 else description, |
|
"citation": f"World Health Organization. ({date}). {title}.", |
|
"url": link, |
|
"source_type": "WHO Guidelines", |
|
"is_open_access": True |
|
}) |
|
|
|
return results |
|
return [] |
|
except Exception: |
|
return [] |
|
|
|
def fetch_from_core_api(query, max_results=2, api_key=None): |
|
"""Fetch open access research papers from CORE API""" |
|
results = [] |
|
|
|
|
|
cleaned_query = re.sub(r'^(hi|hello|hey|greetings|good morning|good afternoon|good evening)[,\.]?\s+', '', query.lower()) |
|
cleaned_query = re.sub(r"(i'?m|i am)\s+a\s+\d+[-\s]year[-\s]old", '', cleaned_query) |
|
cleaned_query = re.sub(r'(my name is|i am|i have been|i\'ve been|i was|i have|i\'ve had|i feel|i\'m feeling|i experienced)', '', cleaned_query) |
|
|
|
|
|
symptom_patterns = [ |
|
r'(muscle weakness)', r'(fatigue)', r'(rash)', r'(pain)', r'(swelling)', |
|
r'(difficulty breathing|shortness of breath)', r'(fever)', r'(headache)', |
|
r'(nausea|vomiting)', r'(dizziness)', r'(numbness)', r'(tingling)' |
|
] |
|
|
|
medical_terms = [] |
|
for pattern in symptom_patterns: |
|
matches = re.findall(pattern, query.lower()) |
|
if matches: |
|
medical_terms.extend(matches) |
|
|
|
|
|
if medical_terms: |
|
search_query = cleaned_query + " " + " ".join(medical_terms) |
|
else: |
|
search_query = cleaned_query |
|
|
|
|
|
base_url = "https://core.ac.uk/api/v3/search/works" |
|
|
|
|
|
search_params = { |
|
"q": search_query, |
|
"limit": max_results * 2, |
|
"offset": 0, |
|
"fields": ["title", "abstract", "authors", "year", "downloadUrl", "sourceFulltextUrl", "doi", "fullText"] |
|
} |
|
|
|
|
|
headers = { |
|
"Authorization": f"Bearer {api_key}" if api_key else None, |
|
"Content-Type": "application/json" |
|
} |
|
|
|
try: |
|
response = requests.post(base_url, json=search_params, headers=headers) |
|
|
|
if response.status_code != 200: |
|
return [] |
|
|
|
data = response.json() |
|
|
|
if "results" in data: |
|
filtered_articles = [] |
|
|
|
|
|
for article in data["results"]: |
|
try: |
|
|
|
score = 0 |
|
|
|
|
|
if article.get("downloadUrl") or article.get("sourceFulltextUrl"): |
|
score += 3 |
|
|
|
|
|
if article.get("fullText"): |
|
score += 2 |
|
|
|
|
|
if article.get("abstract") and len(article.get("abstract")) > 100: |
|
score += 1 |
|
|
|
|
|
for term in medical_terms: |
|
if term in (article.get("title", "") + article.get("abstract", "")).lower(): |
|
score += 2 |
|
|
|
|
|
filtered_articles.append((score, article)) |
|
|
|
except Exception: |
|
continue |
|
|
|
|
|
filtered_articles.sort(reverse=True, key=lambda x: x[0]) |
|
top_articles = [article for score, article in filtered_articles[:max_results]] |
|
|
|
|
|
for article in top_articles: |
|
try: |
|
|
|
title = article.get("title", "No title available") |
|
abstract = article.get("abstract", "") |
|
|
|
|
|
full_text = article.get("fullText", "") |
|
text_content = "" |
|
|
|
if full_text: |
|
|
|
text_content = f"[FULL TEXT AVAILABLE] {full_text[:1500]}..." |
|
else: |
|
|
|
text_content = abstract |
|
|
|
authors = article.get("authors", []) |
|
year = article.get("year", "N/A") |
|
|
|
|
|
author_str = ", ".join([f"{author.get('name', '')}" for author in authors[:3]]) |
|
if len(authors) > 3: |
|
author_str += " et al." |
|
|
|
|
|
url = "" |
|
download_available = False |
|
|
|
if article.get("downloadUrl"): |
|
url = article.get("downloadUrl") |
|
download_available = True |
|
elif article.get("sourceFulltextUrl"): |
|
url = article.get("sourceFulltextUrl") |
|
download_available = True |
|
elif article.get("doi"): |
|
url = f"https://doi.org/{article.get('doi')}" |
|
|
|
|
|
citation = f"{author_str}. ({year}). {title}." |
|
if article.get("doi"): |
|
citation += f" DOI: {article['doi']}" |
|
|
|
|
|
core_id = article.get("id", str(uuid.uuid4())) |
|
|
|
|
|
source_type = "CORE Open Access" |
|
if download_available: |
|
source_type += " (Full Text Available)" |
|
elif full_text: |
|
source_type += " (Full Text Excerpt Included)" |
|
else: |
|
source_type += " (Abstract Only)" |
|
|
|
results.append({ |
|
"id": f"core:{core_id}", |
|
"title": title, |
|
"text": text_content[:800] + "..." if len(text_content) > 800 else text_content, |
|
"citation": citation, |
|
"url": url, |
|
"source_type": source_type, |
|
"is_open_access": True |
|
}) |
|
except Exception: |
|
continue |
|
|
|
return results |
|
except Exception: |
|
return [] |
|
|
|
|
|
def enhanced_search_pubmed(query, retmax=3, api_key=None): |
|
""" |
|
Enhanced PubMed search using E-utilities API with improved parsing and error handling. |
|
|
|
Args: |
|
query (str): Search query string |
|
retmax (int): Maximum number of results to return |
|
api_key (str, optional): NCBI API key for higher rate limits |
|
|
|
Returns: |
|
list: List of article dictionaries with title, abstract, PMID, URL |
|
""" |
|
results = [] |
|
|
|
|
|
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/" |
|
|
|
|
|
|
|
time.sleep(0.33 if api_key is None else 0.1) |
|
|
|
try: |
|
|
|
search_params = { |
|
"db": "pubmed", |
|
"term": query, |
|
"retmax": retmax, |
|
"retmode": "json", |
|
"sort": "relevance" |
|
} |
|
|
|
if api_key: |
|
search_params["api_key"] = api_key |
|
|
|
search_response = requests.get(f"{base_url}esearch.fcgi", params=search_params) |
|
|
|
if search_response.status_code != 200: |
|
print(f"PubMed search error: {search_response.status_code}") |
|
return [] |
|
|
|
search_data = search_response.json() |
|
|
|
if "esearchresult" not in search_data or "idlist" not in search_data["esearchresult"]: |
|
print("No results found or invalid response format") |
|
return [] |
|
|
|
pmids = search_data["esearchresult"]["idlist"] |
|
|
|
if not pmids: |
|
print("No PMIDs found for the query") |
|
return [] |
|
|
|
|
|
time.sleep(0.33 if api_key is None else 0.1) |
|
|
|
|
|
fetch_params = { |
|
"db": "pubmed", |
|
"id": ",".join(pmids), |
|
"retmode": "xml", |
|
"rettype": "abstract" |
|
} |
|
|
|
if api_key: |
|
fetch_params["api_key"] = api_key |
|
|
|
fetch_response = requests.get(f"{base_url}efetch.fcgi", params=fetch_params) |
|
|
|
if fetch_response.status_code != 200: |
|
print(f"PubMed fetch error: {fetch_response.status_code}") |
|
return [] |
|
|
|
|
|
root = ET.fromstring(fetch_response.text) |
|
|
|
for article in root.findall(".//PubmedArticle"): |
|
try: |
|
|
|
pmid = article.findtext(".//PMID") |
|
if not pmid: |
|
continue |
|
|
|
|
|
title = article.findtext(".//ArticleTitle") or "No title available" |
|
|
|
|
|
abstract_sections = [] |
|
for abstract_text in article.findall(".//AbstractText"): |
|
label = abstract_text.get("Label", "") |
|
text = abstract_text.text or "" |
|
|
|
if label and text: |
|
abstract_sections.append(f"{label}: {text}") |
|
elif text: |
|
abstract_sections.append(text) |
|
|
|
|
|
if not abstract_sections: |
|
abstract_text = article.findtext(".//Abstract/AbstractText") |
|
if abstract_text: |
|
abstract_sections.append(abstract_text) |
|
|
|
|
|
abstract = " ".join(abstract_sections) or "Abstract not available" |
|
|
|
|
|
authors = [] |
|
for author in article.findall(".//Author"): |
|
last_name = author.findtext(".//LastName") or "" |
|
initials = author.findtext(".//Initials") or "" |
|
if last_name and initials: |
|
authors.append(f"{last_name} {initials}") |
|
|
|
|
|
author_text = "" |
|
if authors: |
|
if len(authors) == 1: |
|
author_text = authors[0] |
|
elif len(authors) == 2: |
|
author_text = f"{authors[0]} & {authors[1]}" |
|
else: |
|
author_text = f"{authors[0]} et al." |
|
|
|
|
|
journal = article.findtext(".//Journal/Title") or "Unknown Journal" |
|
year = article.findtext(".//PubDate/Year") or "" |
|
|
|
|
|
url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" |
|
|
|
|
|
citation = f"{author_text}{' ' if author_text else ''}({year}). {title}. {journal}. PMID: {pmid}" |
|
|
|
|
|
pmc_id = article.findtext(".//ArticleId[@IdType='pmc']") |
|
has_full_text = bool(pmc_id) |
|
full_text_url = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/" if pmc_id else None |
|
|
|
|
|
result = { |
|
"pmid": pmid, |
|
"title": title, |
|
"abstract": abstract, |
|
"authors": authors, |
|
"journal": journal, |
|
"year": year, |
|
"url": url, |
|
"full_text_url": full_text_url, |
|
"has_full_text": has_full_text, |
|
"citation": citation |
|
} |
|
|
|
results.append(result) |
|
|
|
except Exception as e: |
|
print(f"Error parsing article {pmid}: {str(e)}") |
|
continue |
|
|
|
return results |
|
|
|
except Exception as e: |
|
print(f"Error in PubMed search: {str(e)}") |
|
return [] |
|
|
|
|
|
def search_europe_pmc(query, max_results=3, use_extracted_terms=False, extracted_terms=None): |
|
""" |
|
Search Europe PMC for biomedical articles, with a focus on retrieving full text when available. |
|
Europe PMC provides more open access content than standard PubMed. |
|
|
|
Args: |
|
query (str): Search query string |
|
max_results (int): Maximum number of results to return |
|
use_extracted_terms (bool): Whether to use the extracted medical terms |
|
extracted_terms (list): List of extracted medical terms from the query |
|
|
|
Returns: |
|
list: List of article dictionaries with title, abstract, PMID, URL, and full text URL |
|
""" |
|
results = [] |
|
|
|
|
|
time.sleep(2.0) |
|
|
|
try: |
|
|
|
base_url = "https://www.ebi.ac.uk/europepmc/webservices/rest/search" |
|
|
|
|
|
search_query = query |
|
if use_extracted_terms and extracted_terms and len(extracted_terms) > 0: |
|
|
|
terms_query = " AND ".join(extracted_terms) |
|
search_query = terms_query |
|
print(f"Searching Europe PMC with extracted terms: {terms_query}") |
|
|
|
|
|
search_params = { |
|
"query": f"({search_query}) AND OPEN_ACCESS:y", |
|
"format": "json", |
|
"pageSize": max_results, |
|
"resultType": "core" |
|
} |
|
|
|
print(f"Searching Europe PMC with query: {search_query}") |
|
response = requests.get(base_url, params=search_params) |
|
|
|
if response.status_code != 200: |
|
print(f"Europe PMC search error: {response.status_code}") |
|
|
|
search_params["query"] = search_query |
|
response = requests.get(base_url, params=search_params) |
|
if response.status_code != 200: |
|
return [] |
|
|
|
data = response.json() |
|
|
|
|
|
hit_count = data.get("hitCount", 0) |
|
if hit_count == 0: |
|
print("No Europe PMC results found") |
|
|
|
if use_extracted_terms and extracted_terms: |
|
print("Retrying Europe PMC search with original query") |
|
return search_europe_pmc(query, max_results, False, None) |
|
return [] |
|
|
|
|
|
articles = data.get("resultList", {}).get("result", []) |
|
|
|
for article in articles: |
|
try: |
|
|
|
pmid = article.get("pmid") |
|
doi = article.get("doi") |
|
title = article.get("title", "No title available") |
|
abstract = article.get("abstractText", "Abstract not available") |
|
journal = article.get("journalTitle", "Unknown Journal") |
|
pub_year = article.get("pubYear", "") |
|
|
|
|
|
is_open_access = article.get("isOpenAccess") == "Y" |
|
|
|
|
|
full_text_url = None |
|
full_text_urls = article.get("fullTextUrlList", {}).get("fullTextUrl", []) |
|
for url_entry in full_text_urls: |
|
if url_entry.get("availability") == "Open access" or url_entry.get("documentStyle") == "pdf": |
|
full_text_url = url_entry.get("url") |
|
break |
|
|
|
|
|
if not full_text_url and pmid: |
|
full_text_url = f"https://europepmc.org/article/MED/{pmid}" |
|
elif not full_text_url and doi: |
|
full_text_url = f"https://doi.org/{doi}" |
|
|
|
|
|
author_list = article.get("authorList", {}).get("author", []) |
|
authors = [] |
|
|
|
for author in author_list: |
|
last_name = author.get("lastName", "") |
|
initials = author.get("initials", "") |
|
if last_name: |
|
authors.append(f"{last_name} {initials}") |
|
|
|
|
|
author_text = "" |
|
if authors: |
|
if len(authors) == 1: |
|
author_text = authors[0] |
|
elif len(authors) == 2: |
|
author_text = f"{authors[0]} & {authors[1]}" |
|
else: |
|
author_text = f"{authors[0]} et al." |
|
|
|
|
|
citation = f"{author_text}{' ' if author_text else ''}({pub_year}). {title}. {journal}." |
|
if pmid: |
|
citation += f" PMID: {pmid}" |
|
if doi: |
|
citation += f" DOI: {doi}" |
|
|
|
|
|
url = full_text_url if full_text_url else ( |
|
f"https://europepmc.org/article/MED/{pmid}" if pmid else ( |
|
f"https://doi.org/{doi}" if doi else "" |
|
) |
|
) |
|
|
|
|
|
source_type = "Europe PMC" + (" (Open Access)" if is_open_access else "") |
|
|
|
|
|
result = { |
|
"pmid": pmid, |
|
"doi": doi, |
|
"title": title, |
|
"abstract": abstract, |
|
"authors": authors, |
|
"journal": journal, |
|
"year": pub_year, |
|
"url": url, |
|
"full_text_url": full_text_url, |
|
"has_full_text": is_open_access or full_text_url is not None, |
|
"citation": citation, |
|
"source_type": source_type, |
|
"is_open_access": is_open_access |
|
} |
|
|
|
results.append(result) |
|
|
|
except Exception as e: |
|
print(f"Error parsing Europe PMC article: {str(e)}") |
|
continue |
|
|
|
print(f"Found {len(results)} Europe PMC articles") |
|
return results |
|
|
|
except Exception as e: |
|
print(f"Error in Europe PMC search: {str(e)}") |
|
return [] |
|
|
|
|
|
def fetch_medical_evidence(query, max_results=3): |
|
""" |
|
Fetch medical evidence using a multi-source approach: |
|
1. Search with extracted medical terms in PubMed |
|
2. Search with extracted medical terms in Europe PMC |
|
3. Search with the original query in PubMed |
|
4. Search with the original query in Europe PMC |
|
|
|
This provides better coverage and relevance from multiple sources. |
|
|
|
Args: |
|
query (str): The user's original query |
|
max_results (int): Maximum number of results to return (now set to 3) |
|
|
|
Returns: |
|
list: Combined and deduplicated results from all searches |
|
""" |
|
|
|
pubmed_api_key = os.environ.get("PUBMED_API_KEY") |
|
|
|
|
|
medical_terms = extract_medical_terms(query) |
|
has_medical_terms = len(medical_terms) > 0 |
|
|
|
|
|
terms_pubmed_results = [] |
|
full_pubmed_results = [] |
|
terms_europepmc_results = [] |
|
full_europepmc_results = [] |
|
|
|
|
|
if has_medical_terms: |
|
|
|
terms_query = ", ".join(medical_terms) |
|
print(f"Searching PubMed with extracted terms: {terms_query}") |
|
|
|
|
|
terms_pubmed_results = enhanced_search_pubmed(terms_query, retmax=2, api_key=pubmed_api_key) |
|
|
|
|
|
print(f"Searching Europe PMC with extracted terms") |
|
terms_europepmc_results = search_europe_pmc(query, max_results=2, |
|
use_extracted_terms=True, |
|
extracted_terms=medical_terms) |
|
|
|
|
|
print(f"Searching PubMed with full query") |
|
full_pubmed_results = enhanced_search_pubmed(query, retmax=2, api_key=pubmed_api_key) |
|
|
|
print(f"Searching Europe PMC with full query") |
|
full_europepmc_results = search_europe_pmc(query, max_results=2) |
|
|
|
|
|
all_results = [] |
|
seen_pmids = set() |
|
seen_dois = set() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for result in terms_pubmed_results: |
|
pmid = result.get("pmid") |
|
if pmid and pmid not in seen_pmids and len(all_results) < max_results: |
|
seen_pmids.add(pmid) |
|
|
|
all_results.append({ |
|
"id": f"PMID:{pmid}", |
|
"title": result["title"], |
|
"text": result["abstract"], |
|
"citation": result["citation"], |
|
"url": result["url"], |
|
"source_type": "PubMed" + (" (Full Text Available)" if result.get("has_full_text") else ""), |
|
"is_open_access": result.get("has_full_text", False), |
|
"pmid": pmid |
|
}) |
|
|
|
|
|
for result in terms_europepmc_results: |
|
|
|
pmid = result.get("pmid") |
|
doi = result.get("doi") |
|
|
|
|
|
if (pmid and pmid in seen_pmids) or (doi and doi in seen_dois): |
|
continue |
|
|
|
|
|
if len(all_results) >= max_results: |
|
break |
|
|
|
|
|
if pmid: |
|
seen_pmids.add(pmid) |
|
if doi: |
|
seen_dois.add(doi) |
|
|
|
|
|
article_id = f"PMID:{pmid}" if pmid else (f"DOI:{doi}" if doi else str(uuid.uuid4())[:8]) |
|
|
|
|
|
all_results.append({ |
|
"id": article_id, |
|
"title": result["title"], |
|
"text": result["abstract"], |
|
"citation": result["citation"], |
|
"url": result["url"], |
|
"source_type": result["source_type"], |
|
"is_open_access": result["is_open_access"], |
|
"pmid": pmid, |
|
"doi": doi |
|
}) |
|
|
|
|
|
for result in full_pubmed_results: |
|
pmid = result.get("pmid") |
|
if pmid and pmid not in seen_pmids and len(all_results) < max_results: |
|
seen_pmids.add(pmid) |
|
all_results.append({ |
|
"id": f"PMID:{pmid}", |
|
"title": result["title"], |
|
"text": result["abstract"], |
|
"citation": result["citation"], |
|
"url": result["url"], |
|
"source_type": "PubMed" + (" (Full Text Available)" if result.get("has_full_text") else ""), |
|
"is_open_access": result.get("has_full_text", False), |
|
"pmid": pmid |
|
}) |
|
|
|
|
|
for result in full_europepmc_results: |
|
pmid = result.get("pmid") |
|
doi = result.get("doi") |
|
|
|
|
|
if (pmid and pmid in seen_pmids) or (doi and doi in seen_dois): |
|
continue |
|
|
|
|
|
if len(all_results) >= max_results: |
|
break |
|
|
|
|
|
if pmid: |
|
seen_pmids.add(pmid) |
|
if doi: |
|
seen_dois.add(doi) |
|
|
|
|
|
article_id = f"PMID:{pmid}" if pmid else (f"DOI:{doi}" if doi else str(uuid.uuid4())[:8]) |
|
|
|
|
|
all_results.append({ |
|
"id": article_id, |
|
"title": result["title"], |
|
"text": result["abstract"], |
|
"citation": result["citation"], |
|
"url": result["url"], |
|
"source_type": result["source_type"], |
|
"is_open_access": result["is_open_access"], |
|
"pmid": pmid, |
|
"doi": doi |
|
}) |
|
|
|
|
|
return all_results[:max_results] |
|
|
|
|
|
def parse_doctor_response(response_text): |
|
"""Parse the doctor agent's response into structured components""" |
|
|
|
response_text = re.sub(r'^Direct Answer:\s*', '', response_text) |
|
|
|
|
|
parsed = { |
|
"main_response": response_text, |
|
"diagnosis": "", |
|
"treatment": "", |
|
"reasoning": [], |
|
"sources": [], |
|
"follow_up_questions": [] |
|
} |
|
|
|
|
|
diagnosis_match = re.search(r'(?i)diagnosis:?\s*(.*?)(?:\n\n|\n[A-Z]|\Z)', response_text, re.DOTALL) |
|
if diagnosis_match: |
|
parsed["diagnosis"] = diagnosis_match.group(1).strip() |
|
|
|
|
|
treatment_match = re.search(r'(?i)(treatment|recommendations|plan):?\s*(.*?)(?:\n\n|\n[A-Z]|\Z)', response_text, re.DOTALL) |
|
if treatment_match: |
|
parsed["treatment"] = treatment_match.group(2).strip() |
|
|
|
|
|
follow_up_match = re.search(r'(?i)(?:follow[ -]?up questions|additional questions|clarifying questions):?\s*(.*?)(?:\n\n|\n(?:reasoning|sources):|\Z)', response_text, re.DOTALL) |
|
if follow_up_match: |
|
follow_up_text = follow_up_match.group(1).strip() |
|
|
|
follow_up_text = re.sub(r'^\*\*\s*', '', follow_up_text) |
|
|
|
|
|
if '\n-' in follow_up_text or '\n•' in follow_up_text or '\n*' in follow_up_text: |
|
|
|
bullet_items = re.split(r'\n\s*[-•*]\s*', follow_up_text) |
|
|
|
questions = [] |
|
for item in bullet_items: |
|
if item.strip(): |
|
|
|
cleaned_item = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', item.strip()) |
|
questions.append(cleaned_item) |
|
parsed["follow_up_questions"] = questions |
|
elif '\n1.' in follow_up_text or re.search(r'\n\d+\.', follow_up_text): |
|
|
|
numbered_items = re.split(r'\n\s*\d+\.\s*', follow_up_text) |
|
|
|
questions = [] |
|
for item in numbered_items: |
|
if item.strip(): |
|
|
|
cleaned_item = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', item.strip()) |
|
questions.append(cleaned_item) |
|
parsed["follow_up_questions"] = questions |
|
else: |
|
|
|
cleaned_text = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', follow_up_text) |
|
parsed["follow_up_questions"] = [cleaned_text] |
|
|
|
|
|
reasoning_match = re.search(r'(?i)reasoning:?\s*(.*?)(?:\n\n\Z|\n(?:sources|follow)|\Z)', response_text, re.DOTALL) |
|
if reasoning_match: |
|
reasoning_text = reasoning_match.group(1).strip() |
|
|
|
reasoning_text = re.sub(r'^\*\*\s*', '', reasoning_text) |
|
|
|
|
|
if '\n-' in reasoning_text: |
|
|
|
reasoning_points = [] |
|
lines = reasoning_text.split('\n-') |
|
|
|
|
|
if lines and lines[0].strip(): |
|
|
|
first_item = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', lines[0].strip()) |
|
if first_item: |
|
reasoning_points.append(first_item) |
|
|
|
|
|
for i in range(1, len(lines)): |
|
if lines[i].strip(): |
|
|
|
cleaned_item = re.sub(r'^\s*[-*]*\s*|\s*\*\*\s*$', '', lines[i].strip()) |
|
if cleaned_item: |
|
reasoning_points.append(cleaned_item) |
|
|
|
parsed["reasoning"] = reasoning_points |
|
else: |
|
|
|
cleaned_text = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', reasoning_text) |
|
parsed["reasoning"] = [cleaned_text] |
|
|
|
|
|
sources_match = re.search(r'(?i)(sources|references):?\s*(.*?)(?:\n\n\Z|\Z)', response_text, re.DOTALL) |
|
if sources_match: |
|
sources_text = sources_match.group(2).strip() |
|
|
|
if '\n' in sources_text: |
|
parsed["sources"] = [item.strip() for item in sources_text.split('\n') if item.strip()] |
|
else: |
|
parsed["sources"] = [sources_text] |
|
|
|
|
|
if parsed["sources"]: |
|
|
|
main_response_lines = [] |
|
skip_lines = 0 |
|
for line in parsed["main_response"].split('\n'): |
|
if skip_lines > 0: |
|
skip_lines -= 1 |
|
continue |
|
|
|
|
|
if re.match(r'^URL:\s*https?://', line.strip()): |
|
skip_lines = 0 |
|
continue |
|
|
|
|
|
if re.match(r'^(PMID|DOI):', line.strip()): |
|
skip_lines = 0 |
|
continue |
|
|
|
main_response_lines.append(line) |
|
|
|
parsed["main_response"] = '\n'.join(main_response_lines) |
|
|
|
|
|
citation_matches = re.findall(r'\[([\w\d:]+)\]', response_text) |
|
for citation in citation_matches: |
|
if citation not in parsed["sources"]: |
|
parsed["sources"].append(citation) |
|
|
|
return parsed |
|
|
|
|
|
def doctor_agent(messages): |
|
"""Call the LLM to get a structured response using OpenAI API v0.28.1""" |
|
try: |
|
response = openai.ChatCompletion.create( |
|
model="gpt-4o-mini", |
|
messages=messages, |
|
temperature=0.3 |
|
) |
|
return response.choices[0].message['content'] |
|
except Exception as e: |
|
return f"I'm sorry, there was an error processing your request. Please try again. Error: {str(e)}" |
|
|
|
|
|
def orchestrator_chat(history, query, use_rag, is_follow_up=False): |
|
"""Handle a single turn of conversation with the doctor agent""" |
|
|
|
if is_follow_up: |
|
system = {"role": "system", "content": FOLLOW_UP_PROMPT} |
|
else: |
|
system = {"role": "system", "content": SYSTEM_PROMPT} |
|
|
|
|
|
print(f"Using {'FOLLOW_UP_PROMPT' if is_follow_up else 'SYSTEM_PROMPT'} with query: {query}") |
|
|
|
msgs = [system] + history |
|
|
|
|
|
evidence_snippets = [] |
|
if use_rag: |
|
|
|
evidence_snippets = fetch_medical_evidence(query) |
|
|
|
|
|
if evidence_snippets: |
|
evidence_text = "MEDICAL EVIDENCE FROM MULTIPLE SOURCES:\n\n" |
|
|
|
for i, snippet in enumerate(evidence_snippets): |
|
|
|
pmid = snippet.get("pmid", "") |
|
doi = snippet.get("doi", "") |
|
|
|
evidence_text += f"--- ARTICLE {i+1} ---\n" |
|
|
|
|
|
if pmid: |
|
evidence_text += f"PMID: {pmid}\n" |
|
if doi: |
|
evidence_text += f"DOI: {doi}\n" |
|
|
|
evidence_text += f"Title: {snippet['title']}\n" |
|
evidence_text += f"Source: {snippet['source_type']}\n" |
|
evidence_text += f"Content: {snippet['text']}\n" |
|
evidence_text += f"Citation: {snippet['citation']}\n" |
|
evidence_text += f"URL: {snippet['url']}\n\n" |
|
|
|
|
|
evidence_text += """CITATION INSTRUCTIONS: |
|
1. IMPORTANT: Provide a direct answer first before asking follow-up questions. Even with limited information, give your best assessment. |
|
2. You MUST cite 2-3 different sources in your response. Use no more than 3 sources and no fewer than 2 sources. |
|
3. When citing information from these articles, use the following formats: |
|
• For PubMed articles: [PMID:123456] where 123456 is the actual PubMed ID |
|
• For Europe PMC articles without PMID: [DOI:10.xxxx/yyyy] where 10.xxxx/yyyy is the DOI |
|
|
|
Example: "Recent studies have shown improved outcomes with early intervention [PMID:34567890]." |
|
Example: "Current guidelines recommend a multidisciplinary approach [DOI:10.1234/abcd]." |
|
4. Focus on specific details from the abstracts - extract actual findings, statistics, or recommendations. |
|
5. When multiple sources support a claim, cite all of them for stronger evidence. |
|
Example: "This approach is supported by multiple studies [PMID:12345678][PMID:87654321]." |
|
6. Include full citations in your Sources section with clickable URLs. |
|
7. If the abstracts have conflicting information, acknowledge this and present both perspectives with citations. |
|
8. Use the most recent sources when available, especially for treatment recommendations. |
|
9. If full text is available (marked as "Open Access" or "Full Text Available"), prioritize information from those sources as they contain more complete data. |
|
10. Europe PMC sources often provide more complete full text access, so give them equal consideration to PubMed sources. |
|
11. After your direct answer, include specific follow-up questions in a clearly labeled "Follow-up Questions:" section. |
|
""" |
|
|
|
msgs.append({"role": "system", "content": evidence_text}) |
|
else: |
|
|
|
no_evidence_msg = ("Note: No specific medical evidence was found for this query in PubMed or Europe PMC. " |
|
"Please rely on your general medical knowledge and be sure to recommend " |
|
"appropriate diagnostic steps and medical consultation.") |
|
msgs.append({"role": "system", "content": no_evidence_msg}) |
|
|
|
|
|
if use_rag: |
|
output_instructions = """ |
|
Please structure your response clearly. |
|
**Priority 1: Direct Answer First** |
|
Begin by providing your best assessment based on the available information without using "Direct Answer:" as a heading. Just start your response directly with the answer. If the query lacks some details, offer your initial thoughts based on what is known, while acknowledging areas of uncertainty. |
|
|
|
**Priority 2: Follow-up Questions** |
|
After your direct answer, include a clearly labeled "Follow-up Questions:" section with specific questions that would help refine your assessment. |
|
|
|
**Main Response Structure:** |
|
1. A direct answer to the patient's concerns WITHOUT the heading "Direct Answer:". |
|
2. If appropriate, a clear diagnosis or differential diagnosis with likelihood assessments. |
|
3. Recommendations for a treatment plan or next steps. |
|
4. IMPORTANT: You MUST cite between 2-3 different medical evidence sources using either: |
|
• [PMID:123456] format for PubMed articles |
|
• [DOI:10.xxxx/yyyy] format for Europe PMC articles without PMID |
|
|
|
Use no more than 3 sources and no fewer than 2 sources. |
|
|
|
**After your main response, ALWAYS include these sections:** |
|
- **Follow-up Questions**: Specific numbered questions starting from 1, not bullets. |
|
Do NOT start the first question with asterisks (**). Format each question properly with just a number. |
|
- **Reasoning**: Provide a detailed, in-depth explanation of your clinical reasoning. Use bullet points for clarity. Aim for comprehensive insights that would be valuable to a healthcare professional. |
|
Do NOT start the first point with asterisks (**). Format each bullet point properly. |
|
- **Sources**: A list of all references cited in your main response (2-3 sources), formatted as: |
|
- PMID: 12345678 - Author et al. (Year). Title. Journal. |
|
URL: https://pubmed.ncbi.nlm.nih.gov/12345678/ |
|
- DOI: 10.xxxx/yyyy - Author et al. (Year). Title. Journal. |
|
URL: https://doi.org/10.xxxx/yyyy |
|
|
|
**IMPORTANT FORMATTING NOTES:** |
|
1. Do NOT include technical information like URLs, PMIDs or DOIs in the main answer - these belong in the Sources section only. |
|
2. For follow-up questions, use numbered format (1. 2. 3.) not bullet points. |
|
3. Number the follow-up questions starting from 1, not from any other number. |
|
4. NEVER use markdown formatting like ** (asterisks) at the beginning of any points, questions, or lines. |
|
5. Make sure all bullet points and numbered items are clean, with no markdown formatting. |
|
|
|
IMPORTANT: Only cite sources that were provided in the evidence. Do not fabricate references, PMIDs, or DOIs. |
|
""" |
|
else: |
|
|
|
output_instructions = """ |
|
Please structure your response clearly. |
|
**Priority 1: Direct Answer First** |
|
Begin by providing your best assessment based on the available information without using "Direct Answer:" as a heading. Just start your response directly with the answer. If the query lacks some details, offer your initial thoughts based on what is known, while acknowledging areas of uncertainty. |
|
|
|
**Priority 2: Follow-up Questions** |
|
After your direct answer, include a clearly labeled "Follow-up Questions:" section with specific questions that would help refine your assessment. |
|
|
|
**Main Response Structure:** |
|
1. A direct answer to the patient's concerns WITHOUT the heading "Direct Answer:". |
|
2. If appropriate, a clear diagnosis or differential diagnosis. |
|
3. Recommendations for a treatment plan or next steps. |
|
|
|
**After your main response, ALWAYS include these sections:** |
|
- **Follow-up Questions**: Specific questions to gather additional information, numbered starting from 1 (not bullet points). |
|
Do NOT start the first question with asterisks (**). Format each question properly with just a number. |
|
- **Reasoning**: Provide a detailed, in-depth explanation of your clinical reasoning. Use bullet points for clarity. Aim for comprehensive insights that would be valuable to a healthcare professional. |
|
Do NOT start the first bullet point with asterisks (**). Format each point properly. |
|
|
|
**IMPORTANT FORMATTING NOTES:** |
|
1. For follow-up questions, use numbered format (1. 2. 3.) not bullet points. |
|
2. Number the follow-up questions starting from 1, not from any other number. |
|
3. NEVER use markdown formatting like ** (asterisks) at the beginning of any points, questions, or lines. |
|
4. Make sure all bullet points and numbered items are clean, with no markdown formatting. |
|
|
|
IMPORTANT: Since database search is disabled, do not include citations or sources in your response. |
|
""" |
|
|
|
msgs.append({"role": "system", "content": output_instructions}) |
|
msgs.append({"role": "user", "content": query}) |
|
|
|
|
|
response = doctor_agent(msgs) |
|
|
|
|
|
response = re.sub(r'^Direct Answer:\s*', '', response) |
|
|
|
|
|
response = re.sub(r'\n\s*\*\*\s*', '\n', response) |
|
|
|
|
|
explanation = None |
|
evidence = None |
|
follow_up_questions = "" |
|
|
|
if use_rag: |
|
|
|
linked_response, source_map = extract_and_link_sources(response, evidence_snippets) |
|
|
|
|
|
parsed_response = parse_doctor_response(linked_response) |
|
|
|
|
|
main_response = parsed_response["main_response"] |
|
|
|
|
|
reasoning = parsed_response.get("reasoning", []) |
|
if reasoning: |
|
if isinstance(reasoning, list): |
|
|
|
formatted_reasons = [] |
|
for r in reasoning: |
|
|
|
if r.strip().startswith("-") or r.strip().startswith("•"): |
|
formatted_reasons.append(r) |
|
else: |
|
formatted_reasons.append(f"- {r}") |
|
explanation = "\n".join(formatted_reasons) |
|
else: |
|
explanation = reasoning |
|
|
|
|
|
questions = parsed_response.get("follow_up_questions", []) |
|
if questions: |
|
if isinstance(questions, list): |
|
|
|
formatted_questions = [] |
|
for i, q in enumerate(questions): |
|
if q: |
|
|
|
if re.match(r'^\d+\.', q.strip()): |
|
formatted_questions.append(q) |
|
else: |
|
formatted_questions.append(f"{i+1}. {q}") |
|
follow_up_questions = "\n".join(formatted_questions) |
|
else: |
|
follow_up_questions = questions |
|
|
|
|
|
print(f"Follow-up questions generated: {follow_up_questions}") |
|
else: |
|
|
|
parsed_response = parse_doctor_response(response) |
|
main_response = parsed_response["main_response"] |
|
|
|
|
|
reasoning = parsed_response.get("reasoning", []) |
|
if reasoning: |
|
if isinstance(reasoning, list): |
|
|
|
formatted_reasons = [] |
|
for r in reasoning: |
|
if r: |
|
|
|
if r.strip().startswith("-") or r.strip().startswith("•"): |
|
formatted_reasons.append(r) |
|
else: |
|
formatted_reasons.append(f"- {r}") |
|
explanation = "\n".join(formatted_reasons) |
|
else: |
|
explanation = reasoning |
|
|
|
|
|
questions = parsed_response.get("follow_up_questions", []) |
|
if questions: |
|
if isinstance(questions, list): |
|
|
|
formatted_questions = [] |
|
for i, q in enumerate(questions): |
|
if q: |
|
|
|
if re.match(r'^\s*\d+\.\s*', q.strip()): |
|
formatted_questions.append(q) |
|
else: |
|
|
|
q_cleaned = re.sub(r'^\s*[-•*]\s*', '', q.strip()) |
|
formatted_questions.append(f"{i+1}. {q_cleaned}") |
|
follow_up_questions = "\n".join(formatted_questions) |
|
else: |
|
follow_up_questions = questions |
|
|
|
|
|
print(f"Follow-up questions generated: {follow_up_questions}") |
|
|
|
|
|
return main_response, explanation, follow_up_questions, evidence_snippets |
|
|
|
|
|
def run_consultation(use_rag=True): |
|
"""Run an interactive medical consultation""" |
|
history = [] |
|
print("\n===== MEDICAL AI ASSISTANT =====") |
|
print("Type 'exit' to end or 'next' for a new case.\n") |
|
|
|
if use_rag: |
|
print("Using medical evidence from: PubMed, Europe PMC, and other medical databases") |
|
print("Sources marked with 🔓 provide full text access\n") |
|
|
|
consultation_id = str(uuid.uuid4())[:8] |
|
print(f"Consultation ID: {consultation_id}") |
|
|
|
query = input("\nYou: ") |
|
while query.lower() != "exit": |
|
|
|
is_follow_up = len(history) > 0 |
|
|
|
|
|
if use_rag: |
|
print("\nSearching medical databases...") |
|
|
|
|
|
reply, explanation, follow_up_questions, evidence = orchestrator_chat(history, query, use_rag, is_follow_up) |
|
|
|
|
|
print("\n" + "=" * 30) |
|
print("AI RESPONSE") |
|
print("=" * 30) |
|
print(reply) |
|
|
|
|
|
print("\n" + "=" * 30) |
|
print("DETAILED EXPLANATION") |
|
print("=" * 30) |
|
|
|
if explanation and explanation.strip() and explanation.strip() != "="*50: |
|
print(explanation) |
|
else: |
|
print("No detailed explanation or sources were generated for this response.") |
|
|
|
|
|
if follow_up_questions and follow_up_questions.strip(): |
|
print("\n" + "=" * 30) |
|
print("FOLLOW-UP QUESTIONS") |
|
print("=" * 30) |
|
print(follow_up_questions) |
|
|
|
|
|
if evidence: |
|
print("\nLEGEND: 🔓 = Open Access (full text available)") |
|
|
|
|
|
next_action = input("\nFollow-up? (or 'next' for new case, 'exit' to end): ") |
|
|
|
if next_action.lower() == "exit": |
|
break |
|
elif next_action.lower() == "next": |
|
|
|
history = [] |
|
consultation_id = str(uuid.uuid4())[:8] |
|
print(f"\nNew Consultation ID: {consultation_id}") |
|
query = input("\nYou: ") |
|
else: |
|
|
|
query = next_action |
|
|
|
print("\nConsultation ended.") |
|
|
|
|
|
def save_consultation(history, consultation_id): |
|
"""Save the consultation history to a file""" |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filename = f"consultation_{consultation_id}_{timestamp}.json" |
|
|
|
with open(filename, 'w') as f: |
|
json.dump(history, f, indent=2) |
|
|
|
print(f"Consultation saved to {filename}") |
|
|
|
|
|
if __name__ == "__main__": |
|
print("\nInitializing Medical AI Assistant...") |
|
run_consultation(use_rag=True) |
|
|
|
|
|
def extract_medical_terms(query, max_terms=5): |
|
""" |
|
Extract key medical terms from a user query to improve search relevance. |
|
Uses pattern matching and medical term extraction to identify important medical concepts. |
|
|
|
Args: |
|
query (str): The user's original query text |
|
max_terms (int): Maximum number of terms to extract |
|
|
|
Returns: |
|
list: List of extracted medical terms |
|
""" |
|
|
|
cleaned_query = re.sub(r'^(hi|hello|hey|greetings|good morning|good afternoon|good evening)[,\.]?\s+', '', query.lower()) |
|
cleaned_query = re.sub(r"(i'?m|i am)\s+a\s+\d+[-\s]year[-\s]old", '', cleaned_query) |
|
cleaned_query = re.sub(r'(my name is|i am|i have been|i\'ve been|i was|i have|i\'ve had|i feel|i\'m feeling|i experienced)', '', cleaned_query) |
|
|
|
|
|
medical_patterns = [ |
|
|
|
r'(muscle weakness)', r'(fatigue)', r'(rash)', r'(pain)', r'(swelling)', |
|
r'(difficulty breathing|shortness of breath)', r'(fever)', r'(headache)', |
|
r'(nausea|vomiting)', r'(dizziness)', r'(numbness)', r'(tingling)', |
|
r'(cough)', r'(sore throat)', r'(runny nose)', r'(congestion)', |
|
r'(chest pain)', r'(back pain)', r'(joint pain)', r'(abdominal pain)', |
|
|
|
|
|
r'(diabetes)', r'(hypertension|high blood pressure)', r'(asthma)', |
|
r'(cancer)', r'(arthritis)', r'(depression)', r'(anxiety)', |
|
r'(heart disease|cardiovascular disease)', r'(stroke)', r'(alzheimer)', |
|
|
|
|
|
r'(heart)', r'(lung)', r'(kidney)', r'(liver)', r'(brain)', r'(skin)', |
|
r'(stomach)', r'(intestine)', r'(bone)', r'(muscle)', r'(nerve)', |
|
|
|
|
|
r'(chronic)', r'(acute)', r'(infection)', r'(inflammation)', r'(syndrome)', |
|
r'(disorder)', r'(disease)', r'(condition)', r'(symptom)', r'(diagnosis)', |
|
r'(treatment)', r'(medication)', r'(therapy)', r'(surgery)' |
|
] |
|
|
|
|
|
medical_terms = set() |
|
for pattern in medical_patterns: |
|
matches = re.findall(pattern, query.lower()) |
|
if matches: |
|
for match in matches: |
|
if isinstance(match, tuple): |
|
for term in match: |
|
if term and term.strip(): |
|
medical_terms.add(term.strip()) |
|
else: |
|
if match and match.strip(): |
|
medical_terms.add(match.strip()) |
|
|
|
|
|
if len(medical_terms) == 0: |
|
|
|
word_pattern = r'\b([a-zA-Z]+(?:\s+[a-zA-Z]+){0,2})\b' |
|
words = re.findall(word_pattern, cleaned_query) |
|
medical_terms = set(words[:max_terms]) |
|
|
|
|
|
result = list(medical_terms)[:max_terms] |
|
return result |
|
|
|
|
|
SEARCH_PUBMED_SCHEMA = { |
|
"name": "search_pubmed", |
|
"description": "Search PubMed for medical articles related to a given query, with proper citation formatting.", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"query": { |
|
"type": "string", |
|
"description": "The medical query to search for in PubMed" |
|
}, |
|
"retmax": { |
|
"type": "integer", |
|
"description": "Maximum number of results to return (default: 3)", |
|
"default": 3 |
|
}, |
|
"api_key": { |
|
"type": "string", |
|
"description": "Optional NCBI API key to increase rate limits (3 req/sec without key, 10 req/sec with key)", |
|
"default": None |
|
} |
|
}, |
|
"required": ["query"] |
|
} |
|
} |
|
|
|
|
|
EXAMPLE_MESSAGES = [ |
|
{"role": "system", "content": SYSTEM_PROMPT}, |
|
{"role": "user", "content": "I've been experiencing persistent headaches, fatigue, and dizziness for the past two weeks. What could be causing this?"} |
|
] |
|
|
|
|
|
EXAMPLE_FUNCTION_CALL = { |
|
"name": "search_pubmed", |
|
"arguments": { |
|
"query": "headaches, fatigue, dizziness", |
|
"retmax": 3 |
|
} |
|
} |
|
|
|
|
|
def enhance_medical_query(original_query): |
|
""" |
|
Uses LLM to enhance a medical query for better search results. |
|
This function is prepared for future use but is not currently enabled. |
|
|
|
Args: |
|
original_query (str): The original user query |
|
|
|
Returns: |
|
str: An enhanced query optimized for medical search |
|
""" |
|
try: |
|
|
|
system_prompt = """You are a medical search query optimizer. |
|
Your job is to take a user's medical question and rewrite it to be more effective for searching |
|
medical databases like PubMed and Europe PMC. |
|
|
|
Guidelines: |
|
1. Extract key medical terms, conditions, symptoms, and treatments |
|
2. Use proper medical terminology where possible |
|
3. Structure the query for optimal search performance |
|
4. Return ONLY the enhanced query without explanation |
|
5. Keep the query concise but comprehensive |
|
""" |
|
|
|
|
|
enhanced_response = openai.ChatCompletion.create( |
|
model="gpt-3.5-turbo", |
|
messages=[ |
|
{"role": "system", "content": system_prompt}, |
|
{"role": "user", "content": f"Optimize this medical query for database search: {original_query}"} |
|
], |
|
temperature=0.3, |
|
max_tokens=100 |
|
) |
|
|
|
enhanced_query = enhanced_response.choices[0].message['content'].strip() |
|
print(f"Enhanced query: {enhanced_query}") |
|
return enhanced_query |
|
|
|
except Exception as e: |
|
print(f"Error enhancing query: {str(e)}") |
|
|
|
return original_query |