Spaces:
Sleeping
Sleeping
| from datetime import datetime | |
| from db.schema import Feedback | |
| from db.setup import db_setup | |
| # Create operation (Ingest data into Firebase with unique user_id check) | |
| def ingest(data: Feedback): | |
| ref = db_setup() # Ensure Firebase is initialized | |
| # Check if the user_id already exists in the feedback data | |
| existing_feedback = ref.child('feedback').order_by_child('user_id').equal_to(data.user_id).get() | |
| # TODO: This should probably change. If the same user has multiple feedbacks, we should allow it. -> change to update | |
| if existing_feedback: | |
| print(f"Feedback from user '{data.user_id}' already exists. Ingestion aborted.") | |
| else: | |
| feedback_data = data.dict() # Convert Pydantic model to a dictionary | |
| ref.child('feedback').push(feedback_data) | |
| print(f"Feedback data from user '{data.user_id}' pushed to Firebase!") | |
| # Read operation (Fetch feedback data from Firebase) | |
| def read(feedback_id: int): | |
| ref = db_setup() | |
| feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get() | |
| if feedback_ref: | |
| return feedback_ref | |
| else: | |
| print("Feedback not found!") | |
| return None | |
| # Update operation (Update feedback data in Firebase) | |
| def update(feedback_id: int, updated_data: Feedback): | |
| ref = db_setup() | |
| feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get() | |
| if feedback_ref: | |
| # Assuming we're updating the first entry found | |
| for key in feedback_ref: | |
| ref.child('feedback').child(key).update(updated_data.dict()) | |
| print("Feedback data updated in Firebase!") | |
| else: | |
| print("Feedback not found to update!") | |
| # Delete operation (Remove feedback data from Firebase) | |
| def delete(feedback_id: int): | |
| ref = db_setup() | |
| feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get() | |
| if feedback_ref: | |
| # Assuming we're deleting the first entry found | |
| for key in feedback_ref: | |
| ref.child('feedback').child(key).delete() | |
| print("Feedback data deleted from Firebase!") | |
| else: | |
| print("Feedback not found to delete!") | |
| def test(): | |
| # Create a feedback object | |
| feedback_example = Feedback( | |
| id=1, | |
| user_id="user1234", | |
| time_stamp=datetime.now(), | |
| responses=[ | |
| {"q_id": "q1", "ans": 5}, | |
| {"q_id": "q2", "ans": 3} | |
| ] | |
| ) | |
| # Create (Ingest) | |
| ingest(feedback_example) | |
| # Read (Fetch) | |
| feedback_data = read(1) | |
| if feedback_data: | |
| print(feedback_data) | |
| # Update (Modify) | |
| updated_feedback = Feedback( | |
| id=1, | |
| user_id="user123", | |
| time_stamp=datetime.now(), | |
| responses=[ | |
| {"q_id": "q1", "ans": 4}, # Updated answer | |
| {"q_id": "q2", "ans": 3} | |
| ] | |
| ) | |
| update(1, updated_feedback) | |
| # Delete (Remove) | |
| delete(1) | |