Spaces:
Sleeping
Sleeping
from typing import Union | |
from datetime import timedelta | |
from request import RequestDefault as req | |
from response import ResponseDefault as res | |
from response import ResponseUser as res_login | |
from firebase_admin import credentials, auth, exceptions | |
import firebase_admin | |
import base64 | |
import auth.authentication as auth123 | |
import re | |
from repository import UserRepository, UserInfoRepository | |
from pathlib import Path | |
import cloudinary | |
import cloudinary.uploader | |
from fastapi import FastAPI, File, UploadFile | |
from function import support_function as sf | |
from dotenv import load_dotenv | |
from service import AuthService as authservice | |
import os | |
load_dotenv() | |
CLOUDINARY_CLOUD_NAME=os.getenv("CLOUDINARY_CLOUD_NAME") | |
CLOUDINARY_API_KEY=os.getenv("CLOUDINARY_API_KEY") | |
CLOUDINARY_API_SECRET=os.getenv("CLOUDINARY_API_SECRET") | |
cloudinary.config( | |
cloud_name=CLOUDINARY_CLOUD_NAME, | |
api_key=CLOUDINARY_API_KEY, | |
api_secret=CLOUDINARY_API_SECRET | |
) | |
import os | |
try: | |
if not firebase_admin._apps: | |
cred = credentials.Certificate("../certificate/firebase_certificate.json") | |
fred = firebase_admin.initialize_app(cred) | |
except: | |
try: | |
if not firebase_admin._apps: | |
cred = credentials.Certificate("firebase_certificate.json") | |
fred = firebase_admin.initialize_app(cred) | |
except: | |
if not firebase_admin._apps: | |
json_path = Path(__file__).resolve().parent / 'app' / 'firebase_certificate.json' | |
cred = credentials.Certificate(str(json_path)) | |
fred = firebase_admin.initialize_app(cred) | |
regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' | |
def check_email(email): | |
if(re.fullmatch(regex, email)): | |
return True | |
else: | |
return False | |
def get_user(email): | |
try: | |
user = auth.get_user_by_email(email) | |
return user | |
except exceptions.FirebaseError as e: | |
return None | |
def create_user(email): | |
user = auth.create_user(email=email) | |
return user | |
def get_user1(email): | |
try: | |
user = auth.get_user_by_email(email) | |
return user | |
except exceptions.FirebaseError as e: | |
return None | |
def create_firebase_user(request: req.RequestCreateFireBaseUserGoogle): | |
try: | |
email = request.email | |
token_google = request.token_google | |
check_email_fc = sf.check_email_empty_invalid(email) | |
if check_email_fc is not True: | |
return check_email_fc | |
user = get_user(email) | |
if token_google is None or token_google == "": | |
return res.ReponseError( | |
status=400, | |
data=res.Message(message="token google not empty") | |
) | |
check_google = authservice.verify_token_google(token_google) | |
if check_google == False: | |
return res.ReponseError( | |
status=400, | |
data=res.Message(message="Create user failed") | |
) | |
if user: | |
email1 = user.email | |
display_name = user.display_name | |
uid = user.uid | |
photo_url = user.photo_url | |
return res.ResponseCreateFireBaseUser( | |
status=200, | |
data = res.DataCreateFireBaseUser(localId=uid, | |
email=email1, | |
displayName=display_name, | |
photoUrl=photo_url) | |
) | |
else: | |
return res.ReponseError( | |
status=500, | |
data =res.Message(message="Error") | |
) | |
except: | |
return res.ReponseError( | |
status=500, | |
data =res.Message(message="Server Error") | |
) | |
def info_user(request: req.RequestInfoUser): | |
try: | |
user_id = request.user_id | |
email = sf.check_email_service(user_id) | |
if isinstance(email, res.ReponseError): | |
return email | |
user = get_user(email) | |
if user is None: | |
return res.ReponseError( | |
status=404, | |
data=res.Message(message="User not found") | |
) | |
uid = user.uid if user.uid else "" | |
email = user.email if user.email else "" | |
display_name = user.display_name if user.display_name else "N/A" | |
photo_url = user.photo_url if user.photo_url else "N/A" | |
return res.ResponseInfoUser( | |
status=200, | |
data=res.DataInfoUser( | |
uid=uid, | |
email=email, | |
display_name=display_name, | |
photo_url=photo_url | |
) | |
) | |
except Exception as e: | |
return res.ReponseError( | |
status=500, | |
data=res.Message(message="Server Error: " + str(e)) | |
) | |
def check_email_token(token): | |
try: | |
decoded_token = auth123.decodeJWT(token) | |
sub_value = decoded_token.get("sub") | |
name_user = base64.b85decode(sub_value.encode('ascii')).decode('ascii') | |
return name_user | |
except: | |
return False | |
def is_me(request: req.RequestIsMe): | |
try: | |
token = request.token | |
if token is None or token == "": | |
return res.ReponseError( | |
status=400, | |
data =res.Message(message="token is empty") | |
) | |
test = check_email_token(token) | |
if test is not False: | |
user_id = UserRepository.getUserByEmail(test).id | |
return res.ResponseIsMe( | |
status=200, | |
data = res.DataIsMe(user_id = user_id)) | |
except: | |
return res.ReponseError( | |
status=500, | |
data=res.Message(message="Server Error") | |
) | |
ALLOWED_EXTENSIONS = {'png', 'jpg','jpeg'} | |
def allowed_file(filename): | |
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
def upload_image_service(request: req.RequestUpLoadImage): | |
try: | |
user_id = request.user_id | |
file = request.files | |
email = sf.check_email_service(user_id) | |
if isinstance(email, res.ReponseError): | |
return email | |
if not allowed_file(file.filename): | |
return res.ReponseError( | |
status=415, | |
data=res.Message(message=f"File type not allow") | |
) | |
temp_file_path = f"temp_image_{email}.png" | |
contents = file.file.read() | |
with open(temp_file_path, "wb") as temp_file: | |
temp_file.write(contents) | |
upload_result = cloudinary.uploader.upload(temp_file_path, public_id=email) | |
os.remove(temp_file_path) | |
return res.ResponseUploadImage( | |
status=200, | |
data=res.DataUploadImage(url=upload_result["secure_url"]) | |
) | |
except: | |
return res.ReponseError( | |
status=500, | |
data=res.Message(message="Server Error") | |
) |