|
import os |
|
import uuid |
|
|
|
from pymongo import MongoClient |
|
from datetime import datetime |
|
|
|
|
|
class MongoDBManager: |
|
def __init__(self): |
|
mongodb_uri = str(os.getenv('MONGODB_URI')) |
|
db_name = str(os.getenv('DB_NAME')) |
|
collection_name = str(os.getenv('COLLECTION_NAME')) |
|
|
|
self.client = MongoClient(mongodb_uri) |
|
self.db = self.client[db_name] |
|
self.collection = self.db[collection_name] |
|
self.user_id = None |
|
|
|
def create_user_record(self): |
|
""" Create a new user record with empty answers. """ |
|
self.user_id = str(uuid.uuid4()) |
|
user_record = { |
|
"_id": self.user_id, |
|
"createdAt": datetime.now(), |
|
"questionnaireAnswers": {}, |
|
"questionAnswers": {} |
|
} |
|
self.collection.insert_one(user_record) |
|
|
|
def update_question_answers(self, answers): |
|
""" Update question answers for a specific user. """ |
|
self.create_user_record() |
|
update_result = self.collection.update_one( |
|
{"_id": self.user_id}, |
|
{"$set": {"questionAnswers": answers}} |
|
) |
|
return update_result.modified_count |
|
|
|
def update_questionnaire_answers(self, answers): |
|
""" Update questionnaire answers for a specific user. """ |
|
update_result = self.collection.update_one( |
|
{"_id": self.user_id}, |
|
{"$set": {"questionnaireAnswers": answers}} |
|
) |
|
return update_result.modified_count |
|
|
|
|
|
|
|
|