# Core imports import os import logging import torch from typing import Dict, List, Any import gradio as gr from huggingface_hub import login from transformers import AutoTokenizer, AutoModelForCausalLM from sentence_transformers import SentenceTransformer import faiss import numpy as np from tqdm import tqdm from datetime import datetime from dataclasses import dataclass, field from dotenv import load_dotenv import wandb from peft import get_peft_model, LoraConfig, TaskType, prepare_model_for_kbit_training import bitsandbytes as bnb from accelerate import infer_auto_device_map, init_empty_weights from transformers import BitsAndBytesConfig # Load environment variables load_dotenv() # Set up logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Retrieve secrets securely from environment variables kaggle_username = os.getenv("KAGGLE_USERNAME") kaggle_key = os.getenv("KAGGLE_KEY") hf_token = os.getenv("HF_TOKEN") wandb_key = os.getenv("WANDB_API_KEY") # Log in to Hugging Face if hf_token: login(token=hf_token) else: logger.warning("Hugging Face token not found in environment variables.") @dataclass class MedicalConfig: """Enhanced configuration for medical chatbot""" # LoRA parameters LORA_WEIGHTS_PATH: str = "medical_lora_weights" LORA_R: int = 16 LORA_ALPHA: int = 32 LORA_DROPOUT: float = 0.1 LORA_TARGET_MODULES: List[str] = field(default_factory=lambda: ["q_proj", "v_proj", "k_proj", "o_proj"]) # Training parameters TRAINING_BATCH_SIZE: int = 4 LEARNING_RATE: float = 2e-5 NUM_EPOCHS: int = 3 MAX_LENGTH: int = 2048 INDEX_BATCH_SIZE: int = 32 # Medical specific parameters EMERGENCY_KEYWORDS: List[str] = field(default_factory=lambda: [ 'chest pain', 'breathing difficulty', 'stroke', 'heart attack', 'unconscious', 'severe bleeding', 'seizure', 'anaphylaxis', 'severe burn', 'choking', 'severe head injury', 'spinal injury', 'drowning', 'electric shock', 'severe allergic reaction', 'poisoning', 'overdose', 'self-harm', 'suicidal thoughts', 'severe trauma' ]) URGENT_KEYWORDS: List[str] = field(default_factory=lambda: [ 'infection', 'high fever', 'severe pain', 'vomiting', 'dehydration', 'anxiety attack', 'panic attack', 'mental health crisis', 'broken bone', 'deep cut', 'asthma attack', 'migraine', 'severe rash', 'eye injury', 'dental emergency', 'pregnancy complications', 'severe back pain', 'severe abdominal pain', 'concussion', 'severe allergies' ]) # UK Healthcare specific EMERGENCY_NUMBERS: List[str] = field(default_factory=lambda: ["999", "112", "111"]) GP_SERVICES: Dict[str, Dict[str, str]] = field(default_factory=lambda: { "EMERGENCY": { "name": "A&E", "wait_time": "4 hours target", "when_to_use": "Life-threatening emergencies" }, "URGENT": { "name": "Urgent Care Center", "wait_time": "2-4 hours typically", "when_to_use": "Urgent but not life-threatening conditions" }, "NON_URGENT": { "name": "GP Practice", "wait_time": "Same day to 2 weeks", "when_to_use": "Routine medical care" } }) # Cultural considerations CULTURAL_CONTEXTS: List[Dict[str, str]] = field(default_factory=lambda: [ { "group": "South Asian", "considerations": [ "Different presentation of skin conditions", "Higher diabetes risk", "Cultural dietary practices", "Language preferences" ] }, { "group": "African/Caribbean", "considerations": [ "Different presentation of skin conditions", "Higher hypertension risk", "Specific hair/scalp conditions", "Cultural health beliefs" ] }, { "group": "Middle Eastern", "considerations": [ "Cultural modesty requirements", "Ramadan considerations", "Gender preferences for healthcare providers", "Traditional medicine practices" ] } ]) class GPUOptimizedRAG: def __init__( self, model_path: str = "google/gemma-7b", embedding_model: str = "sentence-transformers/all-MiniLM-L6-v2", config: MedicalConfig = MedicalConfig(), use_cpu_fallback: bool = False ): """Initialize RAG with T4 optimization""" try: # Initialize conversation memory self.conversation_memory = { 'name': 'Pearly', 'role': 'GP Medical Assistant', 'style': 'professional, empathetic, and clear', 'system_prompt': None, 'past_interactions': [] } # Log GPU info if torch.cuda.is_available(): logger.info(f"GPU: {torch.cuda.get_device_name(0)}") logger.info(f"GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.2f}GB") # Initialize tokenizer self.tokenizer = AutoTokenizer.from_pretrained( model_path, trust_remote_code=True ) logger.info("Tokenizer loaded successfully") # Initialize model with T4 optimizations self.model = AutoModelForCausalLM.from_pretrained( model_path, torch_dtype=torch.float16, # Use fp16 for memory efficiency device_map="auto", # Let accelerate handle memory mapping trust_remote_code=True, max_memory={0: "14GB"}, # Reserve 14GB for model, leaving 2GB for other operations load_in_8bit=True, # Use 8-bit quantization for additional memory savings ) logger.info(f"Model loaded successfully on {self.model.device}") # Set up device self.device = torch.device("cuda") self.use_cpu_fallback = use_cpu_fallback # Initialize embedding model self.embedding_model = SentenceTransformer(embedding_model) self.embedding_model.to(self.device) logger.info("Embedding model loaded successfully") # Add clinical quality metrics self.clinical_metrics = { 'terminology_accuracy': 0.0, 'assessment_accuracy': 0.0, 'guideline_adherence': 0.0, 'symptom_recognition': 0.0 } # Initialize other components self.config = config # Setup FAISS index with GPU support self.embedding_dim = self.embedding_model.get_sentence_embedding_dimension() if torch.cuda.is_available() and not use_cpu_fallback: try: self.index = faiss.IndexFlatIP(self.embedding_dim) res = faiss.StandardGpuResources() self.index = faiss.index_cpu_to_gpu(res, 0, self.index) logger.info("FAISS GPU index initialized successfully") except Exception as e: logger.warning(f"GPU FAISS initialization failed: {e}, using CPU index") self.index = faiss.IndexFlatIP(self.embedding_dim) else: self.index = faiss.IndexFlatIP(self.embedding_dim) # Setup LoRA after model initialization self.setup_lora() except Exception as e: logger.error(f"Error in initialization: {e}") raise def setup_lora(self): """Configure and apply LoRA with T4 optimization""" try: # Prepare model for k-bit training model = prepare_model_for_kbit_training(self.model) lora_config = LoraConfig( r=self.config.LORA_R, lora_alpha=self.config.LORA_ALPHA, target_modules=self.config.LORA_TARGET_MODULES, lora_dropout=self.config.LORA_DROPOUT, bias="none", task_type=TaskType.CAUSAL_LM, inference_mode=False, ) self.model = get_peft_model(model, lora_config) logger.info("LoRA configuration applied successfully") # Monitor memory after LoRA setup if torch.cuda.is_available(): monitor_gpu_memory() except Exception as e: logger.error(f"Error setting up LoRA: {e}") if torch.cuda.is_available(): torch.cuda.empty_cache() raise def evaluate_clinical_quality(self, response: str, expected_elements: List[str]) -> Dict[str, float]: """Add clinical quality evaluation matching test requirements""" quality_metrics = { 'terminology_accuracy': self._evaluate_terminology(response, expected_elements), 'assessment_accuracy': self._evaluate_assessment(response), 'guideline_adherence': self._evaluate_guidelines(response), 'symptom_recognition': self._evaluate_symptoms(response, expected_elements) } return quality_metrics def prepare_documents(self, documents: List[Dict]): """Enhanced document preparation with improved batching and memory management""" self.documents = documents embeddings = [] try: for i in tqdm(range(0, len(documents), self.config.INDEX_BATCH_SIZE), desc="Processing documents"): batch = documents[i:i + self.config.INDEX_BATCH_SIZE] texts = [doc['text'] for doc in batch] with torch.amp.autocast(device_type='cuda'): batch_embeddings = self.embedding_model.encode( texts, convert_to_tensor=True, show_progress_bar=False, batch_size=8 ) embeddings.append(batch_embeddings.cpu().numpy()) all_embeddings = np.vstack(embeddings) self.index.add(all_embeddings) logger.info(f"Indexed {len(documents)} documents successfully") except Exception as e: logger.error(f"Error preparing documents: {e}") raise def assess_urgency(self, symptoms: str) -> Dict[str, Any]: """Enhanced symptom assessment with detailed analysis""" symptoms_lower = symptoms.lower() # Initialize response assessment = { 'level': 'NON-URGENT', 'reasons': [], 'recommendations': [], 'follow_up_needed': False } # Check emergency keywords emergency_matches = [kw for kw in self.config.EMERGENCY_KEYWORDS if kw in symptoms_lower] if emergency_matches: assessment.update({ 'level': 'EMERGENCY', 'reasons': emergency_matches, 'recommendations': [ 'Call 999 immediately', 'Do not move if spinal injury suspected', 'Stay on the line for guidance' ], 'follow_up_needed': True }) return assessment # Check urgent keywords urgent_matches = [kw for kw in self.config.URGENT_KEYWORDS if kw in symptoms_lower] if urgent_matches: assessment.update({ 'level': 'URGENT', 'reasons': urgent_matches, 'recommendations': [ 'Visit urgent care center', 'Book emergency GP appointment', 'Monitor symptoms closely' ], 'follow_up_needed': True }) return assessment # Non-urgent default assessment.update({ 'recommendations': [ 'Book routine GP appointment', 'Monitor symptoms', 'Try self-care measures' ], 'follow_up_needed': False }) return assessment def generate_cultural_considerations(self, symptoms: str) -> List[str]: """Generate culturally-aware medical considerations""" considerations = [] symptoms_lower = symptoms.lower() for context in self.config.CULTURAL_CONTEXTS: relevant_considerations = [ cons for cons in context['considerations'] if any(keyword in symptoms_lower for keyword in cons.lower().split()) ] if relevant_considerations: considerations.extend([ f"{context['group']}: {consideration}" for consideration in relevant_considerations ]) return considerations if considerations else ["No specific cultural considerations identified"] def get_booking_template(self, urgency_level: str) -> str: """Get appropriate booking template based on urgency level""" service_info = self.config.GP_SERVICES[urgency_level] templates = { "EMERGENCY": f""" 🚨 EMERGENCY SERVICES REQUIRED 🚨 Service: {service_info['name']} Target Wait Time: {service_info['wait_time']} When to Use: {service_info['when_to_use']} IMMEDIATE ACTIONS: 1. 🚑 Call 999 (or 112) 2. đŸĨ Nearest A&E: [Location Placeholder] 3. 🚨 Stay on line for guidance Type '999' to initiate emergency call """, "URGENT": f""" ⚡ URGENT CARE NEEDED ⚡ Service: {service_info['name']} Expected Wait: {service_info['wait_time']} When to Use: {service_info['when_to_use']} OPTIONS: 1. đŸĨ Find nearest urgent care 2. 📅 Book urgent GP slot 3. 🔍 Locate walk-in clinic Reply with option number (1-3) """, "NON_URGENT": f""" 📋 ROUTINE CARE BOOKING 📋 Service: {service_info['name']} Typical Wait: {service_info['wait_time']} When to Use: {service_info['when_to_use']} OPTIONS: 1. 📅 Schedule GP visit 2. 👨‍⚕ī¸ Find local GP 3. ℹī¸ Self-care advice Reply with option number (1-3) """ } return templates.get(urgency_level, templates["NON_URGENT"]) def generate_response(self, query: str, chat_history: List[tuple] = None) -> Dict[str, Any]: """Generate response with enhanced conversational context and T4 optimization""" try: # Update conversation memory if chat_history: self.conversation_memory['past_interactions'] = chat_history[-3:] # Use mixed precision for T4 with torch.cuda.amp.autocast(): # Retrieve relevant documents with boosted weights for persona matches retrieved_docs = self.retrieve(query, k=7) # Separate documents by type medical_docs = [doc for doc in retrieved_docs if doc['document']['type'] in ['medical_qa', 'diagnosis']] persona_docs = [doc for doc in retrieved_docs if doc['document']['type'] in ['persona', 'conversation', 'GP_template']] # Build context with weighted emphasis on different document types medical_context = " ".join([doc['document']['text'] for doc in medical_docs]) persona_context = " ".join([doc['document']['text'] for doc in persona_docs]) # Assess urgency and get considerations urgency_assessment = self.assess_urgency(query) cultural_considerations = self.generate_cultural_considerations(query) # Build conversation history context history_context = "" if chat_history: history_context = "\n".join([f"Human: {h}\nPearly: {a}" for h, a in chat_history[-3:]]) # Add persona reminder persona_reminder = f""" I am {self.conversation_memory['name']}, a {self.conversation_memory['role']}. My communication style is {self.conversation_memory['style']}. """ # Create enhanced prompt with persona integration prompt = f"""Context: Medical Information: {medical_context} {persona_reminder} Previous Interactions: {history_context} Current Query: {query} Maintain my identity as {self.conversation_memory['name']}, the {self.conversation_memory['role']}, providing clear, professional guidance following NHS protocols. Urgency Level: {urgency_assessment['level']} Cultural Considerations: {', '.join(cultural_considerations)} Respond in a clear, caring manner, always referring to myself as {self.conversation_memory['name']}. Response:""" # Generate response with T4 optimizations inputs = self.tokenizer( prompt, return_tensors="pt", max_length=self.config.MAX_LENGTH, truncation=True ).to(self.device) outputs = self.model.generate( **inputs, max_new_tokens=512, do_sample=True, top_p=0.9, temperature=0.7, num_return_sequences=1, pad_token_id=self.tokenizer.eos_token_id, use_cache=True, # Enable KV cache low_cpu_mem_usage=True ) # Clean up CUDA cache after generation if torch.cuda.is_available(): torch.cuda.empty_cache() response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) response = response.split("Response:")[-1].strip() # Add booking template for emergency/urgent cases if urgency_assessment['level'] in ["EMERGENCY", "URGENT"]: booking_template = self.get_booking_template(urgency_assessment['level']) response = f"{response}\n\n{booking_template}" return { 'response': response, 'urgency_assessment': urgency_assessment, 'cultural_considerations': cultural_considerations } except Exception as e: logger.error(f"Error generating response: {e}") if torch.cuda.is_available(): torch.cuda.empty_cache() # Clean up on error return { 'response': "I apologize, but I encountered an error. If this is an emergency, please call 999 immediately.", 'urgency_assessment': {'level': 'UNKNOWN'}, 'cultural_considerations': [] } def prepare_documents(self, documents: List[Dict]): """Enhanced document preparation with T4 optimization and test metrics""" try: self.documents = documents embeddings = [] batch_size = min(8, self.config.INDEX_BATCH_SIZE) # Smaller batch size for T4 # Initialize metrics for document processing total_processed = 0 successful_embeddings = 0 processing_times = [] for i in tqdm(range(0, len(documents), batch_size), desc="Processing documents"): batch = documents[i:i + batch_size] texts = [doc['text'] for doc in batch] start_time = time.time() # Use mixed precision for encoding with torch.cuda.amp.autocast(): batch_embeddings = self.embedding_model.encode( texts, convert_to_tensor=True, show_progress_bar=False, batch_size=batch_size ) # Move to CPU immediately to free GPU memory embeddings.append(batch_embeddings.cpu().numpy()) # Update metrics total_processed += len(batch) successful_embeddings += len(batch) processing_times.append(time.time() - start_time) # Clear CUDA cache periodically if i % 100 == 0 and torch.cuda.is_available(): torch.cuda.empty_cache() all_embeddings = np.vstack(embeddings) # Use GPU FAISS if available if torch.cuda.is_available(): res = faiss.StandardGpuResources() res.setTempMemory(64 * 1024 * 1024) # 64MB temp memory self.index = faiss.GpuIndexFlatIP(res, self.embedding_dim) self.index.add(all_embeddings) else: self.index = faiss.IndexFlatIP(self.embedding_dim) self.index.add(all_embeddings) # Log processing metrics avg_processing_time = np.mean(processing_times) success_rate = successful_embeddings / total_processed if total_processed > 0 else 0 logger.info(f"Indexed {len(documents)} documents successfully") logger.info(f"Average processing time per batch: {avg_processing_time:.2f}s") logger.info(f"Document processing success rate: {success_rate:.2%}") # Store metrics for reporting self.document_metrics = { 'total_documents': len(documents), 'successful_embeddings': successful_embeddings, 'average_processing_time': avg_processing_time, 'success_rate': success_rate } return self.document_metrics except Exception as e: logger.error(f"Error preparing documents: {e}") if torch.cuda.is_available(): torch.cuda.empty_cache() raise def retrieve(self, query: str, k: int = 5) -> List[Dict]: """Enhanced document retrieval with T4 optimization""" try: # Use mixed precision for query encoding with torch.cuda.amp.autocast(): query_embedding = self.embedding_model.encode( query, convert_to_tensor=True, show_progress_bar=False ) # Move to CPU for FAISS search query_embedding = query_embedding.cpu().numpy().reshape(1, -1) # Perform search scores, indices = self.index.search(query_embedding, k) # Filter and format results results = [ { 'document': self.documents[idx], 'score': float(score), 'relevance_metrics': { 'semantic_similarity': float(score), 'keyword_match': self._calculate_keyword_match(query, self.documents[idx]['text']) } } for score, idx in zip(scores[0], indices[0]) if score > 0.5 # Relevance threshold ] # Log retrieval metrics if results: avg_score = np.mean([r['score'] for r in results]) logger.info(f"Retrieved {len(results)} documents with average score: {avg_score:.3f}") return results except Exception as e: logger.error(f"Error in retrieval: {e}") if torch.cuda.is_available(): torch.cuda.empty_cache() return [] def _calculate_keyword_match(self, query: str, doc_text: str) -> float: """Calculate keyword match score between query and document""" query_words = set(query.lower().split()) doc_words = set(doc_text.lower().split()) matches = query_words.intersection(doc_words) return len(matches) / len(query_words) if query_words else 0.0 def generate_report(self, results: Dict) -> Dict: """Generate enhanced summary report with T4 metrics""" try: total_cases = sum(cat['total'] for cat in results.values()) total_correct = sum(cat['correct'] for cat in results.values()) # Basic performance metrics performance_metrics = { 'timestamp': datetime.now().isoformat(), 'triage_performance': { 'emergency_accuracy': results['emergency']['correct'] / results['emergency']['total'], 'urgent_accuracy': results['urgent']['correct'] / results['urgent']['total'], 'non_urgent_accuracy': results['non_urgent']['correct'] / results['non_urgent']['total'], 'overall_accuracy': total_correct / total_cases } } # Add document processing metrics if available if hasattr(self, 'document_metrics'): performance_metrics['document_processing'] = self.document_metrics # Add GPU metrics if available if torch.cuda.is_available(): gpu_metrics = { 'gpu_name': torch.cuda.get_device_name(0), 'gpu_memory_allocated': torch.cuda.memory_allocated() / 1024**2, # MB 'gpu_memory_cached': torch.cuda.memory_reserved() / 1024**2, # MB } performance_metrics['gpu_metrics'] = gpu_metrics return performance_metrics except Exception as e: logger.error(f"Error generating report: {e}") if torch.cuda.is_available(): torch.cuda.empty_cache() return { 'timestamp': datetime.now().isoformat(), 'error': str(e) } def enhance_response_generation(self): """Add test-aligned response enhancement""" self.response_enhancers = { 'demographic_sensitivity': self._enhance_demographic_sensitivity, 'cultural_competency': self._enhance_cultural_competency, 'clinical_quality': self._enhance_clinical_quality, 'follow_up_generation': self._enhance_follow_up } def _enhance_demographic_sensitivity(self, response: str, demographic: str) -> str: """Add demographic-specific enhancements matching test requirements""" demographic_patterns = { 'pediatric': ['age-appropriate', 'child-friendly', 'developmental'], 'elderly': ['mobility', 'cognitive', 'fall risk'], 'pregnant': ['trimester', 'fetal', 'pregnancy-safe'], 'chronic_condition': ['management', 'monitoring', 'ongoing care'] } return response # Placeholder implementation def process_appointment_booking(message, patient_info): """Process appointment booking queries""" return "I can help you book an appointment. Please provide further details." def create_gradio_interface(rag_system: GPUOptimizedRAG): """Create enhanced Gradio interface with appointment booking capabilities""" SYSTEM_MESSAGE = "You are Pearly, a friendly medical triaging Chatbot." def process_chat_response(response_data: Dict[str, Any], message: str, history: List[tuple]) -> str: """Format chat response based on context, handle appointments, and maintain persona""" try: if not history or message.lower().startswith(("hi", "hello", "hey", "good")): return "Hi! I'm Pearly, your medical triaging assistant. I'm here to help assess your symptoms and provide guidance. How may I assist you today?" urgency_level = response_data['urgency_assessment']['level'] response_text = response_data['response'] if urgency_level == "EMERGENCY": return f"🚨 EMERGENCY ALERT 🚨\n\n{response_text}\n\nWould you like me to help connect you to emergency services?" elif urgency_level == "URGENT": return f"⚠ī¸ URGENT CARE NEEDED ⚠ī¸\n\n{response_text}\n\nWould you like help finding your nearest urgent care center?" else: return f"{response_text}\n\nWould you like help booking a GP appointment or finding more NHS resources?" except Exception as e: logger.error(f"Error processing chat response: {e}") return ( "I'm Pearly, and I apologize for the technical difficulty. For your safety:\n\n" "- Call 999 for emergencies\n" "- Call 111 for urgent medical advice\n" "- Visit NHS 111 online for non-urgent concerns\n\n" "Would you like to try asking your question again?" ) def chat(message: str, history: List[tuple]) -> tuple[str, List[tuple]]: """Enhanced chat function with better error handling and context awareness""" try: response_data = rag_system.generate_response(message, history) response = process_chat_response(response_data, message, history) history.append((message, response)) return history except Exception as e: logger.error(f"Error in chat: {e}") emergency_response = ( "I apologize for the technical difficulty. For your safety:\n\n" "- Call 999 for emergencies\n" "- Call 111 for urgent medical advice\n" "- Visit NHS 111 online for non-urgent concerns\n\n" "Would you like to try asking your question again?" ) history.append((message, emergency_response)) return history # Define interface for the chatbot with gr.Blocks() as interface: gr.HTML("

