chat / src /model.py
Dhruv-Ty's picture
Update src/model.py
d1bfe74 verified
import requests
import uuid
import json
import re
import xml.etree.ElementTree as ET
from bs4 import BeautifulSoup
from datetime import datetime
import os
import openai
import urllib.parse
from dotenv import load_dotenv
import time
# Load environment variables
load_dotenv()
# Initialize OpenAI API key
def get_openai_api_key():
"""Get OpenAI API key from environment variables"""
api_key = os.environ.get("OPENAI_API_KEY")
if not api_key:
raise ValueError("OPENAI_API_KEY environment variable is not set")
return api_key
# Set OpenAI API key
openai.api_key = get_openai_api_key()
# System prompts
SYSTEM_PROMPT = """You are an advanced clinical AI assistant designed to aid healthcare professionals.
Follow these guidelines in all responses:
1. **Answer Directly First**: Begin by providing your best answer based on available information. If information is limited, provide your assessment based on what is known, and indicate areas of uncertainty.
2. **Follow with Clarifying Questions**: After giving your initial assessment, include specific follow-up questions that would help refine your answer. These should be clearly labeled in a separate "Follow-up Questions:" section.
3. Professional tone: Maintain a clear, respectful, and professional tone appropriate for medical consultation.
4. Evidence-based practice: Base all responses on current medical evidence and guidelines.
5. Transparency: Clearly distinguish between established medical facts, clinical guidance, and areas of uncertainty.
6. Structured analysis: Present information in a clear, organized manner following clinical reasoning patterns.
7. Citation: Always cite specific sources for medical claims when available using the [PMID:123456] format, where 123456 is the actual PubMed ID number.
8. Limitations: Acknowledge the limits of AI medical advice and recommend in-person consultation when appropriate.
9. Comprehensive approach: Consider differential diagnoses and relevant contextual factors.
10. Patient-centered: Focus on clinically relevant information while maintaining respect for the patient.
For each consultation:
1. Provide an initial assessment based on available information (as per guideline 1).
2. Include specific follow-up questions (as per guideline 2).
3. Provide differential diagnosis with likelihood assessment.
4. Suggest appropriate next steps (testing, treatment, referral).
5. Include reasoning for your conclusions.
6. Cite medical literature or guidelines supporting your assessment using [PMID:123456].
IMPORTANT: Your primary duty is to support clinical decision-making, not replace clinical judgment.
"""
FOLLOW_UP_PROMPT = """Continue this medical consultation based on the previous discussion.
Consider the information already gathered and the tentative diagnosis/plan.
When responding to the follow-up:
1. Directly address the follow-up question with evidence-based information.
2. Reference relevant details from the prior conversation.
3. If additional information would be helpful, include specific follow-up questions in a clearly labeled "Follow-up Questions:" section.
4. Update recommendations if appropriate based on new information.
5. Maintain the same structured approach with transparent reasoning.
6. Cite additional medical literature or guidelines when relevant using [PMID:123456].
Remember that this is an ongoing consultation where continuity of care is important.
"""
# Function to extract source IDs and replace them with actual links
def extract_and_link_sources(text, evidence_snippets):
"""
Replace [PMID:123456] citation placeholders with actual links to PubMed articles.
Also handles DOI citations and other citation formats for compatibility.
Args:
text (str): Text containing citations
evidence_snippets (list): List of evidence snippets with metadata
Returns:
tuple: (text with citations replaced with links, map of source IDs to metadata)
"""
# Look for [PMID:123456] format first (preferred)
pmid_pattern = r'\[PMID:(\d+)\]'
# Look for [DOI:10.xxxx/yyyy] format for Europe PMC articles
doi_pattern = r'\[DOI:(10\.\d+\/[^\]]+)\]'
# Also look for older [source_id] format for compatibility
source_pattern = r'\[([\w\d:_\-\.+]+)\]'
# Find all PMID citations
pmid_matches = re.findall(pmid_pattern, text)
# Find all DOI citations
doi_matches = re.findall(doi_pattern, text)
# Find all other citation formats
source_matches = re.findall(source_pattern, text)
# Remove PMID and DOI matches from source matches to avoid duplicates
source_matches = [s for s in source_matches if not (s.startswith('PMID:') or s.startswith('DOI:'))]
# Create source map
source_map = {}
# Process PMID citations first
for pmid in pmid_matches:
for snippet in evidence_snippets:
# Check if this is a direct PMID match
if 'pmid' in snippet and snippet['pmid'] == pmid:
source_map[f"PMID:{pmid}"] = {
"id": snippet["id"],
"title": snippet["title"].strip(),
"url": snippet["url"],
"citation": snippet["citation"],
"pmid": pmid
}
break
# Also check the ID field which might contain PMID
elif snippet["id"] == f"PMID:{pmid}":
source_map[f"PMID:{pmid}"] = {
"id": snippet["id"],
"title": snippet["title"].strip(),
"url": snippet["url"],
"citation": snippet["citation"],
"pmid": pmid
}
break
# Process DOI citations
for doi in doi_matches:
for snippet in evidence_snippets:
# Check if this is a direct DOI match
if 'doi' in snippet and snippet['doi'] == doi:
source_map[f"DOI:{doi}"] = {
"id": snippet.get("id", f"DOI:{doi}"),
"title": snippet["title"].strip(),
"url": snippet["url"],
"citation": snippet["citation"],
"doi": doi
}
break
# Also check the ID field which might contain DOI
elif snippet.get("id") == f"DOI:{doi}":
source_map[f"DOI:{doi}"] = {
"id": snippet["id"],
"title": snippet["title"].strip(),
"url": snippet["url"],
"citation": snippet["citation"],
"doi": doi
}
break
# Process other citation formats for backward compatibility
for source_id_match in source_matches:
if source_id_match not in source_map and source_id_match != "source_id":
for snippet in evidence_snippets:
if source_id_match == snippet["id"]:
source_map[source_id_match] = {
"id": snippet["id"],
"title": snippet["title"].strip(),
"url": snippet["url"],
"citation": snippet["citation"],
"pmid": snippet.get("pmid", ""),
"doi": snippet.get("doi", "")
}
break
# Replace PMID citations with links
linked_text = text
for pmid_key in [f"PMID:{pmid}" for pmid in pmid_matches]:
if pmid_key in source_map:
source_data = source_map[pmid_key]
safe_key = re.escape(pmid_key)
pattern = f"\\[{safe_key}\\]"
# Create a replacement with title and URL
short_title = source_data['title'][:60] + "..." if len(source_data['title']) > 60 else source_data['title']
replacement = f"[{short_title}]({source_data['url']})"
linked_text = re.sub(f"\\[{safe_key}\\]", replacement, linked_text)
# Replace DOI citations with links
for doi_key in [f"DOI:{doi}" for doi in doi_matches]:
if doi_key in source_map:
source_data = source_map[doi_key]
safe_key = re.escape(doi_key)
pattern = f"\\[{safe_key}\\]"
# Create a replacement with title and URL
short_title = source_data['title'][:60] + "..." if len(source_data['title']) > 60 else source_data['title']
replacement = f"[{short_title}]({source_data['url']})"
linked_text = re.sub(f"\\[{safe_key}\\]", replacement, linked_text)
# Replace other citation formats
for source_id_key, source_data in source_map.items():
if not (source_id_key.startswith("PMID:") or source_id_key.startswith("DOI:")):
safe_id = re.escape(source_id_key)
pattern = f"\\[{safe_id}\\]"
replacement = f"[{source_data['title']}]({source_data['url']})"
linked_text = re.sub(pattern, replacement, linked_text)
# Handle generic [source_id] placeholder
if "source_id" in source_matches:
# Use the first snippet available if we have any
if evidence_snippets and "source_id" not in source_map:
snippet = evidence_snippets[0] # Use the first snippet
if snippet.get("url") and snippet.get("title"):
source_map["source_id"] = {
"id": snippet["id"],
"title": snippet["title"].strip(),
"url": snippet["url"],
"citation": snippet["citation"],
"pmid": snippet.get("pmid", ""),
"doi": snippet.get("doi", "")
}
replacement = f"[{snippet['title']}]({snippet['url']})"
linked_text = re.sub(r'\[source_id\]', replacement, linked_text)
# Final fallback for any remaining placeholders
linked_text = re.sub(r'\[source_id\]', "[Medical Reference]", linked_text)
linked_text = re.sub(r'\[PMID:(\d+)\]', r'[PubMed Article]', linked_text)
linked_text = re.sub(r'\[DOI:(10\.\d+\/[^\]]+)\]', r'[Europe PMC Article]', linked_text)
return linked_text, source_map
# Implement PubMed API integration for medical evidence retrieval
def fetch_from_pubmed_api(query, max_results=3, api_key=None):
"""Fetch medical evidence from PubMed API using E-utilities"""
results = []
# Clean up the query for better results
cleaned_query = re.sub(r'^(hi|hello|hey|greetings|good morning|good afternoon|good evening)[,\.]?\s+', '', query.lower())
cleaned_query = re.sub(r"(i'?m|i am)\s+a\s+\d+[-\s]year[-\s]old", '', cleaned_query)
cleaned_query = re.sub(r'(my name is|i am|i have been|i\'ve been|i was|i have|i\'ve had|i feel|i\'m feeling|i experienced)', '', cleaned_query)
# Try to extract key medical symptoms
symptom_patterns = [
r'(muscle weakness)', r'(fatigue)', r'(rash)', r'(pain)', r'(swelling)',
r'(difficulty breathing|shortness of breath)', r'(fever)', r'(headache)',
r'(nausea|vomiting)', r'(dizziness)', r'(numbness)', r'(tingling)'
]
medical_terms = []
for pattern in symptom_patterns:
matches = re.findall(pattern, query.lower())
if matches:
medical_terms.extend(matches)
# If we found medical terms, prioritize them in the search
if medical_terms:
search_query = " AND ".join(medical_terms)
# Add the complete cleaned query as a less weighted part
if cleaned_query:
search_query = f"({search_query}) OR ({cleaned_query})"
else:
# If no medical terms found, use the cleaned query
search_query = cleaned_query
# Encode the query for the API
encoded_query = urllib.parse.quote(search_query)
# Base URL for PubMed E-utilities
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
# Search parameters
search_params = {
"db": "pubmed",
"term": encoded_query,
"retmax": max_results,
"retmode": "json",
"sort": "relevance"
}
# Add API key if provided (increases rate limits)
if api_key:
search_params["api_key"] = api_key
try:
# First get article IDs
search_response = requests.get(f"{base_url}esearch.fcgi", params=search_params)
if search_response.status_code != 200:
return []
search_data = search_response.json()
if "esearchresult" in search_data and "idlist" in search_data["esearchresult"]:
ids = search_data["esearchresult"]["idlist"]
if ids:
# Fetch article details
fetch_params = {
"db": "pubmed",
"id": ",".join(ids),
"retmode": "xml"
}
if api_key:
fetch_params["api_key"] = api_key
fetch_response = requests.get(f"{base_url}efetch.fcgi", params=fetch_params)
if fetch_response.status_code != 200:
return []
try:
# Parse XML response
root = ET.fromstring(fetch_response.text)
for article in root.findall(".//PubmedArticle"):
try:
pmid = article.findtext(".//PMID")
title = article.findtext(".//ArticleTitle") or "No title available"
# Extract abstract
abstract_elements = article.findall(".//AbstractText")
abstract = " ".join([(elem.text or "") for elem in abstract_elements])
# Extract authors
authors = []
for author in article.findall(".//Author"):
last_name = author.findtext(".//LastName") or ""
initials = author.findtext(".//Initials") or ""
if last_name and initials:
authors.append(f"{last_name} {initials}")
author_str = ", ".join(authors[:3])
if len(authors) > 3:
author_str += " et al."
# Extract journal and date
journal = article.findtext(".//Journal/Title") or "Journal not specified"
year = article.findtext(".//PubDate/Year") or "N/A"
# Create citation
citation = f"{author_str}. ({year}). {title}. {journal}. PMID: {pmid}"
# Create direct access URL
url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/"
# Check if free full text is available via PMC
pmc_id = article.findtext(".//ArticleId[@IdType='pmc']")
has_free_text = bool(pmc_id) or article.findtext(".//PublicationStatus") == "epublish"
# If PMC ID is available, use that URL instead as it provides full text
if pmc_id:
url = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/"
results.append({
"id": f"pubmed:{pmid}",
"title": title,
"text": abstract[:800] + "..." if len(abstract) > 800 else abstract,
"citation": citation,
"url": url,
"source_type": "PubMed" + (" (Free Full Text)" if has_free_text else ""),
"is_open_access": has_free_text
})
except Exception:
continue
except ET.ParseError:
return []
return results
except Exception:
return []
def fetch_from_pmc_api(query, max_results=2, api_key=None):
"""Fetch free full text articles from PubMed Central (PMC)"""
results = []
# Clean up the query for better results
cleaned_query = re.sub(r'^(hi|hello|hey|greetings|good morning|good afternoon|good evening)[,\.]?\s+', '', query.lower())
cleaned_query = re.sub(r"(i'?m|i am)\s+a\s+\d+[-\s]year[-\s]old", '', cleaned_query)
cleaned_query = re.sub(r'(my name is|i am|i have been|i\'ve been|i was|i have|i\'ve had|i feel|i\'m feeling|i experienced)', '', cleaned_query)
# Encode for API
encoded_query = urllib.parse.quote(cleaned_query + " AND free full text[filter]")
# Base URL for E-utilities
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
# Search parameters - specifically targeting PMC for open access articles
search_params = {
"db": "pmc",
"term": encoded_query,
"retmax": max_results,
"retmode": "json",
"sort": "relevance"
}
# Add API key if provided
if api_key:
search_params["api_key"] = api_key
try:
# First get article IDs
search_response = requests.get(f"{base_url}esearch.fcgi", params=search_params)
if search_response.status_code != 200:
return []
search_data = search_response.json()
if "esearchresult" in search_data and "idlist" in search_data["esearchresult"]:
ids = search_data["esearchresult"]["idlist"]
if ids:
# Fetch article details
fetch_params = {
"db": "pmc",
"id": ",".join(ids),
"retmode": "xml"
}
if api_key:
fetch_params["api_key"] = api_key
fetch_response = requests.get(f"{base_url}efetch.fcgi", params=fetch_params)
if fetch_response.status_code != 200:
return []
try:
# Parse XML response for PMC articles
root = ET.fromstring(fetch_response.text)
for article in root.findall(".//article"):
try:
# Get PMC ID
article_id_elements = article.findall(".//article-id")
pmc_id = None
for id_elem in article_id_elements:
if id_elem.get("pub-id-type") == "pmc":
pmc_id = id_elem.text
if not pmc_id:
continue
# Get article title
title_elem = article.find(".//article-title")
title = "".join(title_elem.itertext()) if title_elem is not None else "No title available"
# Extract abstract
abstract_elem = article.find(".//abstract")
abstract = ""
if abstract_elem is not None:
for p in abstract_elem.findall(".//p"):
abstract += " ".join(p.itertext()) + " "
# If no abstract, try to get from first paragraphs
if not abstract:
body = article.find(".//body")
if body is not None:
paragraphs = body.findall(".//p")
abstract = " ".join([" ".join(p.itertext()) for p in paragraphs[:3]])
# Extract journal and date information
journal_elem = article.find(".//journal-title")
journal = "".join(journal_elem.itertext()) if journal_elem is not None else "PMC Journal"
year_elem = article.find(".//pub-date/year")
year = year_elem.text if year_elem is not None else "N/A"
# Extract authors
authors = []
for contrib in article.findall(".//contrib[@contrib-type='author']"):
surname = contrib.find(".//surname")
given_names = contrib.find(".//given-names")
if surname is not None and given_names is not None:
authors.append(f"{surname.text} {given_names.text[0] if given_names.text else ''}")
author_str = ", ".join(authors[:3])
if len(authors) > 3:
author_str += " et al."
# Create citation
citation = f"{author_str}. ({year}). {title}. {journal}. PMC{pmc_id}"
# Create URL for direct access to full text
url = f"https://www.ncbi.nlm.nih.gov/pmc/articles/PMC{pmc_id}/"
results.append({
"id": f"pmc:{pmc_id}",
"title": title,
"text": abstract[:800] + "..." if len(abstract) > 800 else abstract,
"citation": citation,
"url": url,
"source_type": "PubMed Central (Open Access)",
"is_open_access": True
})
except Exception:
continue
except ET.ParseError:
return []
return results
except Exception:
return []
def fetch_from_who_api(query, max_results=1):
"""Fetch information from WHO guidelines - using web scraping as alternative to API"""
try:
# WHO search URL (as they don't have a public API, we use web scraping)
search_url = f"https://www.who.int/publications/search-results?indexTerms={query.replace(' ', '+')}"
response = requests.get(search_url)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
results = []
# Extract article information
articles = soup.select('.search-results article')[:max_results]
for article in articles:
title_elem = article.select_one('h3')
title = title_elem.text.strip() if title_elem else "WHO Guideline"
desc_elem = article.select_one('.search-description')
description = desc_elem.text.strip() if desc_elem else ""
link_elem = article.select_one('a')
link = "https://www.who.int" + link_elem['href'] if link_elem and 'href' in link_elem.attrs else ""
date_elem = article.select_one('.search-meta')
date = date_elem.text.strip() if date_elem else "Date not specified"
# Generate a unique ID based on the URL
who_id = link.split('/')[-1] if link else f"who-{uuid.uuid4().hex[:8]}"
results.append({
"id": f"who:{who_id}",
"title": title,
"text": description[:800] + "..." if len(description) > 800 else description,
"citation": f"World Health Organization. ({date}). {title}.",
"url": link,
"source_type": "WHO Guidelines",
"is_open_access": True # WHO guidelines are freely accessible
})
return results
return []
except Exception:
return []
def fetch_from_core_api(query, max_results=2, api_key=None):
"""Fetch open access research papers from CORE API"""
results = []
# Clean up the query for better results
cleaned_query = re.sub(r'^(hi|hello|hey|greetings|good morning|good afternoon|good evening)[,\.]?\s+', '', query.lower())
cleaned_query = re.sub(r"(i'?m|i am)\s+a\s+\d+[-\s]year[-\s]old", '', cleaned_query)
cleaned_query = re.sub(r'(my name is|i am|i have been|i\'ve been|i was|i have|i\'ve had|i feel|i\'m feeling|i experienced)', '', cleaned_query)
# Extract medical terms for better search
symptom_patterns = [
r'(muscle weakness)', r'(fatigue)', r'(rash)', r'(pain)', r'(swelling)',
r'(difficulty breathing|shortness of breath)', r'(fever)', r'(headache)',
r'(nausea|vomiting)', r'(dizziness)', r'(numbness)', r'(tingling)'
]
medical_terms = []
for pattern in symptom_patterns:
matches = re.findall(pattern, query.lower())
if matches:
medical_terms.extend(matches)
# If we found medical terms, enhance the query
if medical_terms:
search_query = cleaned_query + " " + " ".join(medical_terms)
else:
search_query = cleaned_query
# Base URL for CORE API
base_url = "https://core.ac.uk/api/v3/search/works"
# Search parameters with medical focus
search_params = {
"q": search_query,
"limit": max_results * 2, # Get more results to filter for the best ones
"offset": 0,
"fields": ["title", "abstract", "authors", "year", "downloadUrl", "sourceFulltextUrl", "doi", "fullText"]
}
# Headers with API key
headers = {
"Authorization": f"Bearer {api_key}" if api_key else None,
"Content-Type": "application/json"
}
try:
response = requests.post(base_url, json=search_params, headers=headers)
if response.status_code != 200:
return []
data = response.json()
if "results" in data:
filtered_articles = []
# First pass: Collect and score all articles
for article in data["results"]:
try:
# Score articles for relevance (higher is better)
score = 0
# Has downloadUrl or sourceFulltextUrl (direct access)
if article.get("downloadUrl") or article.get("sourceFulltextUrl"):
score += 3
# Has full text in the response
if article.get("fullText"):
score += 2
# Has abstract
if article.get("abstract") and len(article.get("abstract")) > 100:
score += 1
# Medical relevance - check title and abstract for medical terms
for term in medical_terms:
if term in (article.get("title", "") + article.get("abstract", "")).lower():
score += 2
# Store with score for later filtering
filtered_articles.append((score, article))
except Exception:
continue
# Sort by score (highest first) and take the top results
filtered_articles.sort(reverse=True, key=lambda x: x[0])
top_articles = [article for score, article in filtered_articles[:max_results]]
# Second pass: Process the top articles in detail
for article in top_articles:
try:
# Extract article information
title = article.get("title", "No title available")
abstract = article.get("abstract", "")
# Try to use full text if available, otherwise use abstract
full_text = article.get("fullText", "")
text_content = ""
if full_text:
# If full text is available, use a summarized version (first part)
text_content = f"[FULL TEXT AVAILABLE] {full_text[:1500]}..."
else:
# Use abstract if no full text
text_content = abstract
authors = article.get("authors", [])
year = article.get("year", "N/A")
# Format authors
author_str = ", ".join([f"{author.get('name', '')}" for author in authors[:3]])
if len(authors) > 3:
author_str += " et al."
# Get the best available URL - prioritize direct download links
url = ""
download_available = False
if article.get("downloadUrl"):
url = article.get("downloadUrl")
download_available = True
elif article.get("sourceFulltextUrl"):
url = article.get("sourceFulltextUrl")
download_available = True
elif article.get("doi"):
url = f"https://doi.org/{article.get('doi')}"
# Create citation
citation = f"{author_str}. ({year}). {title}."
if article.get("doi"):
citation += f" DOI: {article['doi']}"
# Generate a unique ID
core_id = article.get("id", str(uuid.uuid4()))
# Create source type with clarity about data availability
source_type = "CORE Open Access"
if download_available:
source_type += " (Full Text Available)"
elif full_text:
source_type += " (Full Text Excerpt Included)"
else:
source_type += " (Abstract Only)"
results.append({
"id": f"core:{core_id}",
"title": title,
"text": text_content[:800] + "..." if len(text_content) > 800 else text_content,
"citation": citation,
"url": url,
"source_type": source_type,
"is_open_access": True # All CORE articles are open access
})
except Exception:
continue
return results
except Exception:
return []
# Enhanced PubMed search function
def enhanced_search_pubmed(query, retmax=3, api_key=None):
"""
Enhanced PubMed search using E-utilities API with improved parsing and error handling.
Args:
query (str): Search query string
retmax (int): Maximum number of results to return
api_key (str, optional): NCBI API key for higher rate limits
Returns:
list: List of article dictionaries with title, abstract, PMID, URL
"""
results = []
# Base URLs for PubMed E-utilities
base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/"
# Rate limiting - sleep to avoid hitting rate limits
# NCBI allows 3 requests/second without API key, 10 with key
time.sleep(0.33 if api_key is None else 0.1)
try:
# Step 1: Use ESearch to get PMIDs
search_params = {
"db": "pubmed",
"term": query,
"retmax": retmax,
"retmode": "json",
"sort": "relevance"
}
if api_key:
search_params["api_key"] = api_key
search_response = requests.get(f"{base_url}esearch.fcgi", params=search_params)
if search_response.status_code != 200:
print(f"PubMed search error: {search_response.status_code}")
return []
search_data = search_response.json()
if "esearchresult" not in search_data or "idlist" not in search_data["esearchresult"]:
print("No results found or invalid response format")
return []
pmids = search_data["esearchresult"]["idlist"]
if not pmids:
print("No PMIDs found for the query")
return []
# Rate limiting before second request
time.sleep(0.33 if api_key is None else 0.1)
# Step 2: Use EFetch to get article details with abstracts
fetch_params = {
"db": "pubmed",
"id": ",".join(pmids),
"retmode": "xml",
"rettype": "abstract"
}
if api_key:
fetch_params["api_key"] = api_key
fetch_response = requests.get(f"{base_url}efetch.fcgi", params=fetch_params)
if fetch_response.status_code != 200:
print(f"PubMed fetch error: {fetch_response.status_code}")
return []
# Step 3: Parse XML response
root = ET.fromstring(fetch_response.text)
for article in root.findall(".//PubmedArticle"):
try:
# Extract PMID
pmid = article.findtext(".//PMID")
if not pmid:
continue
# Extract title
title = article.findtext(".//ArticleTitle") or "No title available"
# Extract abstract sections with labels if available
abstract_sections = []
for abstract_text in article.findall(".//AbstractText"):
label = abstract_text.get("Label", "")
text = abstract_text.text or ""
if label and text:
abstract_sections.append(f"{label}: {text}")
elif text:
abstract_sections.append(text)
# If no structured abstract, try to get the plain abstract
if not abstract_sections:
abstract_text = article.findtext(".//Abstract/AbstractText")
if abstract_text:
abstract_sections.append(abstract_text)
# Join all abstract sections
abstract = " ".join(abstract_sections) or "Abstract not available"
# Extract authors
authors = []
for author in article.findall(".//Author"):
last_name = author.findtext(".//LastName") or ""
initials = author.findtext(".//Initials") or ""
if last_name and initials:
authors.append(f"{last_name} {initials}")
# Format authors for citation
author_text = ""
if authors:
if len(authors) == 1:
author_text = authors[0]
elif len(authors) == 2:
author_text = f"{authors[0]} & {authors[1]}"
else:
author_text = f"{authors[0]} et al."
# Extract journal and publication year
journal = article.findtext(".//Journal/Title") or "Unknown Journal"
year = article.findtext(".//PubDate/Year") or ""
# Create direct URL to PubMed article
url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/"
# Create citation
citation = f"{author_text}{' ' if author_text else ''}({year}). {title}. {journal}. PMID: {pmid}"
# Check for full text availability
pmc_id = article.findtext(".//ArticleId[@IdType='pmc']")
has_full_text = bool(pmc_id)
full_text_url = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/" if pmc_id else None
# Create result object
result = {
"pmid": pmid,
"title": title,
"abstract": abstract,
"authors": authors,
"journal": journal,
"year": year,
"url": url,
"full_text_url": full_text_url,
"has_full_text": has_full_text,
"citation": citation
}
results.append(result)
except Exception as e:
print(f"Error parsing article {pmid}: {str(e)}")
continue
return results
except Exception as e:
print(f"Error in PubMed search: {str(e)}")
return []
# Europe PMC search function
def search_europe_pmc(query, max_results=3, use_extracted_terms=False, extracted_terms=None):
"""
Search Europe PMC for biomedical articles, with a focus on retrieving full text when available.
Europe PMC provides more open access content than standard PubMed.
Args:
query (str): Search query string
max_results (int): Maximum number of results to return
use_extracted_terms (bool): Whether to use the extracted medical terms
extracted_terms (list): List of extracted medical terms from the query
Returns:
list: List of article dictionaries with title, abstract, PMID, URL, and full text URL
"""
results = []
# Rate limiting - Europe PMC allows 30 requests per minute per IP
time.sleep(2.0) # Conservative rate limiting
try:
# Europe PMC API base URL
base_url = "https://www.ebi.ac.uk/europepmc/webservices/rest/search"
# Construct search query based on parameters
search_query = query
if use_extracted_terms and extracted_terms and len(extracted_terms) > 0:
# Join terms with AND for better search
terms_query = " AND ".join(extracted_terms)
search_query = terms_query
print(f"Searching Europe PMC with extracted terms: {terms_query}")
# Search parameters - specifically looking for open access when possible
search_params = {
"query": f"({search_query}) AND OPEN_ACCESS:y", # Prioritize open access
"format": "json",
"pageSize": max_results,
"resultType": "core" # Get core metadata
}
print(f"Searching Europe PMC with query: {search_query}")
response = requests.get(base_url, params=search_params)
if response.status_code != 200:
print(f"Europe PMC search error: {response.status_code}")
# Try again without open access restriction if no results
search_params["query"] = search_query
response = requests.get(base_url, params=search_params)
if response.status_code != 200:
return []
data = response.json()
# Check if we have results
hit_count = data.get("hitCount", 0)
if hit_count == 0:
print("No Europe PMC results found")
# If we used extracted terms and got no results, try with the original query
if use_extracted_terms and extracted_terms:
print("Retrying Europe PMC search with original query")
return search_europe_pmc(query, max_results, False, None)
return []
# Process results
articles = data.get("resultList", {}).get("result", [])
for article in articles:
try:
# Extract basic metadata
pmid = article.get("pmid")
doi = article.get("doi")
title = article.get("title", "No title available")
abstract = article.get("abstractText", "Abstract not available")
journal = article.get("journalTitle", "Unknown Journal")
pub_year = article.get("pubYear", "")
# Check if it's open access
is_open_access = article.get("isOpenAccess") == "Y"
# Get full text URL if available
full_text_url = None
full_text_urls = article.get("fullTextUrlList", {}).get("fullTextUrl", [])
for url_entry in full_text_urls:
if url_entry.get("availability") == "Open access" or url_entry.get("documentStyle") == "pdf":
full_text_url = url_entry.get("url")
break
# If no specific full text URL found but we have a PMID, create Europe PMC link
if not full_text_url and pmid:
full_text_url = f"https://europepmc.org/article/MED/{pmid}"
elif not full_text_url and doi:
full_text_url = f"https://doi.org/{doi}"
# Get authors
author_list = article.get("authorList", {}).get("author", [])
authors = []
for author in author_list:
last_name = author.get("lastName", "")
initials = author.get("initials", "")
if last_name:
authors.append(f"{last_name} {initials}")
# Format author citation
author_text = ""
if authors:
if len(authors) == 1:
author_text = authors[0]
elif len(authors) == 2:
author_text = f"{authors[0]} & {authors[1]}"
else:
author_text = f"{authors[0]} et al."
# Create citation
citation = f"{author_text}{' ' if author_text else ''}({pub_year}). {title}. {journal}."
if pmid:
citation += f" PMID: {pmid}"
if doi:
citation += f" DOI: {doi}"
# Create a direct URL to access the article
url = full_text_url if full_text_url else (
f"https://europepmc.org/article/MED/{pmid}" if pmid else (
f"https://doi.org/{doi}" if doi else ""
)
)
# Create source type with OA indicator
source_type = "Europe PMC" + (" (Open Access)" if is_open_access else "")
# Format for compatibility with existing code
result = {
"pmid": pmid, # May be None for some articles
"doi": doi, # Alternative identifier
"title": title,
"abstract": abstract,
"authors": authors,
"journal": journal,
"year": pub_year,
"url": url,
"full_text_url": full_text_url,
"has_full_text": is_open_access or full_text_url is not None,
"citation": citation,
"source_type": source_type,
"is_open_access": is_open_access
}
results.append(result)
except Exception as e:
print(f"Error parsing Europe PMC article: {str(e)}")
continue
print(f"Found {len(results)} Europe PMC articles")
return results
except Exception as e:
print(f"Error in Europe PMC search: {str(e)}")
return []
# Enhanced RAG System with focused PubMed searches
def fetch_medical_evidence(query, max_results=3):
"""
Fetch medical evidence using a multi-source approach:
1. Search with extracted medical terms in PubMed
2. Search with extracted medical terms in Europe PMC
3. Search with the original query in PubMed
4. Search with the original query in Europe PMC
This provides better coverage and relevance from multiple sources.
Args:
query (str): The user's original query
max_results (int): Maximum number of results to return (now set to 3)
Returns:
list: Combined and deduplicated results from all searches
"""
# Define API key if available
pubmed_api_key = os.environ.get("PUBMED_API_KEY")
# Step 1: Extract medical terms from the query
medical_terms = extract_medical_terms(query)
has_medical_terms = len(medical_terms) > 0
# Initialize results containers
terms_pubmed_results = []
full_pubmed_results = []
terms_europepmc_results = []
full_europepmc_results = []
# Only use extracted terms if we found any
if has_medical_terms:
# Join terms with commas for PubMed
terms_query = ", ".join(medical_terms)
print(f"Searching PubMed with extracted terms: {terms_query}")
# Search PubMed with extracted terms
terms_pubmed_results = enhanced_search_pubmed(terms_query, retmax=2, api_key=pubmed_api_key)
# Search Europe PMC with extracted terms
print(f"Searching Europe PMC with extracted terms")
terms_europepmc_results = search_europe_pmc(query, max_results=2,
use_extracted_terms=True,
extracted_terms=medical_terms)
# Search with the full original query in both sources
print(f"Searching PubMed with full query")
full_pubmed_results = enhanced_search_pubmed(query, retmax=2, api_key=pubmed_api_key)
print(f"Searching Europe PMC with full query")
full_europepmc_results = search_europe_pmc(query, max_results=2)
# Step 3: Combine results, ensuring no duplicates by PMID or DOI
all_results = []
seen_pmids = set()
seen_dois = set()
# Process results in order of preference:
# 1. Terms search from PubMed (if available)
# 2. Terms search from Europe PMC (if available)
# 3. Full query from PubMed
# 4. Full query from Europe PMC
# Add results from terms search first (often more relevant)
for result in terms_pubmed_results:
pmid = result.get("pmid")
if pmid and pmid not in seen_pmids and len(all_results) < max_results:
seen_pmids.add(pmid)
# Format for compatibility with existing code
all_results.append({
"id": f"PMID:{pmid}",
"title": result["title"],
"text": result["abstract"],
"citation": result["citation"],
"url": result["url"],
"source_type": "PubMed" + (" (Full Text Available)" if result.get("has_full_text") else ""),
"is_open_access": result.get("has_full_text", False),
"pmid": pmid # Keep the original PMID for direct access
})
# Add Europe PMC terms results
for result in terms_europepmc_results:
# Some Europe PMC articles may not have a PMID, use DOI as fallback
pmid = result.get("pmid")
doi = result.get("doi")
# Skip if we've already seen this article via PMID or DOI
if (pmid and pmid in seen_pmids) or (doi and doi in seen_dois):
continue
# Skip if we've reached our max
if len(all_results) >= max_results:
break
# Add to seen IDs
if pmid:
seen_pmids.add(pmid)
if doi:
seen_dois.add(doi)
# Determine ID format (prefer PMID if available, fall back to DOI)
article_id = f"PMID:{pmid}" if pmid else (f"DOI:{doi}" if doi else str(uuid.uuid4())[:8])
# Add to results
all_results.append({
"id": article_id,
"title": result["title"],
"text": result["abstract"],
"citation": result["citation"],
"url": result["url"],
"source_type": result["source_type"],
"is_open_access": result["is_open_access"],
"pmid": pmid, # May be None
"doi": doi # May be None
})
# Add full query PubMed results if we still need more
for result in full_pubmed_results:
pmid = result.get("pmid")
if pmid and pmid not in seen_pmids and len(all_results) < max_results:
seen_pmids.add(pmid)
all_results.append({
"id": f"PMID:{pmid}",
"title": result["title"],
"text": result["abstract"],
"citation": result["citation"],
"url": result["url"],
"source_type": "PubMed" + (" (Full Text Available)" if result.get("has_full_text") else ""),
"is_open_access": result.get("has_full_text", False),
"pmid": pmid
})
# Add full query Europe PMC results if we still need more
for result in full_europepmc_results:
pmid = result.get("pmid")
doi = result.get("doi")
# Skip if we've already seen this article via PMID or DOI
if (pmid and pmid in seen_pmids) or (doi and doi in seen_dois):
continue
# Skip if we've reached our max
if len(all_results) >= max_results:
break
# Add to seen IDs
if pmid:
seen_pmids.add(pmid)
if doi:
seen_dois.add(doi)
# Determine ID format (prefer PMID if available, fall back to DOI)
article_id = f"PMID:{pmid}" if pmid else (f"DOI:{doi}" if doi else str(uuid.uuid4())[:8])
# Add to results
all_results.append({
"id": article_id,
"title": result["title"],
"text": result["abstract"],
"citation": result["citation"],
"url": result["url"],
"source_type": result["source_type"],
"is_open_access": result["is_open_access"],
"pmid": pmid, # May be None
"doi": doi # May be None
})
# Ensure we have exactly max_results results (or fewer if not enough found)
return all_results[:max_results]
# Function to parse doctor agent responses
def parse_doctor_response(response_text):
"""Parse the doctor agent's response into structured components"""
# First, remove "Direct Answer:" prefix that might appear at the beginning of the response
response_text = re.sub(r'^Direct Answer:\s*', '', response_text)
# Initialize structure
parsed = {
"main_response": response_text,
"diagnosis": "",
"treatment": "",
"reasoning": [],
"sources": [],
"follow_up_questions": []
}
# Try to extract diagnosis
diagnosis_match = re.search(r'(?i)diagnosis:?\s*(.*?)(?:\n\n|\n[A-Z]|\Z)', response_text, re.DOTALL)
if diagnosis_match:
parsed["diagnosis"] = diagnosis_match.group(1).strip()
# Try to extract treatment/recommendations
treatment_match = re.search(r'(?i)(treatment|recommendations|plan):?\s*(.*?)(?:\n\n|\n[A-Z]|\Z)', response_text, re.DOTALL)
if treatment_match:
parsed["treatment"] = treatment_match.group(2).strip()
# Try to extract follow-up questions
follow_up_match = re.search(r'(?i)(?:follow[ -]?up questions|additional questions|clarifying questions):?\s*(.*?)(?:\n\n|\n(?:reasoning|sources):|\Z)', response_text, re.DOTALL)
if follow_up_match:
follow_up_text = follow_up_match.group(1).strip()
# Remove any leading markdown formatting (like ** for bold)
follow_up_text = re.sub(r'^\*\*\s*', '', follow_up_text)
# Check if questions are formatted as a list
if '\n-' in follow_up_text or '\n•' in follow_up_text or '\n*' in follow_up_text:
# Split on any bullet point marker
bullet_items = re.split(r'\n\s*[-•*]\s*', follow_up_text)
# Remove any empty items and ensure first item is properly formatted
questions = []
for item in bullet_items:
if item.strip():
# Remove any markdown formatting from each item
cleaned_item = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', item.strip())
questions.append(cleaned_item)
parsed["follow_up_questions"] = questions
elif '\n1.' in follow_up_text or re.search(r'\n\d+\.', follow_up_text):
# Split on numbered items
numbered_items = re.split(r'\n\s*\d+\.\s*', follow_up_text)
# Clean each item and remove any empty ones
questions = []
for item in numbered_items:
if item.strip():
# Remove any markdown formatting
cleaned_item = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', item.strip())
questions.append(cleaned_item)
parsed["follow_up_questions"] = questions
else:
# Just use the raw text if no clear list format is detected
cleaned_text = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', follow_up_text)
parsed["follow_up_questions"] = [cleaned_text]
# Try to extract reasoning if present
reasoning_match = re.search(r'(?i)reasoning:?\s*(.*?)(?:\n\n\Z|\n(?:sources|follow)|\Z)', response_text, re.DOTALL)
if reasoning_match:
reasoning_text = reasoning_match.group(1).strip()
# Remove any leading markdown formatting (like ** for bold)
reasoning_text = re.sub(r'^\*\*\s*', '', reasoning_text)
# Split into bullet points if present
if '\n-' in reasoning_text:
# Split by newline + dash, but ensure we don't lose any content
reasoning_points = []
lines = reasoning_text.split('\n-')
# Process the first item which might not have a dash prefix
if lines and lines[0].strip():
# Clean up any leading/trailing asterisks
first_item = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', lines[0].strip())
if first_item:
reasoning_points.append(first_item)
# Process the rest of the items
for i in range(1, len(lines)):
if lines[i].strip():
# Clean up leading/trailing asterisks and dashes
cleaned_item = re.sub(r'^\s*[-*]*\s*|\s*\*\*\s*$', '', lines[i].strip())
if cleaned_item:
reasoning_points.append(cleaned_item)
parsed["reasoning"] = reasoning_points
else:
# If there are no bullet points, still clean up any markdown
cleaned_text = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', reasoning_text)
parsed["reasoning"] = [cleaned_text]
# Extract sources/references
sources_match = re.search(r'(?i)(sources|references):?\s*(.*?)(?:\n\n\Z|\Z)', response_text, re.DOTALL)
if sources_match:
sources_text = sources_match.group(2).strip()
# Split into individual sources
if '\n' in sources_text:
parsed["sources"] = [item.strip() for item in sources_text.split('\n') if item.strip()]
else:
parsed["sources"] = [sources_text]
# Clean up the main response - remove URLs, PMIDs and DOIs from the text if they're already in the sources section
if parsed["sources"]:
# Remove URL lines
main_response_lines = []
skip_lines = 0
for line in parsed["main_response"].split('\n'):
if skip_lines > 0:
skip_lines -= 1
continue
# Skip lines with just URLs
if re.match(r'^URL:\s*https?://', line.strip()):
skip_lines = 0
continue
# Skip lines with PMIDs or DOIs being displayed alone
if re.match(r'^(PMID|DOI):', line.strip()):
skip_lines = 0
continue
main_response_lines.append(line)
parsed["main_response"] = '\n'.join(main_response_lines)
# Extract citations in the text (format: [source_id])
citation_matches = re.findall(r'\[([\w\d:]+)\]', response_text)
for citation in citation_matches:
if citation not in parsed["sources"]:
parsed["sources"].append(citation)
return parsed
# Enhanced Doctor Agent call with structured output
def doctor_agent(messages):
"""Call the LLM to get a structured response using OpenAI API v0.28.1"""
try:
response = openai.ChatCompletion.create(
model="gpt-4o-mini",
messages=messages,
temperature=0.3
)
return response.choices[0].message['content']
except Exception as e:
return f"I'm sorry, there was an error processing your request. Please try again. Error: {str(e)}"
# Single orchestrator turn with enhanced reasoning and citation tracking
def orchestrator_chat(history, query, use_rag, is_follow_up=False):
"""Handle a single turn of conversation with the doctor agent"""
# Select appropriate system prompt based on whether this is a follow-up
if is_follow_up:
system = {"role": "system", "content": FOLLOW_UP_PROMPT}
else:
system = {"role": "system", "content": SYSTEM_PROMPT}
# Debug - Print prompt type
print(f"Using {'FOLLOW_UP_PROMPT' if is_follow_up else 'SYSTEM_PROMPT'} with query: {query}")
msgs = [system] + history
# Evidence gathering
evidence_snippets = []
if use_rag:
# Only fetch and format evidence if RAG is enabled
evidence_snippets = fetch_medical_evidence(query)
# Format evidence for the model
if evidence_snippets:
evidence_text = "MEDICAL EVIDENCE FROM MULTIPLE SOURCES:\n\n"
for i, snippet in enumerate(evidence_snippets):
# Format the evidence with clear PMID or DOI for citation
pmid = snippet.get("pmid", "")
doi = snippet.get("doi", "")
evidence_text += f"--- ARTICLE {i+1} ---\n"
# Include the appropriate identifiers
if pmid:
evidence_text += f"PMID: {pmid}\n"
if doi:
evidence_text += f"DOI: {doi}\n"
evidence_text += f"Title: {snippet['title']}\n"
evidence_text += f"Source: {snippet['source_type']}\n"
evidence_text += f"Content: {snippet['text']}\n"
evidence_text += f"Citation: {snippet['citation']}\n"
evidence_text += f"URL: {snippet['url']}\n\n"
# Enhanced instructions for better source utilization
evidence_text += """CITATION INSTRUCTIONS:
1. IMPORTANT: Provide a direct answer first before asking follow-up questions. Even with limited information, give your best assessment.
2. You MUST cite 2-3 different sources in your response. Use no more than 3 sources and no fewer than 2 sources.
3. When citing information from these articles, use the following formats:
• For PubMed articles: [PMID:123456] where 123456 is the actual PubMed ID
• For Europe PMC articles without PMID: [DOI:10.xxxx/yyyy] where 10.xxxx/yyyy is the DOI
Example: "Recent studies have shown improved outcomes with early intervention [PMID:34567890]."
Example: "Current guidelines recommend a multidisciplinary approach [DOI:10.1234/abcd]."
4. Focus on specific details from the abstracts - extract actual findings, statistics, or recommendations.
5. When multiple sources support a claim, cite all of them for stronger evidence.
Example: "This approach is supported by multiple studies [PMID:12345678][PMID:87654321]."
6. Include full citations in your Sources section with clickable URLs.
7. If the abstracts have conflicting information, acknowledge this and present both perspectives with citations.
8. Use the most recent sources when available, especially for treatment recommendations.
9. If full text is available (marked as "Open Access" or "Full Text Available"), prioritize information from those sources as they contain more complete data.
10. Europe PMC sources often provide more complete full text access, so give them equal consideration to PubMed sources.
11. After your direct answer, include specific follow-up questions in a clearly labeled "Follow-up Questions:" section.
"""
msgs.append({"role": "system", "content": evidence_text})
else:
# If no evidence was found, inform the model
no_evidence_msg = ("Note: No specific medical evidence was found for this query in PubMed or Europe PMC. "
"Please rely on your general medical knowledge and be sure to recommend "
"appropriate diagnostic steps and medical consultation.")
msgs.append({"role": "system", "content": no_evidence_msg})
# Add instructions for structured output
if use_rag:
output_instructions = """
Please structure your response clearly.
**Priority 1: Direct Answer First**
Begin by providing your best assessment based on the available information without using "Direct Answer:" as a heading. Just start your response directly with the answer. If the query lacks some details, offer your initial thoughts based on what is known, while acknowledging areas of uncertainty.
**Priority 2: Follow-up Questions**
After your direct answer, include a clearly labeled "Follow-up Questions:" section with specific questions that would help refine your assessment.
**Main Response Structure:**
1. A direct answer to the patient's concerns WITHOUT the heading "Direct Answer:".
2. If appropriate, a clear diagnosis or differential diagnosis with likelihood assessments.
3. Recommendations for a treatment plan or next steps.
4. IMPORTANT: You MUST cite between 2-3 different medical evidence sources using either:
• [PMID:123456] format for PubMed articles
• [DOI:10.xxxx/yyyy] format for Europe PMC articles without PMID
Use no more than 3 sources and no fewer than 2 sources.
**After your main response, ALWAYS include these sections:**
- **Follow-up Questions**: Specific numbered questions starting from 1, not bullets.
Do NOT start the first question with asterisks (**). Format each question properly with just a number.
- **Reasoning**: Provide a detailed, in-depth explanation of your clinical reasoning. Use bullet points for clarity. Aim for comprehensive insights that would be valuable to a healthcare professional.
Do NOT start the first point with asterisks (**). Format each bullet point properly.
- **Sources**: A list of all references cited in your main response (2-3 sources), formatted as:
- PMID: 12345678 - Author et al. (Year). Title. Journal.
URL: https://pubmed.ncbi.nlm.nih.gov/12345678/
- DOI: 10.xxxx/yyyy - Author et al. (Year). Title. Journal.
URL: https://doi.org/10.xxxx/yyyy
**IMPORTANT FORMATTING NOTES:**
1. Do NOT include technical information like URLs, PMIDs or DOIs in the main answer - these belong in the Sources section only.
2. For follow-up questions, use numbered format (1. 2. 3.) not bullet points.
3. Number the follow-up questions starting from 1, not from any other number.
4. NEVER use markdown formatting like ** (asterisks) at the beginning of any points, questions, or lines.
5. Make sure all bullet points and numbered items are clean, with no markdown formatting.
IMPORTANT: Only cite sources that were provided in the evidence. Do not fabricate references, PMIDs, or DOIs.
"""
else:
# Different instructions when RAG is disabled - no mention of sources or citations
output_instructions = """
Please structure your response clearly.
**Priority 1: Direct Answer First**
Begin by providing your best assessment based on the available information without using "Direct Answer:" as a heading. Just start your response directly with the answer. If the query lacks some details, offer your initial thoughts based on what is known, while acknowledging areas of uncertainty.
**Priority 2: Follow-up Questions**
After your direct answer, include a clearly labeled "Follow-up Questions:" section with specific questions that would help refine your assessment.
**Main Response Structure:**
1. A direct answer to the patient's concerns WITHOUT the heading "Direct Answer:".
2. If appropriate, a clear diagnosis or differential diagnosis.
3. Recommendations for a treatment plan or next steps.
**After your main response, ALWAYS include these sections:**
- **Follow-up Questions**: Specific questions to gather additional information, numbered starting from 1 (not bullet points).
Do NOT start the first question with asterisks (**). Format each question properly with just a number.
- **Reasoning**: Provide a detailed, in-depth explanation of your clinical reasoning. Use bullet points for clarity. Aim for comprehensive insights that would be valuable to a healthcare professional.
Do NOT start the first bullet point with asterisks (**). Format each point properly.
**IMPORTANT FORMATTING NOTES:**
1. For follow-up questions, use numbered format (1. 2. 3.) not bullet points.
2. Number the follow-up questions starting from 1, not from any other number.
3. NEVER use markdown formatting like ** (asterisks) at the beginning of any points, questions, or lines.
4. Make sure all bullet points and numbered items are clean, with no markdown formatting.
IMPORTANT: Since database search is disabled, do not include citations or sources in your response.
"""
msgs.append({"role": "system", "content": output_instructions})
msgs.append({"role": "user", "content": query})
# Get response from doctor agent
response = doctor_agent(msgs)
# Remove "Direct Answer:" prefix if it appears
response = re.sub(r'^Direct Answer:\s*', '', response)
# Remove any markdown formatting (** for bold) that might appear at the beginning of lines
response = re.sub(r'\n\s*\*\*\s*', '\n', response)
# Extract and process sources
explanation = None
evidence = None
follow_up_questions = ""
if use_rag:
# Process the response to replace source placeholders with actual links
linked_response, source_map = extract_and_link_sources(response, evidence_snippets)
# Parse the response
parsed_response = parse_doctor_response(linked_response)
# Get the main response
main_response = parsed_response["main_response"]
# Extract reasoning for display
reasoning = parsed_response.get("reasoning", [])
if reasoning:
if isinstance(reasoning, list):
# Check if each reasoning point already starts with a bullet point
formatted_reasons = []
for r in reasoning:
# If item already starts with bullet, don't add another
if r.strip().startswith("-") or r.strip().startswith("•"):
formatted_reasons.append(r)
else:
formatted_reasons.append(f"- {r}")
explanation = "\n".join(formatted_reasons)
else:
explanation = reasoning
# Extract follow-up questions
questions = parsed_response.get("follow_up_questions", [])
if questions:
if isinstance(questions, list):
# Format as a numbered list but check if already numbered
formatted_questions = []
for i, q in enumerate(questions):
if q:
# Check if question already starts with a number
if re.match(r'^\d+\.', q.strip()):
formatted_questions.append(q)
else:
formatted_questions.append(f"{i+1}. {q}")
follow_up_questions = "\n".join(formatted_questions)
else:
follow_up_questions = questions
# Debug: Print follow-up questions
print(f"Follow-up questions generated: {follow_up_questions}")
else:
# If RAG is disabled, just parse the response without source processing
parsed_response = parse_doctor_response(response)
main_response = parsed_response["main_response"]
# Extract reasoning
reasoning = parsed_response.get("reasoning", [])
if reasoning:
if isinstance(reasoning, list):
# Check if each reasoning point already starts with a bullet point
formatted_reasons = []
for r in reasoning:
if r: # Ensure 'r' is not None or empty before stripping
# If item already starts with bullet, don't add another
if r.strip().startswith("-") or r.strip().startswith("•"):
formatted_reasons.append(r)
else:
formatted_reasons.append(f"- {r}")
explanation = "\n".join(formatted_reasons)
else:
explanation = reasoning
# Extract follow-up questions
questions = parsed_response.get("follow_up_questions", [])
if questions:
if isinstance(questions, list):
# Format as a numbered list starting with 1, but check if already numbered
formatted_questions = []
for i, q in enumerate(questions):
if q: # Ensure 'q' is not None or empty
# Check if question already starts with a number
if re.match(r'^\s*\d+\.\s*', q.strip()):
formatted_questions.append(q)
else:
# Remove any leading bullet points before adding numbers
q_cleaned = re.sub(r'^\s*[-•*]\s*', '', q.strip())
formatted_questions.append(f"{i+1}. {q_cleaned}")
follow_up_questions = "\n".join(formatted_questions)
else:
follow_up_questions = questions
# Debug: Print follow-up questions
print(f"Follow-up questions generated: {follow_up_questions}")
# Return four values: main response, explanation, follow-up questions, and evidence
return main_response, explanation, follow_up_questions, evidence_snippets
# Enhanced interactive loop with better handling of consultations
def run_consultation(use_rag=True):
"""Run an interactive medical consultation"""
history = []
print("\n===== MEDICAL AI ASSISTANT =====")
print("Type 'exit' to end or 'next' for a new case.\n")
if use_rag:
print("Using medical evidence from: PubMed, Europe PMC, and other medical databases")
print("Sources marked with 🔓 provide full text access\n")
consultation_id = str(uuid.uuid4())[:8]
print(f"Consultation ID: {consultation_id}")
query = input("\nYou: ")
while query.lower() != "exit":
# Track if this is a follow-up question
is_follow_up = len(history) > 0
# Inform user that evidence is being fetched if RAG is enabled
if use_rag:
print("\nSearching medical databases...")
# Process query
reply, explanation, follow_up_questions, evidence = orchestrator_chat(history, query, use_rag, is_follow_up)
# Display the AI response
print("\n" + "=" * 30)
print("AI RESPONSE")
print("=" * 30)
print(reply)
# Always show explanation/reasoning
print("\n" + "=" * 30)
print("DETAILED EXPLANATION")
print("=" * 30)
# Ensure explanation is not empty before printing, or print a default message
if explanation and explanation.strip() and explanation.strip() != "="*50:
print(explanation)
else:
print("No detailed explanation or sources were generated for this response.")
# Display follow-up questions if available
if follow_up_questions and follow_up_questions.strip():
print("\n" + "=" * 30)
print("FOLLOW-UP QUESTIONS")
print("=" * 30)
print(follow_up_questions)
# Add Open Access Legend if evidence sources were found
if evidence:
print("\nLEGEND: 🔓 = Open Access (full text available)")
# Check if we need to continue with follow-up or start a new case
next_action = input("\nFollow-up? (or 'next' for new case, 'exit' to end): ")
if next_action.lower() == "exit":
break
elif next_action.lower() == "next":
# Start a new consultation
history = []
consultation_id = str(uuid.uuid4())[:8]
print(f"\nNew Consultation ID: {consultation_id}")
query = input("\nYou: ")
else:
# Continue with follow-up
query = next_action
print("\nConsultation ended.")
# Save consultation to file
def save_consultation(history, consultation_id):
"""Save the consultation history to a file"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"consultation_{consultation_id}_{timestamp}.json"
with open(filename, 'w') as f:
json.dump(history, f, indent=2)
print(f"Consultation saved to {filename}")
# Main entry point
if __name__ == "__main__":
print("\nInitializing Medical AI Assistant...")
run_consultation(use_rag=True)
# Extract medical terms from user query for better search
def extract_medical_terms(query, max_terms=5):
"""
Extract key medical terms from a user query to improve search relevance.
Uses pattern matching and medical term extraction to identify important medical concepts.
Args:
query (str): The user's original query text
max_terms (int): Maximum number of terms to extract
Returns:
list: List of extracted medical terms
"""
# Clean up query first
cleaned_query = re.sub(r'^(hi|hello|hey|greetings|good morning|good afternoon|good evening)[,\.]?\s+', '', query.lower())
cleaned_query = re.sub(r"(i'?m|i am)\s+a\s+\d+[-\s]year[-\s]old", '', cleaned_query)
cleaned_query = re.sub(r'(my name is|i am|i have been|i\'ve been|i was|i have|i\'ve had|i feel|i\'m feeling|i experienced)', '', cleaned_query)
# Common medical symptoms and conditions to look for
medical_patterns = [
# Symptoms
r'(muscle weakness)', r'(fatigue)', r'(rash)', r'(pain)', r'(swelling)',
r'(difficulty breathing|shortness of breath)', r'(fever)', r'(headache)',
r'(nausea|vomiting)', r'(dizziness)', r'(numbness)', r'(tingling)',
r'(cough)', r'(sore throat)', r'(runny nose)', r'(congestion)',
r'(chest pain)', r'(back pain)', r'(joint pain)', r'(abdominal pain)',
# Conditions
r'(diabetes)', r'(hypertension|high blood pressure)', r'(asthma)',
r'(cancer)', r'(arthritis)', r'(depression)', r'(anxiety)',
r'(heart disease|cardiovascular disease)', r'(stroke)', r'(alzheimer)',
# Body parts/systems
r'(heart)', r'(lung)', r'(kidney)', r'(liver)', r'(brain)', r'(skin)',
r'(stomach)', r'(intestine)', r'(bone)', r'(muscle)', r'(nerve)',
# Other medical terms
r'(chronic)', r'(acute)', r'(infection)', r'(inflammation)', r'(syndrome)',
r'(disorder)', r'(disease)', r'(condition)', r'(symptom)', r'(diagnosis)',
r'(treatment)', r'(medication)', r'(therapy)', r'(surgery)'
]
# Extract terms
medical_terms = set()
for pattern in medical_patterns:
matches = re.findall(pattern, query.lower())
if matches:
for match in matches:
if isinstance(match, tuple): # Some regex groups return tuples
for term in match:
if term and term.strip():
medical_terms.add(term.strip())
else:
if match and match.strip():
medical_terms.add(match.strip())
# If we didn't find specific medical terms, extract general noun phrases
if len(medical_terms) == 0:
# Look for phrases that might be medical conditions
word_pattern = r'\b([a-zA-Z]+(?:\s+[a-zA-Z]+){0,2})\b'
words = re.findall(word_pattern, cleaned_query)
medical_terms = set(words[:max_terms])
# Convert set to list and limit number of terms
result = list(medical_terms)[:max_terms]
return result
# JSON schema for the search_pubmed function for API documentation
SEARCH_PUBMED_SCHEMA = {
"name": "search_pubmed",
"description": "Search PubMed for medical articles related to a given query, with proper citation formatting.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The medical query to search for in PubMed"
},
"retmax": {
"type": "integer",
"description": "Maximum number of results to return (default: 3)",
"default": 3
},
"api_key": {
"type": "string",
"description": "Optional NCBI API key to increase rate limits (3 req/sec without key, 10 req/sec with key)",
"default": None
}
},
"required": ["query"]
}
}
# Example messages array showing usage
EXAMPLE_MESSAGES = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": "I've been experiencing persistent headaches, fatigue, and dizziness for the past two weeks. What could be causing this?"}
]
# Example function call that would be made by the model
EXAMPLE_FUNCTION_CALL = {
"name": "search_pubmed",
"arguments": {
"query": "headaches, fatigue, dizziness",
"retmax": 3
}
}
# Function to enhance medical queries using LLM
def enhance_medical_query(original_query):
"""
Uses LLM to enhance a medical query for better search results.
This function is prepared for future use but is not currently enabled.
Args:
original_query (str): The original user query
Returns:
str: An enhanced query optimized for medical search
"""
try:
# System prompt for query enhancement
system_prompt = """You are a medical search query optimizer.
Your job is to take a user's medical question and rewrite it to be more effective for searching
medical databases like PubMed and Europe PMC.
Guidelines:
1. Extract key medical terms, conditions, symptoms, and treatments
2. Use proper medical terminology where possible
3. Structure the query for optimal search performance
4. Return ONLY the enhanced query without explanation
5. Keep the query concise but comprehensive
"""
# Call OpenAI to enhance the query
enhanced_response = openai.ChatCompletion.create(
model="gpt-3.5-turbo", # Using a smaller model for speed and cost efficiency
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Optimize this medical query for database search: {original_query}"}
],
temperature=0.3,
max_tokens=100
)
enhanced_query = enhanced_response.choices[0].message['content'].strip()
print(f"Enhanced query: {enhanced_query}")
return enhanced_query
except Exception as e:
print(f"Error enhancing query: {str(e)}")
# Fall back to original query if there's an error
return original_query