|
""" |
|
Database module for handling conversation persistence using Supabase. |
|
""" |
|
|
|
import os |
|
import json |
|
from datetime import datetime |
|
import requests |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
class SupabaseClient: |
|
def __init__(self): |
|
"""Initialize Supabase client with credentials from environment variables.""" |
|
self.supabase_url = os.getenv('SUPABASE_URL') |
|
self.supabase_key = os.getenv('SUPABASE_KEY') |
|
|
|
if not self.supabase_url or not self.supabase_key: |
|
raise ValueError("Supabase credentials not found in environment variables. " |
|
"Please set SUPABASE_URL and SUPABASE_KEY.") |
|
|
|
def _make_request(self, method, endpoint, data=None, params=None): |
|
"""Make a request to Supabase API.""" |
|
url = f"{self.supabase_url}{endpoint}" |
|
headers = { |
|
"apikey": self.supabase_key, |
|
"Authorization": f"Bearer {self.supabase_key}", |
|
"Content-Type": "application/json", |
|
"Prefer": "return=representation" |
|
} |
|
|
|
if method == "GET": |
|
response = requests.get(url, headers=headers, params=params) |
|
elif method == "POST": |
|
response = requests.post(url, headers=headers, json=data) |
|
elif method == "PUT": |
|
response = requests.put(url, headers=headers, json=data) |
|
elif method == "DELETE": |
|
response = requests.delete(url, headers=headers, params=params) |
|
else: |
|
raise ValueError(f"Unsupported method: {method}") |
|
|
|
return response |
|
|
|
def create_conversation(self, consultation_id): |
|
"""Create a new conversation record in the database.""" |
|
data = { |
|
"consultation_id": consultation_id, |
|
"created_at": datetime.now().isoformat(), |
|
"is_active": True |
|
} |
|
|
|
response = self._make_request( |
|
"POST", |
|
"/rest/v1/conversations", |
|
data=data |
|
) |
|
|
|
if response.status_code not in (200, 201): |
|
raise Exception(f"Failed to create conversation: {response.text}") |
|
|
|
return response.json() |
|
|
|
def save_message(self, consultation_id, message): |
|
"""Save a single message to the database.""" |
|
|
|
message_data = { |
|
"consultation_id": consultation_id, |
|
"role": message.get("role"), |
|
"content": message.get("content"), |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
|
|
|
|
if "explanation" in message: |
|
message_data["explanation"] = message.get("explanation") |
|
|
|
|
|
if "follow_up_questions" in message: |
|
message_data["follow_up_questions"] = message.get("follow_up_questions") |
|
|
|
if "evidence" in message: |
|
|
|
try: |
|
message_data["evidence"] = json.dumps(message.get("evidence")) |
|
except Exception as e: |
|
|
|
message_data["evidence"] = "[]" |
|
print(f"Failed to serialize evidence: {str(e)}") |
|
|
|
try: |
|
response = self._make_request( |
|
"POST", |
|
"/rest/v1/messages", |
|
data=message_data |
|
) |
|
|
|
if response.status_code not in (200, 201): |
|
print(f"Failed to save message: Status {response.status_code}, Response: {response.text}") |
|
|
|
print(f"Message data: {json.dumps(message_data)}") |
|
return None |
|
|
|
return response.json() |
|
except Exception as e: |
|
print(f"Exception saving message to database: {str(e)}") |
|
|
|
print(f"Message data: {json.dumps(message_data)}") |
|
return None |
|
|
|
def get_conversation_history(self, consultation_id): |
|
"""Retrieve full conversation history for a given consultation ID.""" |
|
params = { |
|
"consultation_id": f"eq.{consultation_id}", |
|
"order": "timestamp.asc", |
|
"limit": "25" |
|
} |
|
|
|
try: |
|
response = self._make_request( |
|
"GET", |
|
"/rest/v1/messages", |
|
params=params |
|
) |
|
|
|
if response.status_code != 200: |
|
print(f"Failed to retrieve conversation history: Status {response.status_code}, Response: {response.text}") |
|
return [] |
|
|
|
|
|
messages = response.json() |
|
print(f"Retrieved {len(messages)} messages from database for consultation {consultation_id}") |
|
|
|
history = [] |
|
|
|
for msg in messages: |
|
message_dict = { |
|
"role": msg["role"], |
|
"content": msg["content"] |
|
} |
|
|
|
if msg.get("explanation"): |
|
message_dict["explanation"] = msg["explanation"] |
|
|
|
if msg.get("follow_up_questions"): |
|
message_dict["follow_up_questions"] = msg["follow_up_questions"] |
|
|
|
if msg.get("evidence"): |
|
|
|
try: |
|
message_dict["evidence"] = json.loads(msg["evidence"]) |
|
except Exception as e: |
|
print(f"Failed to parse evidence JSON: {str(e)}") |
|
message_dict["evidence"] = [] |
|
|
|
history.append(message_dict) |
|
|
|
return history |
|
|
|
except Exception as e: |
|
print(f"Exception retrieving conversation history: {str(e)}") |
|
return [] |
|
|
|
def delete_conversation(self, consultation_id): |
|
"""Delete a completed conversation from the database.""" |
|
|
|
msg_params = {"consultation_id": f"eq.{consultation_id}"} |
|
self._make_request("DELETE", "/rest/v1/messages", params=msg_params) |
|
|
|
|
|
conv_params = {"consultation_id": f"eq.{consultation_id}"} |
|
response = self._make_request("DELETE", "/rest/v1/conversations", params=conv_params) |
|
|
|
if response.status_code != 200: |
|
raise Exception(f"Failed to delete conversation: {response.text}") |
|
|
|
return True |
|
|
|
def get_active_conversations(self): |
|
"""Get list of active conversations.""" |
|
params = { |
|
"is_active": "eq.true", |
|
"order": "created_at.desc" |
|
} |
|
|
|
response = self._make_request( |
|
"GET", |
|
"/rest/v1/conversations", |
|
params=params |
|
) |
|
|
|
if response.status_code != 200: |
|
raise Exception(f"Failed to retrieve active conversations: {response.text}") |
|
|
|
return response.json() |
|
|
|
|
|
def get_db_client(): |
|
"""Get database client instance.""" |
|
return SupabaseClient() |
|
|