Spaces:
Sleeping
Sleeping
File size: 4,490 Bytes
9a6b7dc 897c1d2 9a6b7dc 897c1d2 9a6b7dc 0759822 9a6b7dc e4a1a2b 0759822 9a6b7dc e4a1a2b 9a6b7dc 0759822 9a6b7dc 0759822 9a6b7dc e4a1a2b 9a6b7dc e4a1a2b 9a6b7dc e4a1a2b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
from datetime import datetime
from db.schema import Feedback, Response
from db.setup import db_setup
# Create operation (Ingest data into Firebase with unique user_id check)
def ingest(data: Feedback):
ref = db_setup() # Ensure Firebase is initialized
print(f"Attempting to ingest feedback data from user '{data.user_id}'...")
feedback_data = data.model_dump()
feedback_data["time_stamp"] = feedback_data['time_stamp'].isoformat()
ref.child('feedback').push(feedback_data)
print(f"Feedback data from user '{data.user_id}' pushed to Firebase!")
def save_feedback(data: Feedback):
ref = db_setup() # Ensure Firebase is initialized
print(f"Processing feedback for user '{data.user_id}'...")
# Convert Pydantic model to dictionary
feedback_data = data.model_dump()
feedback_data["time_stamp"] = feedback_data["time_stamp"].isoformat()
# Check if the user already has feedback
existing_feedback = ref.child("feedback").order_by_child("user_id").equal_to(data.user_id).get()
if existing_feedback:
# Update existing feedback
for key in existing_feedback:
ref.child("feedback").child(key).update(feedback_data)
print(f"Feedback updated for user '{data.user_id}'!")
else:
# Insert new feedback
ref.child("feedback").push(feedback_data)
print(f"(Ingestion) New feedback added for user '{data.user_id}'!")
# Read operation (Fetch feedback data from Firebase)
def read(user_id: str):
"""Fetch feedback data for a specific user from Firebase."""
ref = db_setup()
feedback_ref = ref.child('feedback').order_by_child('user_id').equal_to(user_id).get()
if feedback_ref:
for key, value in feedback_ref.items():
return value # Return the first found entry
return None # No progress found
# Update operation (Update feedback data in Firebase)
def update(feedback_id: int, updated_data: Feedback):
ref = db_setup()
feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get()
if feedback_ref:
# Assuming we're updating the first entry found
for key in feedback_ref:
ref.child('feedback').child(key).update(updated_data.dict())
print("Feedback data updated in Firebase!")
else:
print("Feedback not found to update!")
# Delete operation (Remove feedback data from Firebase)
def delete(feedback_id: int):
ref = db_setup()
feedback_ref = ref.child('feedback').order_by_child('id').equal_to(feedback_id).get()
if feedback_ref:
# Assuming we're deleting the first entry found
for key in feedback_ref:
ref.child('feedback').child(key).delete()
print("Feedback data deleted from Firebase!")
else:
print("Feedback not found to delete!")
def reset_feedback_in_db(user_id: str):
"""Deletes previous feedback for a user, resetting progress in Firebase."""
ref = db_setup()
feedback_ref = ref.child('feedback').order_by_child('user_id').equal_to(user_id).get()
if feedback_ref:
for key in feedback_ref:
ref.child('feedback').child(key).delete()
print(f"Feedback data for user '{user_id}' reset in Firebase.")
def test():
# Create a feedback object
# feedback = Feedback(
# id=2,
# user_id='tt',
# time_stamp=datetime(2025, 1, 30, 21, 25, 15, 581994),
# responses=[
# Response(config_id='c_p_0_pop_low_easy', rating_v=1, rating_p0=1, rating_p1=1, comment='',
# timestamp='2025-01-30T21:25:12.642038'),
# Response(config_id='c_p_1_pop_medium_medium', rating_v=1, rating_p0=1, rating_p1=1, comment='',
# timestamp='2025-01-30T21:25:13.854227'),
# Response(config_id='c_p_2_pop_high_hard', rating_v=1, rating_p0=1, rating_p1=1, comment='',
# timestamp='2025-01-30T21:25:15.581948'),
# ]
# )
# # Create (Ingest)
# ingest(feedback)
# Read (Fetch)
feedback_data = read("cdSf")
if feedback_data:
print(feedback_data)
# Update (Modify)
# updated_feedback = Feedback(
# id=1,
# user_id="user123",
# time_stamp=datetime.now(),
# responses=[
# {"q_id": "q1", "ans": 4}, # Updated answer
# {"q_id": "q2", "ans": 3}
# ]
# )
# update(1, updated_feedback)
#
# # Delete (Remove)
# delete(1)
|