File size: 7,308 Bytes
32b5cd7 6cca129 32b5cd7 6cca129 32b5cd7 ef0e367 32b5cd7 0972c77 32b5cd7 ea59ac4 32b5cd7 ea59ac4 32b5cd7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 |
def show():
import random
import time
# import pymongo.mongo_client
from os import environ
import requests
import streamlit as st
MONGODB_URI = environ["MONGODB_URI"]
HIVEMQ_BASE_URL = environ["HIVEMQ_BASE_URL"]
HIVEMQ_API_TOKEN = environ["HIVEMQ_API_TOKEN"]
from pymongo.mongo_client import MongoClient
# SET UP ON DATABASE you need to make a variable for the time called the
# same as the microscope prior to the running of the program running the
# function update_variable_test() will work
microscope = "microscope2"
access_time = 180
database_name = "openflexure-microscope"
collection_name = "Cluster0"
microscopes = [
"microscope",
"microscope2",
"deltastagetransmission",
"deltastagereflection",
]
client = MongoClient(MONGODB_URI)
db = client[database_name]
collection = db[collection_name]
try:
client.admin.command("ping")
print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
st.write(e)
def check_variable(variable_name):
try:
document = collection.find_one({"variable_name": variable_name})
if document:
return document.get("value", "Variable not found.")
else:
return "Variable not found in the collection."
except Exception as e:
return f"An error occurred: {e}"
def create_user(username, password):
api_url = HIVEMQ_BASE_URL + "/mqtt/credentials"
headers = {
"Authorization": f"Bearer {HIVEMQ_API_TOKEN}",
"Content-Type": "application/json",
}
new_user = {"credentials": {"username": username, "password": password}}
requests.post(api_url, json=new_user, headers=headers)
def delete_user(username):
headers = {
"Authorization": f"Bearer {HIVEMQ_API_TOKEN}",
"Content-Type": "application/json",
}
api_url = HIVEMQ_BASE_URL + "/mqtt/credentials/username/" + username
requests.delete(api_url, headers=headers)
def role_user(username, role):
headers = {
"Authorization": f"Bearer {HIVEMQ_API_TOKEN}",
"Content-Type": "application/json",
}
api_url = HIVEMQ_BASE_URL + "/user/" + username + "/roles/" + role + "/attach"
requests.put(api_url, headers=headers)
def update_variable(variable_name, new_value):
try:
result = collection.update_one(
{"variable_name": variable_name},
{"$set": {"value": new_value}},
upsert=True,
)
if result.matched_count > 0:
return "Variable updated successfully."
else:
return "Variable created and updated successfully."
except Exception as e:
return f"An error occurred: {e}"
def update_variable_test():
update_variable(microscope, random.randint(1, 10))
def check_variable_test():
st.write(check_variable(microscope))
def get_current_time():
# api_url = "http://worldtimeapi.org/api/timezone/Etc/UTC"
# try:
# response = requests.get(api_url)
# response.raise_for_status()
# data = response.json()
# return data['unixtime']
# except requests.RequestException as e:
# return f"Error: {e}"
unix_time = int(time.time())
return unix_time
def button():
st.session_state.button_clicked = True
if "button_clicked" not in st.session_state:
st.session_state.button_clicked = False
if "previous_selected_value" not in st.session_state:
st.session_state.previous_selected_value = microscopes[1]
st.write(f"Keys will last {900/60} minutes before being overridable")
st.write("Usernames:")
st.code(
"""
microscope -> microscopeclientuser
microscope2 -> microscope2clientuser
deltastagereflection -> deltastagereflectionclientuser
deltastagetransmission -> deltastagetransmissionclientuser
"""
)
microscope = st.selectbox(
"Choose a microscope:", microscopes, index=microscopes.index("microscope2")
)
if microscope != st.session_state.get("previous_selected_value", microscope):
st.session_state.button_clicked = False
st.session_state["previous_selected_value"] = microscope
st.button(
"Request temporary access",
help="If somebody is using the microscope, you will need to wait",
on_click=button,
)
if 'last_key' not in st.session_state:
st.session_state.last_key = "No generated keys!"
st.success("Last key you generated (may not still be valid): "+st.session_state.last_key)
if st.session_state.button_clicked:
display_text = st.empty()
ctime = get_current_time()
var = check_variable(microscope)
if ctime >= var + access_time:
access_key = "Microscope" + str(random.randint(10000000, 99999999))
delete_user(microscope + "clientuser")
create_user(microscope + "clientuser", access_key)
if microscope == "microscope2":
role_user(microscope + "clientuser", "3")
elif microscope == "microscope":
role_user(microscope + "clientuser", "4")
elif microscope == "deltastagereflection":
role_user(microscope + "clientuser", "5")
elif microscope == "deltastagetransmission":
role_user(microscope + "clientuser", "6")
display_text.success(
"Access key: " + access_key
)
st.session_state.last_key = access_key
update_variable(microscope, ctime)
else:
while True:
if access_time - ctime + var <= 0:
display_text.success("Access key ready!")
break
if (access_time - ctime + var) % 60 < 10:
seconds = "0" + str((access_time - ctime + var) % 60)
else:
seconds = str((access_time - ctime + var) % 60)
display_text.error(
"Please wait "
+ str(
int(
(
access_time
- ctime
+ var
- (access_time - ctime + var) % 60
)
/ 60
)
)
+ ":"
+ seconds
)
ctime = ctime + 1
if ctime % 15 == 0:
ctime = get_current_time() + 1
time.sleep(1)
while True:
time.sleep(5)
cutime = get_current_time()
var = check_variable(microscope)
if cutime <= var + access_time:
display_text.error("The access key was taken!")
break
time.sleep(10)
|