Spaces:
Running
Running
from datetime import datetime | |
import json | |
import os | |
import time | |
from pathlib import Path | |
import streamlit as st | |
from utils import ( | |
load_json, | |
load_json_no_cache, | |
parse_arguments, | |
format_chat_message, | |
find_screenshot, | |
gather_chat_history, | |
get_screenshot, | |
load_page, | |
) | |
def show_selectbox(demonstration_dir): | |
# find all the subdirectories in the current directory | |
dirs = [ | |
d | |
for d in os.listdir(demonstration_dir) | |
if os.path.isdir(f"{demonstration_dir}/{d}") | |
] | |
if not dirs: | |
st.title("No recordings found.") | |
return None | |
# sort by date | |
dirs.sort(key=lambda x: os.path.getmtime(f"{demonstration_dir}/{x}"), reverse=True) | |
# offer the user a dropdown to select which recording to visualize, set a default | |
recording_name = st.sidebar.selectbox("Recording", dirs, index=0) | |
return recording_name | |
def show_overview(data, recording_name, basedir): | |
st.title('[WebLINX](https://mcgill-nlp.github.io/weblinx) Explorer') | |
st.header(f"Recording: `{recording_name}`") | |
screenshot_size = st.session_state.get("screenshot_size_view_mode", "regular") | |
show_advanced_info = st.session_state.get("show_advanced_information", False) | |
if screenshot_size == "regular": | |
col_layout = [1.5, 1.5, 7, 3.5] | |
elif screenshot_size == "small": | |
col_layout = [1.5, 1.5, 7, 2] | |
else: # screenshot_size == 'large' | |
col_layout = [1.5, 1.5, 11] | |
# col_i, col_time, col_act, col_actvis = st.columns(col_layout) | |
# screenshots = load_screenshots(data, basedir) | |
for i, d in enumerate(data): | |
if i > 0 and show_advanced_info: | |
# Use html to add a horizontal line with minimal gap | |
st.markdown( | |
"<hr style='margin-top: 0.1rem; margin-bottom: 0.1rem;'/>", | |
unsafe_allow_html=True, | |
) | |
if screenshot_size == "large": | |
col_time, col_i, col_act = st.columns(col_layout) | |
col_actvis = col_act | |
else: | |
col_time, col_i, col_act, col_actvis = st.columns(col_layout) | |
secs_from_start = d["timestamp"] - data[0]["timestamp"] | |
# `secs_from_start` is a float including ms, display in MM:SS.mm format | |
col_time.markdown( | |
f"**{datetime.utcfromtimestamp(secs_from_start).strftime('%M:%S')}**" | |
) | |
if not st.session_state.get("enable_html_download", True): | |
col_i.markdown(f"**#{i}**") | |
elif d["type"] == "browser" and (page_filename := d["state"]["page"]): | |
page_path = f"{basedir}/pages/{page_filename}" | |
col_i.download_button( | |
label="#" + str(i), | |
data=load_page(page_path), | |
file_name=recording_name + "-" + page_filename, | |
mime="multipart/related", | |
key=f"page{i}", | |
) | |
else: | |
col_i.button(f"#{i}", type='secondary') | |
if d["type"] == "chat": | |
col_act.markdown(format_chat_message(d), unsafe_allow_html=True) | |
continue | |
# screenshot_filename = d["state"]["screenshot"] | |
img = get_screenshot(d, basedir) | |
arguments = parse_arguments(d["action"]) | |
event_type = d["action"]["intent"] | |
action_str = f"**{event_type}**({arguments})" | |
if img: | |
col_actvis.image(img) | |
col_act.markdown(action_str) | |
if show_advanced_info: | |
status = d["state"].get("screenshot_status", "unknown") | |
text = "" | |
if status == "good": | |
text += f'**:green[Used in demo]**\n\n' | |
text += f'Screenshot: `{d["state"]["screenshot"]}`\\\n' | |
text += f'Page: `{d["state"]["page"]}`\n' | |
col_act.markdown(text) | |
def load_recording(basedir): | |
# Before loading replay, we need a dropdown that allows us to select replay.json or replay_orig.json | |
# Find all files in basedir starting with "replay" and ending with ".json" | |
replay_files = sorted( | |
[ | |
f | |
for f in os.listdir(basedir) | |
if f.startswith("replay") and f.endswith(".json") | |
] | |
) | |
replay_file = st.sidebar.selectbox("Select replay", replay_files, index=0) | |
st.sidebar.checkbox( | |
"Advanced Screenshot Info", False, key="show_advanced_information" | |
) | |
st.sidebar.checkbox( | |
"Enable HTML download", False, key="enable_html_download" | |
) | |
replay_file = replay_file.replace(".json", "") | |
if not Path(basedir).joinpath('metadata.json').exists(): | |
st.error(f"Metadata file not found at {basedir}/metadata.json. This is likely an issue with Huggingface Spaces. Try cloning this repo and running locally.") | |
st.stop() | |
metadata = load_json_no_cache(basedir, "metadata") | |
# convert timestamp to readable date string | |
recording_start_timestamp = metadata["recordingStart"] | |
recording_start_date = datetime.fromtimestamp( | |
int(recording_start_timestamp) / 1000 | |
).strftime("%Y-%m-%d %H:%M:%S") | |
st.sidebar.markdown(f"**started**: {recording_start_date}") | |
# recording_end_timestamp = k["recordingEnd"] | |
# calculate duration | |
# duration = int(recording_end_timestamp) - int(recording_start_timestamp) | |
# duration = time.strftime("%M:%S", time.gmtime(duration / 1000)) | |
# Read in the JSON data | |
replay_dict = load_json_no_cache(basedir, replay_file) | |
form = load_json_no_cache(basedir, "form") | |
if replay_dict is None: | |
st.error(f"Replay file not found at {basedir}/{replay_file}. This is likely an issue with Huggingface Spaces. Try cloning this repo and running locally.") | |
st.stop() | |
if form is None: | |
st.error(f"Form file not found at {basedir}/form.json. This is likely an issue with Huggingface Spaces. Try cloning this repo and running locally.") | |
st.stop() | |
duration = replay_dict["data"][-1]["timestamp"] - replay_dict["data"][0]["timestamp"] | |
duration = time.strftime("%M:%S", time.gmtime(duration)) | |
st.sidebar.markdown(f"**duration**: {duration}") | |
if not replay_dict: | |
return None | |
for key in [ | |
"annotator", | |
"description", | |
"tasks", | |
"upload_date", | |
"instructor_sees_screen", | |
"uses_ai_generated_output", | |
]: | |
if form and key in form: | |
# Normalize the key to be more human-readable | |
key_name = key.replace("_", " ").title() | |
if type(form[key]) == list: | |
st.sidebar.markdown(f"**{key_name}**: {', '.join(form[key])}") | |
else: | |
st.sidebar.markdown(f"**{key_name}**: {form[key]}") | |
st.sidebar.markdown("---") | |
if replay_dict and "status" in replay_dict: | |
st.sidebar.markdown(f"**Validation status**: {replay_dict['status']}") | |
processed_meta_path = Path(basedir).joinpath('processed_metadata.json') | |
start_frame = 'file not found' | |
if processed_meta_path.exists(): | |
with open(processed_meta_path) as f: | |
processed_meta = json.load(f) | |
start_frame = processed_meta.get('start_frame', 'info not in file') | |
st.sidebar.markdown(f"**Recording start frame**: {start_frame}") | |
# st.sidebar.button("Delete recording", type="primary", on_click=delete_recording, args=[basedir]) | |
data = replay_dict["data"] | |
return data | |
def run(): | |
# mode = st.sidebar.radio("Mode", ["Overview"]) | |
demonstration_dir = "./demonstrations" | |
# # params = st.experimental_get_query_params() | |
# params = st.query_params | |
# print(params) | |
# # list demonstrations/ | |
# demo_names = os.listdir(demonstration_dir) | |
# if params.get("recording"): | |
# if isinstance(params["recording"], list): | |
# recording_name = params["recording"][0] | |
# else: | |
# recording_name = params["recording"] | |
# else: | |
# recording_name = demo_names[0] | |
# recording_name = st.sidebar.selectbox( | |
# "Recordings", | |
# demo_names, | |
# index=demo_names.index(recording_name), | |
# ) | |
# if recording_name != params.get("recording", [None])[0]: | |
# # st.experimental_set_query_params(recording=recording_name) | |
# # use st.query_params as a dict instead | |
# st.query_params['recording'] = recording_name | |
demo_names = os.listdir(demonstration_dir) | |
def update_recording_name(): | |
st.query_params["recording"] = st.session_state.get("recording_name", demo_names[0]) | |
# For initial run, set the query parameter to the selected recording | |
if not st.query_params.get("recording"): | |
update_recording_name() | |
recording_name = st.query_params.get("recording") | |
if recording_name not in demo_names: | |
st.error(f"Recording `{recording_name}` not found. Please select another recording.") | |
st.stop() | |
recording_idx = demo_names.index(recording_name) | |
st.sidebar.selectbox( | |
"Recordings", demo_names, on_change=update_recording_name, key="recording_name", index=recording_idx | |
) | |
with st.sidebar: | |
# Want a dropdown | |
st.selectbox( | |
"Screenshot size", | |
["small", "regular", "large"], | |
index=1, | |
key="screenshot_size_view_mode", | |
) | |
if recording_name is not None: | |
basedir = f"{demonstration_dir}/{recording_name}" | |
data = load_recording(basedir=basedir) | |
if not data: | |
st.stop() | |
show_overview(data, recording_name=recording_name, basedir=basedir) | |
if __name__ == "__main__": | |
st.set_page_config(layout="wide") | |
run() | |