Spaces:
Runtime error
Runtime error
import gradio as gr | |
from uploader import save_data_to_dir, create_data_dir, save_data | |
from main import predict_task | |
from gradio_scripts.state_handler import reset_state | |
import numpy as np | |
from gradio_scripts.aws_handler import upload_file | |
from aris import create_metadata_table | |
from gradio_scripts.annotation_handler import init_frames | |
import json | |
from zipfile import ZipFile | |
import os | |
from gradio_scripts.upload_ui import Upload_Gradio, models | |
from gradio_scripts.result_ui import Result_Gradio, update_result, table_headers, info_headers, js_update_tab_labels | |
from dataloader import create_dataloader_aris | |
from aris import BEAM_WIDTH_DIR | |
from InferenceConfig import InferenceConfig | |
WEBAPP_VERSION = "1.0" | |
#Initialize State & Result | |
state = { | |
'files': [], | |
'index': 1, | |
'total': 1, | |
'annotation_index': -1, | |
'frame_index': 0, | |
'outputs': [], | |
'config': None, | |
} | |
result = {} | |
# Called when an Aris file is uploaded for inference | |
def on_aris_input(file_list, model_id, conf_thresh, iou_thresh, min_hits, max_age, associative_tracker, boost_power, boost_decay, byte_low_conf, byte_high_conf, min_length, min_travel, output_formats): | |
print(output_formats) | |
# Reset Result | |
reset_state(result, state) | |
state['files'] = file_list | |
state['total'] = len(file_list) | |
state['version'] = WEBAPP_VERSION | |
state['outputs'] = output_formats | |
state['config'] = InferenceConfig( | |
weights = models[model_id] if model_id in models else models['master'], | |
conf_thresh = conf_thresh, | |
nms_iou = iou_thresh, | |
min_hits = min_hits, | |
max_age = max_age, | |
min_length = min_length, | |
min_travel = min_travel, | |
) | |
# Enable tracker if specified | |
if (associative_tracker == "Confidence Boost"): | |
state['config'].enable_conf_boost(boost_power, boost_decay) | |
elif (associative_tracker == "ByteTrack"): | |
state['config'].enable_byte_track(byte_low_conf, byte_high_conf) | |
else: | |
state['config'].enable_sort_track() | |
print(" ") | |
print("Running with:") | |
print(state['config'].to_dict()) | |
print(" ") | |
# Update loading_space to start inference on first file | |
return { | |
inference_handler: gr.update(value = str(np.random.rand()), visible=True), | |
components['cancelBtn']: gr.update(visible=True), | |
components['skipBtn']: gr.update(visible=True), | |
master_tabs: gr.update(selected=1) | |
} | |
# Called when a result zip file is uploaded for result review | |
def on_result_upload(zip_list, aris_list): | |
if (zip_list == None): | |
zip_list = [("static/example/example_result.zip", None)] | |
aris_path = "static/example/input_file.aris" | |
aris_list = [(aris_path, bytearray(open(aris_path, 'rb').read()))] | |
reset_state(result, state) | |
state['version'] = WEBAPP_VERSION | |
state['outputs'] = ["Annotated Video", "Manual Marking", "PDF"] | |
component_updates = { | |
master_tabs: gr.update(selected=1), | |
tab_labeler: gr.update(value = len(zip_list)) | |
} | |
for i in range(len(zip_list)): | |
# Create dir to unzip files | |
dir_name = create_data_dir(str(i)) | |
# Check aris input | |
if (aris_list): | |
aris_info = aris_list[i] | |
file_name = aris_info[0].split("/")[-1] | |
bytes = aris_info[1] | |
valid, input_path, dir_name = save_data_to_dir(bytes, file_name, dir_name) | |
else: | |
input_path = None | |
# Unzip result | |
zip_info = zip_list[i] | |
zip_name = zip_info[0] | |
print(zip_name) | |
with ZipFile(zip_name) as zip_file: | |
ZipFile.extractall(zip_file, path=dir_name) | |
unzipped = os.listdir(dir_name) | |
print(unzipped) | |
for file in unzipped: | |
if (file.endswith("_results.mp4")): | |
result["path_video"].append(os.path.join(dir_name, file)) | |
elif (file.endswith("_results.json")): | |
result["path_json"].append(os.path.join(dir_name, file)) | |
elif (file.endswith("_marking.txt")): | |
result["path_marking"].append(os.path.join(dir_name, file)) | |
result["aris_input"].append(input_path) | |
with open(result['path_json'][-1]) as f: | |
json_result = json.load(f) | |
result['json_result'].append(json_result) | |
fish_table, fish_info = create_metadata_table(json_result, table_headers, info_headers) | |
result["fish_table"].append(fish_table) | |
result["fish_info"].append(fish_info) | |
update = update_result(i, state, result, inference_handler) | |
for key in update.keys(): | |
component_updates[key] = update[key] | |
component_updates.pop(inference_handler) | |
return component_updates | |
# Iterative function that performs inference on the next file in line | |
def infer_next(_, progress=gr.Progress()): | |
if state['index'] >= state['total']: | |
return { | |
result_handler: gr.update(), | |
inference_handler: gr.update() | |
} | |
# Correct progress function for batch file input | |
set_progress = lambda pct, msg : progress(pct, desc=msg) | |
if state['total'] > 1: | |
set_progress = lambda pct, msg : progress(pct, desc="File " + str(state['index']+1) + "/" + str(state['total']) + ": " + msg) | |
set_progress(0, "Starting...") | |
# Save file and create a new directory for result | |
file_info = state['files'][state['index']] | |
file_name = file_info[0].split("/")[-1] | |
bytes = file_info[1] | |
valid, file_path, dir_name = save_data(bytes, file_name) | |
print("Directory: ", dir_name) | |
print("Aris input: ", file_path) | |
print(" ") | |
# Check that the file was valid | |
if not valid: | |
return { | |
result_handler: gr.update(), | |
inference_handler: gr.update() | |
} | |
# Send uploaded file to AWS | |
upload_file(file_path, "fishcounting", "webapp_uploads/" + file_name) | |
# Do inference | |
json_result, json_filepath, zip_filepath, video_filepath, marking_filepath = predict_task( | |
file_path, | |
config = state['config'], | |
output_formats = state['outputs'], | |
gradio_progress = set_progress | |
) | |
# Store result for that file | |
result['json_result'].append(json_result) | |
result['aris_input'].append(file_path) | |
result["path_video"].append(video_filepath) | |
result["path_zip"].append(zip_filepath) | |
result["path_json"].append(json_filepath) | |
result["path_marking"].append(marking_filepath) | |
fish_table, fish_info = create_metadata_table(json_result, table_headers, info_headers) | |
result["fish_table"].append(fish_table) | |
result["fish_info"].append(fish_info) | |
# Increase file index | |
state['index'] += 1 | |
# Send of update to result_handler to show new result | |
# Leave inference_handler update blank to avoid starting next inference until result is updated | |
return { | |
result_handler: gr.update(value = str(np.random.rand())), | |
tab_labeler: gr.update(value = str(state['index'])), | |
inference_handler: gr.update() | |
} | |
# Show result | |
def on_result_ready(): | |
# Update result tab for last file | |
i = state["index"] - 1 | |
return update_result(i, state, result, inference_handler) | |
def cancel_inference(): | |
return { | |
master_tabs: gr.update(selected=0), | |
inference_handler: gr.update(visible=False), | |
components['cancelBtn']: gr.update(visible=False), | |
components['skipBtn']: gr.update(visible=False) | |
} | |
# Request loading of animation editor | |
def prepare_annotation(index): | |
state['annotation_index'] = index | |
state['frame_index'] = 0 | |
if result["aris_input"][index]: | |
return { | |
annotation_progress: gr.update(value="<p id='annotation_info' style='display:none'>[]</p><!--" + str(np.random.rand()) + "-->", visible=True), | |
master_tabs: gr.update(selected=2) | |
} | |
return { | |
annotation_progress: gr.update(), | |
master_tabs: gr.update() | |
} | |
annotation_info = None | |
annotation_dataset = None | |
# annotation_progress.change | |
def load_annotation(_, progress=gr.Progress()): | |
global annotation_info, annotation_dataset | |
# Get result index | |
result_index = state['annotation_index'] | |
set_progress = lambda pct, msg: progress(pct, desc=msg) | |
if state['frame_index'] == 0: | |
if set_progress: set_progress(0, "Loading Frames") | |
dataloader, annotation_dataset = create_dataloader_aris(result["aris_input"][result_index], BEAM_WIDTH_DIR, None) | |
# Check that frames remain to be loaded | |
if state['frame_index'] < len(result['json_result'][result_index]['frames']): | |
# load frames and annotation | |
annotation_info, state['frame_index'] = init_frames(annotation_dataset, result['json_result'][result_index], state['frame_index'], gp=set_progress) | |
# save as html element | |
annotation_storage = "<p id='annotation_info' style='display:none'>" + json.dumps(annotation_info) + "</p>" | |
return { | |
annotation_editor: gr.update(), | |
annotation_progress: gr.update(value=annotation_storage) | |
} | |
# If complete, start annotation editor | |
annotation_html = "" | |
# Header | |
annotation_html += "<div id='annotation_header'>" | |
annotation_html += " <h1 id='annotation_frame_nbr'>Frame 0/100</h1>" | |
annotation_html += " <p id='annotation_edited'>(edited)</p>" | |
annotation_html += "</div>" | |
# Annotation Body | |
annotation_html += "<div style='display:flex'>" | |
annotation_html += " <canvas id='canvas' style='width:50%' onmousedown='mouse_down(event)' onmousemove='mouse_move(event)' onmouseup='mouse_up()' onmouseleave='mouse_up()'></canvas>" | |
annotation_html += " <div id='annotation_display' style='width:50%'></div>" | |
annotation_html += "</div>" | |
# Dummy objects | |
annotation_html += "<img id='annotation_img' onload='draw()' style='display:none'></img>" | |
annotation_html += "<!--" + str(np.random.rand()) + "-->" | |
return { | |
annotation_editor: gr.update(value=annotation_html, visible=True), | |
annotation_progress: gr.update(visible=False) | |
} | |
components = {} | |
demo = gr.Blocks() | |
with demo: | |
with gr.Blocks() as inner_body: | |
# Title of page | |
gr.HTML( | |
""" | |
<h1 align="center" style="font-size:xxx-large">Caltech Fisheye</h1> | |
<style> | |
#marking_json thead { | |
display: none !important; | |
} | |
.selected.svelte-kqij2n { | |
background: linear-gradient(180deg, #66eecb47, transparent); | |
} | |
#annotation_frame_nbr { | |
left: calc(50% - 100px); | |
position: absolute; | |
width: 200px; | |
text-align: center; | |
font-size: x-large; | |
} | |
#annotation_header { | |
height: 40px; | |
} | |
#annotation_frame_nbr { | |
left: calc(50% - 100px); | |
position: absolute; | |
width: 200px; | |
text-align: center; | |
font-size: x-large; | |
} | |
#annotation_edited { | |
right: 0px; | |
position: absolute; | |
margin-top: 5px; | |
} | |
</style> | |
<style id="tab_style"></style> | |
""" | |
) | |
with gr.Tabs() as master_tabs: | |
components['master_tabs'] = master_tabs | |
# Master Tab for uploading aris or result files | |
with gr.Tab("Upload", id=0): | |
# Draw Gradio components related to the upload ui | |
Upload_Gradio(components) | |
# Master Tab for result visualization | |
with gr.Tab("Result", id=1): | |
# Define annotation progress bar for event listeres, but unrender since it will be displayed later on | |
annotation_progress = gr.HTML("", visible=False).unrender() | |
components['annotation_progress'] = annotation_progress | |
# Draw the gradio components related to visualzing result | |
vis_components = Result_Gradio(prepare_annotation, components) | |
# Master Tab for annotation editing | |
with gr.Tab("Annotation Editor", id=2): | |
# Draw the annotation loading bar here | |
annotation_progress.render() | |
# Add annotation editor component | |
annotation_editor = gr.HTML("", visible=False) | |
annotation_storage = gr.HTML("", visible=False) | |
# Event listener for opening annotation | |
annotation_progress.change(load_annotation, annotation_progress, [annotation_editor, annotation_progress], _js=""" | |
() => { | |
info_string = document.getElementById("annotation_info").innerHTML; | |
info = JSON.parse(info_string); | |
console.log(info) | |
if (info.length == 0) { | |
window.annotation_info = []; | |
return false; | |
} | |
window.annotation_info = window.annotation_info.concat(info) | |
console.log(window.annotation_info) | |
return true; | |
} | |
""") | |
# Event listener for running javascript defined in 'annotation_editor.js' | |
with open('gradio_scripts/annotation_editor.js', 'r') as f: | |
annotation_editor.change(lambda x: gr.update(), None, annotation_editor, _js=f.read()) | |
# Disclaimer at the bottom of page | |
gr.HTML( | |
""" | |
<p align="center"> | |
<b>Note</b>: The software is provided "as is", without warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose and noninfringement. | |
In no event shall the authors or copyright holders be liable for any claim, damages or other liability, whether in an action of contract, tort or otherwise, arising from, out of or in connection with the software or the use or other dealings in the software. | |
</p> | |
""" | |
) | |
# Extract important components for ease of code | |
input = components['input'] | |
inference_handler = components['inference_handler'] | |
result_handler = components['result_handler'] | |
tab_labeler = components['tab_labeler'] | |
inference_comps = [inference_handler, master_tabs, components['cancelBtn'], components['skipBtn']] | |
# When a file is uploaded to the input, tell the inference_handler to start inference | |
input.upload(on_aris_input, [input] + components['hyperparams'], inference_comps) | |
# When inference handler updates, tell result_handler to show the new result | |
# Also, add inference_handler as the output in order to have it display the progress | |
inference_event = inference_handler.change(infer_next, None, [inference_handler, result_handler, tab_labeler]) | |
# Send UI changes based on the new results to the UI_components, and tell the inference_handler to start next inference | |
result_handler.change(on_result_ready, None, vis_components + [inference_handler]) | |
# Cancel and skip buttons | |
components['cancelBtn'].click(cancel_inference, None, inference_comps, cancels=[inference_event]) | |
components['skipBtn'].click(cancel_inference, None, inference_comps, cancels=[inference_event]) | |
# Button to load a previous result and view visualization | |
components['preview_result_btn'].click( | |
on_result_upload, | |
[components['result_input'], components['result_aris_input']], | |
vis_components + [master_tabs, tab_labeler] | |
) | |
demo.queue().launch() | |
on_result_ready() | |