|
from pymongo import MongoClient |
|
from datetime import datetime |
|
import pytz |
|
from prefect_utils import trigger_maintenance_request |
|
from prefect import task |
|
import pandas as pd |
|
import os |
|
import asyncio |
|
|
|
MONGODB_PASSWORD = os.getenv("MONGODB_PASSWORD") |
|
|
|
blinded_connection_string = os.getenv("blinded_connection_string") |
|
|
|
connection_string = blinded_connection_string.replace("<db_password>", MONGODB_PASSWORD) |
|
|
|
|
|
@task |
|
def generate_empty_well(): |
|
dbclient = MongoClient(connection_string) |
|
db = dbclient["LCM-OT-2-SLD"] |
|
collection = db["wells"] |
|
rows = ['B', 'C', 'D', 'E', 'F', 'G', 'H'] |
|
columns = [str(i) for i in range(1, 13)] |
|
for row in rows: |
|
for col in columns: |
|
well = f"{row}{col}" |
|
metadata = { |
|
"well": well, |
|
"status": "empty", |
|
"project": "OT2" |
|
} |
|
|
|
|
|
query = {"well": well} |
|
update_data = {"$set": metadata} |
|
result = collection.update_one(query, update_data, upsert=True) |
|
|
|
|
|
dbclient.close() |
|
|
|
@task |
|
def update_used_wells(used_wells): |
|
dbclient = MongoClient(connection_string) |
|
db = dbclient["LCM-OT-2-SLD"] |
|
collection = db["wells"] |
|
|
|
for well in used_wells: |
|
metadata = { |
|
"well": well, |
|
"status": "used", |
|
"project": "OT2" |
|
} |
|
|
|
query = {"well": well} |
|
update_data = {"$set": metadata} |
|
result = collection.update_one(query, update_data, upsert=True) |
|
|
|
|
|
dbclient.close() |
|
|
|
def find_unused_wells(): |
|
dbclient = MongoClient(connection_string) |
|
db = dbclient["LCM-OT-2-SLD"] |
|
collection = db["wells"] |
|
query = {"status": "empty"} |
|
response = list(collection.find(query)) |
|
df = pd.DataFrame(response) |
|
|
|
|
|
|
|
|
|
|
|
def well_sort_key(well): |
|
row = well[0] |
|
col = int(well[1:]) |
|
return (row, col) |
|
if "well" in df.columns: |
|
empty_wells = sorted(df["well"].tolist(), key=well_sort_key) |
|
else: |
|
empty_wells = [] |
|
|
|
if len(empty_wells) == 1: |
|
maintenance_type = "wellplate_maintenance" |
|
if check_maintenance_status(maintenance_type) == 0: |
|
asyncio.run(trigger_maintenance_request(maintenance_type)) |
|
set_maintenance_status(maintenance_type,1) |
|
else: |
|
print("[DEBUG] a maintenance request has been sent") |
|
|
|
dbclient.close() |
|
|
|
|
|
|
|
|
|
|
|
return empty_wells |
|
|
|
@task |
|
def save_result(result_data): |
|
dbclient = MongoClient(connection_string) |
|
db = dbclient["LCM-OT-2-SLD"] |
|
collection = db["result"] |
|
|
|
result_data["timestamp"] = datetime.now(pytz.timezone("America/Toronto")) |
|
insert_result = collection.insert_one(result_data) |
|
inserted_id = insert_result.inserted_id |
|
|
|
dbclient.close() |
|
return inserted_id |
|
|
|
def get_student_quota(student_id): |
|
with MongoClient(connection_string) as client: |
|
db = client["LCM-OT-2-SLD"] |
|
collection = db["student"] |
|
student = collection.find_one({"student_id": student_id}) |
|
if student is None: |
|
raise ValueError(f"Student ID '{student_id}' not found in the database.") |
|
return student.get("quota", 0) |
|
|
|
def decrement_student_quota(student_id): |
|
dbclient = MongoClient(connection_string) |
|
db = dbclient["LCM-OT-2-SLD"] |
|
collection = db["student"] |
|
|
|
student = collection.find_one({"student_id": student_id}) |
|
if not student: |
|
return f"Student ID {student_id} not found." |
|
if student.get("quota", 0) <= 0: |
|
return f"Student ID {student_id} has no remaining quota." |
|
|
|
|
|
result = collection.update_one( |
|
{"student_id": student_id, "quota": {"$gt": 0}}, |
|
{"$inc": {"quota": -1}} |
|
) |
|
|
|
if result.modified_count > 0: |
|
return f"Student ID {student_id}'s quota update successfully." |
|
else: |
|
return f"Quota update failed for Student ID {student_id}." |
|
|
|
def add_student_quota(student_id, quota): |
|
""" |
|
Adds a new student with a given quota. |
|
:param student_id: The ID of the student. |
|
:param quota: The initial quota for the student. |
|
""" |
|
dbclient = MongoClient(connection_string) |
|
db = dbclient["LCM-OT-2-SLD"] |
|
collection = db["student"] |
|
student_data = {"student_id": student_id, "quota": quota} |
|
collection.update_one({"student_id": student_id}, {"$set": student_data}, upsert=True) |
|
dbclient.close() |
|
|
|
def check_maintenance_status(maintenance_type): |
|
dbclient = MongoClient(connection_string) |
|
db = dbclient["LCM-OT-2-SLD"] |
|
collection = db["HITL_status"] |
|
status_doc = collection.find_one({"name": maintenance_type}) |
|
dbclient.close() |
|
|
|
if status_doc is None: |
|
return 0 |
|
return status_doc.get("flag", 0) |
|
|
|
def set_maintenance_status(maintenance_type,new_status): |
|
|
|
dbclient = MongoClient(connection_string) |
|
db = dbclient["LCM-OT-2-SLD"] |
|
collection = db["HITL_status"] |
|
|
|
collection.update_one( |
|
{"name": maintenance_type}, |
|
{"$set": {"flag": new_status, |
|
"timestamp": datetime.now(pytz.timezone("America/Toronto"))} |
|
}, |
|
upsert=True |
|
) |
|
dbclient.close() |
|
|
|
def insert_maintenance_log(maintenance_type, operator="Unknown"): |
|
dbclient = MongoClient(connection_string) |
|
db = dbclient["LCM-OT-2-SLD"] |
|
collection = db["maintenance_logs"] |
|
|
|
log_data = { |
|
"maintenance_type": maintenance_type, |
|
"timestamp": datetime.now(pytz.timezone("America/Toronto")), |
|
"operator": operator |
|
} |
|
collection.insert_one(log_data) |
|
dbclient.close() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
find_unused_wells() |
|
|