Spaces:
Paused
Paused
File size: 3,089 Bytes
ddc5bbd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
from fastapi import Body, Request, HTTPException, status
from fastapi.encoders import jsonable_encoder
import sys
from ..models.users import User, UpdateUser
from bson import ObjectId
import re
def create_user(collection, user: User = Body(...)):
user = jsonable_encoder(user)
new_user = collection.insert_one(user)
created_user = collection.find_one({"_id": new_user.inserted_id})
return created_user
def list_users(collection, limit: int):
try:
users = list(collection.find(limit = limit))
return users
except:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"No users found!")
def find_user(collection, user_id: str):
if (user := collection.find_one({"user_id": user_id})):
return user
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id {user_id} not found!")
def find_name_from_id(collection, user_id: str):
# find_one user record based on user id and project for user name
if (user_name := collection.find_one({"user_id": user_id}, {"name": 1, "_id": 0})):
return user_name['name'] # index name field from single field record returned
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id {user_id} not found!")
def find_user_name(collection, name: str):
# search for name in lowercase
if (user := collection.find_one({"name": re.compile('^' + re.escape(name) + '$', re.IGNORECASE)})):
return user
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with name {name} not found!")
def find_user_email(collection, email: str):
if (user := collection.find_one({"email": re.compile('^' + re.escape(email) + '$', re.IGNORECASE)})):
return user
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with Email Address {email} not found!")
''' Update user record based on user object/json'''
def update_user(collection, user_id: str, user: UpdateUser):
try:
user = {k: v for k, v in user.model_dump().items() if v is not None}
if len(user) >= 1:
update_result = collection.update_one({"user_id": user_id}, {"$set": user})
if update_result.modified_count == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id: '{user_id}' not found and updated!")
if (existing_users := collection.find_one({"user_id": user_id})) is not None:
return existing_users
except:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id: '{user_id}' not found and updated!")
def delete_user(collection, user_id: str):
try:
deleted_user = collection.delete_one({"user_id": user_id})
if deleted_user.deleted_count == 1:
return f"User with user_id {user_id} deleted sucessfully"
except:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id {user_id} not found!")
|