Spaces:
Paused
Paused
File size: 3,174 Bytes
2da7ed3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
from fastapi import Body, Request, HTTPException, status
from fastapi.encoders import jsonable_encoder
import sys
# sys.path.append('/Users/benolojo/DCU/CA4/ca400_FinalYearProject/2024-ca400-olojob2-majdap2/src/backend/src/')
from ..models.users import User, UpdateUser
from bson import ObjectId
import re
def get_collection_users(request: Request):
test = request.app.database["user_records"]
print(type(test))
return test
def create_user(request: Request, user: User = Body(...)):
user = jsonable_encoder(user)
new_user = get_collection_users(request).insert_one(user)
created_user = get_collection_users(request).find_one({"_id": new_user.inserted_id})
print("NEW ID IS:.........", new_user.inserted_id)
return created_user
def list_users(request: Request, limit: int):
try:
users = list(get_collection_users(request).find(limit = limit))
return users
except:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"No users found!")
def find_user(request: Request, user_id: str):
if (user := get_collection_users(request).find_one({"user_id": user_id})):
return user
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id {user_id} not found!")
def find_user_name(request: Request, name: str):
# search for name in lowercase
if (user := get_collection_users(request).find_one({"name": re.compile('^' + re.escape(name) + '$', re.IGNORECASE)})):
return user
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with name {name} not found!")
def find_user_email(request: Request, email: str):
if (user := get_collection_users(request).find_one({"email": re.compile('^' + re.escape(email) + '$', re.IGNORECASE)})):
return user
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with Email Address {email} not found!")
''' Update user record based on user object/json'''
def update_user(request: Request, user_id: str, user: UpdateUser):
try:
user = {k: v for k, v in user.model_dump().items() if v is not None}
if len(user) >= 1:
update_result = get_collection_users(request).update_one({"user_id": user_id}, {"$set": user})
if update_result.modified_count == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id: '{user_id}' not found and updated!")
if (existing_users := get_collection_users(request).find_one({"user_id": user_id})) is not None:
return existing_users
except:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id: '{user_id}' not found and updated!")
def delete_user(request: Request, user_id: str):
try:
deleted_user = get_collection_users(request).delete_one({"user_id": user_id})
if deleted_user.deleted_count == 1:
return f"User with user_id {user_id} deleted sucessfully"
except:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id {user_id} not found!")
|