|
def show(): |
|
import random |
|
import time |
|
|
|
|
|
from os import environ |
|
import requests |
|
import streamlit as st |
|
|
|
MONGODB_URI = environ["MONGODB_URI"] |
|
|
|
HIVEMQ_BASE_URL = environ["HIVEMQ_BASE_URL"] |
|
HIVEMQ_API_TOKEN = environ["HIVEMQ_API_TOKEN"] |
|
|
|
from pymongo.mongo_client import MongoClient |
|
|
|
|
|
|
|
|
|
microscope = "microscope2" |
|
access_time = 180 |
|
database_name = "openflexure-microscope" |
|
collection_name = "Cluster0" |
|
microscopes = [ |
|
"microscope", |
|
"microscope2", |
|
"deltastagetransmission", |
|
"deltastagereflection", |
|
] |
|
|
|
client = MongoClient(MONGODB_URI) |
|
db = client[database_name] |
|
collection = db[collection_name] |
|
|
|
try: |
|
client.admin.command("ping") |
|
print("Pinged your deployment. You successfully connected to MongoDB!") |
|
except Exception as e: |
|
st.write(e) |
|
|
|
def check_variable(variable_name): |
|
try: |
|
|
|
document = collection.find_one({"variable_name": variable_name}) |
|
if document: |
|
return document.get("value", "Variable not found.") |
|
else: |
|
return "Variable not found in the collection." |
|
except Exception as e: |
|
return f"An error occurred: {e}" |
|
|
|
def create_user(username, password): |
|
api_url = HIVEMQ_BASE_URL + "/mqtt/credentials" |
|
headers = { |
|
"Authorization": f"Bearer {HIVEMQ_API_TOKEN}", |
|
"Content-Type": "application/json", |
|
} |
|
|
|
new_user = {"credentials": {"username": username, "password": password}} |
|
|
|
requests.post(api_url, json=new_user, headers=headers) |
|
|
|
def delete_user(username): |
|
headers = { |
|
"Authorization": f"Bearer {HIVEMQ_API_TOKEN}", |
|
"Content-Type": "application/json", |
|
} |
|
|
|
api_url = HIVEMQ_BASE_URL + "/mqtt/credentials/username/" + username |
|
requests.delete(api_url, headers=headers) |
|
|
|
def role_user(username, role): |
|
headers = { |
|
"Authorization": f"Bearer {HIVEMQ_API_TOKEN}", |
|
"Content-Type": "application/json", |
|
} |
|
api_url = HIVEMQ_BASE_URL + "/user/" + username + "/roles/" + role + "/attach" |
|
requests.put(api_url, headers=headers) |
|
|
|
def update_variable(variable_name, new_value): |
|
try: |
|
result = collection.update_one( |
|
{"variable_name": variable_name}, |
|
{"$set": {"value": new_value}}, |
|
upsert=True, |
|
) |
|
if result.matched_count > 0: |
|
return "Variable updated successfully." |
|
else: |
|
return "Variable created and updated successfully." |
|
except Exception as e: |
|
return f"An error occurred: {e}" |
|
|
|
def update_variable_test(): |
|
update_variable(microscope, random.randint(1, 10)) |
|
|
|
def check_variable_test(): |
|
st.write(check_variable(microscope)) |
|
|
|
def get_current_time(): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
unix_time = int(time.time()) |
|
return unix_time |
|
|
|
def button(): |
|
st.session_state.button_clicked = True |
|
|
|
if "button_clicked" not in st.session_state: |
|
st.session_state.button_clicked = False |
|
if "previous_selected_value" not in st.session_state: |
|
st.session_state.previous_selected_value = microscopes[1] |
|
|
|
st.write(f"Keys will last {900/60} minutes before being overridable") |
|
st.write("Usernames:") |
|
st.code( |
|
""" |
|
microscope -> microscopeclientuser |
|
microscope2 -> microscope2clientuser |
|
deltastagereflection -> deltastagereflectionclientuser |
|
deltastagetransmission -> deltastagetransmissionclientuser |
|
""" |
|
) |
|
|
|
microscope = st.selectbox( |
|
"Choose a microscope:", microscopes, index=microscopes.index("microscope2") |
|
) |
|
if microscope != st.session_state.get("previous_selected_value", microscope): |
|
|
|
st.session_state.button_clicked = False |
|
|
|
st.session_state["previous_selected_value"] = microscope |
|
|
|
st.button( |
|
"Request temporary access", |
|
help="If somebody is using the microscope, you will need to wait", |
|
on_click=button, |
|
) |
|
|
|
if 'last_key' not in st.session_state: |
|
st.session_state.last_key = "No generated keys!" |
|
st.success("Last key you generated (may not still be valid): "+st.session_state.last_key) |
|
|
|
if st.session_state.button_clicked: |
|
display_text = st.empty() |
|
ctime = get_current_time() |
|
var = check_variable(microscope) |
|
if ctime >= var + access_time: |
|
|
|
access_key = "Microscope" + str(random.randint(10000000, 99999999)) |
|
delete_user(microscope + "clientuser") |
|
create_user(microscope + "clientuser", access_key) |
|
if microscope == "microscope2": |
|
role_user(microscope + "clientuser", "3") |
|
elif microscope == "microscope": |
|
role_user(microscope + "clientuser", "4") |
|
elif microscope == "deltastagereflection": |
|
role_user(microscope + "clientuser", "5") |
|
elif microscope == "deltastagetransmission": |
|
role_user(microscope + "clientuser", "6") |
|
|
|
display_text.success( |
|
"Access key: " + access_key |
|
) |
|
st.session_state.last_key = access_key |
|
update_variable(microscope, ctime) |
|
|
|
else: |
|
while True: |
|
if access_time - ctime + var <= 0: |
|
display_text.success("Access key ready!") |
|
break |
|
if (access_time - ctime + var) % 60 < 10: |
|
seconds = "0" + str((access_time - ctime + var) % 60) |
|
else: |
|
seconds = str((access_time - ctime + var) % 60) |
|
display_text.error( |
|
"Please wait " |
|
+ str( |
|
int( |
|
( |
|
access_time |
|
- ctime |
|
+ var |
|
- (access_time - ctime + var) % 60 |
|
) |
|
/ 60 |
|
) |
|
) |
|
+ ":" |
|
+ seconds |
|
) |
|
|
|
ctime = ctime + 1 |
|
if ctime % 15 == 0: |
|
ctime = get_current_time() + 1 |
|
time.sleep(1) |
|
while True: |
|
time.sleep(5) |
|
cutime = get_current_time() |
|
var = check_variable(microscope) |
|
if cutime <= var + access_time: |
|
display_text.error("The access key was taken!") |
|
break |
|
time.sleep(10) |
|
|