benjolo's picture
Adding backend/ folder to HF space (#1)
2da7ed3 verified
raw
history blame
3.17 kB
from fastapi import Body, Request, HTTPException, status
from fastapi.encoders import jsonable_encoder
import sys
# sys.path.append('/Users/benolojo/DCU/CA4/ca400_FinalYearProject/2024-ca400-olojob2-majdap2/src/backend/src/')
from ..models.users import User, UpdateUser
from bson import ObjectId
import re
def get_collection_users(request: Request):
test = request.app.database["user_records"]
print(type(test))
return test
def create_user(request: Request, user: User = Body(...)):
user = jsonable_encoder(user)
new_user = get_collection_users(request).insert_one(user)
created_user = get_collection_users(request).find_one({"_id": new_user.inserted_id})
print("NEW ID IS:.........", new_user.inserted_id)
return created_user
def list_users(request: Request, limit: int):
try:
users = list(get_collection_users(request).find(limit = limit))
return users
except:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"No users found!")
def find_user(request: Request, user_id: str):
if (user := get_collection_users(request).find_one({"user_id": user_id})):
return user
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id {user_id} not found!")
def find_user_name(request: Request, name: str):
# search for name in lowercase
if (user := get_collection_users(request).find_one({"name": re.compile('^' + re.escape(name) + '$', re.IGNORECASE)})):
return user
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with name {name} not found!")
def find_user_email(request: Request, email: str):
if (user := get_collection_users(request).find_one({"email": re.compile('^' + re.escape(email) + '$', re.IGNORECASE)})):
return user
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with Email Address {email} not found!")
''' Update user record based on user object/json'''
def update_user(request: Request, user_id: str, user: UpdateUser):
try:
user = {k: v for k, v in user.model_dump().items() if v is not None}
if len(user) >= 1:
update_result = get_collection_users(request).update_one({"user_id": user_id}, {"$set": user})
if update_result.modified_count == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id: '{user_id}' not found and updated!")
if (existing_users := get_collection_users(request).find_one({"user_id": user_id})) is not None:
return existing_users
except:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id: '{user_id}' not found and updated!")
def delete_user(request: Request, user_id: str):
try:
deleted_user = get_collection_users(request).delete_one({"user_id": user_id})
if deleted_user.deleted_count == 1:
return f"User with user_id {user_id} deleted sucessfully"
except:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id {user_id} not found!")