File size: 3,089 Bytes
ddc5bbd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from fastapi import Body, Request, HTTPException, status
from fastapi.encoders import jsonable_encoder
import sys
from ..models.users import User, UpdateUser
from bson import ObjectId
import re


def create_user(collection, user: User = Body(...)):
    user = jsonable_encoder(user)
    new_user = collection.insert_one(user)
    created_user = collection.find_one({"_id": new_user.inserted_id})
    return created_user


def list_users(collection, limit: int):
    try:
        users = list(collection.find(limit = limit))
        return users
    except:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"No users found!")


def find_user(collection, user_id: str):
    if (user := collection.find_one({"user_id": user_id})):
        return user
    else:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id {user_id} not found!")

def find_name_from_id(collection, user_id: str):

    # find_one user record based on user id and project for user name
    if (user_name := collection.find_one({"user_id": user_id}, {"name": 1, "_id": 0})):
        return user_name['name'] # index name field from single field record returned
    else:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id {user_id} not found!")

def find_user_name(collection, name: str):
    # search for name in lowercase
    if (user := collection.find_one({"name": re.compile('^' + re.escape(name) + '$', re.IGNORECASE)})):
        return user
    else:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with name {name} not found!")


def find_user_email(collection, email: str):
    if (user := collection.find_one({"email": re.compile('^' + re.escape(email) + '$', re.IGNORECASE)})):
        return user
    else:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with Email Address {email} not found!")


''' Update user record based on user object/json'''
def update_user(collection, user_id: str, user: UpdateUser):
    try:
        user = {k: v for k, v in user.model_dump().items() if v is not None}
        if len(user) >= 1:
            update_result = collection.update_one({"user_id": user_id}, {"$set": user})

            if update_result.modified_count == 0:
                raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id: '{user_id}' not found and updated!")

        if (existing_users := collection.find_one({"user_id": user_id})) is not None:
            return existing_users
    except:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id: '{user_id}' not found and updated!")


def delete_user(collection, user_id: str):
    try:
        deleted_user = collection.delete_one({"user_id": user_id})

        if deleted_user.deleted_count == 1:
            return f"User with user_id {user_id} deleted sucessfully"
    except:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"User with user_id {user_id} not found!")