sparrow-ui-itn / views /data_annotation.py
ITNovaML's picture
Update views/data_annotation.py
0d8e290
raw
history blame contribute delete
No virus
29.3 kB
import streamlit as st
from PIL import Image
import streamlit_nested_layout
from streamlit_sparrow_labeling import st_sparrow_labeling
from streamlit_sparrow_labeling import DataProcessor
import json
import math
import os
from natsort import natsorted
from tools import agstyler
from tools.agstyler import PINLEFT
import pandas as pd
from toolbar_main import component_toolbar_main
class DataAnnotation:
class Model:
pageTitle = "Data Annotation"
img_file = None
rects_file = None
labels_file = "docs/labels.json"
groups_file = "docs/groups.json"
assign_labels_text = "Assign Labels"
text_caption_1 = "Check 'Assign Labels' to enable editing of labels and values, move and resize the boxes to annotate the document."
text_caption_2 = "Add annotations by clicking and dragging on the document, when 'Assign Labels' is unchecked."
labels = ["", "invoice_no", "invoice_date", "seller", "client", "seller_tax_id", "client_tax_id", "iban", "item_desc",
"item_qty", "item_net_price", "item_net_worth", "item_vat", "item_gross_worth", "total_net_worth", "total_vat",
"total_gross_worth"]
groups = ["", "items_row1", "items_row2", "items_row3", "items_row4", "items_row5", "items_row6", "items_row7",
"items_row8", "items_row9", "items_row10", "summary"]
selected_field = "Selected Field: "
save_text = "Save"
saved_text = "Saved!"
subheader_1 = "Select"
subheader_2 = "Upload"
annotation_text = "Annotation"
no_annotation_file = "No annotation file selected"
no_annotation_mapping = "Please annotate the document. Uncheck 'Assign Labels' and draw new annotations"
download_text = "Download"
download_hint = "Download the annotated structure in JSON format"
annotation_selection_help = "Select an annotation file to load"
upload_help = "Upload a file to annotate"
upload_button_text = "Upload"
upload_button_text_desc = "Choose a file"
assign_labels_text = "Assign Labels"
assign_labels_help = "Check to enable editing of labels and values"
export_labels_text = "Export Labels"
export_labels_help = "Create key-value pairs for the labels in JSON format"
done_text = "Done"
grouping_id = "ID"
grouping_value = "Value"
completed_text = "Completed"
completed_help = "Check to mark the annotation as completed"
error_text = "Value is too long. Please shorten it."
selection_must_be_continuous = "Please select continuous rows"
def view(self, model, ui_width, device_type, device_width):
with open(model.labels_file, "r") as f:
labels_json = json.load(f)
labels_list = labels_json["labels"]
labels = ['']
for label in labels_list:
labels.append(label['name'])
model.labels = labels
with open(model.groups_file, "r") as f:
groups_json = json.load(f)
groups_list = groups_json["groups"]
groups = ['']
for group in groups_list:
groups.append(group['name'])
model.groups = groups
with st.sidebar:
st.markdown("---")
st.subheader(model.subheader_1)
placeholder_upload = st.empty()
file_names = self.get_existing_file_names('docs/images/')
if 'annotation_index' not in st.session_state:
st.session_state['annotation_index'] = 0
annotation_index = 0
else:
annotation_index = st.session_state['annotation_index']
annotation_selection = placeholder_upload.selectbox(model.annotation_text, file_names,
index=annotation_index,
help=model.annotation_selection_help)
annotation_index = self.get_annotation_index(annotation_selection, file_names)
file_extension = self.get_file_extension(annotation_selection, 'docs/images/')
model.img_file = f"docs/images/{annotation_selection}" + file_extension
model.rects_file = f"docs/json/{annotation_selection}.json"
completed_check = st.empty()
btn = st.button(model.export_labels_text, disabled=True)
if btn:
self.export_labels(model)
st.write(model.done_text)
st.subheader(model.subheader_2)
with st.form("upload-form", clear_on_submit=True):
uploaded_file = st.file_uploader(model.upload_button_text_desc, accept_multiple_files=False,
type=['png', 'jpg', 'jpeg'],
help=model.upload_help, disabled=True)
submitted = st.form_submit_button(model.upload_button_text, disabled=True)
if submitted and uploaded_file is not None:
ret = self.upload_file(uploaded_file)
if ret is not False:
file_names = self.get_existing_file_names('docs/images/')
annotation_index = self.get_annotation_index(annotation_selection, file_names)
annotation_selection = placeholder_upload.selectbox(model.annotation_text, file_names,
index=annotation_index,
help=model.annotation_selection_help)
st.session_state['annotation_index'] = annotation_index
# st.title(model.pageTitle + " - " + annotation_selection)
if model.img_file is None:
st.caption(model.no_annotation_file)
return
saved_state = self.fetch_annotations(model.rects_file)
# annotation file has been changed
if annotation_index != st.session_state['annotation_index']:
annotation_v = saved_state['meta']['version']
if annotation_v == "v0.1":
st.session_state["annotation_done"] = False
else:
st.session_state["annotation_done"] = True
# store the annotation file index
st.session_state['annotation_index'] = annotation_index
# first load
if "annotation_done" not in st.session_state:
annotation_v = saved_state['meta']['version']
if annotation_v == "v0.1":
st.session_state["annotation_done"] = False
else:
st.session_state["annotation_done"] = True
with completed_check:
annotation_done = st.checkbox(model.completed_text, help=model.completed_help, key="annotation_done", disabled=True)
if annotation_done:
saved_state['meta']['version'] = "v1.0"
else:
saved_state['meta']['version'] = "v0.1"
with open(model.rects_file, "w") as f:
json.dump(saved_state, f, indent=2)
st.session_state[model.rects_file] = saved_state
assign_labels = st.checkbox(model.assign_labels_text, True, help=model.assign_labels_help)
mode = "transform" if assign_labels else "rect"
docImg = Image.open(model.img_file)
data_processor = DataProcessor()
with st.container():
doc_height = saved_state['meta']['image_size']['height']
doc_width = saved_state['meta']['image_size']['width']
canvas_width, number_of_columns = self.canvas_available_width(ui_width, doc_width, device_type,
device_width)
if number_of_columns > 1:
col1, col2 = st.columns([number_of_columns, 10 - number_of_columns])
with col1:
result_rects = self.render_doc(model, docImg, saved_state, mode, canvas_width, doc_height, doc_width)
with col2:
tab = st.radio("Select", ["Mapping", "Grouping", "Ordering"], horizontal=True,
label_visibility="collapsed")
if tab == "Mapping":
self.render_form(model, result_rects, data_processor, annotation_selection)
elif tab == "Grouping":
self.group_annotations(model, result_rects)
elif tab == "Ordering":
self.order_annotations(model, model.labels, model.groups, result_rects)
else:
result_rects = self.render_doc(model, docImg, saved_state, mode, canvas_width, doc_height, doc_width)
tab = st.radio("Select", ["Mapping", "Grouping"], horizontal=True, label_visibility="collapsed")
if tab == "Mapping":
self.render_form(model, result_rects, data_processor, annotation_selection)
else:
self.group_annotations(model, result_rects)
def render_doc(self, model, docImg, saved_state, mode, canvas_width, doc_height, doc_width):
with st.container():
height = 1296
width = 864
result_rects = st_sparrow_labeling(
fill_color="rgba(0, 151, 255, 0.3)",
stroke_width=2,
stroke_color="rgba(0, 50, 255, 0.7)",
background_image=docImg,
initial_rects=saved_state,
height=height,
width=width,
drawing_mode=mode,
display_toolbar=True,
update_streamlit=True,
canvas_width=canvas_width,
doc_height=doc_height,
doc_width=doc_width,
image_rescale=True,
key="doc_annotation" + model.img_file
)
st.caption(model.text_caption_1)
st.caption(model.text_caption_2)
return result_rects
def render_form(self, model, result_rects, data_processor, annotation_selection):
with st.container():
if result_rects is not None:
with st.form(key="fields_form"):
toolbar = st.empty()
self.render_form_view(result_rects.rects_data['words'], model.labels, result_rects,
data_processor)
with toolbar:
submit = st.form_submit_button(model.save_text, type="primary", disabled=True)
if submit:
for word in result_rects.rects_data['words']:
if len(word['value']) > 1000:
st.error(model.error_text)
return
with open(model.rects_file, "w") as f:
json.dump(result_rects.rects_data, f, indent=2)
st.session_state[model.rects_file] = result_rects.rects_data
# st.write(model.saved_text)
st.experimental_rerun()
if len(result_rects.rects_data['words']) == 0:
st.caption(model.no_annotation_mapping)
return
else:
with open(model.rects_file, 'rb') as file:
st.download_button(label=model.download_text,
data=file,
file_name=annotation_selection + ".json",
mime='application/json',
help=model.download_hint)
def render_form_view(self, words, labels, result_rects, data_processor):
data = []
for i, rect in enumerate(words):
group, label = rect['label'].split(":", 1) if ":" in rect['label'] else (None, rect['label'])
data.append({'id': i, 'value': rect['value'], 'label': label})
df = pd.DataFrame(data)
formatter = {
'id': ('ID', {**PINLEFT, 'hide': True}),
'value': ('Value', {**PINLEFT, 'editable': True}),
'label': ('Label', {**PINLEFT,
'width': 80,
'editable': True,
'cellEditor': 'agSelectCellEditor',
'cellEditorParams': {
'values': labels
}})
}
go = {
'rowClassRules': {
'row-selected': 'data.id === ' + str(result_rects.current_rect_index)
}
}
green_light = "#abf7b1"
css = {
'.row-selected': {
'background-color': f'{green_light} !important'
}
}
response = agstyler.draw_grid(
df,
formatter=formatter,
fit_columns=True,
grid_options=go,
css=css
)
data = response['data'].values.tolist()
for i, rect in enumerate(words):
value = data[i][1]
label = data[i][2]
data_processor.update_rect_data(result_rects.rects_data, i, value, label)
def canvas_available_width(self, ui_width, doc_width, device_type, device_width):
doc_width_pct = (doc_width * 100) / ui_width
if doc_width_pct < 45:
canvas_width_pct = 37
elif doc_width_pct < 55:
canvas_width_pct = 49
else:
canvas_width_pct = 60
if ui_width > 700 and canvas_width_pct == 37 and device_type == "desktop":
return math.floor(canvas_width_pct * ui_width / 100), 4
elif ui_width > 700 and canvas_width_pct == 49 and device_type == "desktop":
return math.floor(canvas_width_pct * ui_width / 100), 5
elif ui_width > 700 and canvas_width_pct == 60 and device_type == "desktop":
return math.floor(canvas_width_pct * ui_width / 100), 6
else:
if device_type == "desktop":
ui_width = device_width - math.floor((device_width * 22) / 100)
elif device_type == "mobile":
ui_width = device_width - math.floor((device_width * 13) / 100)
return ui_width, 1
def fetch_annotations(self, rects_file):
for key in st.session_state:
if key.startswith("docs/json/") and key != rects_file:
del st.session_state[key]
if rects_file not in st.session_state:
with open(rects_file, "r") as f:
saved_state = json.load(f)
st.session_state[rects_file] = saved_state
else:
saved_state = st.session_state[rects_file]
return saved_state
def upload_file(self, uploaded_file):
if uploaded_file is not None:
if os.path.exists(os.path.join("docs/images/", uploaded_file.name)):
st.write("File already exists")
return False
if len(uploaded_file.name) > 100:
st.write("File name too long")
return False
with open(os.path.join("docs/images/", uploaded_file.name), "wb") as f:
f.write(uploaded_file.getbuffer())
img_file = Image.open(os.path.join("docs/images/", uploaded_file.name))
annotations_json = {
"meta": {
"version": "v0.1",
"split": "train",
"image_id": len(self.get_existing_file_names("docs/images/")),
"image_size": {
"width": img_file.width,
"height": img_file.height
}
},
"words": []
}
file_name = uploaded_file.name.split(".")[0]
with open(os.path.join("docs/json/", file_name + ".json"), "w") as f:
json.dump(annotations_json, f, indent=2)
st.success("File uploaded successfully")
def get_existing_file_names(self, dir_name):
# get ordered list of files without file extension, excluding hidden files
return natsorted([os.path.splitext(f)[0] for f in os.listdir(dir_name) if not f.startswith('.')])
def get_file_extension(self, file_name, dir_name):
# get list of files, excluding hidden files
files = [f for f in os.listdir(dir_name) if not f.startswith('.')]
for f in files:
if file_name is not None and os.path.splitext(f)[0] == file_name:
return os.path.splitext(f)[1]
def get_annotation_index(self, file, files_list):
return files_list.index(file)
def group_annotations(self, model, result_rects):
with st.form(key="grouping_form"):
if result_rects is not None:
words = result_rects.rects_data['words']
data = []
for i, rect in enumerate(words):
data.append({'id': i, 'value': rect['value']})
df = pd.DataFrame(data)
formatter = {
'id': ('ID', {**PINLEFT, 'width': 50}),
'value': ('Value', PINLEFT)
}
toolbar = st.empty()
response = agstyler.draw_grid(
df,
formatter=formatter,
fit_columns=True,
selection='multiple',
use_checkbox='True',
pagination_size=40
)
rows = response['selected_rows']
with toolbar:
submit = st.form_submit_button(model.save_text, type="primary", disabled=True)
if submit and len(rows) > 0:
# check if there are gaps in the selected rows
if len(rows) > 1:
for i in range(len(rows) - 1):
if rows[i]['id'] + 1 != rows[i + 1]['id']:
st.error(model.selection_must_be_continuous)
return
words = result_rects.rects_data['words']
new_words_list = []
coords = []
for row in rows:
word_value = words[row['id']]['value']
rect = words[row['id']]['rect']
coords.append(rect)
new_words_list.append(word_value)
# convert array to string
new_word = " ".join(new_words_list)
# Get min x1 value from coords array
x1_min = min([coord['x1'] for coord in coords])
y1_min = min([coord['y1'] for coord in coords])
x2_max = max([coord['x2'] for coord in coords])
y2_max = max([coord['y2'] for coord in coords])
words[rows[0]['id']]['value'] = new_word
words[rows[0]['id']]['rect'] = {
"x1": x1_min,
"y1": y1_min,
"x2": x2_max,
"y2": y2_max
}
# loop array in reverse order and remove selected entries
i = 0
for row in rows[::-1]:
if i == len(rows) - 1:
break
del words[row['id']]
i += 1
result_rects.rects_data['words'] = words
with open(model.rects_file, "w") as f:
json.dump(result_rects.rects_data, f, indent=2)
st.session_state[model.rects_file] = result_rects.rects_data
st.experimental_rerun()
def order_annotations(self, model, labels, groups, result_rects):
if result_rects is not None:
self.action_event = None
data = []
idx_list = [""]
words = result_rects.rects_data['words']
for i, rect in enumerate(words):
if rect['label'] != "":
# split string into two variables, assign None to first variable if no split is found
group, label = rect['label'].split(":", 1) if ":" in rect['label'] else (None, rect['label'])
data.append({'id': i, 'value': rect['value'], 'label': label, 'group': group})
idx_list.append(i)
df = pd.DataFrame(data)
formatter = {
'id': ('ID', {**PINLEFT, 'width': 50}),
'value': ('Value', {**PINLEFT}),
'label': ('Label', {**PINLEFT,
'width': 80,
'editable': False,
'cellEditor': 'agSelectCellEditor',
'cellEditorParams': {
'values': labels
}}),
'group': ('Group', {**PINLEFT,
'width': 80,
'editable': True,
'cellEditor': 'agSelectCellEditor',
'cellEditorParams': {
'values': groups
}})
}
go = {
'rowClassRules': {
'row-selected': 'data.id === ' + str(result_rects.current_rect_index)
}
}
green_light = "#abf7b1"
css = {
'.row-selected': {
'background-color': f'{green_light} !important'
}
}
idx_option = st.selectbox('Select row to move into', idx_list)
def run_component(props):
value = component_toolbar_main(key='toolbar_main', **props)
return value
def handle_event(value):
if value is not None:
if 'action_timestamp' not in st.session_state:
self.action_event = value['action']
st.session_state['action_timestamp'] = value['timestamp']
else:
if st.session_state['action_timestamp'] != value['timestamp']:
self.action_event = value['action']
st.session_state['action_timestamp'] = value['timestamp']
else:
self.action_event = None
props = {
'buttons': {
'up': {
'disabled': True,
'rendered': ''
},
'down': {
'disabled': True,
'rendered': ''
},
'save': {
'disabled': True,
'rendered': ''
# 'rendered': 'none',
}
}
}
handle_event(run_component(props))
response = agstyler.draw_grid(
df,
formatter=formatter,
fit_columns=True,
grid_options=go,
css=css
)
rows = response['selected_rows']
if len(rows) == 0 and result_rects.current_rect_index > -1:
for i, row in enumerate(data):
if row['id'] == result_rects.current_rect_index:
rows = [
{
'_selectedRowNodeInfo': {
'nodeRowIndex': i
},
'id': row['id']
}
]
break
if str(self.action_event) == 'up':
if len(rows) > 0:
idx = rows[0]['_selectedRowNodeInfo']['nodeRowIndex']
if idx > 0:
row_id = rows[0]['id']
if row_id == idx_option:
return
# swap row upwards in the array
if idx_option == "":
words[row_id], words[row_id - 1] = words[row_id - 1], words[row_id]
else:
for i in range(1000):
words[row_id], words[row_id - 1] = words[row_id - 1], words[row_id]
row_id -= 1
if row_id == idx_option:
break
result_rects.rects_data['words'] = words
with open(model.rects_file, "w") as f:
json.dump(result_rects.rects_data, f, indent=2)
st.session_state[model.rects_file] = result_rects.rects_data
st.experimental_rerun()
elif str(self.action_event) == 'down':
if len(rows) > 0:
idx = rows[0]['_selectedRowNodeInfo']['nodeRowIndex']
if idx < len(df) - 1:
row_id = rows[0]['id']
if row_id == idx_option:
return
# swap row downwards in the array
if idx_option == "":
words[row_id], words[row_id + 1] = words[row_id + 1], words[row_id]
else:
for i in range(1000):
words[row_id], words[row_id + 1] = words[row_id + 1], words[row_id]
row_id += 1
if row_id == idx_option:
break
result_rects.rects_data['words'] = words
with open(model.rects_file, "w") as f:
json.dump(result_rects.rects_data, f, indent=2)
st.session_state[model.rects_file] = result_rects.rects_data
st.experimental_rerun()
elif str(self.action_event) == 'save':
data = response['data'].values.tolist()
for elem in data:
if elem[3] != "None":
idx = elem[0]
group = elem[3]
words[idx]['label'] = f"{group}:{elem[2]}"
result_rects.rects_data['words'] = words
with open(model.rects_file, "w") as f:
json.dump(result_rects.rects_data, f, indent=2)
st.session_state[model.rects_file] = result_rects.rects_data
st.experimental_rerun()
def export_labels(self, model):
path_from = os.path.join("docs/json/")
path_to = os.path.join("docs/json/key/")
files = [f for f in os.listdir(path_from) if not f.startswith('.')]
for file in files:
path = os.path.join(path_from, file)
if os.path.isfile(path):
with open(path, "r") as f:
data = json.load(f)
words = data['words']
keys = {}
row_keys = {}
for word in words:
if word['label'] != '':
if ':' in word['label']:
group, label = word['label'].split(':', 1)
if 'row' not in group:
if group not in keys:
keys[group] = {}
keys[group][label] = word['value']
else:
if "items" not in keys:
keys["items"] = []
if group not in row_keys:
row_keys[group] = {}
row_keys[group][label] = word['value']
else:
keys[word['label']] = word['value']
if row_keys != {}:
for key in row_keys:
keys["items"].append(row_keys[key])
if keys != {}:
path = os.path.join(path_to, file)
with open(path, "w") as f:
json.dump(keys, f, indent=2)