from typing import List, Dict, Optional from api.sources.agent.request_params.auth import UpdateBrandRequest from core.constants import Tables from core.interface.supabase_client import async_supabase_client, supabase_config, create_supabase_client from core.schemas.agent import AgentModel from core.schemas.wallet import WalletModel class AuthDataSource: @staticmethod async def login(email: str, password: str) -> Dict: supabase = create_supabase_client(supabase_config) response = supabase.auth.sign_in_with_password({ "email": email, "password": password }) res = {'refresh_token': response.session.refresh_token, 'access_token': response.session.access_token, 'uuid': response.user.id} return res @staticmethod async def login_passwordless(phone: str): supabase = create_supabase_client(supabase_config) supabase.auth.sign_in_with_otp({ "phone": phone }) @staticmethod async def verify_passwordless(phone: str, token: str): supabase = create_supabase_client(supabase_config) response = supabase.auth.verify_otp({ "phone": phone, "token": token, "type": "sms" }) print(response) is_new_user = False supabase = await async_supabase_client(supabase_config) db_response = await supabase.table(Tables.AGENT_TABLE) \ .select('*') \ .eq('phone', phone) \ .execute() if not db_response.data: is_new_user = True res = {'refresh_token': response.session.refresh_token, 'access_token': response.session.access_token, 'uuid': response.user.id} return res, is_new_user @staticmethod async def register(email: str, password: str) -> str: supabase = create_supabase_client(supabase_config) response = supabase.auth.sign_up({ "email": email, "password": password, }) uuid = response.user.id return uuid @staticmethod async def register_in_db(**kwargs): data = kwargs # data['wallet'] = WalletModel(amount=0) # data['setup_completed'] = False user: AgentModel = AgentModel(**data) supabase = await async_supabase_client(supabase_config) response = await supabase.table(Tables.AGENT_TABLE) \ .insert(user.to_json()) \ .execute() return response.data @staticmethod async def verify(uuid: str) -> Optional[dict]: supabase = await async_supabase_client(supabase_config) response = await supabase.table(Tables.AGENT_TABLE) \ .select('*') \ .eq('uuid', uuid) \ .execute() if response.data: res = response.data[0] res.pop('password_hash') return res else: return None @staticmethod async def refresh_auth_state(refresh_token: str): supabase = create_supabase_client(supabase_config) response = supabase.auth.refresh_session(refresh_token) res = {'refresh_token': response.session.refresh_token, 'access_token': response.session.access_token, 'uuid': response.user.id} return res @staticmethod async def does_user_exists(email: Optional[str], phone: Optional[str]): supabase = await async_supabase_client(supabase_config) response = await supabase.table(Tables.AGENT_TABLE) \ .select('*') \ .eq('email' if email else 'phone', email if email else phone) \ .execute() if bool(response.data): return response.data[0], bool(response.data[0]['setup_completed']) else: return None, False @staticmethod async def setup_completed(uuid: str): supabase = await async_supabase_client(supabase_config) data, count = await supabase.table(Tables.AGENT_TABLE) \ .update({'setup_completed': True}) \ .eq('uuid', uuid) \ .execute() @staticmethod async def setup_brand(brand_request: UpdateBrandRequest, agent_uuid: str): supabase = await async_supabase_client(supabase_config) # Step 1: Create or get the brand in the "brands" table brand_data = { "brand_name": brand_request.brand_name, "logo_url": brand_request.logo_url, "proof_id_url": brand_request.proof_id_url, } db_response = await supabase.table(Tables.BRAND_TABLE) \ .select('*') \ .eq("brand_name", brand_request.brand_name) \ .execute() if not db_response.data: brand_response = await supabase.table(Tables.BRAND_TABLE) \ .insert(brand_data) \ .execute() else: brand_response = await supabase.table(Tables.BRAND_TABLE) \ .update(brand_data) \ .eq('brand_name', brand_request.brand_name) \ .execute() brand_id = brand_response.data[0]['brand_id'] # Step 2: Add brand location in the "brand_locations" table location_data = { "brand_id": brand_id, "location_lat": brand_request.lat, "location_long": brand_request.long, } await supabase.table(Tables.BRAND_LOCATIONS_TABLE) \ .upsert(location_data) \ .execute() # Step 3: Link brand_id with agent in the "agents" table agent_data = { "brand_id": brand_id, } await supabase.table(Tables.AGENT_TABLE) \ .update(agent_data) \ .eq('uuid', agent_uuid) \ .execute() @staticmethod async def update_wallet_amount(uuid: str, amount: float): supabase = await async_supabase_client(supabase_config) await supabase.table(Tables.AGENT_TABLE) \ .update({"wallet": WalletModel(amount=amount).to_json()}) \ .eq('uuid', uuid) \ .execute() @staticmethod async def fetch_categories() -> List[dict]: return [ {"id": 1, "category": "General"}, {"id": 2, "category": "Restaurant Reservations"}, {"id": 3, "category": "Transportation"}, {"id": 4, "category": "Entertainment"}, {"id": 5, "category": "Tour Bookings"}, {"id": 6, "category": "Event Tickets"}, {"id": 7, "category": "Spa and Wellness"}, {"id": 8, "category": "Shopping Assistance"}, {"id": 9, "category": "Travel Planning"}, ]