Spaces:
Runtime error
Runtime error
# | |
import gradio as gr | |
import os | |
from ultralytics import YOLO | |
from google.cloud import vision | |
_api_key = os.environ["API_KEY"] | |
_project_id = os.environ["PROJECT_ID"] | |
client = vision.ImageAnnotatorClient(client_options={"quota_project_id": _project_id, "api_key": _api_key}) | |
# client = vision.ImageAnnotatorClient() | |
AngTol = 10 | |
import math | |
from scipy.spatial import KDTree | |
import io | |
from time import time | |
from PIL import Image, ImageDraw, ImageFilter | |
import numpy as np | |
import cv2 | |
import sys | |
sys.path.insert(0, ".") | |
import classical | |
from typing import Union | |
modelPh = r'corners-best.pt' | |
model1DIM = 640 | |
keypointModel = r'keypoints-best.pt' | |
minSz = 1280 | |
_examples = [["example0.jpg", True],["example1.jpg",True], ["example2.jpg",False], ["example3.jpg",True]] | |
def unwarp_image(warped_image, src_points, dst_points, output_width, output_height): | |
src_pts = np.array(src_points).astype(np.float64) | |
dst_pts = np.array(dst_points).astype(np.float64) | |
homography, mask = cv2.findHomography(src_pts, dst_pts) | |
unwarped_image = cv2.warpPerspective( | |
np.array(warped_image), homography, (output_width, output_height) | |
) | |
unwarped_image = Image.fromarray(unwarped_image) | |
return unwarped_image | |
model0 = None | |
def get_load_PhModel(): | |
global model0 | |
if model0 ==None: | |
tic = time() | |
model0 = YOLO(modelPh) # load a custom model | |
print(f"model0 load took: {time()-tic:.2g}") | |
return model0 | |
def get_corners(results:list, img): | |
global model1DIM | |
# keypoints ie corners for homography | |
KP = "topLeft topRight bottomRight bottomLeft".split() | |
r = results[0] | |
planars = [] | |
kps = [] | |
for kpco in r.keypoints.xy.cpu():#.squeeze() | |
# assert len(kpco)>0, "not found" | |
keypoints = {k:v.numpy() for v,k in zip(kpco,KP)} | |
sz = model1DIM | |
dstCorners = np.array([(0,0),(sz,0),(sz,sz),(0,sz)]) | |
planar = unwarp_image(img, np.array(list(keypoints.values())),dstCorners, sz,sz) | |
# planar.save("temp-ph.jpg") | |
planars.append(planar) | |
kps.append(keypoints) | |
return planars, kps | |
model = None | |
def get_load_KpModel(): | |
global model | |
if model == None: | |
tic = time() | |
model = YOLO(keypointModel) # load a custom model | |
print(f"model load took: {time()-tic:.2g}") | |
return model | |
def preprocessImg(planar): | |
img = planar.convert('RGB').copy() | |
w,h = img.size | |
smalldl = abs(w-h)/h <0.05 | |
_ = max(w,h) | |
DIM = w | |
if w!=h and smalldl: | |
img = img.resize((_,_)) | |
elif w!=h: | |
img = img.resize((_,_)) | |
if _ < minSz: | |
img = img.resize((minSz,minSz)) | |
return img | |
def get_keypoints(results:list): | |
if len(results) !=1: | |
raise gr.Error("found multiple dials. expected only 1") | |
r = results[0] | |
# ordering | |
kp = "start_kp center end_kp tip".split() | |
kpco = r.keypoints.xy.cpu().squeeze() | |
keypoints = {k:v.numpy() for v,k in zip(kpco,kp)} | |
if len(keypoints["center"])!=2: | |
raise gr.Error("center keypoint not found") | |
elif len(keypoints["tip"])!=2: | |
raise gr.Error("tip keypoint not found") | |
return keypoints | |
def cosangle(a,b, ignoreRot=False): | |
na = np.linalg.norm(a) | |
nb = np.linalg.norm(b) | |
angle2tip = np.rad2deg(np.arccos(np.dot(a, b)/(na*nb))) | |
angle2tip | |
rotdir = np.cross(a,b) < 0 | |
if rotdir and not ignoreRot: | |
return 360-angle2tip | |
return angle2tip | |
def calculate_sweep_angles(keypoints:dict): | |
# get sweep angles start->tip | |
a = keypoints["start_kp"] - keypoints["center"] | |
b = keypoints["tip"] - keypoints["center"] | |
angle2tip = cosangle(a, b) | |
# get sweep angles start->end | |
b = keypoints["end_kp"] - keypoints["center"] | |
totalAngle = cosangle(a, b) | |
return angle2tip, totalAngle | |
def get_text_from_image(client, path_or_img)->Union[list[dict],Exception ]: | |
if type(path_or_img)==str: | |
with open(path_or_img, "rb") as image_file: | |
content = image_file.read() | |
else: | |
buf = io.BytesIO() | |
path_or_img.save(buf, format="JPEG") | |
content = buf.getvalue() | |
image = vision.Image(content=content) | |
response = client.text_detection(image=image) | |
if response.error.message: | |
raise Exception( | |
"{}\nFor more info on error messages, check: " | |
"https://cloud.google.com/apis/design/errors".format(response.error.message) | |
) | |
texts = response.text_annotations | |
contents = [ {"text": found.description, "boxCorners": [ (vert.x, vert.y) for vert in found.bounding_poly.vertices]} for found in texts] | |
return contents | |
def median_point_of_bounding_box(x1, y1, x2, y2, x3, y3, x4, y4): | |
x_coords = [x1, x2, x3, x4] | |
y_coords = [y1, y2, y3, y4] | |
x_median = sum(x_coords) / len(x_coords) | |
y_median = sum(y_coords) / len(y_coords) | |
return x_median, y_median | |
def to_numeric(text:str): | |
try: | |
return float(text.replace(",",".")) | |
except: | |
pass | |
return None | |
def result_as_validvalue(contents:list[dict])->tuple[list[dict], list[str]]: | |
# only valid values and sort min to max | |
valid = [] | |
other = [] | |
for f in contents: | |
t = f["text"] | |
value = to_numeric(t) | |
if "\n" in t: | |
continue | |
elif value == None and t!="": | |
other.append(t) | |
continue | |
b = f["boxCorners"] | |
m = median_point_of_bounding_box(*np.array(b).flatten()) | |
a = cv2.contourArea(np.array(b)) / len(f["text"]) | |
valid.append({"text":f["text"], "value": value, "mid": m, "apchar":a, "box":b}) | |
valid.sort(key=lambda e: e["value"]) | |
return valid, list(set(other)) | |
distance = lambda a,b : np.sqrt(np.square(np.array(a)-np.array(b)).sum()) | |
def determine_ocr_neighbors(keypoints, valid:list[dict], nearestIx)->tuple[ list, float ]: | |
center = np.array(keypoints["center"]) | |
def cosangle(a,b): | |
na = np.linalg.norm(a) | |
nb = np.linalg.norm(b) | |
ang = np.rad2deg(np.arccos(np.dot(a, b)/(na*nb))) | |
rotdir = -1 if np.cross(a,b) < 0 else 1 | |
return ang , rotdir | |
# compute angles between values | |
values = [valid[0]] | |
values[0]["dang"] = 0 | |
values[0]["ds"] = distance(center, values[0]["mid"]) | |
rates = [] | |
angS = 0 | |
for v in valid[1:]: | |
u = v.copy() | |
u["dv"] = v["value"] - values[-1]["value"] | |
a = np.array(values[-1]["mid"]) - center | |
b = np.array(v["mid"]) - center | |
ang,_ = cosangle(a,b) | |
u["rot"] = _ | |
angS += ang | |
u["dang"] = ang | |
# u["ddir"] = rot # counter clockwise? | |
u["dvda"] = u["dv"] / ang | |
rates.append(u["dvda"]) | |
# | |
# u["ds"] = distance(values[-1]["mid"], u["mid"]) | |
u["ds"] = distance(center, u["mid"]) | |
values.append(u) | |
if nearestIx[0]==0: | |
rates.insert(0, rates[0]) | |
rates = np.array(rates) | |
# filter outlier rate | |
# ix = np.bitwise_and(rates> np.quantile(rates, 0.05) , rates<np.quantile(rates, 0.95)) | |
# rate = rates[ix].mean() | |
meanAng = angS/len(valid) | |
if len(rates)>=6: | |
ix = np.bitwise_and(rates> np.quantile(rates, 0.05) , rates<np.quantile(rates, 0.95)) | |
if not np.all(~ix): | |
rates = rates[ix] | |
rate = rates.mean() | |
elif len(nearestIx)==2: | |
n = [nearestIx[0], nearestIx[1]] | |
rank = np.hstack([np.arange(0,n[0]+1)[::-1], np.arange(n[1],len(rates))-n[1]]).astype(float) | |
weights = np.exp(-2*rank) | |
weights /= weights.sum() | |
rate = np.average(rates, weights=weights) | |
elif len(nearestIx)==1: | |
rate = rates[nearestIx[0]] | |
rate, meanAng | |
return values, rate | |
def vec_angle(v1, v2)->tuple[float, bool]: | |
vector1 = v1/np.linalg.norm(v1) | |
vector2 = v2/np.linalg.norm(v2) | |
angle_rad = np.arctan2(np.cross(vector1, vector2), np.dot(vector1, vector2)) | |
return math.degrees(angle_rad) | |
def angles_from_tip(keypoints, values, nearestIx): | |
center = keypoints["center"] | |
tip = keypoints["tip"] - center | |
N = len(nearestIx) | |
start = nearestIx[0] | |
if N==2 or (N==1 and nearestIx[0]==len(values)-1): | |
v = values[start] | |
a = v["mid"] - center | |
ang = vec_angle(a,tip) | |
cumsum = 0 | |
for i in range(start,-1,-1): | |
values[i]["before"] = abs(ang)+cumsum | |
cumsum += values[i]["dang"] | |
if N==2 or (N==1 and nearestIx[0]==0): | |
if N==1: | |
start = nearestIx[0] | |
else: | |
start = nearestIx[1] | |
v = values[start] | |
a = v["mid"] - center | |
ang = vec_angle(a,tip) | |
values[start]["dang"] = 0 | |
cumsum = 0 | |
for i in range(start, len(values)): | |
cumsum -= values[i]["dang"] | |
values[i]["before"] = -abs(ang)+cumsum | |
return values | |
def sort_clockwise_with_start(coordinates, x_center, y_center, starting_index): | |
angles = [math.atan2(y - y_center, x - x_center) for x, y in coordinates] | |
sorted_indices = sorted(range(len(angles)), key=lambda i: (angles[i] - angles[starting_index] + 2 * math.pi) % (2 * math.pi)) | |
return sorted_indices, angles | |
def remove_nonrange_value(valid): | |
# meanArea = np.mean([e["apchar"] for e in valid]) | |
meanArea = np.mean([e["apchar"] for e in valid if "apchar" in e]) | |
cutoff = 0.5 | |
# valid = list(filter(lambda e: abs(e["apchar"]-meanArea)/meanArea < cutoff, valid)) | |
valid = list(filter(lambda e: True if e["text"]=="tip" else abs(e["apchar"]-meanArea)/meanArea < cutoff, valid)) | |
return valid | |
def check_tip(img, keypoints): | |
lines = classical.get_needle_line(np.array(img)) | |
if lines is None or len(lines)==0: | |
return False | |
# lines = lines.squeeze() | |
if lines.ndim==1: | |
lines = np.expand_dims(lines,axis=0) | |
# nearest line to center, | |
dist2 = lambda a,b: (a[0]-b[0])**2 + (a[1]-b[1])**2 | |
center = keypoints["center"] | |
ds = [ min(dist2(center, e[:2]), dist2(center, e[2:])) for e in lines] # closest line to center | |
ix= np.argsort(ds) | |
ix, ds | |
l = lines[ix][0] | |
a = np.array([l[0]-l[2], l[1]-l[3]]) | |
a | |
tip = keypoints["tip"] - center | |
ang = vec_angle(a, tip) | |
if abs(ang) > AngTol: | |
# furthest point from center is tip | |
if dist2(l[:2],center) > dist2(l[2:],center): | |
keypoints["tip"] = l[:2] | |
else: | |
keypoints["tip"] = l[2:] | |
print("new point ", keypoints["tip"]) | |
return True | |
return False | |
def get_needle_value(img, keypoints): | |
tic2 = time() | |
contents = get_text_from_image(client, img) | |
toc = time() | |
print(f"ocr took: {toc-tic2:.1g}") | |
if 0==len(contents): | |
raise gr.Error("failed to get any text/number") | |
valid,other = result_as_validvalue(contents) | |
if 0==len(valid): | |
raise gr.Error("failed to get any number") | |
valid.append({"text":"tip", "mid":keypoints["tip"]}) | |
ix,an = sort_clockwise_with_start([e["mid"] for e in valid],*keypoints["center"], 0) | |
valid = [valid[i] for i in ix] | |
# assert valid[-1]["text"]!="tip" and valid[0]["text"]!="tip", "failed to properly detect tip" | |
valid = remove_nonrange_value(valid) | |
i=0 | |
nearestIx=[] | |
for i,v in enumerate(valid): | |
if "tip"==v["text"]: | |
nearestIx = [i-1,i] | |
valid.pop(i) | |
break | |
if len(valid)==nearestIx[1] or -1==nearestIx[0]: | |
# nearestIx[1] = 0 # tip is out of bounds | |
tip = keypoints["tip"] - keypoints["center"] | |
b = valid[0]["mid"] - keypoints["center"] | |
a = valid[-1]["mid"] - keypoints["center"] | |
if abs(vec_angle(tip,a)) < abs(vec_angle(tip, b)): | |
nearestIx = [len(valid)-1] | |
else: | |
nearestIx = [0] | |
# nearest to | |
nearestIx = np.array(nearestIx) | |
center = np.array(keypoints["center"]) | |
values, rate = determine_ocr_neighbors(keypoints, valid, nearestIx) | |
if len(values)<2: | |
raise gr.Error("failed to find at least 2 OCR number values") | |
# import pandas as pd | |
# print(pd.DataFrame.from_dict(values)) | |
# print(nearestIx) | |
# tree = KDTree([v["mid"] for v in values]) | |
# # find bounding ocr values of tip | |
# dist, nearestIx = tree.query(keypoints["tip"],k=2) | |
# nearestIx.sort() | |
# dist, nearestIx | |
values = angles_from_tip(keypoints, values, nearestIx) | |
# compare against start and end | |
c = keypoints["center"] | |
tip = keypoints["tip"] - c | |
tipValues = [] | |
for i in range(len(values)): | |
v = values[i] | |
a = v["mid"] - c | |
ang = vec_angle(a,tip) | |
before = v["before"] | |
startValue = v["value"] | |
angle2tip = ang | |
needleVal = -1 | |
angle2tip = before | |
needleVal = angle2tip * rate + startValue # tip value from nearest Ix | |
tipValues.append(needleVal) | |
print(f"{i}, {ang:.2f}, {before:.2f}, @{needleVal:.2f}, {startValue}") | |
# print(f"total took: {toc-tic:.1g}") | |
tipValues = np.array(tipValues) | |
# debug(img, contents, keypoints) | |
startValue= float(values[0]["value"]) | |
tipvalue= round(float(tipValues[nearestIx].mean()),2) | |
endValue= float(values[-1]["value"]) | |
return {"startValue": startValue, "tipvalue": tipvalue, "endValue": endValue, "unitPerDeg": float(rate), "otherText": list(set(other))} | |
# debug draw | |
def corners2bbox(C): | |
p = np.array(C) | |
s,e = p.min(axis=0).astype(int), p.max(axis=0).astype(int) | |
return s, e | |
def debug(img, contents, keypoints): | |
draw = ImageDraw.Draw(img) | |
for f in contents: | |
b = f["boxCorners"] | |
s,e = corners2bbox(b) | |
c = (255,0,0) | |
draw.rectangle((*s,*e), fill=None, outline=c, width=1) | |
m = median_point_of_bounding_box(*np.array(b).flatten()) | |
draw.point(m, (255,0,255)) | |
img | |
for v,c in zip(keypoints.values(), [(255,0,0), (0,255,0), (0,0,255),(255,0,255)]): | |
s = np.array(v)-1 | |
e = np.array(v)+1 | |
draw.rectangle((*s,*e), c) | |
img.save("temp-ocr.jpg") | |
print("saved debug img") | |
def predict(img, detect_gauge_first): | |
KPs = [] | |
if detect_gauge_first: | |
model0 = get_load_PhModel() | |
results = model0.predict(img) | |
phimgs,KPs = get_corners(results, img) | |
if len(phimgs)==0: | |
raise gr.Error("no gauge found") | |
else: | |
phimgs = [img.copy()] | |
payloads = [] | |
for i,phimg in enumerate(phimgs): | |
model = get_load_KpModel() | |
phimg = preprocessImg(phimg) | |
results = model.predict(phimg) | |
keypoints = get_keypoints(results) | |
angle2tip, totalAngle = calculate_sweep_angles(keypoints) | |
angReplaced = check_tip(phimg, keypoints) | |
phimg = phimg.filter(ImageFilter.UnsharpMask(radius=3)) | |
payload = get_needle_value(phimg, keypoints) | |
payload["angleToTip"] = round(float(angle2tip),2) | |
if angReplaced: | |
payload["angleToTip"] = None | |
payload["totalAngle"] = round(float(totalAngle),2) | |
for k,v in payload.items(): | |
print(k, type(v),v) | |
if len(KPs)>i: | |
payload["bbox"] = {k:v.astype(int).tolist() for k,v in KPs[i].items()} | |
payloads.append(payload) | |
return payloads | |
def test(img, detect_gauge_first): | |
return {"msg":str(img.size), "other": detect_gauge_first} | |
description = r""" | |
<b>Official 🤗 Gradio demo</b> for <a href='https://synanthropic.com/reading-analog-gauge' target='_blank'><b>Reading Analog Gauges: Automate Gauge Readings with AI in Days, Not Months | |
</b></a>.<br> | |
<br> | |
This model reads analog dial gauge by detecting, applying perspective correction, and gauge reading. | |
<br> | |
The model was build <i><strong>only</strong></i> with synthetic data (e.g. examples).<br> | |
Hence, it <i>probably</i> will not work on significantly different images - give it a try. Let us know, so we can keep improving.<br> | |
<br> | |
You can read more about it [here](https://synanthropic.com/reading-analog-gauge). | |
<br> | |
<br> | |
❗️Usage steps:<br> | |
1️⃣ Upload an image with analog dial gauge with readable values. The gauge face in the uploaded image should <b>occupy the majority of the image</b>.<br> | |
2️⃣ If the image has only one gauge and is a direct flat view, uncheck <strong>detect gauge first</strong>.</br> | |
3️⃣ Click the <b>Submit</b> button to start inference.<br> | |
<br> | |
""" | |
gr.Interface(title="Reading Analog Gauges", | |
description=description, | |
fn=predict, | |
inputs=[ | |
gr.Image(type="pil", sources=["upload"],streaming=False, min_width=640), | |
gr.Checkbox(True, label="detect gauge first", info="if input image is zoomed in on only one gauge, uncheck box") | |
], | |
outputs="json", | |
examples=_examples, | |
allow_flagging="never", | |
cache_examples=True)\ | |
.launch() | |