oskarastrom's picture
Bug fixes
d9c7dce
raw
history blame
No virus
16.4 kB
import gradio as gr
from uploader import save_data_to_dir, create_data_dir, save_data
from main import predict_task
from gradio_scripts.state_handler import reset_state
import numpy as np
from gradio_scripts.aws_handler import upload_file
from aris import create_metadata_table
from gradio_scripts.annotation_handler import init_frames
import json
from zipfile import ZipFile
import os
from gradio_scripts.upload_ui import Upload_Gradio, models
from gradio_scripts.result_ui import Result_Gradio, update_result, table_headers, info_headers, js_update_tab_labels
from dataloader import create_dataloader_aris
from aris import BEAM_WIDTH_DIR
from InferenceConfig import InferenceConfig
WEBAPP_VERSION = "1.0"
enable_annotation_editor = False
#Initialize State & Result
state = {
'files': [],
'index': 1,
'total': 1,
'annotation_index': -1,
'frame_index': 0,
'outputs': [],
'config': None,
}
result = {}
# Called when an Aris file is uploaded for inference
def on_aris_input(
file_list,
model_id,
conf_thresh, iou_thresh,
min_hits, max_age,
associative_tracker, boost_power, boost_decay, byte_low_conf, byte_high_conf,
min_length, max_length, min_travel,
output_formats
):
print(output_formats)
# Reset Result
reset_state(result, state)
state['files'] = file_list
state['total'] = len(file_list)
state['version'] = WEBAPP_VERSION
state['outputs'] = output_formats
state['config'] = InferenceConfig(
weights = models[model_id] if model_id in models else models['master'],
conf_thresh = conf_thresh,
nms_iou = iou_thresh,
min_hits = min_hits,
max_age = max_age,
min_length = min_length,
max_length = max_length,
min_travel = min_travel,
)
# Enable tracker if specified
if (associative_tracker == "Confidence Boost"):
state['config'].enable_conf_boost(boost_power, boost_decay)
elif (associative_tracker == "ByteTrack"):
state['config'].enable_byte_track(byte_low_conf, byte_high_conf)
else:
state['config'].enable_sort_track()
print(" ")
print("Inference with:")
print(state['config'].to_dict())
print(" ")
# Update loading_space to start inference on first file
return {
inference_handler: gr.update(value = str(np.random.rand()), visible=True),
components['cancel_btn']: gr.update(visible=True),
master_tabs: gr.update(selected=1)
}
# Called when a result zip file is uploaded for result review
def on_result_upload():
return {
master_tabs: gr.update(selected=1),
result_uploader: gr.update(value=str(np.random.rand()))
}
def on_result_upload_finish(zip_list, aris_list):
if (zip_list == None):
zip_list = [("static/example/example_result.zip", None)]
aris_path = "static/example/input_file.aris"
aris_list = [(aris_path, bytearray(open(aris_path, 'rb').read()))]
reset_state(result, state)
state['version'] = WEBAPP_VERSION
state['outputs'] = ["Annotated Video", "Manual Marking", "PDF"]
component_updates = {
tab_labeler: gr.update(value = len(zip_list))
}
for i in range(len(zip_list)):
# Create dir to unzip files
dir_name = create_data_dir(str(i))
# Check aris input
if (aris_list):
aris_info = aris_list[i]
file_name = aris_info[0].split("/")[-1]
bytes = aris_info[1]
valid, input_path, dir_name = save_data_to_dir(bytes, file_name, dir_name)
else:
input_path = None
# Unzip result
zip_info = zip_list[i]
zip_name = zip_info[0]
print(zip_name)
with ZipFile(zip_name) as zip_file:
ZipFile.extractall(zip_file, path=dir_name)
unzipped = os.listdir(dir_name)
print(unzipped)
for file in unzipped:
if (file.endswith("_results.mp4")):
result["path_video"].append(os.path.join(dir_name, file))
elif (file.endswith("_results.json")):
result["path_json"].append(os.path.join(dir_name, file))
elif (file.endswith("_marking.txt")):
result["path_marking"].append(os.path.join(dir_name, file))
result["aris_input"].append(input_path)
with open(result['path_json'][-1]) as f:
json_result = json.load(f)
result['json_result'].append(json_result)
fish_table, fish_info = create_metadata_table(json_result, table_headers, info_headers)
result["fish_table"].append(fish_table)
result["fish_info"].append(fish_info)
update = update_result(i, state, result, inference_handler)
for key in update.keys():
component_updates[key] = update[key]
component_updates.pop(inference_handler)
return component_updates
# Iterative function that performs inference on the next file in line
def infer_next(_, progress=gr.Progress()):
if state['index'] >= state['total']:
return {
result_handler: gr.update(),
inference_handler: gr.update()
}
# Correct progress function for batch file input
set_progress = lambda pct, msg : progress(pct, desc=msg)
if state['total'] > 1:
set_progress = lambda pct, msg : progress(pct, desc="File " + str(state['index']+1) + "/" + str(state['total']) + ": " + msg)
set_progress(0, "Starting...")
# Save file and create a new directory for result
file_info = state['files'][state['index']]
file_name = file_info[0].split("/")[-1]
bytes = file_info[1]
valid, file_path, dir_name = save_data(bytes, file_name)
print("Directory: ", dir_name)
print("Aris input: ", file_path)
print(" ")
# Check that the file was valid
if not valid:
return {
result_handler: gr.update(),
inference_handler: gr.update()
}
# Send uploaded file to AWS
upload_file(file_path, "fishcounting", "webapp_uploads/" + file_name)
# Do inference
json_result, json_filepath, zip_filepath, video_filepath, marking_filepath = predict_task(
file_path,
config = state['config'],
output_formats = state['outputs'],
gradio_progress = set_progress
)
# Store result for that file
result['json_result'].append(json_result)
result['aris_input'].append(file_path)
result["path_video"].append(video_filepath)
result["path_zip"].append(zip_filepath)
result["path_json"].append(json_filepath)
result["path_marking"].append(marking_filepath)
fish_table, fish_info = create_metadata_table(json_result, table_headers, info_headers)
result["fish_table"].append(fish_table)
result["fish_info"].append(fish_info)
# Increase file index
state['index'] += 1
# Send of update to result_handler to show new result
# Leave inference_handler update blank to avoid starting next inference until result is updated
return {
result_handler: gr.update(value = str(np.random.rand())),
tab_labeler: gr.update(value = str(state['index'])),
inference_handler: gr.update()
}
# Show result
def on_result_ready():
# Update result tab for last file
i = state["index"] - 1
return update_result(i, state, result, inference_handler)
def cancel_inference():
return {
master_tabs: gr.update(selected=0),
inference_handler: gr.update(visible=False),
components['cancel_btn']: gr.update(visible=False)
}
# Request loading of animation editor
def prepare_annotation(index):
state['annotation_index'] = index
state['frame_index'] = 0
if result["aris_input"][index]:
return {
annotation_progress: gr.update(value="<p id='annotation_info' style='display:none'>[]</p><!--" + str(np.random.rand()) + "-->", visible=True),
master_tabs: gr.update(selected=2)
}
return {
annotation_progress: gr.update(),
master_tabs: gr.update()
}
annotation_info = None
annotation_dataset = None
# annotation_progress.change
def load_annotation(_, progress=gr.Progress()):
global annotation_info, annotation_dataset
# Get result index
result_index = state['annotation_index']
set_progress = lambda pct, msg: progress(pct, desc=msg)
if state['frame_index'] == 0:
if set_progress: set_progress(0, "Loading Frames")
dataloader, annotation_dataset = create_dataloader_aris(result["aris_input"][result_index], BEAM_WIDTH_DIR, None)
# Check that frames remain to be loaded
if state['frame_index'] < len(result['json_result'][result_index]['frames']):
# load frames and annotation
annotation_info, state['frame_index'] = init_frames(annotation_dataset, result['json_result'][result_index], state['frame_index'], gp=set_progress)
# save as html element
annotation_content = "<p id='annotation_info' style='display:none'>" + json.dumps(annotation_info) + "</p>"
return {
annotation_editor: gr.update(),
annotation_progress: gr.update(value=annotation_content)
}
# If complete, start annotation editor
annotation_html = ""
# Header
annotation_html += "<div id='annotation_header'>"
annotation_html += " <h1 id='annotation_frame_nbr'>Frame 0/100</h1>"
annotation_html += " <p id='annotation_edited'>(edited)</p>"
annotation_html += "</div>"
# Annotation Body
annotation_html += "<div style='display:flex'>"
annotation_html += " <canvas id='canvas' style='width:50%' onmousedown='mouse_down(event)' onmousemove='mouse_move(event)' onmouseup='mouse_up()' onmouseleave='mouse_up()'></canvas>"
annotation_html += " <div id='annotation_display' style='width:50%'></div>"
annotation_html += "</div>"
# Dummy objects
annotation_html += "<img id='annotation_img' onload='draw()' style='display:none'></img>"
annotation_html += "<!--" + str(np.random.rand()) + "-->"
return {
annotation_editor: gr.update(value=annotation_html, visible=True),
annotation_progress: gr.update(visible=False)
}
components = {}
demo = gr.Blocks()
with demo:
with gr.Blocks() as inner_body:
# Title of page + style
gr.HTML(
"""
<h1 align="center" style="font-size:xxx-large">Caltech Fisheye</h1>
<style>
#marking_json thead {
display: none !important;
}
.selected.svelte-kqij2n {
background: linear-gradient(180deg, #66eecb47, transparent);
}
#annotation_frame_nbr {
left: calc(50% - 100px);
position: absolute;
width: 200px;
text-align: center;
font-size: x-large;
}
#annotation_header {
height: 40px;
}
#annotation_frame_nbr {
left: calc(50% - 100px);
position: absolute;
width: 200px;
text-align: center;
font-size: x-large;
}
#annotation_edited {
right: 0px;
position: absolute;
margin-top: 5px;
}
</style>
<style id="tab_style"></style>
"""
)
with gr.Tabs() as master_tabs:
components['master_tabs'] = master_tabs
# Master Tab for uploading aris or result files
with gr.Tab("Upload", id=0):
# Draw Gradio components related to the upload ui
Upload_Gradio(components)
# Master Tab for result visualization
with gr.Tab("Result", id=1):
# Define annotation progress bar for event listeres, but unrender since it will be displayed later on
result_uploader = gr.HTML("", visible=False)
components['result_uploader'] = result_uploader
annotation_progress = gr.HTML("", visible=False).unrender()
components['annotation_progress'] = annotation_progress
# Draw the gradio components related to visualzing result
vis_components = Result_Gradio(prepare_annotation, components)
# Master Tab for annotation editing
if enable_annotation_editor:
with gr.Tab("Annotation Editor", id=2):
# Draw the annotation loading bar here
annotation_progress.render()
# Add annotation editor component
annotation_editor = gr.HTML("", visible=False)
# Event listener for opening annotation
annotation_progress.change(load_annotation, annotation_progress, [annotation_editor, annotation_progress], _js="""
() => {
info_string = document.getElementById("annotation_info").innerHTML;
info = JSON.parse(info_string);
console.log(info)
if (info.length == 0) {
window.annotation_info = [];
return false;
}
window.annotation_info = window.annotation_info.concat(info)
console.log(window.annotation_info)
return true;
}
""")
# Event listener for running javascript defined in 'annotation_editor.js'
# show_annotation
with open('gradio_scripts/annotation_editor.js', 'r') as f:
annotation_editor.change(lambda x: gr.update(), None, annotation_editor, _js=f.read())
# Disclaimer at the bottom of page
gr.HTML(
"""
<p align="center">
<b>Note</b>: The software is provided "as is", without warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose and noninfringement.
In no event shall the authors or copyright holders be liable for any claim, damages or other liability, whether in an action of contract, tort or otherwise, arising from, out of or in connection with the software or the use or other dealings in the software.
</p>
"""
)
# Extract important components for ease of code
input = components['input']
inference_handler = components['inference_handler']
result_handler = components['result_handler']
tab_labeler = components['tab_labeler']
inference_comps = [inference_handler, master_tabs, components['cancel_btn']]
# When a file is uploaded to the input, tell the inference_handler to start inference
input.upload(on_aris_input, [input] + components['hyperparams'], inference_comps)
# When inference handler updates, tell result_handler to show the new result
# Also, add inference_handler as the output in order to have it display the progress
inference_event = inference_handler.change(infer_next, None, [inference_handler, result_handler, tab_labeler])
# Send UI changes based on the new results to the UI_components, and tell the inference_handler to start next inference
result_handler.change(on_result_ready, None, vis_components + [inference_handler])
# Cancel and skip buttons
components['cancel_btn'].click(cancel_inference, None, inference_comps, cancels=[inference_event])
# Button to load a previous result and view visualization
components['open_result_btn'].click(on_result_upload, None, [result_uploader, master_tabs])
components['result_uploader'].change(
on_result_upload_finish,
[components['result_input'], components['result_aris_input']],
vis_components + [tab_labeler]
)
demo.queue().launch()
on_result_ready()