""" Database module for handling conversation persistence using Supabase. """ import os import json from datetime import datetime import requests from dotenv import load_dotenv # Load environment variables load_dotenv() class SupabaseClient: def __init__(self): """Initialize Supabase client with credentials from environment variables.""" self.supabase_url = os.getenv('SUPABASE_URL') self.supabase_key = os.getenv('SUPABASE_KEY') if not self.supabase_url or not self.supabase_key: raise ValueError("Supabase credentials not found in environment variables. " "Please set SUPABASE_URL and SUPABASE_KEY.") def _make_request(self, method, endpoint, data=None, params=None): """Make a request to Supabase API.""" url = f"{self.supabase_url}{endpoint}" headers = { "apikey": self.supabase_key, "Authorization": f"Bearer {self.supabase_key}", "Content-Type": "application/json", "Prefer": "return=representation" } if method == "GET": response = requests.get(url, headers=headers, params=params) elif method == "POST": response = requests.post(url, headers=headers, json=data) elif method == "PUT": response = requests.put(url, headers=headers, json=data) elif method == "DELETE": response = requests.delete(url, headers=headers, params=params) else: raise ValueError(f"Unsupported method: {method}") return response def create_conversation(self, consultation_id): """Create a new conversation record in the database.""" data = { "consultation_id": consultation_id, "created_at": datetime.now().isoformat(), "is_active": True } response = self._make_request( "POST", "/rest/v1/conversations", data=data ) if response.status_code not in (200, 201): raise Exception(f"Failed to create conversation: {response.text}") return response.json() def save_message(self, consultation_id, message): """Save a single message to the database.""" # Normalize message format for database storage message_data = { "consultation_id": consultation_id, "role": message.get("role"), "content": message.get("content"), "timestamp": datetime.now().isoformat() } # Handle additional fields if "explanation" in message: message_data["explanation"] = message.get("explanation") # Store follow-up questions if "follow_up_questions" in message: message_data["follow_up_questions"] = message.get("follow_up_questions") if "evidence" in message: # Convert evidence to string for storage try: message_data["evidence"] = json.dumps(message.get("evidence")) except Exception as e: # If JSON serialization fails, store as empty array message_data["evidence"] = "[]" print(f"Failed to serialize evidence: {str(e)}") try: response = self._make_request( "POST", "/rest/v1/messages", data=message_data ) if response.status_code not in (200, 201): print(f"Failed to save message: Status {response.status_code}, Response: {response.text}") # Attempt to log the message that failed to save print(f"Message data: {json.dumps(message_data)}") return None return response.json() except Exception as e: print(f"Exception saving message to database: {str(e)}") # Attempt to log the message that failed to save print(f"Message data: {json.dumps(message_data)}") return None def get_conversation_history(self, consultation_id): """Retrieve full conversation history for a given consultation ID.""" params = { "consultation_id": f"eq.{consultation_id}", "order": "timestamp.asc", "limit": "25" # Explicitly set a high limit for messages - 10000, if needed. } try: response = self._make_request( "GET", "/rest/v1/messages", params=params ) if response.status_code != 200: print(f"Failed to retrieve conversation history: Status {response.status_code}, Response: {response.text}") return [] # Convert database format back to application format messages = response.json() print(f"Retrieved {len(messages)} messages from database for consultation {consultation_id}") history = [] for msg in messages: message_dict = { "role": msg["role"], "content": msg["content"] } if msg.get("explanation"): message_dict["explanation"] = msg["explanation"] if msg.get("follow_up_questions"): message_dict["follow_up_questions"] = msg["follow_up_questions"] if msg.get("evidence"): # Parse evidence JSON string back to object try: message_dict["evidence"] = json.loads(msg["evidence"]) except Exception as e: print(f"Failed to parse evidence JSON: {str(e)}") message_dict["evidence"] = [] history.append(message_dict) return history except Exception as e: print(f"Exception retrieving conversation history: {str(e)}") return [] def delete_conversation(self, consultation_id): """Delete a completed conversation from the database.""" # First delete all messages msg_params = {"consultation_id": f"eq.{consultation_id}"} self._make_request("DELETE", "/rest/v1/messages", params=msg_params) # Then delete conversation record conv_params = {"consultation_id": f"eq.{consultation_id}"} response = self._make_request("DELETE", "/rest/v1/conversations", params=conv_params) if response.status_code != 200: raise Exception(f"Failed to delete conversation: {response.text}") return True def get_active_conversations(self): """Get list of active conversations.""" params = { "is_active": "eq.true", "order": "created_at.desc" } response = self._make_request( "GET", "/rest/v1/conversations", params=params ) if response.status_code != 200: raise Exception(f"Failed to retrieve active conversations: {response.text}") return response.json() # Initialize database client def get_db_client(): """Get database client instance.""" return SupabaseClient()