import requests import uuid import json import re import xml.etree.ElementTree as ET from bs4 import BeautifulSoup from datetime import datetime import os import openai import urllib.parse from dotenv import load_dotenv import time # Load environment variables load_dotenv() # Initialize OpenAI API key def get_openai_api_key(): """Get OpenAI API key from environment variables""" api_key = os.environ.get("OPENAI_API_KEY") if not api_key: raise ValueError("OPENAI_API_KEY environment variable is not set") return api_key # Set OpenAI API key openai.api_key = get_openai_api_key() # System prompts SYSTEM_PROMPT = """You are an advanced clinical AI assistant designed to aid healthcare professionals. Follow these guidelines in all responses: 1. **Answer Directly First**: Begin by providing your best answer based on available information. If information is limited, provide your assessment based on what is known, and indicate areas of uncertainty. 2. **Follow with Clarifying Questions**: After giving your initial assessment, include specific follow-up questions that would help refine your answer. These should be clearly labeled in a separate "Follow-up Questions:" section. 3. Professional tone: Maintain a clear, respectful, and professional tone appropriate for medical consultation. 4. Evidence-based practice: Base all responses on current medical evidence and guidelines. 5. Transparency: Clearly distinguish between established medical facts, clinical guidance, and areas of uncertainty. 6. Structured analysis: Present information in a clear, organized manner following clinical reasoning patterns. 7. Citation: Always cite specific sources for medical claims when available using the [PMID:123456] format, where 123456 is the actual PubMed ID number. 8. Limitations: Acknowledge the limits of AI medical advice and recommend in-person consultation when appropriate. 9. Comprehensive approach: Consider differential diagnoses and relevant contextual factors. 10. Patient-centered: Focus on clinically relevant information while maintaining respect for the patient. For each consultation: 1. Provide an initial assessment based on available information (as per guideline 1). 2. Include specific follow-up questions (as per guideline 2). 3. Provide differential diagnosis with likelihood assessment. 4. Suggest appropriate next steps (testing, treatment, referral). 5. Include reasoning for your conclusions. 6. Cite medical literature or guidelines supporting your assessment using [PMID:123456]. IMPORTANT: Your primary duty is to support clinical decision-making, not replace clinical judgment. """ FOLLOW_UP_PROMPT = """Continue this medical consultation based on the previous discussion. Consider the information already gathered and the tentative diagnosis/plan. When responding to the follow-up: 1. Directly address the follow-up question with evidence-based information. 2. Reference relevant details from the prior conversation. 3. If additional information would be helpful, include specific follow-up questions in a clearly labeled "Follow-up Questions:" section. 4. Update recommendations if appropriate based on new information. 5. Maintain the same structured approach with transparent reasoning. 6. Cite additional medical literature or guidelines when relevant using [PMID:123456]. Remember that this is an ongoing consultation where continuity of care is important. """ # Function to extract source IDs and replace them with actual links def extract_and_link_sources(text, evidence_snippets): """ Replace [PMID:123456] citation placeholders with actual links to PubMed articles. Also handles DOI citations and other citation formats for compatibility. Args: text (str): Text containing citations evidence_snippets (list): List of evidence snippets with metadata Returns: tuple: (text with citations replaced with links, map of source IDs to metadata) """ # Look for [PMID:123456] format first (preferred) pmid_pattern = r'\[PMID:(\d+)\]' # Look for [DOI:10.xxxx/yyyy] format for Europe PMC articles doi_pattern = r'\[DOI:(10\.\d+\/[^\]]+)\]' # Also look for older [source_id] format for compatibility source_pattern = r'\[([\w\d:_\-\.+]+)\]' # Find all PMID citations pmid_matches = re.findall(pmid_pattern, text) # Find all DOI citations doi_matches = re.findall(doi_pattern, text) # Find all other citation formats source_matches = re.findall(source_pattern, text) # Remove PMID and DOI matches from source matches to avoid duplicates source_matches = [s for s in source_matches if not (s.startswith('PMID:') or s.startswith('DOI:'))] # Create source map source_map = {} # Process PMID citations first for pmid in pmid_matches: for snippet in evidence_snippets: # Check if this is a direct PMID match if 'pmid' in snippet and snippet['pmid'] == pmid: source_map[f"PMID:{pmid}"] = { "id": snippet["id"], "title": snippet["title"].strip(), "url": snippet["url"], "citation": snippet["citation"], "pmid": pmid } break # Also check the ID field which might contain PMID elif snippet["id"] == f"PMID:{pmid}": source_map[f"PMID:{pmid}"] = { "id": snippet["id"], "title": snippet["title"].strip(), "url": snippet["url"], "citation": snippet["citation"], "pmid": pmid } break # Process DOI citations for doi in doi_matches: for snippet in evidence_snippets: # Check if this is a direct DOI match if 'doi' in snippet and snippet['doi'] == doi: source_map[f"DOI:{doi}"] = { "id": snippet.get("id", f"DOI:{doi}"), "title": snippet["title"].strip(), "url": snippet["url"], "citation": snippet["citation"], "doi": doi } break # Also check the ID field which might contain DOI elif snippet.get("id") == f"DOI:{doi}": source_map[f"DOI:{doi}"] = { "id": snippet["id"], "title": snippet["title"].strip(), "url": snippet["url"], "citation": snippet["citation"], "doi": doi } break # Process other citation formats for backward compatibility for source_id_match in source_matches: if source_id_match not in source_map and source_id_match != "source_id": for snippet in evidence_snippets: if source_id_match == snippet["id"]: source_map[source_id_match] = { "id": snippet["id"], "title": snippet["title"].strip(), "url": snippet["url"], "citation": snippet["citation"], "pmid": snippet.get("pmid", ""), "doi": snippet.get("doi", "") } break # Replace PMID citations with links linked_text = text for pmid_key in [f"PMID:{pmid}" for pmid in pmid_matches]: if pmid_key in source_map: source_data = source_map[pmid_key] safe_key = re.escape(pmid_key) pattern = f"\\[{safe_key}\\]" # Create a replacement with title and URL short_title = source_data['title'][:60] + "..." if len(source_data['title']) > 60 else source_data['title'] replacement = f"[{short_title}]({source_data['url']})" linked_text = re.sub(f"\\[{safe_key}\\]", replacement, linked_text) # Replace DOI citations with links for doi_key in [f"DOI:{doi}" for doi in doi_matches]: if doi_key in source_map: source_data = source_map[doi_key] safe_key = re.escape(doi_key) pattern = f"\\[{safe_key}\\]" # Create a replacement with title and URL short_title = source_data['title'][:60] + "..." if len(source_data['title']) > 60 else source_data['title'] replacement = f"[{short_title}]({source_data['url']})" linked_text = re.sub(f"\\[{safe_key}\\]", replacement, linked_text) # Replace other citation formats for source_id_key, source_data in source_map.items(): if not (source_id_key.startswith("PMID:") or source_id_key.startswith("DOI:")): safe_id = re.escape(source_id_key) pattern = f"\\[{safe_id}\\]" replacement = f"[{source_data['title']}]({source_data['url']})" linked_text = re.sub(pattern, replacement, linked_text) # Handle generic [source_id] placeholder if "source_id" in source_matches: # Use the first snippet available if we have any if evidence_snippets and "source_id" not in source_map: snippet = evidence_snippets[0] # Use the first snippet if snippet.get("url") and snippet.get("title"): source_map["source_id"] = { "id": snippet["id"], "title": snippet["title"].strip(), "url": snippet["url"], "citation": snippet["citation"], "pmid": snippet.get("pmid", ""), "doi": snippet.get("doi", "") } replacement = f"[{snippet['title']}]({snippet['url']})" linked_text = re.sub(r'\[source_id\]', replacement, linked_text) # Final fallback for any remaining placeholders linked_text = re.sub(r'\[source_id\]', "[Medical Reference]", linked_text) linked_text = re.sub(r'\[PMID:(\d+)\]', r'[PubMed Article]', linked_text) linked_text = re.sub(r'\[DOI:(10\.\d+\/[^\]]+)\]', r'[Europe PMC Article]', linked_text) return linked_text, source_map # Implement PubMed API integration for medical evidence retrieval def fetch_from_pubmed_api(query, max_results=3, api_key=None): """Fetch medical evidence from PubMed API using E-utilities""" results = [] # Clean up the query for better results cleaned_query = re.sub(r'^(hi|hello|hey|greetings|good morning|good afternoon|good evening)[,\.]?\s+', '', query.lower()) cleaned_query = re.sub(r"(i'?m|i am)\s+a\s+\d+[-\s]year[-\s]old", '', cleaned_query) cleaned_query = re.sub(r'(my name is|i am|i have been|i\'ve been|i was|i have|i\'ve had|i feel|i\'m feeling|i experienced)', '', cleaned_query) # Try to extract key medical symptoms symptom_patterns = [ r'(muscle weakness)', r'(fatigue)', r'(rash)', r'(pain)', r'(swelling)', r'(difficulty breathing|shortness of breath)', r'(fever)', r'(headache)', r'(nausea|vomiting)', r'(dizziness)', r'(numbness)', r'(tingling)' ] medical_terms = [] for pattern in symptom_patterns: matches = re.findall(pattern, query.lower()) if matches: medical_terms.extend(matches) # If we found medical terms, prioritize them in the search if medical_terms: search_query = " AND ".join(medical_terms) # Add the complete cleaned query as a less weighted part if cleaned_query: search_query = f"({search_query}) OR ({cleaned_query})" else: # If no medical terms found, use the cleaned query search_query = cleaned_query # Encode the query for the API encoded_query = urllib.parse.quote(search_query) # Base URL for PubMed E-utilities base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/" # Search parameters search_params = { "db": "pubmed", "term": encoded_query, "retmax": max_results, "retmode": "json", "sort": "relevance" } # Add API key if provided (increases rate limits) if api_key: search_params["api_key"] = api_key try: # First get article IDs search_response = requests.get(f"{base_url}esearch.fcgi", params=search_params) if search_response.status_code != 200: return [] search_data = search_response.json() if "esearchresult" in search_data and "idlist" in search_data["esearchresult"]: ids = search_data["esearchresult"]["idlist"] if ids: # Fetch article details fetch_params = { "db": "pubmed", "id": ",".join(ids), "retmode": "xml" } if api_key: fetch_params["api_key"] = api_key fetch_response = requests.get(f"{base_url}efetch.fcgi", params=fetch_params) if fetch_response.status_code != 200: return [] try: # Parse XML response root = ET.fromstring(fetch_response.text) for article in root.findall(".//PubmedArticle"): try: pmid = article.findtext(".//PMID") title = article.findtext(".//ArticleTitle") or "No title available" # Extract abstract abstract_elements = article.findall(".//AbstractText") abstract = " ".join([(elem.text or "") for elem in abstract_elements]) # Extract authors authors = [] for author in article.findall(".//Author"): last_name = author.findtext(".//LastName") or "" initials = author.findtext(".//Initials") or "" if last_name and initials: authors.append(f"{last_name} {initials}") author_str = ", ".join(authors[:3]) if len(authors) > 3: author_str += " et al." # Extract journal and date journal = article.findtext(".//Journal/Title") or "Journal not specified" year = article.findtext(".//PubDate/Year") or "N/A" # Create citation citation = f"{author_str}. ({year}). {title}. {journal}. PMID: {pmid}" # Create direct access URL url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" # Check if free full text is available via PMC pmc_id = article.findtext(".//ArticleId[@IdType='pmc']") has_free_text = bool(pmc_id) or article.findtext(".//PublicationStatus") == "epublish" # If PMC ID is available, use that URL instead as it provides full text if pmc_id: url = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/" results.append({ "id": f"pubmed:{pmid}", "title": title, "text": abstract[:800] + "..." if len(abstract) > 800 else abstract, "citation": citation, "url": url, "source_type": "PubMed" + (" (Free Full Text)" if has_free_text else ""), "is_open_access": has_free_text }) except Exception: continue except ET.ParseError: return [] return results except Exception: return [] def fetch_from_pmc_api(query, max_results=2, api_key=None): """Fetch free full text articles from PubMed Central (PMC)""" results = [] # Clean up the query for better results cleaned_query = re.sub(r'^(hi|hello|hey|greetings|good morning|good afternoon|good evening)[,\.]?\s+', '', query.lower()) cleaned_query = re.sub(r"(i'?m|i am)\s+a\s+\d+[-\s]year[-\s]old", '', cleaned_query) cleaned_query = re.sub(r'(my name is|i am|i have been|i\'ve been|i was|i have|i\'ve had|i feel|i\'m feeling|i experienced)', '', cleaned_query) # Encode for API encoded_query = urllib.parse.quote(cleaned_query + " AND free full text[filter]") # Base URL for E-utilities base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/" # Search parameters - specifically targeting PMC for open access articles search_params = { "db": "pmc", "term": encoded_query, "retmax": max_results, "retmode": "json", "sort": "relevance" } # Add API key if provided if api_key: search_params["api_key"] = api_key try: # First get article IDs search_response = requests.get(f"{base_url}esearch.fcgi", params=search_params) if search_response.status_code != 200: return [] search_data = search_response.json() if "esearchresult" in search_data and "idlist" in search_data["esearchresult"]: ids = search_data["esearchresult"]["idlist"] if ids: # Fetch article details fetch_params = { "db": "pmc", "id": ",".join(ids), "retmode": "xml" } if api_key: fetch_params["api_key"] = api_key fetch_response = requests.get(f"{base_url}efetch.fcgi", params=fetch_params) if fetch_response.status_code != 200: return [] try: # Parse XML response for PMC articles root = ET.fromstring(fetch_response.text) for article in root.findall(".//article"): try: # Get PMC ID article_id_elements = article.findall(".//article-id") pmc_id = None for id_elem in article_id_elements: if id_elem.get("pub-id-type") == "pmc": pmc_id = id_elem.text if not pmc_id: continue # Get article title title_elem = article.find(".//article-title") title = "".join(title_elem.itertext()) if title_elem is not None else "No title available" # Extract abstract abstract_elem = article.find(".//abstract") abstract = "" if abstract_elem is not None: for p in abstract_elem.findall(".//p"): abstract += " ".join(p.itertext()) + " " # If no abstract, try to get from first paragraphs if not abstract: body = article.find(".//body") if body is not None: paragraphs = body.findall(".//p") abstract = " ".join([" ".join(p.itertext()) for p in paragraphs[:3]]) # Extract journal and date information journal_elem = article.find(".//journal-title") journal = "".join(journal_elem.itertext()) if journal_elem is not None else "PMC Journal" year_elem = article.find(".//pub-date/year") year = year_elem.text if year_elem is not None else "N/A" # Extract authors authors = [] for contrib in article.findall(".//contrib[@contrib-type='author']"): surname = contrib.find(".//surname") given_names = contrib.find(".//given-names") if surname is not None and given_names is not None: authors.append(f"{surname.text} {given_names.text[0] if given_names.text else ''}") author_str = ", ".join(authors[:3]) if len(authors) > 3: author_str += " et al." # Create citation citation = f"{author_str}. ({year}). {title}. {journal}. PMC{pmc_id}" # Create URL for direct access to full text url = f"https://www.ncbi.nlm.nih.gov/pmc/articles/PMC{pmc_id}/" results.append({ "id": f"pmc:{pmc_id}", "title": title, "text": abstract[:800] + "..." if len(abstract) > 800 else abstract, "citation": citation, "url": url, "source_type": "PubMed Central (Open Access)", "is_open_access": True }) except Exception: continue except ET.ParseError: return [] return results except Exception: return [] def fetch_from_who_api(query, max_results=1): """Fetch information from WHO guidelines - using web scraping as alternative to API""" try: # WHO search URL (as they don't have a public API, we use web scraping) search_url = f"https://www.who.int/publications/search-results?indexTerms={query.replace(' ', '+')}" response = requests.get(search_url) if response.status_code == 200: soup = BeautifulSoup(response.text, 'html.parser') results = [] # Extract article information articles = soup.select('.search-results article')[:max_results] for article in articles: title_elem = article.select_one('h3') title = title_elem.text.strip() if title_elem else "WHO Guideline" desc_elem = article.select_one('.search-description') description = desc_elem.text.strip() if desc_elem else "" link_elem = article.select_one('a') link = "https://www.who.int" + link_elem['href'] if link_elem and 'href' in link_elem.attrs else "" date_elem = article.select_one('.search-meta') date = date_elem.text.strip() if date_elem else "Date not specified" # Generate a unique ID based on the URL who_id = link.split('/')[-1] if link else f"who-{uuid.uuid4().hex[:8]}" results.append({ "id": f"who:{who_id}", "title": title, "text": description[:800] + "..." if len(description) > 800 else description, "citation": f"World Health Organization. ({date}). {title}.", "url": link, "source_type": "WHO Guidelines", "is_open_access": True # WHO guidelines are freely accessible }) return results return [] except Exception: return [] def fetch_from_core_api(query, max_results=2, api_key=None): """Fetch open access research papers from CORE API""" results = [] # Clean up the query for better results cleaned_query = re.sub(r'^(hi|hello|hey|greetings|good morning|good afternoon|good evening)[,\.]?\s+', '', query.lower()) cleaned_query = re.sub(r"(i'?m|i am)\s+a\s+\d+[-\s]year[-\s]old", '', cleaned_query) cleaned_query = re.sub(r'(my name is|i am|i have been|i\'ve been|i was|i have|i\'ve had|i feel|i\'m feeling|i experienced)', '', cleaned_query) # Extract medical terms for better search symptom_patterns = [ r'(muscle weakness)', r'(fatigue)', r'(rash)', r'(pain)', r'(swelling)', r'(difficulty breathing|shortness of breath)', r'(fever)', r'(headache)', r'(nausea|vomiting)', r'(dizziness)', r'(numbness)', r'(tingling)' ] medical_terms = [] for pattern in symptom_patterns: matches = re.findall(pattern, query.lower()) if matches: medical_terms.extend(matches) # If we found medical terms, enhance the query if medical_terms: search_query = cleaned_query + " " + " ".join(medical_terms) else: search_query = cleaned_query # Base URL for CORE API base_url = "https://core.ac.uk/api/v3/search/works" # Search parameters with medical focus search_params = { "q": search_query, "limit": max_results * 2, # Get more results to filter for the best ones "offset": 0, "fields": ["title", "abstract", "authors", "year", "downloadUrl", "sourceFulltextUrl", "doi", "fullText"] } # Headers with API key headers = { "Authorization": f"Bearer {api_key}" if api_key else None, "Content-Type": "application/json" } try: response = requests.post(base_url, json=search_params, headers=headers) if response.status_code != 200: return [] data = response.json() if "results" in data: filtered_articles = [] # First pass: Collect and score all articles for article in data["results"]: try: # Score articles for relevance (higher is better) score = 0 # Has downloadUrl or sourceFulltextUrl (direct access) if article.get("downloadUrl") or article.get("sourceFulltextUrl"): score += 3 # Has full text in the response if article.get("fullText"): score += 2 # Has abstract if article.get("abstract") and len(article.get("abstract")) > 100: score += 1 # Medical relevance - check title and abstract for medical terms for term in medical_terms: if term in (article.get("title", "") + article.get("abstract", "")).lower(): score += 2 # Store with score for later filtering filtered_articles.append((score, article)) except Exception: continue # Sort by score (highest first) and take the top results filtered_articles.sort(reverse=True, key=lambda x: x[0]) top_articles = [article for score, article in filtered_articles[:max_results]] # Second pass: Process the top articles in detail for article in top_articles: try: # Extract article information title = article.get("title", "No title available") abstract = article.get("abstract", "") # Try to use full text if available, otherwise use abstract full_text = article.get("fullText", "") text_content = "" if full_text: # If full text is available, use a summarized version (first part) text_content = f"[FULL TEXT AVAILABLE] {full_text[:1500]}..." else: # Use abstract if no full text text_content = abstract authors = article.get("authors", []) year = article.get("year", "N/A") # Format authors author_str = ", ".join([f"{author.get('name', '')}" for author in authors[:3]]) if len(authors) > 3: author_str += " et al." # Get the best available URL - prioritize direct download links url = "" download_available = False if article.get("downloadUrl"): url = article.get("downloadUrl") download_available = True elif article.get("sourceFulltextUrl"): url = article.get("sourceFulltextUrl") download_available = True elif article.get("doi"): url = f"https://doi.org/{article.get('doi')}" # Create citation citation = f"{author_str}. ({year}). {title}." if article.get("doi"): citation += f" DOI: {article['doi']}" # Generate a unique ID core_id = article.get("id", str(uuid.uuid4())) # Create source type with clarity about data availability source_type = "CORE Open Access" if download_available: source_type += " (Full Text Available)" elif full_text: source_type += " (Full Text Excerpt Included)" else: source_type += " (Abstract Only)" results.append({ "id": f"core:{core_id}", "title": title, "text": text_content[:800] + "..." if len(text_content) > 800 else text_content, "citation": citation, "url": url, "source_type": source_type, "is_open_access": True # All CORE articles are open access }) except Exception: continue return results except Exception: return [] # Enhanced PubMed search function def enhanced_search_pubmed(query, retmax=3, api_key=None): """ Enhanced PubMed search using E-utilities API with improved parsing and error handling. Args: query (str): Search query string retmax (int): Maximum number of results to return api_key (str, optional): NCBI API key for higher rate limits Returns: list: List of article dictionaries with title, abstract, PMID, URL """ results = [] # Base URLs for PubMed E-utilities base_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/" # Rate limiting - sleep to avoid hitting rate limits # NCBI allows 3 requests/second without API key, 10 with key time.sleep(0.33 if api_key is None else 0.1) try: # Step 1: Use ESearch to get PMIDs search_params = { "db": "pubmed", "term": query, "retmax": retmax, "retmode": "json", "sort": "relevance" } if api_key: search_params["api_key"] = api_key search_response = requests.get(f"{base_url}esearch.fcgi", params=search_params) if search_response.status_code != 200: print(f"PubMed search error: {search_response.status_code}") return [] search_data = search_response.json() if "esearchresult" not in search_data or "idlist" not in search_data["esearchresult"]: print("No results found or invalid response format") return [] pmids = search_data["esearchresult"]["idlist"] if not pmids: print("No PMIDs found for the query") return [] # Rate limiting before second request time.sleep(0.33 if api_key is None else 0.1) # Step 2: Use EFetch to get article details with abstracts fetch_params = { "db": "pubmed", "id": ",".join(pmids), "retmode": "xml", "rettype": "abstract" } if api_key: fetch_params["api_key"] = api_key fetch_response = requests.get(f"{base_url}efetch.fcgi", params=fetch_params) if fetch_response.status_code != 200: print(f"PubMed fetch error: {fetch_response.status_code}") return [] # Step 3: Parse XML response root = ET.fromstring(fetch_response.text) for article in root.findall(".//PubmedArticle"): try: # Extract PMID pmid = article.findtext(".//PMID") if not pmid: continue # Extract title title = article.findtext(".//ArticleTitle") or "No title available" # Extract abstract sections with labels if available abstract_sections = [] for abstract_text in article.findall(".//AbstractText"): label = abstract_text.get("Label", "") text = abstract_text.text or "" if label and text: abstract_sections.append(f"{label}: {text}") elif text: abstract_sections.append(text) # If no structured abstract, try to get the plain abstract if not abstract_sections: abstract_text = article.findtext(".//Abstract/AbstractText") if abstract_text: abstract_sections.append(abstract_text) # Join all abstract sections abstract = " ".join(abstract_sections) or "Abstract not available" # Extract authors authors = [] for author in article.findall(".//Author"): last_name = author.findtext(".//LastName") or "" initials = author.findtext(".//Initials") or "" if last_name and initials: authors.append(f"{last_name} {initials}") # Format authors for citation author_text = "" if authors: if len(authors) == 1: author_text = authors[0] elif len(authors) == 2: author_text = f"{authors[0]} & {authors[1]}" else: author_text = f"{authors[0]} et al." # Extract journal and publication year journal = article.findtext(".//Journal/Title") or "Unknown Journal" year = article.findtext(".//PubDate/Year") or "" # Create direct URL to PubMed article url = f"https://pubmed.ncbi.nlm.nih.gov/{pmid}/" # Create citation citation = f"{author_text}{' ' if author_text else ''}({year}). {title}. {journal}. PMID: {pmid}" # Check for full text availability pmc_id = article.findtext(".//ArticleId[@IdType='pmc']") has_full_text = bool(pmc_id) full_text_url = f"https://www.ncbi.nlm.nih.gov/pmc/articles/{pmc_id}/" if pmc_id else None # Create result object result = { "pmid": pmid, "title": title, "abstract": abstract, "authors": authors, "journal": journal, "year": year, "url": url, "full_text_url": full_text_url, "has_full_text": has_full_text, "citation": citation } results.append(result) except Exception as e: print(f"Error parsing article {pmid}: {str(e)}") continue return results except Exception as e: print(f"Error in PubMed search: {str(e)}") return [] # Europe PMC search function def search_europe_pmc(query, max_results=3, use_extracted_terms=False, extracted_terms=None): """ Search Europe PMC for biomedical articles, with a focus on retrieving full text when available. Europe PMC provides more open access content than standard PubMed. Args: query (str): Search query string max_results (int): Maximum number of results to return use_extracted_terms (bool): Whether to use the extracted medical terms extracted_terms (list): List of extracted medical terms from the query Returns: list: List of article dictionaries with title, abstract, PMID, URL, and full text URL """ results = [] # Rate limiting - Europe PMC allows 30 requests per minute per IP time.sleep(2.0) # Conservative rate limiting try: # Europe PMC API base URL base_url = "https://www.ebi.ac.uk/europepmc/webservices/rest/search" # Construct search query based on parameters search_query = query if use_extracted_terms and extracted_terms and len(extracted_terms) > 0: # Join terms with AND for better search terms_query = " AND ".join(extracted_terms) search_query = terms_query print(f"Searching Europe PMC with extracted terms: {terms_query}") # Search parameters - specifically looking for open access when possible search_params = { "query": f"({search_query}) AND OPEN_ACCESS:y", # Prioritize open access "format": "json", "pageSize": max_results, "resultType": "core" # Get core metadata } print(f"Searching Europe PMC with query: {search_query}") response = requests.get(base_url, params=search_params) if response.status_code != 200: print(f"Europe PMC search error: {response.status_code}") # Try again without open access restriction if no results search_params["query"] = search_query response = requests.get(base_url, params=search_params) if response.status_code != 200: return [] data = response.json() # Check if we have results hit_count = data.get("hitCount", 0) if hit_count == 0: print("No Europe PMC results found") # If we used extracted terms and got no results, try with the original query if use_extracted_terms and extracted_terms: print("Retrying Europe PMC search with original query") return search_europe_pmc(query, max_results, False, None) return [] # Process results articles = data.get("resultList", {}).get("result", []) for article in articles: try: # Extract basic metadata pmid = article.get("pmid") doi = article.get("doi") title = article.get("title", "No title available") abstract = article.get("abstractText", "Abstract not available") journal = article.get("journalTitle", "Unknown Journal") pub_year = article.get("pubYear", "") # Check if it's open access is_open_access = article.get("isOpenAccess") == "Y" # Get full text URL if available full_text_url = None full_text_urls = article.get("fullTextUrlList", {}).get("fullTextUrl", []) for url_entry in full_text_urls: if url_entry.get("availability") == "Open access" or url_entry.get("documentStyle") == "pdf": full_text_url = url_entry.get("url") break # If no specific full text URL found but we have a PMID, create Europe PMC link if not full_text_url and pmid: full_text_url = f"https://europepmc.org/article/MED/{pmid}" elif not full_text_url and doi: full_text_url = f"https://doi.org/{doi}" # Get authors author_list = article.get("authorList", {}).get("author", []) authors = [] for author in author_list: last_name = author.get("lastName", "") initials = author.get("initials", "") if last_name: authors.append(f"{last_name} {initials}") # Format author citation author_text = "" if authors: if len(authors) == 1: author_text = authors[0] elif len(authors) == 2: author_text = f"{authors[0]} & {authors[1]}" else: author_text = f"{authors[0]} et al." # Create citation citation = f"{author_text}{' ' if author_text else ''}({pub_year}). {title}. {journal}." if pmid: citation += f" PMID: {pmid}" if doi: citation += f" DOI: {doi}" # Create a direct URL to access the article url = full_text_url if full_text_url else ( f"https://europepmc.org/article/MED/{pmid}" if pmid else ( f"https://doi.org/{doi}" if doi else "" ) ) # Create source type with OA indicator source_type = "Europe PMC" + (" (Open Access)" if is_open_access else "") # Format for compatibility with existing code result = { "pmid": pmid, # May be None for some articles "doi": doi, # Alternative identifier "title": title, "abstract": abstract, "authors": authors, "journal": journal, "year": pub_year, "url": url, "full_text_url": full_text_url, "has_full_text": is_open_access or full_text_url is not None, "citation": citation, "source_type": source_type, "is_open_access": is_open_access } results.append(result) except Exception as e: print(f"Error parsing Europe PMC article: {str(e)}") continue print(f"Found {len(results)} Europe PMC articles") return results except Exception as e: print(f"Error in Europe PMC search: {str(e)}") return [] # Enhanced RAG System with focused PubMed searches def fetch_medical_evidence(query, max_results=3): """ Fetch medical evidence using a multi-source approach: 1. Search with extracted medical terms in PubMed 2. Search with extracted medical terms in Europe PMC 3. Search with the original query in PubMed 4. Search with the original query in Europe PMC This provides better coverage and relevance from multiple sources. Args: query (str): The user's original query max_results (int): Maximum number of results to return (now set to 3) Returns: list: Combined and deduplicated results from all searches """ # Define API key if available pubmed_api_key = os.environ.get("PUBMED_API_KEY") # Step 1: Extract medical terms from the query medical_terms = extract_medical_terms(query) has_medical_terms = len(medical_terms) > 0 # Initialize results containers terms_pubmed_results = [] full_pubmed_results = [] terms_europepmc_results = [] full_europepmc_results = [] # Only use extracted terms if we found any if has_medical_terms: # Join terms with commas for PubMed terms_query = ", ".join(medical_terms) print(f"Searching PubMed with extracted terms: {terms_query}") # Search PubMed with extracted terms terms_pubmed_results = enhanced_search_pubmed(terms_query, retmax=2, api_key=pubmed_api_key) # Search Europe PMC with extracted terms print(f"Searching Europe PMC with extracted terms") terms_europepmc_results = search_europe_pmc(query, max_results=2, use_extracted_terms=True, extracted_terms=medical_terms) # Search with the full original query in both sources print(f"Searching PubMed with full query") full_pubmed_results = enhanced_search_pubmed(query, retmax=2, api_key=pubmed_api_key) print(f"Searching Europe PMC with full query") full_europepmc_results = search_europe_pmc(query, max_results=2) # Step 3: Combine results, ensuring no duplicates by PMID or DOI all_results = [] seen_pmids = set() seen_dois = set() # Process results in order of preference: # 1. Terms search from PubMed (if available) # 2. Terms search from Europe PMC (if available) # 3. Full query from PubMed # 4. Full query from Europe PMC # Add results from terms search first (often more relevant) for result in terms_pubmed_results: pmid = result.get("pmid") if pmid and pmid not in seen_pmids and len(all_results) < max_results: seen_pmids.add(pmid) # Format for compatibility with existing code all_results.append({ "id": f"PMID:{pmid}", "title": result["title"], "text": result["abstract"], "citation": result["citation"], "url": result["url"], "source_type": "PubMed" + (" (Full Text Available)" if result.get("has_full_text") else ""), "is_open_access": result.get("has_full_text", False), "pmid": pmid # Keep the original PMID for direct access }) # Add Europe PMC terms results for result in terms_europepmc_results: # Some Europe PMC articles may not have a PMID, use DOI as fallback pmid = result.get("pmid") doi = result.get("doi") # Skip if we've already seen this article via PMID or DOI if (pmid and pmid in seen_pmids) or (doi and doi in seen_dois): continue # Skip if we've reached our max if len(all_results) >= max_results: break # Add to seen IDs if pmid: seen_pmids.add(pmid) if doi: seen_dois.add(doi) # Determine ID format (prefer PMID if available, fall back to DOI) article_id = f"PMID:{pmid}" if pmid else (f"DOI:{doi}" if doi else str(uuid.uuid4())[:8]) # Add to results all_results.append({ "id": article_id, "title": result["title"], "text": result["abstract"], "citation": result["citation"], "url": result["url"], "source_type": result["source_type"], "is_open_access": result["is_open_access"], "pmid": pmid, # May be None "doi": doi # May be None }) # Add full query PubMed results if we still need more for result in full_pubmed_results: pmid = result.get("pmid") if pmid and pmid not in seen_pmids and len(all_results) < max_results: seen_pmids.add(pmid) all_results.append({ "id": f"PMID:{pmid}", "title": result["title"], "text": result["abstract"], "citation": result["citation"], "url": result["url"], "source_type": "PubMed" + (" (Full Text Available)" if result.get("has_full_text") else ""), "is_open_access": result.get("has_full_text", False), "pmid": pmid }) # Add full query Europe PMC results if we still need more for result in full_europepmc_results: pmid = result.get("pmid") doi = result.get("doi") # Skip if we've already seen this article via PMID or DOI if (pmid and pmid in seen_pmids) or (doi and doi in seen_dois): continue # Skip if we've reached our max if len(all_results) >= max_results: break # Add to seen IDs if pmid: seen_pmids.add(pmid) if doi: seen_dois.add(doi) # Determine ID format (prefer PMID if available, fall back to DOI) article_id = f"PMID:{pmid}" if pmid else (f"DOI:{doi}" if doi else str(uuid.uuid4())[:8]) # Add to results all_results.append({ "id": article_id, "title": result["title"], "text": result["abstract"], "citation": result["citation"], "url": result["url"], "source_type": result["source_type"], "is_open_access": result["is_open_access"], "pmid": pmid, # May be None "doi": doi # May be None }) # Ensure we have exactly max_results results (or fewer if not enough found) return all_results[:max_results] # Function to parse doctor agent responses def parse_doctor_response(response_text): """Parse the doctor agent's response into structured components""" # First, remove "Direct Answer:" prefix that might appear at the beginning of the response response_text = re.sub(r'^Direct Answer:\s*', '', response_text) # Initialize structure parsed = { "main_response": response_text, "diagnosis": "", "treatment": "", "reasoning": [], "sources": [], "follow_up_questions": [] } # Try to extract diagnosis diagnosis_match = re.search(r'(?i)diagnosis:?\s*(.*?)(?:\n\n|\n[A-Z]|\Z)', response_text, re.DOTALL) if diagnosis_match: parsed["diagnosis"] = diagnosis_match.group(1).strip() # Try to extract treatment/recommendations treatment_match = re.search(r'(?i)(treatment|recommendations|plan):?\s*(.*?)(?:\n\n|\n[A-Z]|\Z)', response_text, re.DOTALL) if treatment_match: parsed["treatment"] = treatment_match.group(2).strip() # Try to extract follow-up questions follow_up_match = re.search(r'(?i)(?:follow[ -]?up questions|additional questions|clarifying questions):?\s*(.*?)(?:\n\n|\n(?:reasoning|sources):|\Z)', response_text, re.DOTALL) if follow_up_match: follow_up_text = follow_up_match.group(1).strip() # Remove any leading markdown formatting (like ** for bold) follow_up_text = re.sub(r'^\*\*\s*', '', follow_up_text) # Check if questions are formatted as a list if '\n-' in follow_up_text or '\n•' in follow_up_text or '\n*' in follow_up_text: # Split on any bullet point marker bullet_items = re.split(r'\n\s*[-•*]\s*', follow_up_text) # Remove any empty items and ensure first item is properly formatted questions = [] for item in bullet_items: if item.strip(): # Remove any markdown formatting from each item cleaned_item = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', item.strip()) questions.append(cleaned_item) parsed["follow_up_questions"] = questions elif '\n1.' in follow_up_text or re.search(r'\n\d+\.', follow_up_text): # Split on numbered items numbered_items = re.split(r'\n\s*\d+\.\s*', follow_up_text) # Clean each item and remove any empty ones questions = [] for item in numbered_items: if item.strip(): # Remove any markdown formatting cleaned_item = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', item.strip()) questions.append(cleaned_item) parsed["follow_up_questions"] = questions else: # Just use the raw text if no clear list format is detected cleaned_text = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', follow_up_text) parsed["follow_up_questions"] = [cleaned_text] # Try to extract reasoning if present reasoning_match = re.search(r'(?i)reasoning:?\s*(.*?)(?:\n\n\Z|\n(?:sources|follow)|\Z)', response_text, re.DOTALL) if reasoning_match: reasoning_text = reasoning_match.group(1).strip() # Remove any leading markdown formatting (like ** for bold) reasoning_text = re.sub(r'^\*\*\s*', '', reasoning_text) # Split into bullet points if present if '\n-' in reasoning_text: # Split by newline + dash, but ensure we don't lose any content reasoning_points = [] lines = reasoning_text.split('\n-') # Process the first item which might not have a dash prefix if lines and lines[0].strip(): # Clean up any leading/trailing asterisks first_item = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', lines[0].strip()) if first_item: reasoning_points.append(first_item) # Process the rest of the items for i in range(1, len(lines)): if lines[i].strip(): # Clean up leading/trailing asterisks and dashes cleaned_item = re.sub(r'^\s*[-*]*\s*|\s*\*\*\s*$', '', lines[i].strip()) if cleaned_item: reasoning_points.append(cleaned_item) parsed["reasoning"] = reasoning_points else: # If there are no bullet points, still clean up any markdown cleaned_text = re.sub(r'^\s*\*\*\s*|\s*\*\*\s*$', '', reasoning_text) parsed["reasoning"] = [cleaned_text] # Extract sources/references sources_match = re.search(r'(?i)(sources|references):?\s*(.*?)(?:\n\n\Z|\Z)', response_text, re.DOTALL) if sources_match: sources_text = sources_match.group(2).strip() # Split into individual sources if '\n' in sources_text: parsed["sources"] = [item.strip() for item in sources_text.split('\n') if item.strip()] else: parsed["sources"] = [sources_text] # Clean up the main response - remove URLs, PMIDs and DOIs from the text if they're already in the sources section if parsed["sources"]: # Remove URL lines main_response_lines = [] skip_lines = 0 for line in parsed["main_response"].split('\n'): if skip_lines > 0: skip_lines -= 1 continue # Skip lines with just URLs if re.match(r'^URL:\s*https?://', line.strip()): skip_lines = 0 continue # Skip lines with PMIDs or DOIs being displayed alone if re.match(r'^(PMID|DOI):', line.strip()): skip_lines = 0 continue main_response_lines.append(line) parsed["main_response"] = '\n'.join(main_response_lines) # Extract citations in the text (format: [source_id]) citation_matches = re.findall(r'\[([\w\d:]+)\]', response_text) for citation in citation_matches: if citation not in parsed["sources"]: parsed["sources"].append(citation) return parsed # Enhanced Doctor Agent call with structured output def doctor_agent(messages): """Call the LLM to get a structured response using OpenAI API v0.28.1""" try: response = openai.ChatCompletion.create( model="gpt-4o-mini", messages=messages, temperature=0.3 ) return response.choices[0].message['content'] except Exception as e: return f"I'm sorry, there was an error processing your request. Please try again. Error: {str(e)}" # Single orchestrator turn with enhanced reasoning and citation tracking def orchestrator_chat(history, query, use_rag, is_follow_up=False): """Handle a single turn of conversation with the doctor agent""" # Select appropriate system prompt based on whether this is a follow-up if is_follow_up: system = {"role": "system", "content": FOLLOW_UP_PROMPT} else: system = {"role": "system", "content": SYSTEM_PROMPT} # Debug - Print prompt type print(f"Using {'FOLLOW_UP_PROMPT' if is_follow_up else 'SYSTEM_PROMPT'} with query: {query}") msgs = [system] + history # Evidence gathering evidence_snippets = [] if use_rag: # Only fetch and format evidence if RAG is enabled evidence_snippets = fetch_medical_evidence(query) # Format evidence for the model if evidence_snippets: evidence_text = "MEDICAL EVIDENCE FROM MULTIPLE SOURCES:\n\n" for i, snippet in enumerate(evidence_snippets): # Format the evidence with clear PMID or DOI for citation pmid = snippet.get("pmid", "") doi = snippet.get("doi", "") evidence_text += f"--- ARTICLE {i+1} ---\n" # Include the appropriate identifiers if pmid: evidence_text += f"PMID: {pmid}\n" if doi: evidence_text += f"DOI: {doi}\n" evidence_text += f"Title: {snippet['title']}\n" evidence_text += f"Source: {snippet['source_type']}\n" evidence_text += f"Content: {snippet['text']}\n" evidence_text += f"Citation: {snippet['citation']}\n" evidence_text += f"URL: {snippet['url']}\n\n" # Enhanced instructions for better source utilization evidence_text += """CITATION INSTRUCTIONS: 1. IMPORTANT: Provide a direct answer first before asking follow-up questions. Even with limited information, give your best assessment. 2. You MUST cite 2-3 different sources in your response. Use no more than 3 sources and no fewer than 2 sources. 3. When citing information from these articles, use the following formats: • For PubMed articles: [PMID:123456] where 123456 is the actual PubMed ID • For Europe PMC articles without PMID: [DOI:10.xxxx/yyyy] where 10.xxxx/yyyy is the DOI Example: "Recent studies have shown improved outcomes with early intervention [PMID:34567890]." Example: "Current guidelines recommend a multidisciplinary approach [DOI:10.1234/abcd]." 4. Focus on specific details from the abstracts - extract actual findings, statistics, or recommendations. 5. When multiple sources support a claim, cite all of them for stronger evidence. Example: "This approach is supported by multiple studies [PMID:12345678][PMID:87654321]." 6. Include full citations in your Sources section with clickable URLs. 7. If the abstracts have conflicting information, acknowledge this and present both perspectives with citations. 8. Use the most recent sources when available, especially for treatment recommendations. 9. If full text is available (marked as "Open Access" or "Full Text Available"), prioritize information from those sources as they contain more complete data. 10. Europe PMC sources often provide more complete full text access, so give them equal consideration to PubMed sources. 11. After your direct answer, include specific follow-up questions in a clearly labeled "Follow-up Questions:" section. """ msgs.append({"role": "system", "content": evidence_text}) else: # If no evidence was found, inform the model no_evidence_msg = ("Note: No specific medical evidence was found for this query in PubMed or Europe PMC. " "Please rely on your general medical knowledge and be sure to recommend " "appropriate diagnostic steps and medical consultation.") msgs.append({"role": "system", "content": no_evidence_msg}) # Add instructions for structured output if use_rag: output_instructions = """ Please structure your response clearly. **Priority 1: Direct Answer First** Begin by providing your best assessment based on the available information without using "Direct Answer:" as a heading. Just start your response directly with the answer. If the query lacks some details, offer your initial thoughts based on what is known, while acknowledging areas of uncertainty. **Priority 2: Follow-up Questions** After your direct answer, include a clearly labeled "Follow-up Questions:" section with specific questions that would help refine your assessment. **Main Response Structure:** 1. A direct answer to the patient's concerns WITHOUT the heading "Direct Answer:". 2. If appropriate, a clear diagnosis or differential diagnosis with likelihood assessments. 3. Recommendations for a treatment plan or next steps. 4. IMPORTANT: You MUST cite between 2-3 different medical evidence sources using either: • [PMID:123456] format for PubMed articles • [DOI:10.xxxx/yyyy] format for Europe PMC articles without PMID Use no more than 3 sources and no fewer than 2 sources. **After your main response, ALWAYS include these sections:** - **Follow-up Questions**: Specific numbered questions starting from 1, not bullets. Do NOT start the first question with asterisks (**). Format each question properly with just a number. - **Reasoning**: Provide a detailed, in-depth explanation of your clinical reasoning. Use bullet points for clarity. Aim for comprehensive insights that would be valuable to a healthcare professional. Do NOT start the first point with asterisks (**). Format each bullet point properly. - **Sources**: A list of all references cited in your main response (2-3 sources), formatted as: - PMID: 12345678 - Author et al. (Year). Title. Journal. URL: https://pubmed.ncbi.nlm.nih.gov/12345678/ - DOI: 10.xxxx/yyyy - Author et al. (Year). Title. Journal. URL: https://doi.org/10.xxxx/yyyy **IMPORTANT FORMATTING NOTES:** 1. Do NOT include technical information like URLs, PMIDs or DOIs in the main answer - these belong in the Sources section only. 2. For follow-up questions, use numbered format (1. 2. 3.) not bullet points. 3. Number the follow-up questions starting from 1, not from any other number. 4. NEVER use markdown formatting like ** (asterisks) at the beginning of any points, questions, or lines. 5. Make sure all bullet points and numbered items are clean, with no markdown formatting. IMPORTANT: Only cite sources that were provided in the evidence. Do not fabricate references, PMIDs, or DOIs. """ else: # Different instructions when RAG is disabled - no mention of sources or citations output_instructions = """ Please structure your response clearly. **Priority 1: Direct Answer First** Begin by providing your best assessment based on the available information without using "Direct Answer:" as a heading. Just start your response directly with the answer. If the query lacks some details, offer your initial thoughts based on what is known, while acknowledging areas of uncertainty. **Priority 2: Follow-up Questions** After your direct answer, include a clearly labeled "Follow-up Questions:" section with specific questions that would help refine your assessment. **Main Response Structure:** 1. A direct answer to the patient's concerns WITHOUT the heading "Direct Answer:". 2. If appropriate, a clear diagnosis or differential diagnosis. 3. Recommendations for a treatment plan or next steps. **After your main response, ALWAYS include these sections:** - **Follow-up Questions**: Specific questions to gather additional information, numbered starting from 1 (not bullet points). Do NOT start the first question with asterisks (**). Format each question properly with just a number. - **Reasoning**: Provide a detailed, in-depth explanation of your clinical reasoning. Use bullet points for clarity. Aim for comprehensive insights that would be valuable to a healthcare professional. Do NOT start the first bullet point with asterisks (**). Format each point properly. **IMPORTANT FORMATTING NOTES:** 1. For follow-up questions, use numbered format (1. 2. 3.) not bullet points. 2. Number the follow-up questions starting from 1, not from any other number. 3. NEVER use markdown formatting like ** (asterisks) at the beginning of any points, questions, or lines. 4. Make sure all bullet points and numbered items are clean, with no markdown formatting. IMPORTANT: Since database search is disabled, do not include citations or sources in your response. """ msgs.append({"role": "system", "content": output_instructions}) msgs.append({"role": "user", "content": query}) # Get response from doctor agent response = doctor_agent(msgs) # Remove "Direct Answer:" prefix if it appears response = re.sub(r'^Direct Answer:\s*', '', response) # Remove any markdown formatting (** for bold) that might appear at the beginning of lines response = re.sub(r'\n\s*\*\*\s*', '\n', response) # Extract and process sources explanation = None evidence = None follow_up_questions = "" if use_rag: # Process the response to replace source placeholders with actual links linked_response, source_map = extract_and_link_sources(response, evidence_snippets) # Parse the response parsed_response = parse_doctor_response(linked_response) # Get the main response main_response = parsed_response["main_response"] # Extract reasoning for display reasoning = parsed_response.get("reasoning", []) if reasoning: if isinstance(reasoning, list): # Check if each reasoning point already starts with a bullet point formatted_reasons = [] for r in reasoning: # If item already starts with bullet, don't add another if r.strip().startswith("-") or r.strip().startswith("•"): formatted_reasons.append(r) else: formatted_reasons.append(f"- {r}") explanation = "\n".join(formatted_reasons) else: explanation = reasoning # Extract follow-up questions questions = parsed_response.get("follow_up_questions", []) if questions: if isinstance(questions, list): # Format as a numbered list but check if already numbered formatted_questions = [] for i, q in enumerate(questions): if q: # Check if question already starts with a number if re.match(r'^\d+\.', q.strip()): formatted_questions.append(q) else: formatted_questions.append(f"{i+1}. {q}") follow_up_questions = "\n".join(formatted_questions) else: follow_up_questions = questions # Debug: Print follow-up questions print(f"Follow-up questions generated: {follow_up_questions}") else: # If RAG is disabled, just parse the response without source processing parsed_response = parse_doctor_response(response) main_response = parsed_response["main_response"] # Extract reasoning reasoning = parsed_response.get("reasoning", []) if reasoning: if isinstance(reasoning, list): # Check if each reasoning point already starts with a bullet point formatted_reasons = [] for r in reasoning: if r: # Ensure 'r' is not None or empty before stripping # If item already starts with bullet, don't add another if r.strip().startswith("-") or r.strip().startswith("•"): formatted_reasons.append(r) else: formatted_reasons.append(f"- {r}") explanation = "\n".join(formatted_reasons) else: explanation = reasoning # Extract follow-up questions questions = parsed_response.get("follow_up_questions", []) if questions: if isinstance(questions, list): # Format as a numbered list starting with 1, but check if already numbered formatted_questions = [] for i, q in enumerate(questions): if q: # Ensure 'q' is not None or empty # Check if question already starts with a number if re.match(r'^\s*\d+\.\s*', q.strip()): formatted_questions.append(q) else: # Remove any leading bullet points before adding numbers q_cleaned = re.sub(r'^\s*[-•*]\s*', '', q.strip()) formatted_questions.append(f"{i+1}. {q_cleaned}") follow_up_questions = "\n".join(formatted_questions) else: follow_up_questions = questions # Debug: Print follow-up questions print(f"Follow-up questions generated: {follow_up_questions}") # Return four values: main response, explanation, follow-up questions, and evidence return main_response, explanation, follow_up_questions, evidence_snippets # Enhanced interactive loop with better handling of consultations def run_consultation(use_rag=True): """Run an interactive medical consultation""" history = [] print("\n===== MEDICAL AI ASSISTANT =====") print("Type 'exit' to end or 'next' for a new case.\n") if use_rag: print("Using medical evidence from: PubMed, Europe PMC, and other medical databases") print("Sources marked with 🔓 provide full text access\n") consultation_id = str(uuid.uuid4())[:8] print(f"Consultation ID: {consultation_id}") query = input("\nYou: ") while query.lower() != "exit": # Track if this is a follow-up question is_follow_up = len(history) > 0 # Inform user that evidence is being fetched if RAG is enabled if use_rag: print("\nSearching medical databases...") # Process query reply, explanation, follow_up_questions, evidence = orchestrator_chat(history, query, use_rag, is_follow_up) # Display the AI response print("\n" + "=" * 30) print("AI RESPONSE") print("=" * 30) print(reply) # Always show explanation/reasoning print("\n" + "=" * 30) print("DETAILED EXPLANATION") print("=" * 30) # Ensure explanation is not empty before printing, or print a default message if explanation and explanation.strip() and explanation.strip() != "="*50: print(explanation) else: print("No detailed explanation or sources were generated for this response.") # Display follow-up questions if available if follow_up_questions and follow_up_questions.strip(): print("\n" + "=" * 30) print("FOLLOW-UP QUESTIONS") print("=" * 30) print(follow_up_questions) # Add Open Access Legend if evidence sources were found if evidence: print("\nLEGEND: 🔓 = Open Access (full text available)") # Check if we need to continue with follow-up or start a new case next_action = input("\nFollow-up? (or 'next' for new case, 'exit' to end): ") if next_action.lower() == "exit": break elif next_action.lower() == "next": # Start a new consultation history = [] consultation_id = str(uuid.uuid4())[:8] print(f"\nNew Consultation ID: {consultation_id}") query = input("\nYou: ") else: # Continue with follow-up query = next_action print("\nConsultation ended.") # Save consultation to file def save_consultation(history, consultation_id): """Save the consultation history to a file""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"consultation_{consultation_id}_{timestamp}.json" with open(filename, 'w') as f: json.dump(history, f, indent=2) print(f"Consultation saved to {filename}") # Main entry point if __name__ == "__main__": print("\nInitializing Medical AI Assistant...") run_consultation(use_rag=True) # Extract medical terms from user query for better search def extract_medical_terms(query, max_terms=5): """ Extract key medical terms from a user query to improve search relevance. Uses pattern matching and medical term extraction to identify important medical concepts. Args: query (str): The user's original query text max_terms (int): Maximum number of terms to extract Returns: list: List of extracted medical terms """ # Clean up query first cleaned_query = re.sub(r'^(hi|hello|hey|greetings|good morning|good afternoon|good evening)[,\.]?\s+', '', query.lower()) cleaned_query = re.sub(r"(i'?m|i am)\s+a\s+\d+[-\s]year[-\s]old", '', cleaned_query) cleaned_query = re.sub(r'(my name is|i am|i have been|i\'ve been|i was|i have|i\'ve had|i feel|i\'m feeling|i experienced)', '', cleaned_query) # Common medical symptoms and conditions to look for medical_patterns = [ # Symptoms r'(muscle weakness)', r'(fatigue)', r'(rash)', r'(pain)', r'(swelling)', r'(difficulty breathing|shortness of breath)', r'(fever)', r'(headache)', r'(nausea|vomiting)', r'(dizziness)', r'(numbness)', r'(tingling)', r'(cough)', r'(sore throat)', r'(runny nose)', r'(congestion)', r'(chest pain)', r'(back pain)', r'(joint pain)', r'(abdominal pain)', # Conditions r'(diabetes)', r'(hypertension|high blood pressure)', r'(asthma)', r'(cancer)', r'(arthritis)', r'(depression)', r'(anxiety)', r'(heart disease|cardiovascular disease)', r'(stroke)', r'(alzheimer)', # Body parts/systems r'(heart)', r'(lung)', r'(kidney)', r'(liver)', r'(brain)', r'(skin)', r'(stomach)', r'(intestine)', r'(bone)', r'(muscle)', r'(nerve)', # Other medical terms r'(chronic)', r'(acute)', r'(infection)', r'(inflammation)', r'(syndrome)', r'(disorder)', r'(disease)', r'(condition)', r'(symptom)', r'(diagnosis)', r'(treatment)', r'(medication)', r'(therapy)', r'(surgery)' ] # Extract terms medical_terms = set() for pattern in medical_patterns: matches = re.findall(pattern, query.lower()) if matches: for match in matches: if isinstance(match, tuple): # Some regex groups return tuples for term in match: if term and term.strip(): medical_terms.add(term.strip()) else: if match and match.strip(): medical_terms.add(match.strip()) # If we didn't find specific medical terms, extract general noun phrases if len(medical_terms) == 0: # Look for phrases that might be medical conditions word_pattern = r'\b([a-zA-Z]+(?:\s+[a-zA-Z]+){0,2})\b' words = re.findall(word_pattern, cleaned_query) medical_terms = set(words[:max_terms]) # Convert set to list and limit number of terms result = list(medical_terms)[:max_terms] return result # JSON schema for the search_pubmed function for API documentation SEARCH_PUBMED_SCHEMA = { "name": "search_pubmed", "description": "Search PubMed for medical articles related to a given query, with proper citation formatting.", "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "The medical query to search for in PubMed" }, "retmax": { "type": "integer", "description": "Maximum number of results to return (default: 3)", "default": 3 }, "api_key": { "type": "string", "description": "Optional NCBI API key to increase rate limits (3 req/sec without key, 10 req/sec with key)", "default": None } }, "required": ["query"] } } # Example messages array showing usage EXAMPLE_MESSAGES = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": "I've been experiencing persistent headaches, fatigue, and dizziness for the past two weeks. What could be causing this?"} ] # Example function call that would be made by the model EXAMPLE_FUNCTION_CALL = { "name": "search_pubmed", "arguments": { "query": "headaches, fatigue, dizziness", "retmax": 3 } } # Function to enhance medical queries using LLM def enhance_medical_query(original_query): """ Uses LLM to enhance a medical query for better search results. This function is prepared for future use but is not currently enabled. Args: original_query (str): The original user query Returns: str: An enhanced query optimized for medical search """ try: # System prompt for query enhancement system_prompt = """You are a medical search query optimizer. Your job is to take a user's medical question and rewrite it to be more effective for searching medical databases like PubMed and Europe PMC. Guidelines: 1. Extract key medical terms, conditions, symptoms, and treatments 2. Use proper medical terminology where possible 3. Structure the query for optimal search performance 4. Return ONLY the enhanced query without explanation 5. Keep the query concise but comprehensive """ # Call OpenAI to enhance the query enhanced_response = openai.ChatCompletion.create( model="gpt-3.5-turbo", # Using a smaller model for speed and cost efficiency messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"Optimize this medical query for database search: {original_query}"} ], temperature=0.3, max_tokens=100 ) enhanced_query = enhanced_response.choices[0].message['content'].strip() print(f"Enhanced query: {enhanced_query}") return enhanced_query except Exception as e: print(f"Error enhancing query: {str(e)}") # Fall back to original query if there's an error return original_query