Spaces:
Sleeping
Sleeping
from typing import List | |
from fastapi import APIRouter, HTTPException, Depends | |
from api.sources.user.data_sources.auth import AuthDataSource | |
from api.sources.user.request_params.auth import LoginRequest, RegisterRequest, VerifyRequest, UpdatePromptsRequest, RefreshRequest, \ | |
LoginPasswordlessRequest, VerifyPasswordlessRequest, UserExistsRequest | |
from core.depends.oauth import get_current_user | |
from core.schemas.prompt import PromptModel | |
router = APIRouter(prefix='/auth') | |
async def fetch_prompts(current_user: dict = Depends(get_current_user)): | |
prompts: List[PromptModel] = await AuthDataSource.fetch_prompts() | |
return {"result": prompts} | |
async def update_prompts(request: UpdatePromptsRequest, current_user: dict = Depends(get_current_user)): | |
uuid = current_user['id'] | |
default_amount = 50 | |
try: | |
await AuthDataSource.update_prompts(uuid=uuid, prompts=request.prompts['prompts']) | |
await AuthDataSource.update_wallet_amount(uuid=uuid, amount=default_amount) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error: {e}") | |
return {"result": default_amount} | |
async def refresh_auth_state(request: RefreshRequest): | |
try: | |
res = await AuthDataSource.refresh_auth_state(request.refresh_token) | |
return {"result": res} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error: {e}") | |
async def user_exists(request: UserExistsRequest): | |
print(request.email) | |
print(request.phone) | |
if not request.email and not request.phone: | |
raise HTTPException(status_code=500, detail=f"Pass either `email` or `phone`") | |
try: | |
user, is_new_user = await AuthDataSource.does_user_exists(email=request.email, phone=request.phone) | |
return {"result": { | |
"user_data": user, | |
"user_exists": is_new_user, | |
}} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error: {e}") | |
async def login(request: LoginRequest): | |
try: | |
auth_data = await AuthDataSource.login(email=request.email, password=request.password) | |
user_data = await AuthDataSource.verify(uuid=auth_data['uuid']) | |
return {"result": { | |
"user_data": user_data, | |
"auth_data": auth_data | |
}} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Unable to login with following credentials.") | |
async def login_passwordless(request: LoginPasswordlessRequest): | |
try: | |
await AuthDataSource.login_passwordless(phone=request.phone) | |
return {"result": True} | |
except Exception as e: | |
print(e) | |
raise HTTPException(status_code=500, detail=f"Unable to login with following credentials.") | |
async def verify_passwordless(request: VerifyPasswordlessRequest): | |
try: | |
auth_data, is_new_user = await AuthDataSource.verify_passwordless(phone=request.phone, token=request.token) | |
return {"result": { | |
"auth_data": auth_data, | |
"is_new_user": is_new_user | |
}} | |
except Exception as e: | |
print(e) | |
raise HTTPException(status_code=500, detail=f"Unable to login with following credentials.") | |
async def register_user_info(request: RegisterRequest, current_user: dict = Depends(get_current_user)): | |
try: | |
uuid = current_user['id'] | |
data = request.dict() | |
data['uuid'] = uuid | |
await AuthDataSource.register_in_db(**data) | |
return {"result": uuid} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error: {e}") | |
async def register(request: RegisterRequest): | |
try: | |
uuid = await AuthDataSource.register(email=request.email, | |
password=request.password_hash) | |
print(uuid) | |
await AuthDataSource.register_in_db(uuid, **request.dict()) | |
return {"result": uuid} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error: {e}") | |
async def verify(request: VerifyRequest): | |
try: | |
data = await AuthDataSource.verify(uuid=request.uuid) | |
return {"result": data} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=f"Error: {e}") | |