Spaces:
Build error
Build error
import motor.motor_asyncio | |
from bson.objectid import ObjectId | |
MONGO_DETAILS = "mongodb://localhost:27017" | |
client = motor.motor_asyncio.AsyncIOMotorClient(MONGO_DETAILS) | |
database = client.database | |
user_collection = database.get_collection("users") | |
# helpers | |
def user_helper(user) -> dict: | |
return { | |
"id": str(user["_id"]), | |
"username": user["username"], | |
"email": user["email"], | |
"password": user["password"], | |
"projectname": user["projectname"], | |
"projectpath": user["projectpath"], | |
} | |
# Retrieve all users present in the database | |
async def retrieve_users(): | |
users = [] | |
async for user in user_collection.find(): | |
users.append(user_helper(user)) | |
return users | |
# Add a new user into to the database | |
async def add_user(user_data: dict) -> dict: | |
user = await user_collection.insert_one(user_data) | |
new_user = await user_collection.find_one({"_id": user.inserted_id}) | |
return user_helper(new_user) | |
# Retrieve a user with a matching ID | |
async def retrieve_user(username: str) -> dict: | |
user = await user_collection.find_one({"username":username}) | |
if user: | |
return user_helper(user) | |
# Update a user with a matching ID | |
async def update_user(username: str, data: dict): | |
# Return false if an empty request body is sent. | |
if len(data) < 1: | |
return False | |
user = await user_collection.find_one({"username": username}) | |
if user: | |
updated_user = await user_collection.update_one( | |
{"username": username}, {"$set": data} | |
) | |
if updated_user: | |
return True | |
return False | |
# Delete a user from the database | |
async def delete_user(username: str): | |
user = await user_collection.find_one({"username": username}) | |
if user: | |
await user_collection.delete_one({"username": username}) | |
return True |