File size: 7,546 Bytes
3158c30 abc2d3f 3158c30 abc2d3f 3158c30 ad30c53 3158c30 11042ec 34c6a7d 11042ec 34c6a7d 11042ec 34c6a7d 11042ec 34c6a7d 11042ec 3158c30 6bae0d0 d87f9c0 3158c30 11042ec 0a90f16 11042ec 0a90f16 11042ec 34c6a7d ad30c53 34c6a7d 0a90f16 11042ec abc2d3f 11042ec 3158c30 abc2d3f 3158c30 abc2d3f 3158c30 abc2d3f 3158c30 34c6a7d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 |
"""
Database module for handling conversation persistence using Supabase.
"""
import os
import json
from datetime import datetime
import requests
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
class SupabaseClient:
def __init__(self):
"""Initialize Supabase client with credentials from environment variables."""
self.supabase_url = os.getenv('SUPABASE_URL')
self.supabase_key = os.getenv('SUPABASE_KEY')
if not self.supabase_url or not self.supabase_key:
raise ValueError("Supabase credentials not found in environment variables. "
"Please set SUPABASE_URL and SUPABASE_KEY.")
def _make_request(self, method, endpoint, data=None, params=None):
"""Make a request to Supabase API."""
url = f"{self.supabase_url}{endpoint}"
headers = {
"apikey": self.supabase_key,
"Authorization": f"Bearer {self.supabase_key}",
"Content-Type": "application/json",
"Prefer": "return=representation"
}
if method == "GET":
response = requests.get(url, headers=headers, params=params)
elif method == "POST":
response = requests.post(url, headers=headers, json=data)
elif method == "PUT":
response = requests.put(url, headers=headers, json=data)
elif method == "DELETE":
response = requests.delete(url, headers=headers, params=params)
else:
raise ValueError(f"Unsupported method: {method}")
return response
def create_conversation(self, consultation_id):
"""Create a new conversation record in the database."""
data = {
"consultation_id": consultation_id,
"created_at": datetime.now().isoformat(),
"is_active": True
}
response = self._make_request(
"POST",
"/rest/v1/conversations",
data=data
)
if response.status_code not in (200, 201):
raise Exception(f"Failed to create conversation: {response.text}")
return response.json()
def save_message(self, consultation_id, message):
"""Save a single message to the database."""
# Normalize message format for database storage
message_data = {
"consultation_id": consultation_id,
"role": message.get("role"),
"content": message.get("content"),
"timestamp": datetime.now().isoformat()
}
# Handle additional fields
if "explanation" in message:
message_data["explanation"] = message.get("explanation")
# Store follow-up questions
if "follow_up_questions" in message:
message_data["follow_up_questions"] = message.get("follow_up_questions")
if "evidence" in message:
# Convert evidence to string for storage
try:
message_data["evidence"] = json.dumps(message.get("evidence"))
except Exception as e:
# If JSON serialization fails, store as empty array
message_data["evidence"] = "[]"
print(f"Failed to serialize evidence: {str(e)}")
try:
response = self._make_request(
"POST",
"/rest/v1/messages",
data=message_data
)
if response.status_code not in (200, 201):
print(f"Failed to save message: Status {response.status_code}, Response: {response.text}")
# Attempt to log the message that failed to save
print(f"Message data: {json.dumps(message_data)}")
return None
return response.json()
except Exception as e:
print(f"Exception saving message to database: {str(e)}")
# Attempt to log the message that failed to save
print(f"Message data: {json.dumps(message_data)}")
return None
def get_conversation_history(self, consultation_id):
"""Retrieve full conversation history for a given consultation ID."""
params = {
"consultation_id": f"eq.{consultation_id}",
"order": "timestamp.asc",
"limit": "25" # Explicitly set a high limit for messages - 10000, if needed.
}
try:
response = self._make_request(
"GET",
"/rest/v1/messages",
params=params
)
if response.status_code != 200:
print(f"Failed to retrieve conversation history: Status {response.status_code}, Response: {response.text}")
return []
# Convert database format back to application format
messages = response.json()
print(f"Retrieved {len(messages)} messages from database for consultation {consultation_id}")
history = []
for msg in messages:
message_dict = {
"role": msg["role"],
"content": msg["content"]
}
if msg.get("explanation"):
message_dict["explanation"] = msg["explanation"]
if msg.get("follow_up_questions"):
message_dict["follow_up_questions"] = msg["follow_up_questions"]
if msg.get("evidence"):
# Parse evidence JSON string back to object
try:
message_dict["evidence"] = json.loads(msg["evidence"])
except Exception as e:
print(f"Failed to parse evidence JSON: {str(e)}")
message_dict["evidence"] = []
history.append(message_dict)
return history
except Exception as e:
print(f"Exception retrieving conversation history: {str(e)}")
return []
def delete_conversation(self, consultation_id):
"""Delete a completed conversation from the database."""
# First delete all messages
msg_params = {"consultation_id": f"eq.{consultation_id}"}
self._make_request("DELETE", "/rest/v1/messages", params=msg_params)
# Then delete conversation record
conv_params = {"consultation_id": f"eq.{consultation_id}"}
response = self._make_request("DELETE", "/rest/v1/conversations", params=conv_params)
if response.status_code != 200:
raise Exception(f"Failed to delete conversation: {response.text}")
return True
def get_active_conversations(self):
"""Get list of active conversations."""
params = {
"is_active": "eq.true",
"order": "created_at.desc"
}
response = self._make_request(
"GET",
"/rest/v1/conversations",
params=params
)
if response.status_code != 200:
raise Exception(f"Failed to retrieve active conversations: {response.text}")
return response.json()
# Initialize database client
def get_db_client():
"""Get database client instance."""
return SupabaseClient()
|