from pymongo import MongoClient from datetime import datetime import pytz from prefect_utils import trigger_maintenance_request from prefect import task import pandas as pd import os import asyncio MONGODB_PASSWORD = os.getenv("MONGODB_PASSWORD") blinded_connection_string = os.getenv("blinded_connection_string") connection_string = blinded_connection_string.replace("", MONGODB_PASSWORD) @task def generate_empty_well(): dbclient = MongoClient(connection_string) db = dbclient["LCM-OT-2-SLD"] collection = db["wells"] rows = ['B', 'C', 'D', 'E', 'F', 'G', 'H'] columns = [str(i) for i in range(1, 13)] for row in rows: for col in columns: well = f"{row}{col}" metadata = { "well": well, "status": "empty", "project": "OT2" } #send_data_to_mongodb(collection="wells", data=metadata) query = {"well": well} update_data = {"$set": metadata} result = collection.update_one(query, update_data, upsert=True) # close connection dbclient.close() @task def update_used_wells(used_wells): dbclient = MongoClient(connection_string) db = dbclient["LCM-OT-2-SLD"] collection = db["wells"] for well in used_wells: metadata = { "well": well, "status": "used", "project": "OT2" } #send_data_to_mongodb(collection="wells", data=metadata) query = {"well": well} update_data = {"$set": metadata} result = collection.update_one(query, update_data, upsert=True) # close connection dbclient.close() def find_unused_wells(): dbclient = MongoClient(connection_string) db = dbclient["LCM-OT-2-SLD"] collection = db["wells"] query = {"status": "empty"} response = list(collection.find(query)) df = pd.DataFrame(response) # Extract the "well" column as a list #if "well" not in df.columns: # raise ValueError("No available wells.") # Sort obtained list def well_sort_key(well): row = well[0] # A, B, C, ... col = int(well[1:]) # 1, 2, ..., 12 return (row, col) if "well" in df.columns: empty_wells = sorted(df["well"].tolist(), key=well_sort_key) else: empty_wells = [] if len(empty_wells) == 1: maintenance_type = "wellplate_maintenance" if check_maintenance_status(maintenance_type) == 0: asyncio.run(trigger_maintenance_request(maintenance_type)) set_maintenance_status(maintenance_type,1) else: print("[DEBUG] a maintenance request has been sent") dbclient.close() # Check if there are any empty wells #if len(empty_wells) == 0: # raise ValueError("No empty wells found") #print(empty_wells) return empty_wells @task def save_result(result_data): dbclient = MongoClient(connection_string) db = dbclient["LCM-OT-2-SLD"] collection = db["result"] #collection = db["test_result"] result_data["timestamp"] = datetime.now(pytz.timezone("America/Toronto")) # UTC time insert_result = collection.insert_one(result_data) inserted_id = insert_result.inserted_id # close connection dbclient.close() return inserted_id def get_student_quota(student_id): with MongoClient(connection_string) as client: db = client["LCM-OT-2-SLD"] collection = db["student"] student = collection.find_one({"student_id": student_id}) if student is None: raise ValueError(f"Student ID '{student_id}' not found in the database.") return student.get("quota", 0) def decrement_student_quota(student_id): dbclient = MongoClient(connection_string) db = dbclient["LCM-OT-2-SLD"] collection = db["student"] student = collection.find_one({"student_id": student_id}) if not student: return f"Student ID {student_id} not found." if student.get("quota", 0) <= 0: return f"Student ID {student_id} has no remaining quota." result = collection.update_one( {"student_id": student_id, "quota": {"$gt": 0}}, {"$inc": {"quota": -1}} ) if result.modified_count > 0: return f"Student ID {student_id}'s quota update successfully." else: return f"Quota update failed for Student ID {student_id}." def add_student_quota(student_id, quota): """ Adds a new student with a given quota. :param student_id: The ID of the student. :param quota: The initial quota for the student. """ dbclient = MongoClient(connection_string) db = dbclient["LCM-OT-2-SLD"] collection = db["student"] student_data = {"student_id": student_id, "quota": quota} collection.update_one({"student_id": student_id}, {"$set": student_data}, upsert=True) dbclient.close() def check_maintenance_status(maintenance_type): dbclient = MongoClient(connection_string) db = dbclient["LCM-OT-2-SLD"] collection = db["HITL_status"] status_doc = collection.find_one({"name": maintenance_type}) dbclient.close() if status_doc is None: return 0 return status_doc.get("flag", 0) def set_maintenance_status(maintenance_type,new_status): #flag == 1, request sent; flag == 0, no request undergoing dbclient = MongoClient(connection_string) db = dbclient["LCM-OT-2-SLD"] collection = db["HITL_status"] collection.update_one( {"name": maintenance_type}, {"$set": {"flag": new_status, "timestamp": datetime.now(pytz.timezone("America/Toronto"))} }, upsert=True ) dbclient.close() def insert_maintenance_log(maintenance_type, operator="Unknown"): dbclient = MongoClient(connection_string) db = dbclient["LCM-OT-2-SLD"] collection = db["maintenance_logs"] log_data = { "maintenance_type": maintenance_type, "timestamp": datetime.now(pytz.timezone("America/Toronto")), "operator": operator } collection.insert_one(log_data) dbclient.close() if __name__ == "__main__": #generate_empty_well() find_unused_wells()