Spaces:
Sleeping
Sleeping
from typing import List, Dict, Optional | |
from api.sources.agent.request_params.auth import UpdateBrandRequest | |
from core.constants import Tables | |
from core.interface.supabase_client import async_supabase_client, supabase_config, create_supabase_client | |
from core.schemas.agent import AgentModel | |
from core.schemas.wallet import WalletModel | |
class AuthDataSource: | |
async def login(email: str, password: str) -> Dict: | |
supabase = create_supabase_client(supabase_config) | |
response = supabase.auth.sign_in_with_password({ | |
"email": email, | |
"password": password | |
}) | |
res = {'refresh_token': response.session.refresh_token, 'access_token': response.session.access_token, | |
'uuid': response.user.id} | |
return res | |
async def login_passwordless(phone: str): | |
supabase = create_supabase_client(supabase_config) | |
supabase.auth.sign_in_with_otp({ | |
"phone": phone | |
}) | |
async def verify_passwordless(phone: str, token: str): | |
supabase = create_supabase_client(supabase_config) | |
response = supabase.auth.verify_otp({ | |
"phone": phone, | |
"token": token, | |
"type": "sms" | |
}) | |
print(response) | |
is_new_user = False | |
supabase = await async_supabase_client(supabase_config) | |
db_response = await supabase.table(Tables.AGENT_TABLE) \ | |
.select('*') \ | |
.eq('phone', phone) \ | |
.execute() | |
if not db_response.data: | |
is_new_user = True | |
res = {'refresh_token': response.session.refresh_token, 'access_token': response.session.access_token, | |
'uuid': response.user.id} | |
return res, is_new_user | |
async def register(email: str, password: str) -> str: | |
supabase = create_supabase_client(supabase_config) | |
response = supabase.auth.sign_up({ | |
"email": email, | |
"password": password, | |
}) | |
uuid = response.user.id | |
return uuid | |
async def register_in_db(**kwargs): | |
data = kwargs | |
# data['wallet'] = WalletModel(amount=0) | |
# data['setup_completed'] = False | |
user: AgentModel = AgentModel(**data) | |
supabase = await async_supabase_client(supabase_config) | |
response = await supabase.table(Tables.AGENT_TABLE) \ | |
.insert(user.to_json()) \ | |
.execute() | |
return response.data | |
async def verify(uuid: str) -> Optional[dict]: | |
supabase = await async_supabase_client(supabase_config) | |
response = await supabase.table(Tables.AGENT_TABLE) \ | |
.select('*') \ | |
.eq('uuid', uuid) \ | |
.execute() | |
if response.data: | |
res = response.data[0] | |
res.pop('password_hash') | |
return res | |
else: | |
return None | |
async def refresh_auth_state(refresh_token: str): | |
supabase = create_supabase_client(supabase_config) | |
response = supabase.auth.refresh_session(refresh_token) | |
res = {'refresh_token': response.session.refresh_token, 'access_token': response.session.access_token, | |
'uuid': response.user.id} | |
return res | |
async def does_user_exists(email: Optional[str], phone: Optional[str]): | |
supabase = await async_supabase_client(supabase_config) | |
response = await supabase.table(Tables.AGENT_TABLE) \ | |
.select('*') \ | |
.eq('email' if email else 'phone', email if email else phone) \ | |
.execute() | |
if bool(response.data): | |
return response.data[0], bool(response.data[0]['setup_completed']) | |
else: | |
return None, False | |
async def setup_completed(uuid: str): | |
supabase = await async_supabase_client(supabase_config) | |
data, count = await supabase.table(Tables.AGENT_TABLE) \ | |
.update({'setup_completed': True}) \ | |
.eq('uuid', uuid) \ | |
.execute() | |
async def setup_brand(brand_request: UpdateBrandRequest, agent_uuid: str): | |
supabase = await async_supabase_client(supabase_config) | |
# Step 1: Create or get the brand in the "brands" table | |
brand_data = { | |
"brand_name": brand_request.brand_name, | |
"logo_url": brand_request.logo_url, | |
"proof_id_url": brand_request.proof_id_url, | |
} | |
db_response = await supabase.table(Tables.BRAND_TABLE) \ | |
.select('*') \ | |
.eq("brand_name", brand_request.brand_name) \ | |
.execute() | |
if not db_response.data: | |
brand_response = await supabase.table(Tables.BRAND_TABLE) \ | |
.insert(brand_data) \ | |
.execute() | |
else: | |
brand_response = await supabase.table(Tables.BRAND_TABLE) \ | |
.update(brand_data) \ | |
.eq('brand_name', brand_request.brand_name) \ | |
.execute() | |
brand_id = brand_response.data[0]['brand_id'] | |
# Step 2: Add brand location in the "brand_locations" table | |
location_data = { | |
"brand_id": brand_id, | |
"location_lat": brand_request.lat, | |
"location_long": brand_request.long, | |
} | |
await supabase.table(Tables.BRAND_LOCATIONS_TABLE) \ | |
.upsert(location_data) \ | |
.execute() | |
# Step 3: Link brand_id with agent in the "agents" table | |
agent_data = { | |
"brand_id": brand_id, | |
} | |
await supabase.table(Tables.AGENT_TABLE) \ | |
.update(agent_data) \ | |
.eq('uuid', agent_uuid) \ | |
.execute() | |
async def update_wallet_amount(uuid: str, amount: float): | |
supabase = await async_supabase_client(supabase_config) | |
await supabase.table(Tables.AGENT_TABLE) \ | |
.update({"wallet": WalletModel(amount=amount).to_json()}) \ | |
.eq('uuid', uuid) \ | |
.execute() | |
async def fetch_categories() -> List[dict]: | |
return [ | |
{"id": 1, "category": "General"}, | |
{"id": 2, "category": "Restaurant Reservations"}, | |
{"id": 3, "category": "Transportation"}, | |
{"id": 4, "category": "Entertainment"}, | |
{"id": 5, "category": "Tour Bookings"}, | |
{"id": 6, "category": "Event Tickets"}, | |
{"id": 7, "category": "Spa and Wellness"}, | |
{"id": 8, "category": "Shopping Assistance"}, | |
{"id": 9, "category": "Travel Planning"}, | |
] | |