Spaces:
Sleeping
Sleeping
File size: 3,113 Bytes
0f4d874 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 |
import os
import uuid
import queue
import threading
import time
from dotenv import load_dotenv
from pymongo.server_api import ServerApi
from pymongo.mongo_client import MongoClient
load_dotenv(r"./.env")
# Create a thread-safe queue
update_queue = queue.Queue()
# Flag to control the queue processor thread
is_running = True
def queue_processor():
"""Process database updates from the queue"""
while is_running:
try:
# Get an update task from the queue, wait up to 1 second
task = update_queue.get(timeout=1)
# Process the update
uuid_value = task['uuid']
new_data = task['data']
result = user.update_one(
{"uuid": str(uuid_value)},
{"$push": {"data": new_data}}
)
# Mark the task as done
update_queue.task_done()
except queue.Empty:
# No tasks in queue, continue waiting
continue
except Exception as e:
print(f"Error processing queue item: {str(e)}")
continue
# Start the queue processor thread
queue_processor_thread = threading.Thread(target=queue_processor, daemon=True)
queue_processor_thread.start()
uri = os.getenv("MONGO_STRING")
client = MongoClient(uri, server_api=ServerApi("1"))
db = client.get_database("drishti")
user = db.user
def register_user(name: str, mail: str, number: str, password):
uuid_ = str(uuid.uuid4())
data = {
"uuid": uuid_,
"name": name,
"email": mail,
"phone_number": number,
"password": password,
"is_active": True,
"data": [],
}
try:
user.insert_one(data)
return {"status": "success", "uuid": uuid_}
except Exception as e:
return {"status": "failed", "error": str(e)}
def update_data_field(uuid_value, new_data):
# Instead of updating directly, add to queue
update_queue.put({
'uuid': uuid_value,
'data': new_data
})
return "Update queued successfully!"
def fetch_data_field(uuid: str) -> dict:
result = user.find_one({"uuid": str(uuid)})
if result:
return result["data"]
else:
return {"status": "failed", "error": "No document matched the given UUID."}
# Add cleanup function
def cleanup():
global is_running
is_running = False
queue_processor_thread.join(timeout=5)
if __name__ == "__main__":
try:
from pyperclip import copy
rlt = fetch_data_field("95009d58-db41-4c40-8a5e-07f8821a525f")
print(rlt)
copy(rlt)
finally:
cleanup()
# print(update_data_field("3cbb2a37-35b3-45f2-a4fd-499baa34880c",
# {
# "timestamp": "12:00",
# "humidity": 50,
# "temprature": 25,
# "soil_moisture": 0.5
# }
# )
# )
# print(register_user("Batu Khan", "batukhan99@gmail.com", "321654987", "abc123")) |