| from typing import List, Dict, Optional |
| from supabase import create_client, Client |
| from datetime import datetime |
| from config.settings import settings |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class SupabaseManager: |
| def __init__(self): |
| |
| settings.validate_required_env_vars() |
| |
| |
| self.supabase: Client = create_client(settings.SUPABASE_URL, settings.SUPABASE_ANON_KEY) |
| logger.info("SupabaseManager initialized successfully") |
| |
| async def get_active_users(self) -> List[Dict]: |
| """Get all users with active notifications""" |
| try: |
| response = self.supabase.table("users").select("*").eq("active", True).execute() |
| return response.data |
| except Exception as e: |
| print(f"Error getting active users: {e}") |
| return [] |
| |
| async def get_user_by_email(self, email: str) -> Optional[Dict]: |
| """Get user by email address""" |
| try: |
| response = self.supabase.table("users").select("*").eq("email", email).execute() |
| return response.data[0] if response.data else None |
| except Exception as e: |
| print(f"Error getting user by email: {e}") |
| return None |
| |
| async def save_user_preferences(self, email: str, preferences: str) -> Dict: |
| """Save or update user preferences""" |
| try: |
| |
| existing_user = await self.get_user_by_email(email) |
| |
| if existing_user: |
| |
| response = self.supabase.table("users").update({ |
| "preferences_text": preferences, |
| "updated_at": datetime.utcnow().isoformat(), |
| "active": True |
| }).eq("email", email).execute() |
| else: |
| |
| response = self.supabase.table("users").insert({ |
| "email": email, |
| "preferences_text": preferences, |
| "active": True |
| }).execute() |
| |
| return response.data[0] if response.data else {} |
| except Exception as e: |
| print(f"Error saving user preferences: {e}") |
| raise |
| |
| async def toggle_user_active(self, email: str, active: bool) -> bool: |
| """Toggle user active status""" |
| try: |
| response = self.supabase.table("users").update({ |
| "active": active, |
| "updated_at": datetime.utcnow().isoformat() |
| }).eq("email", email).execute() |
| return len(response.data) > 0 |
| except Exception as e: |
| print(f"Error toggling user active status: {e}") |
| return False |
| |
| |
| async def log_agent_run(self, user_id: str, challenges_found: int, notifications_sent: int, status: str = "completed", error_message: Optional[str] = None) -> bool: |
| """Log agent execution""" |
| try: |
| self.supabase.table("agent_runs").insert({ |
| "user_id": user_id, |
| "challenges_found": challenges_found, |
| "notifications_sent": notifications_sent, |
| "status": status, |
| "error_message": error_message |
| }).execute() |
| return True |
| except Exception as e: |
| print(f"Error logging agent run: {e}") |
| return False |
|
|