import gradio as gr from uploader import save_data_to_dir, create_data_dir, save_data from main import predict_task from gradio_scripts.state_handler import reset_state import numpy as np from gradio_scripts.aws_handler import upload_file from aris import create_metadata_table from gradio_scripts.annotation_handler import init_frames import json from zipfile import ZipFile import os from gradio_scripts.upload_ui import Upload_Gradio, models from gradio_scripts.result_ui import Result_Gradio, update_result, table_headers, info_headers, js_update_tab_labels from dataloader import create_dataloader_aris from aris import BEAM_WIDTH_DIR from InferenceConfig import InferenceConfig WEBAPP_VERSION = "1.0" #Initialize State & Result state = { 'files': [], 'index': 1, 'total': 1, 'annotation_index': -1, 'frame_index': 0, 'outputs': [], 'config': None, } result = {} # Called when an Aris file is uploaded for inference def on_aris_input(file_list, model_id, conf_thresh, iou_thresh, min_hits, max_age, associative_tracker, boost_power, boost_decay, byte_low_conf, byte_high_conf, min_length, min_travel, output_formats): print(output_formats) # Reset Result reset_state(result, state) state['files'] = file_list state['total'] = len(file_list) state['version'] = WEBAPP_VERSION state['outputs'] = output_formats state['config'] = InferenceConfig( weights = models[model_id] if model_id in models else models['master'], conf_thresh = conf_thresh, nms_iou = iou_thresh, min_hits = min_hits, max_age = max_age, min_length = min_length, min_travel = min_travel, ) # Enable tracker if specified if (associative_tracker == "Confidence Boost"): state['config'].enable_conf_boost(boost_power, boost_decay) elif (associative_tracker == "ByteTrack"): state['config'].enable_byte_track(byte_low_conf, byte_high_conf) print(" ") print("Running with:") print(state['config'].to_dict()) print(" ") # Update loading_space to start inference on first file return { inference_handler: gr.update(value = str(np.random.rand()), visible=True), components['cancelBtn']: gr.update(visible=True), components['skipBtn']: gr.update(visible=True), master_tabs: gr.update(selected=1) } # Called when a result zip file is uploaded for result review def on_result_upload(zip_list, aris_list): if (zip_list == None): zip_list = [("static/example/example_result.zip", None)] aris_path = "static/example/input_file.aris" aris_list = [(aris_path, bytearray(open(aris_path, 'rb').read()))] reset_state(result, state) state['version'] = WEBAPP_VERSION component_updates = { master_tabs: gr.update(selected=1), tab_labeler: gr.update(value = len(zip_list)) } for i in range(len(zip_list)): # Create dir to unzip files dir_name = create_data_dir(str(i)) # Check aris input if (aris_list): aris_info = aris_list[i] file_name = aris_info[0].split("/")[-1] bytes = aris_info[1] valid, input_path, dir_name = save_data_to_dir(bytes, file_name, dir_name) else: input_path = None # Unzip result zip_info = zip_list[i] zip_name = zip_info[0] print(zip_name) with ZipFile(zip_name) as zip_file: ZipFile.extractall(zip_file, path=dir_name) unzipped = os.listdir(dir_name) print(unzipped) for file in unzipped: if (file.endswith("_results.mp4")): result["path_video"].append(os.path.join(dir_name, file)) elif (file.endswith("_results.json")): result["path_json"].append(os.path.join(dir_name, file)) elif (file.endswith("_marking.txt")): result["path_marking"].append(os.path.join(dir_name, file)) result["aris_input"].append(input_path) with open(result['path_json'][-1]) as f: json_result = json.load(f) result['json_result'].append(json_result) fish_table, fish_info = create_metadata_table(json_result, table_headers, info_headers) result["fish_table"].append(fish_table) result["fish_info"].append(fish_info) update = update_result(i, state, result, inference_handler) for key in update.keys(): component_updates[key] = update[key] component_updates.pop(inference_handler) return component_updates # Iterative function that performs inference on the next file in line def infer_next(_, progress=gr.Progress()): if state['index'] >= state['total']: return { result_handler: gr.update(), inference_handler: gr.update() } # Correct progress function for batch file input set_progress = lambda pct, msg : progress(pct, desc=msg) if state['total'] > 1: set_progress = lambda pct, msg : progress(pct, desc="File " + str(state['index']+1) + "/" + str(state['total']) + ": " + msg) set_progress(0, "Starting...") # Save file and create a new directory for result file_info = state['files'][state['index']] file_name = file_info[0].split("/")[-1] bytes = file_info[1] valid, file_path, dir_name = save_data(bytes, file_name) print("Directory: ", dir_name) print("Aris input: ", file_path) print(" ") # Check that the file was valid if not valid: return { result_handler: gr.update(), inference_handler: gr.update() } # Send uploaded file to AWS upload_file(file_path, "fishcounting", "webapp_uploads/" + file_name) # Do inference json_result, json_filepath, zip_filepath, video_filepath, marking_filepath = predict_task( file_path, config = state['config'], output_formats = state['outputs'], gradio_progress = set_progress ) # Store result for that file result['json_result'].append(json_result) result['aris_input'].append(file_path) result["path_video"].append(video_filepath) result["path_zip"].append(zip_filepath) result["path_json"].append(json_filepath) result["path_marking"].append(marking_filepath) fish_table, fish_info = create_metadata_table(json_result, table_headers, info_headers) result["fish_table"].append(fish_table) result["fish_info"].append(fish_info) # Increase file index state['index'] += 1 # Send of update to result_handler to show new result # Leave inference_handler update blank to avoid starting next inference until result is updated return { result_handler: gr.update(value = str(np.random.rand())), tab_labeler: gr.update(value = str(state['index'])), inference_handler: gr.update() } # Show result def on_result_ready(): # Update result tab for last file i = state["index"] - 1 return update_result(i, state, result, inference_handler) def cancel_inference(): return { master_tabs: gr.update(selected=0), inference_handler: gr.update(visible=False), components['cancelBtn']: gr.update(visible=False), components['skipBtn']: gr.update(visible=False) } # Request loading of animation editor def prepare_annotation(index): state['annotation_index'] = index state['frame_index'] = 0 if result["aris_input"][index]: return { annotation_progress: gr.update(value="", visible=True), master_tabs: gr.update(selected=2) } return { annotation_progress: gr.update(), master_tabs: gr.update() } annotation_info = None annotation_dataset = None # annotation_progress.change def load_annotation(_, progress=gr.Progress()): global annotation_info, annotation_dataset # Get result index result_index = state['annotation_index'] set_progress = lambda pct, msg: progress(pct, desc=msg) if state['frame_index'] == 0: if set_progress: set_progress(0, "Loading Frames") dataloader, annotation_dataset = create_dataloader_aris(result["aris_input"][result_index], BEAM_WIDTH_DIR, None) # Check that frames remain to be loaded if state['frame_index'] < len(result['json_result'][result_index]['frames']): # load frames and annotation annotation_info, state['frame_index'] = init_frames(annotation_dataset, result['json_result'][result_index], state['frame_index'], gp=set_progress) # save as html element annotation_storage = "" return { annotation_editor: gr.update(), annotation_progress: gr.update(value=annotation_storage) } # If complete, start annotation editor annotation_html = "" # Header annotation_html += "
" annotation_html += "