Pearly Medical Assistant

Hi! I'm Pearly, your GP medical assistant.

") chatbot = gr.Chatbot(value=[(None, "Hi! I'm Pearly, your GP medical assistant. How can I help you today?")]) msg = gr.Textbox(label="Your Message") submit = gr.Button("Send") submit.click(chat, inputs=[msg, chatbot], outputs=chatbot) return interface def monitor_gpu_memory(): """Monitor GPU memory usage""" if torch.cuda.is_available(): device = torch.cuda.current_device() allocated = torch.cuda.memory_allocated(device) / 1024**2 reserved = torch.cuda.memory_reserved(device) / 1024**2 logger.info(f"GPU Memory: Allocated: {allocated:.2f}MB, Reserved: {reserved:.2f}MB") def prepare_medical_documents(): """Prepare medical knowledge base documents with enhanced conversation flow""" try: logger.info("Loading medical and persona datasets...") datasets = { "persona": load_dataset("AlekseyKorshuk/persona-chat", split="train[:500]"), "medqa": load_dataset("medalpaca/medical_meadow_medqa", split="train[:500]"), "meddia": load_dataset("wasiqnauman/medical-diagnosis-synthetic", split="train[:500]") } documents = [] # Process Persona dataset for enhanced conversational style logger.info("Processing persona dataset...") for item in datasets["persona"]: if isinstance(item.get('personality'), list): personality = " ".join(item['personality']) documents.append({ 'text': f""" Conversation Style Guide: Personality: {personality} Role: Pearly - Medical Assistant Core Traits: Professional, empathetic, clear Key Behaviors: - Always introduce as Pearly - Show empathy for symptoms - Ask relevant follow-up questions - Offer practical assistance - Maintain professional tone while being approachable """, 'type': 'persona' }) # Process conversation examples with enhanced structure if isinstance(item.get('utterances'), list): for utterance in item['utterances']: if isinstance(utterance, dict) and 'history' in utterance: conversation = ' '.join(utterance['history']) documents.append({ 'text': f""" Medical Consultation Pattern: Conversation: {conversation} Key Elements: - Show understanding of symptoms - Ask clarifying questions - Provide clear guidance - Offer next steps - Check if assistance needed """, 'type': 'conversation_pattern' }) # Process MedQA dataset with enhanced medical context logger.info("Processing medical QA dataset...") for item in datasets["medqa"]: if 'input' in item and 'output' in item: input_text = item['input'] if input_text.startswith('Q:'): input_text = input_text[2:] documents.append({ 'text': f""" Medical Knowledge Base: Question: {input_text} Answer: {item['output']} Application: - Use information to inform recommendations - Adapt to patient's situation - Maintain clinical accuracy - Explain in clear terms """, 'type': 'medical_qa' }) # Process diagnosis dataset with structured guidance logger.info("Processing diagnosis dataset...") for item in datasets["meddia"]: if 'input' in item and 'output' in item: documents.append({ 'text': f""" Clinical Assessment Framework: Symptoms: {item['input']} Assessment and Plan: {item['output']} Response Structure: 1. Acknowledge symptoms 2. Ask about severity and duration 3. Inquire about related symptoms 4. Provide clear recommendations 5. Offer assistance with next steps """, 'type': 'diagnosis_guidance' }) # Add enhanced conversation templates conversation_templates = [ { 'text': """ Consultation Framework: 1. Initial Response: - Acknowledge the concern - Show empathy - Ask about duration/severity 2. Follow-up Questions: - Ask specific, relevant questions - Clarify symptoms - Check for related issues 3. Assessment: - Summarize findings - Explain reasoning - State level of concern 4. Recommendations: - Provide clear guidance - List specific actions - Offer assistance 5. Next Steps: - Suggest appropriate care level - Offer to help with appointments - Provide relevant resources 6. Safety Checks: - Verify understanding - Confirm action plan - Ensure patient comfort Response Patterns: Emergency: "I understand you're experiencing [symptom]. This requires immediate attention..." Urgent: "I'm concerned about [symptom]. To better assess the urgency..." Non-urgent: "I hear you about [symptom]. Let's understand this better..." Follow-ups: "Would you like help with [specific assistance]?" "Shall I help you [book/find/connect]?" Guidance Structure: 1. Acknowledge 2. Assess 3. Advise 4. Assist 5. Arrange follow-up """, 'type': 'conversation_framework' } ] documents.extend(conversation_templates) # Add your existing GP templates # Define GP templates inside the function GP_templates = [ { 'text': """ Role: GP Medical Assistant Context: Emergency situation - Chest Pain Style: Calm, clear, and authoritative Conversation Pattern: Patient Input: Chest pain symptoms Response Framework: 1. Initial acknowledgment 2. Immediate triage questions: - Duration of pain - Pain characteristics - Associated symptoms 3. Emergency guidance 4. Clear action steps Example Flow: Patient: I'm having chest pain Pearly: I understand you're experiencing chest pain. This requires immediate attention. How long have you been experiencing this pain? Patient: About 30 minutes now Pearly: Thank you for letting me know. Is the pain sharp or dull, and where exactly is it located? Also, are you experiencing any other symptoms like shortness of breath or sweating? Patient: It's sharp and in the center of my chest. Yes, I'm sweating a lot Pearly: Given your symptoms, this could be a serious cardiac event requiring immediate emergency care. IMMEDIATE ACTIONS: 1. Call 999 immediately - I can help you do this 2. Sit down and try to stay calm 3. If available and you're not allergic, take one adult aspirin (300mg) 4. Loosen any tight clothing 5. Keep the line open - I'll stay with you until help arrives Would you like me to connect you to emergency services now? """, 'type': 'GP_template' }, { 'text': """ Role: GP Medical Assistant Context: Non-emergency consultation - Persistent Cough Style: Empathetic and thorough Conversation Pattern: Patient Input: Cough symptoms Response Framework: 1. Show empathy 2. Gather specific details: - Type of cough - Duration - Associated symptoms 3. Risk assessment 4. Clear recommendations Example Flow: Patient: I've had a cough for 5 days now Pearly: I'm sorry to hear you've been unwell. To better understand your condition, is it a dry cough or are you producing any phlegm? Patient: It's mostly dry but sometimes I cough up clear phlegm Pearly: Thank you for that detail. Have you experienced any fever, and are you having any difficulty breathing? Patient: No fever but I feel a bit short of breath sometimes Pearly: I understand. A few more important questions: 1. Have you been exposed to anyone with COVID-19 or other respiratory infections? 2. Do you have any underlying health conditions? 3. Are your symptoms worse at any particular time of day? Would you like help booking an appointment with your GP? """, 'type': 'GP_template' }, { 'text': """ Role: GP Medical Assistant Context: Mental Health Support Style: Empathetic, patient, and supportive Conversation Pattern: Patient Input: Mental health concerns Response Framework: 1. Validate feelings 2. Gentle exploration: - Duration - Impact on daily life - Support systems 3. Clear support options 4. Immediate resources Example Flow: Patient: I've been feeling really anxious and can't sleep Pearly: I hear you, and I want you to know it's brave to speak up about this. How long have you been experiencing these feelings? Patient: About a month now, getting worse Pearly: Thank you for sharing that. To better understand your situation: 1. Are there any specific triggers you've noticed? 2. How is this affecting your daily activities? 3. Have you had any previous mental health support? Would you like information about immediate support services or help booking a GP appointment? """, 'type': 'GP_template' } ] # Add all templates to documents documents.extend(GP_templates) logger.info(f"Prepared {len(documents)} documents including:") logger.info(f"- {len([d for d in documents if d['type'] == 'persona'])} persona guides") logger.info(f"- {len([d for d in documents if d['type'] == 'conversation_pattern'])} conversation patterns") logger.info(f"- {len([d for d in documents if d['type'] == 'medical_qa'])} medical QA pairs") logger.info(f"- {len([d for d in documents if d['type'] == 'diagnosis_guidance'])} diagnosis guidelines") logger.info(f"- {len([d for d in documents if d['type'] == 'conversation_framework'])} conversation frameworks") logger.info(f"- {len([d for d in documents if d['type'] == 'GP_template'])} GP templates") return documents except Exception as e: logger.error(f"Error preparing medical documents: {e}") # Print sample data for debugging for dataset_name, dataset in datasets.items(): try: sample = dataset[0] logger.error(f"\nSample from {dataset_name}:") logger.error(f"Keys: {list(sample.keys())}") logger.error(f"Sample content: {str(sample)[:500]}") except Exception as debug_e: logger.error(f"Error inspecting {dataset_name}: {debug_e}") raise def generate_response(self, query: str, chat_history: List[tuple] = None) -> Dict[str, Any]: """Generate response with enhanced conversational context and persona""" try: # Update conversation memory if chat_history: self.conversation_memory['past_interactions'] = chat_history[-3:] # Retrieve relevant documents with boosted weights for persona matches retrieved_docs = self.retrieve(query, k=7) # Separate documents by type medical_docs = [doc for doc in retrieved_docs if doc['document']['type'] in ['medical_qa', 'diagnosis']] persona_docs = [doc for doc in retrieved_docs if doc['document']['type'] in ['persona', 'conversation', 'GP_template']] # Build context with weighted emphasis on different document types medical_context = " ".join([doc['document']['text'] for doc in medical_docs]) persona_context = " ".join([doc['document']['text'] for doc in persona_docs]) # Assess urgency and get considerations urgency_assessment = self.assess_urgency(query) cultural_considerations = self.generate_cultural_considerations(query) # Build conversation history context history_context = "" if chat_history: history_context = "\n".join([f"Human: {h}\nPearly: {a}" for h, a in chat_history[-3:]]) # Add persona reminder persona_reminder = f""" I am {self.conversation_memory['name']}, a {self.conversation_memory['role']}. My communication style is {self.conversation_memory['style']}. """ # Create enhanced prompt with persona integration prompt = f"""Context: Medical Information: {medical_context} {persona_reminder} Previous Interactions: {history_context} Current Query: {query} Maintain my identity as {self.conversation_memory['name']}, the {self.conversation_memory['role']}, providing clear, professional guidance following NHS protocols. Urgency Level: {urgency_assessment['level']} Cultural Considerations: {', '.join(cultural_considerations)} Respond in a clear, caring manner, always referring to myself as {self.conversation_memory['name']}. Response:""" # Generate response inputs = self.tokenizer( prompt, return_tensors="pt", max_length=self.config.MAX_LENGTH, truncation=True ).to(self.device) with torch.amp.autocast(device_type='cuda'): outputs = self.model.generate( **inputs, max_new_tokens=512, do_sample=True, top_p=0.9, temperature=0.7, num_return_sequences=1, pad_token_id=self.tokenizer.eos_token_id ) response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) response = response.split("Response:")[-1].strip() # Add booking template for emergency/urgent cases if urgency_assessment['level'] in ["EMERGENCY", "URGENT"]: booking_template = self.get_booking_template(urgency_assessment['level']) response = f"{response}\n\n{booking_template}" return { 'response': response, 'urgency_assessment': urgency_assessment, 'cultural_considerations': cultural_considerations } except Exception as e: logger.error(f"Error generating response: {e}") return { 'response': "I apologize, but I encountered an error. If this is an emergency, please call 999 immediately.", 'urgency_assessment': {'level': 'UNKNOWN'}, 'cultural_considerations': [] } def check_urgency_accuracy(self, predicted: str, expected: str) -> float: """Check if urgency level matches expected""" return 1.0 if predicted == expected else 0.0 def check_action_accuracy(self, response: str, expected_actions: List[str]) -> float: """Check if recommended actions match expected""" if not expected_actions: return 1.0 found_actions = sum(1 for action in expected_actions if action.lower() in response.lower()) return found_actions / len(expected_actions) def assess_conversation_quality(self, response: str) -> float: """Assess conversation quality metrics""" metrics = { 'empathy': any(word in response.lower() for word in ['understand', 'hear you', 'sorry']), 'clarity': len(response.split('.')) <= 5, # Check for concise sentences 'follow_up': '?' in response, # Check for follow-up questions 'structure': any(word in response.lower() for word in ['first', 'then', 'next', 'finally']) } return sum(metrics.values()) / len(metrics) def check_cultural_sensitivity(self, response_data: Dict, context: str) -> float: """Check cultural sensitivity of response""" if not context: return 1.0 cultural_considerations = response_data.get('cultural_considerations', []) return 1.0 if any(context.lower() in cons.lower() for cons in cultural_considerations) else 0.0 def setup_wandb(config: MedicalConfig): """Setup Weights & Biases tracking""" try: wandb.init( project="medical-chatbot", config={ "learning_rate": config.LEARNING_RATE, "epochs": config.NUM_EPOCHS, "batch_size": config.TRAINING_BATCH_SIZE, "lora_r": config.LORA_R, "lora_alpha": config.LORA_ALPHA } ) logger.info("Weights & Biases initialized successfully") except Exception as e: logger.warning(f"Failed to initialize Weights & Biases: {e}") logger.warning("Continuing without wandb tracking") if __name__ == "__main__": try: # Initialize configuration config = MedicalConfig() # Monitor initial GPU state logger.info("Checking GPU availability and memory...") if torch.cuda.is_available(): for i in range(torch.cuda.device_count()): gpu_props = torch.cuda.get_device_properties(i) logger.info(f"GPU {i}: {gpu_props.name}") logger.info(f"Total Memory: {gpu_props.total_memory / 1e9:.2f}GB") else: logger.warning("No GPU detected, running on CPU") # Setup wandb tracking with error handling try: wandb_key = os.getenv("WANDB_API_KEY") if wandb_key: wandb_key = wandb_key.strip() setup_wandb(config) else: logger.warning("No WANDB_API_KEY found, skipping wandb initialization") except Exception as e: logger.warning(f"Failed to initialize Weights & Biases: {e}") logger.warning("Continuing without wandb tracking") # Initialize RAG system with T4 optimization logger.info("Initializing RAG system...") rag_system = GPUOptimizedRAG( config=config, use_cpu_fallback=False, # Use GPU by default model_path="google/gemma-7b", embedding_model="sentence-transformers/all-MiniLM-L6-v2" ) # Monitor GPU state after model loading if torch.cuda.is_available(): device = torch.cuda.current_device() allocated = torch.cuda.memory_allocated(device) / 1024**2 reserved = torch.cuda.memory_reserved(device) / 1024**2 logger.info(f"GPU Memory after model loading: Allocated: {allocated:.2f}MB, Reserved: {reserved:.2f}MB") # Prepare and index documents with progress monitoring logger.info("Preparing medical knowledge base...") try: medical_documents = prepare_medical_documents() document_metrics = rag_system.prepare_documents(medical_documents) logger.info(f"Document processing metrics: {document_metrics}") except Exception as e: logger.error(f"Error in document preparation: {e}") raise # Create and launch Gradio interface with error checking logger.info("Creating Gradio interface...") interface = create_gradio_interface(rag_system) if interface is None: raise ValueError("Failed to create Gradio interface - interface object is None") # Clear CUDA cache before launching interface if torch.cuda.is_available(): torch.cuda.empty_cache() logger.info("Cleared CUDA cache before launch") # Launch the interface with T4 memory management logger.info("Launching interface...") interface.launch( server_name="0.0.0.0", server_port=7860, share=True, enable_queue=True, # Enable queue for better memory management max_threads=4 # Limit concurrent processing ) except Exception as e: logger.error(f"Application startup error: {e}") # Clean up on error if torch.cuda.is_available(): torch.cuda.empty_cache() if 'wandb' in locals(): try: wandb.finish() except: pass raise finally: # Final cleanup if torch.cuda.is_available(): torch.cuda.empty_cache() logger.info("Application shutdown complete")