openflexure-microscope / key_request.py
Kenzo-A-T's picture
Update key_request.py
ef0e367 verified
def show():
import random
import time
# import pymongo.mongo_client
from os import environ
import requests
import streamlit as st
MONGODB_URI = environ["MONGODB_URI"]
HIVEMQ_BASE_URL = environ["HIVEMQ_BASE_URL"]
HIVEMQ_API_TOKEN = environ["HIVEMQ_API_TOKEN"]
from pymongo.mongo_client import MongoClient
# SET UP ON DATABASE you need to make a variable for the time called the
# same as the microscope prior to the running of the program running the
# function update_variable_test() will work
microscope = "microscope2"
access_time = 180
database_name = "openflexure-microscope"
collection_name = "Cluster0"
microscopes = [
"microscope",
"microscope2",
"deltastagetransmission",
"deltastagereflection",
]
client = MongoClient(MONGODB_URI)
db = client[database_name]
collection = db[collection_name]
try:
client.admin.command("ping")
print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
st.write(e)
def check_variable(variable_name):
try:
document = collection.find_one({"variable_name": variable_name})
if document:
return document.get("value", "Variable not found.")
else:
return "Variable not found in the collection."
except Exception as e:
return f"An error occurred: {e}"
def create_user(username, password):
api_url = HIVEMQ_BASE_URL + "/mqtt/credentials"
headers = {
"Authorization": f"Bearer {HIVEMQ_API_TOKEN}",
"Content-Type": "application/json",
}
new_user = {"credentials": {"username": username, "password": password}}
requests.post(api_url, json=new_user, headers=headers)
def delete_user(username):
headers = {
"Authorization": f"Bearer {HIVEMQ_API_TOKEN}",
"Content-Type": "application/json",
}
api_url = HIVEMQ_BASE_URL + "/mqtt/credentials/username/" + username
requests.delete(api_url, headers=headers)
def role_user(username, role):
headers = {
"Authorization": f"Bearer {HIVEMQ_API_TOKEN}",
"Content-Type": "application/json",
}
api_url = HIVEMQ_BASE_URL + "/user/" + username + "/roles/" + role + "/attach"
requests.put(api_url, headers=headers)
def update_variable(variable_name, new_value):
try:
result = collection.update_one(
{"variable_name": variable_name},
{"$set": {"value": new_value}},
upsert=True,
)
if result.matched_count > 0:
return "Variable updated successfully."
else:
return "Variable created and updated successfully."
except Exception as e:
return f"An error occurred: {e}"
def update_variable_test():
update_variable(microscope, random.randint(1, 10))
def check_variable_test():
st.write(check_variable(microscope))
def get_current_time():
# api_url = "http://worldtimeapi.org/api/timezone/Etc/UTC"
# try:
# response = requests.get(api_url)
# response.raise_for_status()
# data = response.json()
# return data['unixtime']
# except requests.RequestException as e:
# return f"Error: {e}"
unix_time = int(time.time())
return unix_time
def button():
st.session_state.button_clicked = True
if "button_clicked" not in st.session_state:
st.session_state.button_clicked = False
if "previous_selected_value" not in st.session_state:
st.session_state.previous_selected_value = microscopes[1]
st.write(f"Keys will last {900/60} minutes before being overridable")
st.write("Usernames:")
st.code(
"""
microscope -> microscopeclientuser
microscope2 -> microscope2clientuser
deltastagereflection -> deltastagereflectionclientuser
deltastagetransmission -> deltastagetransmissionclientuser
"""
)
microscope = st.selectbox(
"Choose a microscope:", microscopes, index=microscopes.index("microscope2")
)
if microscope != st.session_state.get("previous_selected_value", microscope):
st.session_state.button_clicked = False
st.session_state["previous_selected_value"] = microscope
st.button(
"Request temporary access",
help="If somebody is using the microscope, you will need to wait",
on_click=button,
)
if 'last_key' not in st.session_state:
st.session_state.last_key = "No generated keys!"
st.success("Last key you generated (may not still be valid): "+st.session_state.last_key)
if st.session_state.button_clicked:
display_text = st.empty()
ctime = get_current_time()
var = check_variable(microscope)
if ctime >= var + access_time:
access_key = "Microscope" + str(random.randint(10000000, 99999999))
delete_user(microscope + "clientuser")
create_user(microscope + "clientuser", access_key)
if microscope == "microscope2":
role_user(microscope + "clientuser", "3")
elif microscope == "microscope":
role_user(microscope + "clientuser", "4")
elif microscope == "deltastagereflection":
role_user(microscope + "clientuser", "5")
elif microscope == "deltastagetransmission":
role_user(microscope + "clientuser", "6")
display_text.success(
"Access key: " + access_key
)
st.session_state.last_key = access_key
update_variable(microscope, ctime)
else:
while True:
if access_time - ctime + var <= 0:
display_text.success("Access key ready!")
break
if (access_time - ctime + var) % 60 < 10:
seconds = "0" + str((access_time - ctime + var) % 60)
else:
seconds = str((access_time - ctime + var) % 60)
display_text.error(
"Please wait "
+ str(
int(
(
access_time
- ctime
+ var
- (access_time - ctime + var) % 60
)
/ 60
)
)
+ ":"
+ seconds
)
ctime = ctime + 1
if ctime % 15 == 0:
ctime = get_current_time() + 1
time.sleep(1)
while True:
time.sleep(5)
cutime = get_current_time()
var = check_variable(microscope)
if cutime <= var + access_time:
display_text.error("The access key was taken!")
break
time.sleep(10)