Spaces:
Runtime error
Runtime error
import gradio as gr | |
from uploader import save_data, create_data_dir | |
from main import predict_task | |
from state_handler import load_example_result, reset_state | |
from file_reader import File | |
import numpy as np | |
from aws_handler import upload_file | |
from aris import create_metadata_table | |
import cv2 | |
import base64 | |
from bbox import draggable_js | |
from annotation_handler import load_frames | |
import json | |
from zipfile import ZipFile | |
import os | |
max_tabs = 10 | |
table_headers = ["TOTAL" , "FRAME_NUM", "DIR", "R", "THETA", "L", "TIME", "DATE", "SPECIES"] | |
info_headers = [ | |
["TOTAL_TIME", "DATE", "START", "END", "TOTAL_FRAMES", "FRAME_RATE"], | |
["TOTAL_FISH", "UPSTREAM_FISH", "DOWNSTREAM_FISH", "NONDIRECTIONAL_FISH"], | |
["UPSTREAM_MOTION", "INTENSITY", "THRESHOLD", "WATER_TEMP"] | |
] | |
css = """ | |
<style> | |
#result_json { | |
height: 500px; | |
overflow: scroll !important; | |
} | |
#marking_json thead { | |
display: none !important; | |
} | |
#canvas { | |
align-self: center; | |
} | |
</style> | |
""" | |
js_update_tabs = """ | |
async () => { | |
let el_list = document.getElementById("result_handler").getElementsByClassName("svelte-1kcgrqr") | |
let idx = (el_list[1].value === "LOADING") ? 1 : parseInt(el_list[1].value) | |
console.log(idx) | |
style_sheet = document.getElementById("tab_style") | |
style_sheet.innerHTML = "" | |
for (let i = 1; i <= idx; i++) { | |
style_sheet.innerHTML += "#result_tabs button.svelte-kqij2n:nth-child(" + i + "):before {content: 'Result " + i + "';}" | |
} | |
} | |
""" | |
#Initialize State & Result | |
state = { | |
'files': [], | |
'index': 1, | |
'total': 1 | |
} | |
result = {} | |
# Start function, called on file upload | |
def on_input(file_list): | |
# Reset Result | |
reset_state(result, state) | |
state['files'] = file_list | |
state['total'] = len(file_list) | |
# Update loading_space to start inference on first file | |
return { | |
inference_handler: gr.update(value = str(np.random.rand()), visible=True) | |
} | |
# Iterative function that performs inference on the next file in line | |
def handle_next(_, progress=gr.Progress()): | |
if state['index'] >= state['total']: | |
return { | |
result_handler: gr.update(), | |
inference_handler: gr.update() | |
} | |
# Correct progress function for batch file input | |
set_progress = lambda pct, msg : progress(pct, desc=msg) | |
if state['total'] > 1: | |
set_progress = lambda pct, msg : progress(pct, desc="File " + str(state['index']+1) + "/" + str(state['total']) + ": " + msg) | |
set_progress(0, "Starting...") | |
# Save file and create a new directory for result | |
file_info = state['files'][state['index']] | |
file_name = file_info[0].split("/")[-1] | |
bytes = file_info[1] | |
valid, file_path, dir_name = save_data(bytes, file_name) | |
print(dir_name) | |
print(file_path) | |
# Check that the file was valid | |
if not valid: | |
return { | |
result_handler: gr.update(), | |
inference_handler: gr.update() | |
} | |
# Send uploaded file to AWS | |
upload_file(file_path, "fishcounting", "webapp_uploads/" + file_name) | |
# Do inference | |
json_result, json_filepath, zip_filepath, video_filepath, marking_filepath = predict_task(file_path, gradio_progress=set_progress) | |
# Store result for that file | |
result['json_result'].append(json_result) | |
result['aris_input'].append(file_path) | |
result["path_video"].append(video_filepath) | |
result["path_zip"].append(zip_filepath) | |
result["path_json"].append(json_filepath) | |
result["path_marking"].append(marking_filepath) | |
fish_table, fish_info = create_metadata_table(json_result, table_headers, info_headers) | |
result["fish_table"].append(fish_table) | |
result["fish_info"].append(fish_info) | |
# Increase file index | |
state['index'] += 1 | |
# Send of update to result_handler to show new result | |
# Leave inference_handler update blank to avoid starting next inference until result is updated | |
return { | |
result_handler: gr.update(value = str(state["index"])), | |
inference_handler: gr.update() | |
} | |
# Show result UI based on example data | |
def show_example_data(): | |
load_example_result(result, table_headers, info_headers) | |
state["index"] = 1 | |
return gr.update(value=str(state["index"])) | |
def show_data(): | |
# Get last index | |
i = state["index"] - 1 | |
# If index is larger than max_tabs, only add file to zip list | |
if i >= max_tabs: | |
return { | |
zip_out: gr.update(value=result["path_zip"]) | |
} | |
# Check if inference is done | |
not_done = state['index'] < state['total'] | |
annotation_html = "" | |
if result["aris_input"][i]: | |
frame_info = load_frames(result["aris_input"][i], result['json_result'][i]) | |
annotation_html = "<div style='display:flex'>" | |
annotation_html += "<canvas id='canvas' style='width:50%' onmousedown='mouse_down(event)' onmousemove='mouse_move(event)' onmouseup='mouse_up()' onmouseleave='mouse_up()'></canvas>" | |
annotation_html += "<div id='annotation_display' style='width:50%'></div>" | |
annotation_html += "</div>" | |
annotation_html += "<p id='annotation_info' style='display:none'>" + json.dumps(frame_info) + "</p>" | |
annotation_html += "<img id='annotation_img' onload='draw()' style='display:none'></img>" | |
# Send update to UI, and to inference_handler to start next file inference | |
return { | |
zip_out: gr.update(value=result["path_zip"]), | |
tabs[i]['tab']: gr.update(), | |
tabs[i]['video']: gr.update(value=result["path_video"][i], visible=True), | |
tabs[i]['metadata']: gr.update(value=result["fish_info"][i], visible=True), | |
tabs[i]['table']: gr.update(value=result["fish_table"][i], visible=True), | |
tabs[i]['annotation']: gr.update(value=annotation_html, visible=True), | |
tab_parent: gr.update(selected=i), | |
inference_handler: gr.update(value = str(np.random.rand()), visible=not_done) | |
} | |
def testing(): | |
codec = cv2.VideoWriter_fourcc(*'avc1') | |
vr = cv2.VideoWriter("static/test.mp4", codec, 10, [ int(1.5*100), 100 ] ) | |
print(vr) | |
print("hi") | |
codec = cv2.VideoWriter_fourcc(*'mp4v') | |
vr = cv2.VideoWriter("static/test.mp4", codec, 10, [ int(1.5*100), 100 ] ) | |
print(vr) | |
print("hi") | |
vr = cv2.VideoWriter("static/test.mp4", -1, 10, [ int(1.5*100), 100 ] ) | |
testing() | |
def preview_result(zip_info, aris_info): | |
zip_name = zip_info[0] | |
print(zip_name) | |
if (aris_info): | |
print(aris_info[0]) | |
file_name = aris_info[0].split("/")[-1] | |
bytes = aris_info[1] | |
valid, file_path, dir_name = save_data(bytes, file_name) | |
else: | |
dir_name = create_data_dir() | |
file_path = None | |
with ZipFile(zip_name) as zip_file: | |
ZipFile.extractall(zip_file, path=dir_name) | |
unzipped = os.listdir(dir_name) | |
print(unzipped) | |
reset_state(result, state) | |
state["index"] = 1 | |
for file in unzipped: | |
if (file.endswith("_results.mp4")): | |
result["path_video"].append(os.path.join(dir_name, file)) | |
elif (file.endswith("_results.json")): | |
result["path_json"].append(os.path.join(dir_name, file)) | |
elif (file.endswith("_marking.txt")): | |
result["path_marking"].append(os.path.join(dir_name, file)) | |
result["aris_input"].append(file_path) | |
with open(result['path_json'][0]) as f: | |
json_result = json.load(f) | |
result['json_result'].append(json_result) | |
fish_table, fish_info = create_metadata_table(json_result, table_headers, info_headers) | |
result["fish_table"].append(fish_table) | |
result["fish_info"].append(fish_info) | |
return { | |
result_handler: gr.update(value = str(state["index"])), | |
inference_handler: gr.update() | |
} | |
demo = gr.Blocks() | |
with demo: | |
with gr.Blocks() as inner_body: | |
# Title of page | |
gr.HTML( | |
""" | |
<h1 align="center" style="font-size:xxx-large">Caltech Fisheye</h1> | |
""" + css + """ | |
<style id="tab_style"></style> | |
""" | |
) | |
with gr.Tabs(): | |
with gr.Tab("Infer ARIS"): | |
gr.HTML("<p align='center' style='font-size: large;font-style: italic;'>Submit an .aris file to analyze result.</p>") | |
#Input field for aris submission | |
input = File(file_types=[".aris", ".ddf"], type="binary", label="ARIS Input", file_count="multiple") | |
# Dummy element to call inference events, this also displays the inference progress | |
inference_handler = gr.Text(value=str(np.random.rand()), visible=False) | |
# Zip file output | |
zip_out = gr.File(label="ZIP Output", interactive=False) | |
with gr.Tab("Review Results"): | |
# Title of page | |
gr.HTML(""" | |
<p align='center' style='font-size: large;font-style: italic;'>Submit an old zip file of results to visualize.</p> | |
<p align='center' style='font-size: large;font-style: italic;'>If you want to edit annotations, also submit an aris file.</p> | |
""") | |
result_input = File(file_types=[".zip"], type="binary", label="Upload result file") | |
result_aris_input = File(file_types=[".aris", ".ddf"], type="binary", label="Upload aris file (optional)") | |
preview_result_btn = gr.Button("Preview Result") | |
# Dummy element to call UI events | |
result_handler = gr.Text(value="LOADING", visible=False, elem_id="result_handler") | |
# List of all UI components that will recieve outputs from the result_handler | |
UI_components = [] | |
UI_components.append(zip_out) | |
# Create result tabs | |
tabs = [] | |
with gr.Tabs(elem_id="result_tabs") as tab_parent: | |
UI_components.append(tab_parent) | |
for i in range(max_tabs): | |
with gr.Tab(label="", id=i, elem_id="result_tab"+str(i)) as tab: | |
metadata_out = gr.Matrix(label="Info", interactive=False, headers=[""]*6, datatype="markdown", visible=False, elem_id="marking_json") | |
table_out = gr.Matrix(label='Indentified Fish', headers=table_headers, interactive=False, visible=False) | |
video_out = gr.Video(label='Annotated Video', interactive=False, visible=False) | |
annotation_editor = gr.HTML("""""", visible=False) | |
with open('annotation_editor.js', 'r') as f: | |
js = f.read() | |
annotation_editor.change(lambda x: gr.update(), None, annotation_editor, _js=js) | |
tabs.append({ | |
'tab': tab, | |
'metadata': metadata_out, | |
'video': video_out, | |
'table': table_out, | |
'annotation': annotation_editor | |
}) | |
UI_components.extend([tab, metadata_out, video_out, table_out, annotation_editor]) | |
# Button to show example result | |
#gr.Button(value="Show Example Result").click(show_example_data, None, result_handler) | |
# Disclaimer at the bottom of page | |
gr.HTML( | |
""" | |
<p align="center"> | |
<b>Note</b>: The software is provided "as is", without warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose and noninfringement. | |
In no event shall the authors or copyright holders be liable for any claim, damages or other liability, whether in an action of contract, tort or otherwise, arising from, out of or in connection with the software or the use or other dealings in the software. | |
</p> | |
""" | |
) | |
# When a file is uploaded to the input, tell the inference_handler to start inference | |
input.upload(fn=on_input, inputs=input, outputs=[inference_handler]) | |
# When inference handler updates, tell result_handler to show the new result | |
# Also, add inference_handler as the output in order to have it display the progress | |
inference_handler.change(handle_next, None, [result_handler, inference_handler]) | |
# Send UI changes based on the new results to the UI_components, and tell the inference_handler to start next inference | |
result_handler.change(show_data, None, UI_components + [inference_handler], _js=js_update_tabs) | |
# Button to load a previous result and view visualization | |
preview_result_btn.click(preview_result, [result_input, result_aris_input], [result_handler, inference_handler, preview_result_btn]) | |
demo.queue().launch() | |
show_data() | |