Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| from datetime import datetime | |
| import os | |
| import json | |
| from pathlib import Path | |
| import sys | |
| import shutil | |
| import time | |
| import traceback | |
| import pandas as pd | |
| import streamlit as st | |
| import json | |
| from PIL import Image, ImageDraw | |
| CACHE_TTL = 60 * 60 * 24 * 14 | |
| """ | |
| Streamlit app utilities | |
| """ | |
| def load_json(basedir, name): | |
| if not os.path.exists(f"{basedir}/{name}.json"): | |
| return None | |
| with open(f"{basedir}/{name}.json", "r") as f: | |
| j = json.load(f) | |
| return j | |
| def load_json_no_cache(basedir, name): | |
| if not os.path.exists(f"{basedir}/{name}.json"): | |
| return None | |
| try: | |
| with open(f"{basedir}/{name}.json", "r") as f: | |
| j = json.load(f) | |
| except: | |
| return None | |
| return j | |
| def save_json(basedir, name, data): | |
| with open(f"{basedir}/{name}.json", "w") as f: | |
| json.dump(data, f, indent=4) | |
| def load_image(image_file): | |
| img = Image.open(image_file) | |
| return img | |
| def load_page(page_path): | |
| return open(page_path, "rb") | |
| def shorten(s): | |
| # shorten to 100 characters | |
| if len(s) > 100: | |
| s = s[:100] + "..." | |
| return s | |
| def parse_arguments(action): | |
| s = [] | |
| event_type = action["intent"] | |
| args = action["arguments"] | |
| if event_type == "textInput": | |
| txt = args["text"] | |
| txt = txt.strip() | |
| # escape markdown characters | |
| txt = txt.replace("_", "\\_") | |
| txt = txt.replace("*", "\\*") | |
| txt = txt.replace("`", "\\`") | |
| txt = txt.replace("$", "\\$") | |
| txt = shorten(txt) | |
| s.append(f'"{txt}"') | |
| elif event_type == "change": | |
| s.append(f'{args["value"]}') | |
| elif event_type == "load": | |
| url = args["properties"].get("url") or args.get("url") | |
| short_url = shorten(url) | |
| s.append(f'"[{short_url}]({url})"') | |
| if args["properties"].get("transitionType"): | |
| s.append(f'*{args["properties"]["transitionType"]}*') | |
| s.append(f'*{" ".join(args["properties"]["transitionQualifiers"])}*') | |
| elif event_type == "scroll": | |
| s.append(f'{args["scrollX"]}, {args["scrollY"]}') | |
| elif event_type == "say": | |
| s.append(f'"{args["text"]}"') | |
| elif event_type == "copy": | |
| selected = shorten(args["selected"]) | |
| s.append(f'"{selected}"') | |
| elif event_type == "paste": | |
| pasted = shorten(args["pasted"]) | |
| s.append(f'"{pasted}"') | |
| elif event_type == "tabcreate": | |
| s.append(f'{args["properties"]["tabId"]}') | |
| elif event_type == "tabremove": | |
| s.append(f'{args["properties"]["tabId"]}') | |
| elif event_type == "tabswitch": | |
| s.append( | |
| f'{args["properties"]["tabIdOrigin"]} -> {args["properties"]["tabId"]}' | |
| ) | |
| if args.get("element"): | |
| if event_type == 'click': | |
| x = round(args['metadata']['mouseX'], 1) | |
| y = round(args['metadata']['mouseY'], 1) | |
| uid = args.get('element', {}).get('attributes', {}).get("data-webtasks-id") | |
| s.append(f"*x =* {x}, *y =* {y}, *uid =* {uid}") | |
| else: | |
| top = round(args["element"]["bbox"]["top"], 1) | |
| left = round(args["element"]["bbox"]["left"], 1) | |
| right = round(args["element"]["bbox"]["right"], 1) | |
| bottom = round(args["element"]["bbox"]["bottom"], 1) | |
| s.append(f"*top =* {top}, *left =* {left}, *right =* {right}, *bottom =* {bottom}") | |
| return ", ".join(s) | |
| def create_visualization(_img, event_type, bbox, x, y, screenshot_path): | |
| # screenshot_path is not used, but we need it for caching since we can't cache | |
| # PIL images (hence the leading underscore in the variable name to indicate | |
| # that it's not hashed) | |
| _img = _img.convert("RGBA") | |
| draw = ImageDraw.Draw(_img) | |
| # draw a bounding box around the element | |
| color = { | |
| "click": "red", | |
| "hover": "orange", | |
| "textInput": "blue", | |
| "change": "green", | |
| }[event_type] | |
| left = bbox["left"] | |
| top = bbox["top"] | |
| w = bbox["width"] | |
| h = bbox["height"] | |
| draw.rectangle((left, top, left + w, top + h), outline=color, width=2) | |
| if event_type in ["click", "hover"]: | |
| r = 15 | |
| for i in range(1, 5): | |
| rx = r * i | |
| draw.ellipse((x - rx, y - rx, x + rx, y + rx), outline=color, width=3) | |
| draw.ellipse((x - r, y - r, x + r, y + r), fill=color) | |
| return _img | |
| def get_screenshot_minimal(screenshot_path, event_type, bbox, x, y, new_width=None): | |
| img = load_image(screenshot_path) | |
| # vis = None | |
| if event_type in ["click", "textInput", "change", "hover"]: | |
| img = create_visualization(img, event_type, bbox, x, y, screenshot_path) | |
| if new_width is not None: | |
| # Resize to 800px wide | |
| w, h = img.size | |
| new_w = new_width | |
| new_h = int(new_w * h / w) | |
| img = img.resize((new_w, new_h)) | |
| print(f"Resized '{screenshot_path}' to", new_w, new_h) | |
| return img | |
| def get_event_info(d): | |
| event_type = d["action"]["intent"] | |
| try: | |
| bbox = d["action"]["arguments"]["element"]["bbox"] | |
| except KeyError: | |
| bbox = None | |
| try: | |
| x = d["action"]["arguments"]["properties"]["x"] | |
| y = d["action"]["arguments"]["properties"]["y"] | |
| except KeyError: | |
| x = None | |
| y = None | |
| return event_type, bbox, x, y | |
| def get_screenshot(d, basedir, new_width=None): | |
| screenshot_filename = d["state"]["screenshot"] | |
| if not screenshot_filename: | |
| return None | |
| event_type, bbox, x, y = get_event_info(d) | |
| screenshot_path = f"{basedir}/screenshots/{screenshot_filename}" | |
| return get_screenshot_minimal( | |
| screenshot_path, event_type, bbox, x, y, new_width=new_width | |
| ) | |
| def text_bubble(text, color): | |
| text = text.replace("\n", "<br>").replace("\t", " " * 8) | |
| return f'<div style="background-color:{color}; padding: 8px; margin: 6px; border-radius:10px; display:inline-block;">{text}</div>' | |
| def gather_chat_history(data, example_index): | |
| chat = [] | |
| for i, d in enumerate(data): | |
| if d["type"] == "chat": | |
| if i >= example_index: | |
| break | |
| chat.append(d) | |
| # # leave out just 5 last messages | |
| # if len(chat) > 5: | |
| # chat = chat[-5:] | |
| return reversed(chat) | |
| def format_chat_message(d): | |
| if d["speaker"] == "instructor": | |
| return text_bubble("🧑 " + d["utterance"], "rgba(63, 111, 255, 0.35)") | |
| else: | |
| return text_bubble("🤖 " + d["utterance"], "rgba(185,185,185,0.35)") | |
| def find_screenshot(data, example_index, basedir): | |
| # keep looking at previous screenshots until we find one | |
| # if there is none, return None | |
| for i in range(example_index, -1, -1): | |
| d = data[i] | |
| if d["type"] == "chat": | |
| continue | |
| screenshot = get_screenshot(d, basedir) | |
| if screenshot: | |
| return screenshot | |
| return None | |
| def create_visualization_2(_img, bbox, color, width, x, y): | |
| _img = _img.convert("RGBA") | |
| draw = ImageDraw.Draw(_img) | |
| if bbox: | |
| left = bbox["left"] | |
| top = bbox["top"] | |
| w = bbox["width"] | |
| h = bbox["height"] | |
| draw.rectangle((left, top, left + w, top + h), outline=color, width=width) | |
| if x and y: | |
| r = 8 | |
| for i in range(1, 4): | |
| rx = r * i | |
| draw.ellipse((x - rx, y - rx, x + rx, y + rx), outline=color, width=2) | |
| draw.ellipse((x - r, y - r, x + r, y + r), fill=color) | |
| return _img | |
| def rescale_bbox(bbox, scaling_factor): | |
| return { | |
| k: bbox[k] * scaling_factor | |
| for k in ["top", "left", "width", "height", "right", "bottom"] | |
| if k in bbox | |
| } | |
| def show_overlay( | |
| _img, | |
| pred, | |
| ref, | |
| turn_args, | |
| turn_metadata, | |
| scale_pred=True, | |
| show=("pred_coords", "ref", "pred_elem"), | |
| ): | |
| scaling_factor = turn_metadata.get("zoomLevel", 1.0) | |
| if "pred_elem" in show: | |
| # First, draw red box around predicted element | |
| if pred.get("element") and pred["element"].get("bbox"): | |
| # rescale the bbox by scaling_factor | |
| bbox = rescale_bbox(pred["element"]["bbox"], scaling_factor) | |
| _img = create_visualization_2( | |
| _img, bbox, color="red", width=9, x=None, y=None | |
| ) | |
| if "ref" in show: | |
| # Finally, draw a blue box around the reference element (if it exists) | |
| if ref.get("element") and ref["element"].get("bbox"): | |
| # rescale the bbox | |
| bbox = rescale_bbox(ref["element"]["bbox"], scaling_factor) | |
| x = turn_args.get("properties", {}).get("x") | |
| y = turn_args.get("properties", {}).get("y") | |
| _img = create_visualization_2(_img, bbox, color="blue", width=6, x=x, y=y) | |
| if "pred_coords" in show: | |
| # Second draw a green box and x/y coordinate based on predicted coordinates | |
| # The predicted coordinates are the raw output of the model, | |
| # Whereas the predicted element is the inferred element from the predicted coordinates | |
| if pred["args"].get("x") and pred["args"].get("y"): | |
| x = pred["args"]["x"] | |
| y = pred["args"]["y"] | |
| if scale_pred: | |
| x = x * scaling_factor | |
| y = y * scaling_factor | |
| else: | |
| x = None | |
| y = None | |
| # If the predicted element is a bounding box, draw a green box around it | |
| if all(c in pred["args"] for c in ["top", "left", "right", "bottom"]): | |
| bbox = { | |
| "top": pred["args"]["top"], | |
| "left": pred["args"]["left"], | |
| "width": (pred["args"]["right"] - pred["args"]["left"]), | |
| "height": (pred["args"]["bottom"] - pred["args"]["top"]), | |
| "right": pred["args"]["right"], | |
| "bottom": pred["args"]["bottom"], | |
| } | |
| if scale_pred: | |
| bbox = rescale_bbox(bbox, scaling_factor) | |
| else: | |
| # Otherwise, do nothing | |
| bbox = None | |
| _img = create_visualization_2(_img, bbox=bbox, color="green", width=3, x=x, y=y) | |
| return _img | |