File size: 6,529 Bytes
32b5cd7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
import base64
import io
import json
import os
import shutil
import time
from io import BytesIO
from queue import Queue
import paho.mqtt.client as mqtt
from PIL import Image
class MicroscopeDemo:
def __init__(
self,
host,
port,
username,
password,
microscope,
path_to_openflexure_stitching="OPTIONAL",
):
self.host = host
self.port = port
self.username = username
self.password = password
self.microscope = microscope
self.path_to_openflexure_stitching = path_to_openflexure_stitching
self.client = mqtt.Client()
self.client.tls_set()
self.client.username_pw_set(self.username, self.password)
self.receiveq = Queue()
def on_message(client, userdata, message):
received = json.loads(message.payload.decode("utf-8"))
self.receiveq.put(received)
if len(json.dumps(received)) <= 300:
print(received)
else:
try:
print(json.dumps(received)[:300] + "...")
except Exception as e:
print(f"Command printing error (program will continue) {e}")
self.client.on_message = on_message
self.client.connect(self.host, port=self.port, keepalive=60, bind_address="")
self.client.loop_start()
self.client.subscribe(self.microscope + "/return", qos=2)
def scan_and_stitch(
self, c1, c2, temp, ov=1200, foc=0, output="Downloads/stitched.jpeg"
):
"""takes a scan with the same inputs + 2 more and outputs a stitched
image. output is the directory the stitched image will go to and temp is
the temporary directory to stitch the image otherwise it works just like
scan()"""
command = json.dumps(
{"command": "scan", "c1": c1, "c2": c2, "ov": ov, "foc": foc}
)
self.client.publish(
self.microscope + "/command", payload=command, qos=2, retain=False
)
while self.receiveq.empty():
time.sleep(0.05)
image = self.receiveq.get()
image_list = image["images"]
if os.path.isdir(temp):
shutil.rmtree(temp)
os.makedirs(temp)
for i in range(len(image_list)):
image_bytes = base64.b64decode(image_list[i])
with io.BytesIO(image_bytes) as buffer:
img = Image.open(buffer)
img.save(
temp + "/" + str(i) + ".jpeg",
format="JPEG",
exif=img.info.get("exif"),
)
os.system(
"cd "
+ self.path_to_openflexure_stitching
+ " && python -m venv .venv && .\\.venv\\Scripts\\activate && openflexure-stitch " # noqa: E501
+ temp
)
# edit the cd commands so it can find the stitching program on your
# machine and make sure that openflexure-stitching is installed
pos = len(temp)
for char in ("/", "\\"):
index = temp.rfind(char)
if index != -1 and index < pos:
pos = index
result = temp[pos + 1 :] if pos < len(temp) else temp
shutil.move(temp + "/" + result + "_stitched.jpg", output)
def move(self, x, y, z=False, relative=False):
"""moves to given coordinates x, y (and z if it is set to any integer
value, if it is set to False the z value wont change). If relative is
True, then it will move relative to the current position instead of
moving to the absolute coordinates"""
command = json.dumps(
{"command": "move", "x": x, "y": y, "z": z, "relative": relative}
)
self.client.publish(
self.microscope + "/command", payload=command, qos=2, retain=False
)
while self.receiveq.empty():
time.sleep(0.05)
return self.receiveq.get()
def scan(self, c1, c2, ov=1200, foc=0):
"""returns a list of image objects. Takes images to scan an entire area
specified by two corners. you can input the corner coordinates as "x1
y1", "x2, y2" or [x1, y1], [x2, y2]. ov refers to the overlap between
the images (useful for stitching) and foc refers to how much the
microscope should focus between images (0 to disable)"""
command = json.dumps(
{"command": "scan", "c1": c1, "c2": c2, "ov": ov, "foc": foc}
)
self.client.publish(
self.microscope + "/command", payload=command, qos=2, retain=False
)
while self.receiveq.empty():
time.sleep(0.05)
image_l = self.receiveq.get()
image_list = image_l["images"]
for i in range(len(image_list)):
image = image_list[i]
image_bytes = base64.b64decode(image)
image_object = Image.open(BytesIO(image_bytes))
image_list[i] = image_object
return image_list
def focus(self, amount="fast"):
"""focuses by different amounts: huge, fast, medium, fine, or any
integer value"""
command = json.dumps({"command": "focus", "amount": amount})
self.client.publish(
self.microscope + "/command", payload=command, qos=2, retain=False
)
while self.receiveq.empty():
time.sleep(0.05)
return self.receiveq.get()
def get_pos(
self,
):
"""returns a dictionary with x, y, and z coordinates eg.
{'x':1,'y':2,'z':3}"""
command = json.dumps({"command": "get_pos"})
self.client.publish(
self.microscope + "/command", payload=command, qos=2, retain=False
)
while self.receiveq.empty():
time.sleep(0.05)
pos = self.receiveq.get()
return pos["pos"]
def take_image(self):
"""returns an image object"""
command = json.dumps({"command": "take_image"})
self.client.publish(
self.microscope + "/command", payload=command, qos=2, retain=False
)
while self.receiveq.empty():
time.sleep(0.05)
image = self.receiveq.get()
image_string = image["image"]
image_bytes = base64.b64decode(image_string)
image_object = Image.open(BytesIO(image_bytes))
return image_object
def end_connection(self): # ends the connection
self.client.loop_stop()
self.client.disconnect()
|