Frame 0/100

" annotation_html += "

(edited)

" annotation_html += "
" # Annotation Body annotation_html += "
" annotation_html += " " annotation_html += "
" annotation_html += "
" # Dummy objects annotation_html += "" annotation_html += "" return { annotation_editor: gr.update(value=annotation_html, visible=True), annotation_progress: gr.update(visible=False) } components = {} demo = gr.Blocks() with demo: with gr.Blocks() as inner_body: # Title of page gr.HTML( """

Caltech Fisheye

""" ) with gr.Tabs() as master_tabs: components['master_tabs'] = master_tabs # Master Tab for uploading aris or result files with gr.Tab("Upload", id=0): # Draw Gradio components related to the upload ui Upload_Gradio(components) # Master Tab for result visualization with gr.Tab("Result", id=1): # Define annotation progress bar for event listeres, but unrender since it will be displayed later on annotation_progress = gr.HTML("", visible=False).unrender() components['annotation_progress'] = annotation_progress # Draw the gradio components related to visualzing result vis_components = Result_Gradio(prepare_annotation, components) # Master Tab for annotation editing with gr.Tab("Annotation Editor", id=2): # Draw the annotation loading bar here annotation_progress.render() # Add annotation editor component annotation_editor = gr.HTML("", visible=False) annotation_storage = gr.HTML("", visible=False) # Event listener for opening annotation annotation_progress.change(load_annotation, annotation_progress, [annotation_editor, annotation_progress], _js=""" () => { info_string = document.getElementById("annotation_info").innerHTML; info = JSON.parse(info_string); console.log(info) if (info.length == 0) { window.annotation_info = []; return false; } window.annotation_info = window.annotation_info.concat(info) console.log(window.annotation_info) return true; } """) # Event listener for running javascript defined in 'annotation_editor.js' with open('gradio_scripts/annotation_editor.js', 'r') as f: annotation_editor.change(lambda x: gr.update(), None, annotation_editor, _js=f.read()) # Disclaimer at the bottom of page gr.HTML( """

Note: The software is provided "as is", without warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose and noninfringement. In no event shall the authors or copyright holders be liable for any claim, damages or other liability, whether in an action of contract, tort or otherwise, arising from, out of or in connection with the software or the use or other dealings in the software.

""" ) # Extract important components for ease of code input = components['input'] inference_handler = components['inference_handler'] result_handler = components['result_handler'] tab_labeler = components['tab_labeler'] inference_comps = [inference_handler, master_tabs, components['cancelBtn'], components['skipBtn']] # When a file is uploaded to the input, tell the inference_handler to start inference input.upload(on_aris_input, [input] + components['hyperparams'], inference_comps) # When inference handler updates, tell result_handler to show the new result # Also, add inference_handler as the output in order to have it display the progress inference_event = inference_handler.change(infer_next, None, [inference_handler, result_handler, tab_labeler]) # Send UI changes based on the new results to the UI_components, and tell the inference_handler to start next inference result_handler.change(on_result_ready, None, vis_components + [inference_handler]) # Cancel and skip buttons components['cancelBtn'].click(cancel_inference, None, inference_comps, cancels=[inference_event]) components['skipBtn'].click(cancel_inference, None, inference_comps, cancels=[inference_event]) # Button to load a previous result and view visualization components['preview_result_btn'].click( on_result_upload, [components['result_input'], components['result_aris_input']], vis_components + [master_tabs, tab_labeler] ) demo.queue().launch() on_result_ready()