yashmakan's picture
files added
bf2bf0e
from typing import List, Dict, Optional
from api.sources.agent.request_params.auth import UpdateBrandRequest
from core.constants import Tables
from core.interface.supabase_client import async_supabase_client, supabase_config, create_supabase_client
from core.schemas.agent import AgentModel
from core.schemas.wallet import WalletModel
class AuthDataSource:
@staticmethod
async def login(email: str, password: str) -> Dict:
supabase = create_supabase_client(supabase_config)
response = supabase.auth.sign_in_with_password({
"email": email,
"password": password
})
res = {'refresh_token': response.session.refresh_token, 'access_token': response.session.access_token,
'uuid': response.user.id}
return res
@staticmethod
async def login_passwordless(phone: str):
supabase = create_supabase_client(supabase_config)
supabase.auth.sign_in_with_otp({
"phone": phone
})
@staticmethod
async def verify_passwordless(phone: str, token: str):
supabase = create_supabase_client(supabase_config)
response = supabase.auth.verify_otp({
"phone": phone,
"token": token,
"type": "sms"
})
print(response)
is_new_user = False
supabase = await async_supabase_client(supabase_config)
db_response = await supabase.table(Tables.AGENT_TABLE) \
.select('*') \
.eq('phone', phone) \
.execute()
if not db_response.data:
is_new_user = True
res = {'refresh_token': response.session.refresh_token, 'access_token': response.session.access_token,
'uuid': response.user.id}
return res, is_new_user
@staticmethod
async def register(email: str, password: str) -> str:
supabase = create_supabase_client(supabase_config)
response = supabase.auth.sign_up({
"email": email,
"password": password,
})
uuid = response.user.id
return uuid
@staticmethod
async def register_in_db(**kwargs):
data = kwargs
# data['wallet'] = WalletModel(amount=0)
# data['setup_completed'] = False
user: AgentModel = AgentModel(**data)
supabase = await async_supabase_client(supabase_config)
response = await supabase.table(Tables.AGENT_TABLE) \
.insert(user.to_json()) \
.execute()
return response.data
@staticmethod
async def verify(uuid: str) -> Optional[dict]:
supabase = await async_supabase_client(supabase_config)
response = await supabase.table(Tables.AGENT_TABLE) \
.select('*') \
.eq('uuid', uuid) \
.execute()
if response.data:
res = response.data[0]
res.pop('password_hash')
return res
else:
return None
@staticmethod
async def refresh_auth_state(refresh_token: str):
supabase = create_supabase_client(supabase_config)
response = supabase.auth.refresh_session(refresh_token)
res = {'refresh_token': response.session.refresh_token, 'access_token': response.session.access_token,
'uuid': response.user.id}
return res
@staticmethod
async def does_user_exists(email: Optional[str], phone: Optional[str]):
supabase = await async_supabase_client(supabase_config)
response = await supabase.table(Tables.AGENT_TABLE) \
.select('*') \
.eq('email' if email else 'phone', email if email else phone) \
.execute()
if bool(response.data):
return response.data[0], bool(response.data[0]['setup_completed'])
else:
return None, False
@staticmethod
async def setup_completed(uuid: str):
supabase = await async_supabase_client(supabase_config)
data, count = await supabase.table(Tables.AGENT_TABLE) \
.update({'setup_completed': True}) \
.eq('uuid', uuid) \
.execute()
@staticmethod
async def setup_brand(brand_request: UpdateBrandRequest, agent_uuid: str):
supabase = await async_supabase_client(supabase_config)
# Step 1: Create or get the brand in the "brands" table
brand_data = {
"brand_name": brand_request.brand_name,
"logo_url": brand_request.logo_url,
"proof_id_url": brand_request.proof_id_url,
}
db_response = await supabase.table(Tables.BRAND_TABLE) \
.select('*') \
.eq("brand_name", brand_request.brand_name) \
.execute()
if not db_response.data:
brand_response = await supabase.table(Tables.BRAND_TABLE) \
.insert(brand_data) \
.execute()
else:
brand_response = await supabase.table(Tables.BRAND_TABLE) \
.update(brand_data) \
.eq('brand_name', brand_request.brand_name) \
.execute()
brand_id = brand_response.data[0]['brand_id']
# Step 2: Add brand location in the "brand_locations" table
location_data = {
"brand_id": brand_id,
"location_lat": brand_request.lat,
"location_long": brand_request.long,
}
await supabase.table(Tables.BRAND_LOCATIONS_TABLE) \
.upsert(location_data) \
.execute()
# Step 3: Link brand_id with agent in the "agents" table
agent_data = {
"brand_id": brand_id,
}
await supabase.table(Tables.AGENT_TABLE) \
.update(agent_data) \
.eq('uuid', agent_uuid) \
.execute()
@staticmethod
async def update_wallet_amount(uuid: str, amount: float):
supabase = await async_supabase_client(supabase_config)
await supabase.table(Tables.AGENT_TABLE) \
.update({"wallet": WalletModel(amount=amount).to_json()}) \
.eq('uuid', uuid) \
.execute()
@staticmethod
async def fetch_categories() -> List[dict]:
return [
{"id": 1, "category": "General"},
{"id": 2, "category": "Restaurant Reservations"},
{"id": 3, "category": "Transportation"},
{"id": 4, "category": "Entertainment"},
{"id": 5, "category": "Tour Bookings"},
{"id": 6, "category": "Event Tickets"},
{"id": 7, "category": "Spa and Wellness"},
{"id": 8, "category": "Shopping Assistance"},
{"id": 9, "category": "Travel Planning"},
]