chat / src /database.py
Dhruv-Ty's picture
Update src/database.py
301fa31 verified
"""
Database module for handling conversation persistence using Supabase.
"""
import os
import json
from datetime import datetime
import requests
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
class SupabaseClient:
def __init__(self):
"""Initialize Supabase client with credentials from environment variables."""
self.supabase_url = os.getenv('SUPABASE_URL')
self.supabase_key = os.getenv('SUPABASE_KEY')
if not self.supabase_url or not self.supabase_key:
raise ValueError("Supabase credentials not found in environment variables. "
"Please set SUPABASE_URL and SUPABASE_KEY.")
def _make_request(self, method, endpoint, data=None, params=None):
"""Make a request to Supabase API."""
url = f"{self.supabase_url}{endpoint}"
headers = {
"apikey": self.supabase_key,
"Authorization": f"Bearer {self.supabase_key}",
"Content-Type": "application/json",
"Prefer": "return=representation"
}
if method == "GET":
response = requests.get(url, headers=headers, params=params)
elif method == "POST":
response = requests.post(url, headers=headers, json=data)
elif method == "PUT":
response = requests.put(url, headers=headers, json=data)
elif method == "DELETE":
response = requests.delete(url, headers=headers, params=params)
else:
raise ValueError(f"Unsupported method: {method}")
return response
def create_conversation(self, consultation_id):
"""Create a new conversation record in the database."""
data = {
"consultation_id": consultation_id,
"created_at": datetime.now().isoformat(),
"is_active": True
}
response = self._make_request(
"POST",
"/rest/v1/conversations",
data=data
)
if response.status_code not in (200, 201):
raise Exception(f"Failed to create conversation: {response.text}")
return response.json()
def save_message(self, consultation_id, message):
"""Save a single message to the database."""
# Normalize message format for database storage
message_data = {
"consultation_id": consultation_id,
"role": message.get("role"),
"content": message.get("content"),
"timestamp": datetime.now().isoformat()
}
# Handle additional fields
if "explanation" in message:
message_data["explanation"] = message.get("explanation")
# Store follow-up questions
if "follow_up_questions" in message:
message_data["follow_up_questions"] = message.get("follow_up_questions")
if "evidence" in message:
# Convert evidence to string for storage
try:
message_data["evidence"] = json.dumps(message.get("evidence"))
except Exception as e:
# If JSON serialization fails, store as empty array
message_data["evidence"] = "[]"
print(f"Failed to serialize evidence: {str(e)}")
try:
response = self._make_request(
"POST",
"/rest/v1/messages",
data=message_data
)
if response.status_code not in (200, 201):
print(f"Failed to save message: Status {response.status_code}, Response: {response.text}")
# Attempt to log the message that failed to save
print(f"Message data: {json.dumps(message_data)}")
return None
return response.json()
except Exception as e:
print(f"Exception saving message to database: {str(e)}")
# Attempt to log the message that failed to save
print(f"Message data: {json.dumps(message_data)}")
return None
def get_conversation_history(self, consultation_id):
"""Retrieve full conversation history for a given consultation ID."""
params = {
"consultation_id": f"eq.{consultation_id}",
"order": "timestamp.asc",
"limit": "25" # Explicitly set a high limit for messages - 10000, if needed.
}
try:
response = self._make_request(
"GET",
"/rest/v1/messages",
params=params
)
if response.status_code != 200:
print(f"Failed to retrieve conversation history: Status {response.status_code}, Response: {response.text}")
return []
# Convert database format back to application format
messages = response.json()
print(f"Retrieved {len(messages)} messages from database for consultation {consultation_id}")
history = []
for msg in messages:
message_dict = {
"role": msg["role"],
"content": msg["content"]
}
if msg.get("explanation"):
message_dict["explanation"] = msg["explanation"]
if msg.get("follow_up_questions"):
message_dict["follow_up_questions"] = msg["follow_up_questions"]
if msg.get("evidence"):
# Parse evidence JSON string back to object
try:
message_dict["evidence"] = json.loads(msg["evidence"])
except Exception as e:
print(f"Failed to parse evidence JSON: {str(e)}")
message_dict["evidence"] = []
history.append(message_dict)
return history
except Exception as e:
print(f"Exception retrieving conversation history: {str(e)}")
return []
def delete_conversation(self, consultation_id):
"""Delete a completed conversation from the database."""
# First delete all messages
msg_params = {"consultation_id": f"eq.{consultation_id}"}
self._make_request("DELETE", "/rest/v1/messages", params=msg_params)
# Then delete conversation record
conv_params = {"consultation_id": f"eq.{consultation_id}"}
response = self._make_request("DELETE", "/rest/v1/conversations", params=conv_params)
if response.status_code != 200:
raise Exception(f"Failed to delete conversation: {response.text}")
return True
def get_active_conversations(self):
"""Get list of active conversations."""
params = {
"is_active": "eq.true",
"order": "created_at.desc"
}
response = self._make_request(
"GET",
"/rest/v1/conversations",
params=params
)
if response.status_code != 200:
raise Exception(f"Failed to retrieve active conversations: {response.text}")
return response.json()
# Initialize database client
def get_db_client():
"""Get database client instance."""
return SupabaseClient()