Spaces:
Runtime error
Runtime error
import gradio as gr | |
from transformers import AutoProcessor, Pix2StructForConditionalGeneration, T5Tokenizer, T5ForConditionalGeneration, Pix2StructProcessor | |
from PIL import Image | |
import torch | |
import warnings | |
import re | |
import json | |
import os | |
import numpy as np | |
import pandas as pd | |
from tqdm import tqdm | |
import argparse | |
from scipy import optimize | |
from typing import Optional | |
import dataclasses | |
import editdistance | |
import itertools | |
import sys | |
import time | |
import logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger() | |
warnings.filterwarnings('ignore') | |
MAX_PATCHES = 512 | |
# Load the models and processor | |
#device = torch.device("cpu") | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# Paths to the models | |
ko_deplot_model_path = './model_epoch_1_210000.bin' | |
aihub_deplot_model_path='./deplot_k.pt' | |
t5_model_path = './ke_t5.pt' | |
# Load first model ko-deplot | |
processor1 = Pix2StructProcessor.from_pretrained('nuua/ko-deplot') | |
model1 = Pix2StructForConditionalGeneration.from_pretrained('nuua/ko-deplot') | |
model1.load_state_dict(torch.load(ko_deplot_model_path, map_location=device)) | |
model1.to(device) | |
# Load second model aihub-deplot | |
processor2 = AutoProcessor.from_pretrained("ybelkada/pix2struct-base") | |
model2 = Pix2StructForConditionalGeneration.from_pretrained("ybelkada/pix2struct-base") | |
model2.load_state_dict(torch.load(aihub_deplot_model_path, map_location=device)) | |
tokenizer = T5Tokenizer.from_pretrained("KETI-AIR/ke-t5-base") | |
t5_model = T5ForConditionalGeneration.from_pretrained("KETI-AIR/ke-t5-base") | |
t5_model.load_state_dict(torch.load(t5_model_path, map_location=device)) | |
model2.to(device) | |
t5_model.to(device) | |
#ko-deplot ์ถ๋ก ํจ์ | |
# Function to format output | |
def format_output(prediction): | |
return prediction.replace('<0x0A>', '\n') | |
# First model prediction ko-deplot | |
def predict_model1(image): | |
images = [image] | |
inputs = processor1(images=images, text="What is the title of the chart", return_tensors="pt", padding=True) | |
inputs = {k: v.to(device) for k, v in inputs.items()} # Move to GPU | |
model1.eval() | |
with torch.no_grad(): | |
predictions = model1.generate(**inputs, max_new_tokens=4096) | |
outputs = [processor1.decode(pred, skip_special_tokens=True) for pred in predictions] | |
formatted_output = format_output(outputs[0]) | |
return formatted_output | |
def replace_unk(text): | |
# 1. '์ ๋ชฉ:', '์ ํ:' ๊ธ์ ์์ ์๋ <unk>๋ \n๋ก ๋ฐ๊ฟ | |
text = re.sub(r'<unk>(?=์ ๋ชฉ:|์ ํ:)', '\n', text) | |
# 2. '์ธ๋ก ' ๋๋ '๊ฐ๋ก '์ '๋ํ' ์ฌ์ด์ ์๋ <unk>๋ฅผ ""๋ก ๋ฐ๊ฟ | |
text = re.sub(r'(?<=์ธ๋ก |๊ฐ๋ก )<unk>(?=๋ํ)', '', text) | |
# 3. ์ซ์์ ํ ์คํธ ์ฌ์ด์ ์๋ <unk>๋ฅผ \n๋ก ๋ฐ๊ฟ | |
text = re.sub(r'(\d)<unk>([^\d])', r'\1\n\2', text) | |
# 4. %, ์, ๊ฑด, ๋ช ๋ค์ ๋์ค๋ <unk>๋ฅผ \n๋ก ๋ฐ๊ฟ | |
text = re.sub(r'(?<=[%์๊ฑด๋ช \)])<unk>', '\n', text) | |
# 5. ์ซ์์ ์ซ์ ์ฌ์ด์ ์๋ <unk>๋ฅผ \n๋ก ๋ฐ๊ฟ | |
text = re.sub(r'(\d)<unk>(\d)', r'\1\n\2', text) | |
# 6. 'ํ'์ด๋ผ๋ ๊ธ์์ ' |' ์ฌ์ด์ ์๋ <unk>๋ฅผ \n๋ก ๋ฐ๊ฟ | |
text = re.sub(r'ํ<unk>(?= \|)', 'ํ\n', text) | |
# 7. ๋๋จธ์ง <unk>๋ฅผ ๋ชจ๋ ""๋ก ๋ฐ๊ฟ | |
text = text.replace('<unk>', '') | |
return text | |
# Second model prediction aihub_deplot | |
def predict_model2(image): | |
image = image.convert("RGB") | |
inputs = processor2(images=image, return_tensors="pt", max_patches=MAX_PATCHES).to(device) | |
flattened_patches = inputs.flattened_patches.to(device) | |
attention_mask = inputs.attention_mask.to(device) | |
model2.eval() | |
t5_model.eval() | |
with torch.no_grad(): | |
deplot_generated_ids = model2.generate(flattened_patches=flattened_patches, attention_mask=attention_mask, max_length=1000) | |
generated_datatable = processor2.batch_decode(deplot_generated_ids, skip_special_tokens=False)[0] | |
generated_datatable = generated_datatable.replace("<pad>", "<unk>").replace("</s>", "<unk>") | |
refined_table = replace_unk(generated_datatable) | |
return refined_table | |
#function for converting aihub dataset labeling json file to ko-deplot data table | |
def process_json_file(input_file): | |
with open(input_file, 'r', encoding='utf-8') as file: | |
data = json.load(file) | |
# ํ์ํ ๋ฐ์ดํฐ ์ถ์ถ | |
chart_type = data['metadata']['chart_sub'] | |
title = data['annotations'][0]['title'] | |
x_axis = data['annotations'][0]['axis_label']['x_axis'] | |
y_axis = data['annotations'][0]['axis_label']['y_axis'] | |
legend = data['annotations'][0]['legend'] | |
data_labels = data['annotations'][0]['data_label'] | |
is_legend = data['annotations'][0]['is_legend'] | |
# ์ํ๋ ํ์์ผ๋ก ๋ณํ | |
formatted_string = f"TITLE | {title} <0x0A> " | |
if '๊ฐ๋ก' in chart_type: | |
if is_legend: | |
# ๊ฐ๋ก ์ฐจํธ ์ฒ๋ฆฌ | |
formatted_string += " | ".join(legend) + " <0x0A> " | |
for i in range(len(y_axis)): | |
row = [y_axis[i]] | |
for j in range(len(legend)): | |
if i < len(data_labels[j]): | |
row.append(str(data_labels[j][i])) # ๋ฐ์ดํฐ ๊ฐ์ ๋ฌธ์์ด๋ก ๋ณํ | |
else: | |
row.append("") # ๋ฐ์ดํฐ๊ฐ ์๋ ๊ฒฝ์ฐ ๋น ๋ฌธ์์ด ์ถ๊ฐ | |
formatted_string += " | ".join(row) + " <0x0A> " | |
else: | |
# is_legend๊ฐ False์ธ ๊ฒฝ์ฐ | |
for i in range(len(y_axis)): | |
row = [y_axis[i], str(data_labels[0][i])] | |
formatted_string += " | ".join(row) + " <0x0A> " | |
elif chart_type == "์ํ": | |
# ์ํ ์ฐจํธ ์ฒ๋ฆฌ | |
if legend: | |
used_labels = legend | |
else: | |
used_labels = x_axis | |
formatted_string += " | ".join(used_labels) + " <0x0A> " | |
row = [data_labels[0][i] for i in range(len(used_labels))] | |
formatted_string += " | ".join(row) + " <0x0A> " | |
elif chart_type == "ํผํฉํ": | |
# ํผํฉํ ์ฐจํธ ์ฒ๋ฆฌ | |
all_legends = [ann['legend'][0] for ann in data['annotations']] | |
formatted_string += " | ".join(all_legends) + " <0x0A> " | |
combined_data = [] | |
for i in range(len(x_axis)): | |
row = [x_axis[i]] | |
for ann in data['annotations']: | |
if i < len(ann['data_label'][0]): | |
row.append(str(ann['data_label'][0][i])) # ๋ฐ์ดํฐ ๊ฐ์ ๋ฌธ์์ด๋ก ๋ณํ | |
else: | |
row.append("") # ๋ฐ์ดํฐ๊ฐ ์๋ ๊ฒฝ์ฐ ๋น ๋ฌธ์์ด ์ถ๊ฐ | |
combined_data.append(" | ".join(row)) | |
formatted_string += " <0x0A> ".join(combined_data) + " <0x0A> " | |
else: | |
# ๊ธฐํ ์ฐจํธ ์ฒ๋ฆฌ | |
if is_legend: | |
formatted_string += " | ".join(legend) + " <0x0A> " | |
for i in range(len(x_axis)): | |
row = [x_axis[i]] | |
for j in range(len(legend)): | |
if i < len(data_labels[j]): | |
row.append(str(data_labels[j][i])) # ๋ฐ์ดํฐ ๊ฐ์ ๋ฌธ์์ด๋ก ๋ณํ | |
else: | |
row.append("") # ๋ฐ์ดํฐ๊ฐ ์๋ ๊ฒฝ์ฐ ๋น ๋ฌธ์์ด ์ถ๊ฐ | |
formatted_string += " | ".join(row) + " <0x0A> " | |
else: | |
for i in range(len(x_axis)): | |
if i < len(data_labels[0]): | |
formatted_string += f"{x_axis[i]} | {str(data_labels[0][i])} <0x0A> " | |
else: | |
formatted_string += f"{x_axis[i]} | <0x0A> " # ๋ฐ์ดํฐ๊ฐ ์๋ ๊ฒฝ์ฐ ๋น ๋ฌธ์์ด ์ถ๊ฐ | |
# ๋ง์ง๋ง "<0x0A> " ์ ๊ฑฐ | |
formatted_string = formatted_string[:-8] | |
return format_output(formatted_string) | |
def chart_data(data): | |
datatable = [] | |
num = len(data) | |
for n in range(num): | |
title = data[n]['title'] if data[n]['is_title'] else '' | |
legend = data[n]['legend'] if data[n]['is_legend'] else '' | |
datalabel = data[n]['data_label'] if data[n]['is_datalabel'] else [0] | |
unit = data[n]['unit'] if data[n]['is_unit'] else '' | |
base = data[n]['base'] if data[n]['is_base'] else '' | |
x_axis_title = data[n]['axis_title']['x_axis'] | |
y_axis_title = data[n]['axis_title']['y_axis'] | |
x_axis = data[n]['axis_label']['x_axis'] if data[n]['is_axis_label_x_axis'] else [0] | |
y_axis = data[n]['axis_label']['y_axis'] if data[n]['is_axis_label_y_axis'] else [0] | |
if len(legend) > 1: | |
datalabel = np.array(datalabel).transpose().tolist() | |
datatable.append([title, legend, datalabel, unit, base, x_axis_title, y_axis_title, x_axis, y_axis]) | |
return datatable | |
def datatable(data, chart_type): | |
data_table = '' | |
num = len(data) | |
if len(data) == 2: | |
temp = [] | |
temp.append(f"๋์: {data[0][4]}") | |
temp.append(f"์ ๋ชฉ: {data[0][0]}") | |
temp.append(f"์ ํ: {' '.join(chart_type[0:2])}") | |
temp.append(f"{data[0][5]} | {data[0][1][0]}({data[0][3]}) | {data[1][1][0]}({data[1][3]})") | |
x_axis = data[0][7] | |
for idx, x in enumerate(x_axis): | |
temp.append(f"{x} | {data[0][2][0][idx]} | {data[1][2][0][idx]}") | |
data_table = '\n'.join(temp) | |
else: | |
for n in range(num): | |
temp = [] | |
title, legend, datalabel, unit, base, x_axis_title, y_axis_title, x_axis, y_axis = data[n] | |
legend = [element + f"({unit})" for element in legend] | |
if len(legend) > 1: | |
temp.append(f"๋์: {base}") | |
temp.append(f"์ ๋ชฉ: {title}") | |
temp.append(f"์ ํ: {' '.join(chart_type[0:2])}") | |
temp.append(f"{x_axis_title} | {' | '.join(legend)}") | |
if chart_type[2] == "์ํ": | |
datalabel = sum(datalabel, []) | |
temp.append(f"{' | '.join([str(d) for d in datalabel])}") | |
data_table = '\n'.join(temp) | |
else: | |
axis = y_axis if chart_type[2] == "๊ฐ๋ก ๋ง๋ํ" else x_axis | |
for idx, (x, d) in enumerate(zip(axis, datalabel)): | |
temp_d = [str(e) for e in d] | |
temp_d = " | ".join(temp_d) | |
row = f"{x} | {temp_d}" | |
temp.append(row) | |
data_table = '\n'.join(temp) | |
else: | |
temp.append(f"๋์: {base}") | |
temp.append(f"์ ๋ชฉ: {title}") | |
temp.append(f"์ ํ: {' '.join(chart_type[0:2])}") | |
temp.append(f"{x_axis_title} | {unit}") | |
axis = y_axis if chart_type[2] == "๊ฐ๋ก ๋ง๋ํ" else x_axis | |
datalabel = datalabel[0] | |
for idx, x in enumerate(axis): | |
row = f"{x} | {str(datalabel[idx])}" | |
temp.append(row) | |
data_table = '\n'.join(temp) | |
return data_table | |
#function for converting aihub dataset labeling json file to aihub-deplot data table | |
def process_json_file2(input_file): | |
with open(input_file, 'r', encoding='utf-8') as file: | |
data = json.load(file) | |
# ํ์ํ ๋ฐ์ดํฐ ์ถ์ถ | |
chart_multi = data['metadata']['chart_multi'] | |
chart_main = data['metadata']['chart_main'] | |
chart_sub = data['metadata']['chart_sub'] | |
chart_type = [chart_multi, chart_sub, chart_main] | |
chart_annotations = data['annotations'] | |
charData = chart_data(chart_annotations) | |
dataTable = datatable(charData, chart_type) | |
return dataTable | |
# RMS | |
def _to_float(text): # ๋จ์ ๋ผ๊ณ ์ซ์๋ง..? | |
try: | |
if text.endswith("%"): | |
# Convert percentages to floats. | |
return float(text.rstrip("%")) / 100.0 | |
else: | |
return float(text) | |
except ValueError: | |
return None | |
def _get_relative_distance( | |
target, prediction, theta = 1.0 | |
): | |
"""Returns min(1, |target-prediction|/|target|).""" | |
if not target: | |
return int(not prediction) | |
distance = min(abs((target - prediction) / target), 1) | |
return distance if distance < theta else 1 | |
def anls_metric(target: str, prediction: str, theta: float = 0.5): | |
edit_distance = editdistance.eval(target, prediction) | |
normalize_ld = edit_distance / max(len(target), len(prediction)) | |
return 1 - normalize_ld if normalize_ld < theta else 0 | |
def _permute(values, indexes): | |
return tuple(values[i] if i < len(values) else "" for i in indexes) | |
class Table: | |
"""Helper class for the content of a markdown table.""" | |
base: Optional[str] = None | |
title: Optional[str] = None | |
chartType: Optional[str] = None | |
headers: tuple[str, Ellipsis] = dataclasses.field(default_factory=tuple) | |
rows: tuple[tuple[str, Ellipsis], Ellipsis] = dataclasses.field(default_factory=tuple) | |
def permuted(self, indexes): | |
"""Builds a version of the table changing the column order.""" | |
return Table( | |
base=self.base, | |
title=self.title, | |
chartType=self.chartType, | |
headers=_permute(self.headers, indexes), | |
rows=tuple(_permute(row, indexes) for row in self.rows), | |
) | |
def aligned( | |
self, headers, text_theta = 0.5 | |
): | |
"""Builds a column permutation with headers in the most correct order.""" | |
if len(headers) != len(self.headers): | |
raise ValueError(f"Header length {headers} must match {self.headers}.") | |
distance = [] | |
for h2 in self.headers: | |
distance.append( | |
[ | |
1 - anls_metric(h1, h2, text_theta) | |
for h1 in headers | |
] | |
) | |
cost_matrix = np.array(distance) | |
row_ind, col_ind = optimize.linear_sum_assignment(cost_matrix) | |
permutation = [idx for _, idx in sorted(zip(col_ind, row_ind))] | |
score = (1 - cost_matrix)[permutation[1:], range(1, len(row_ind))].prod() | |
return self.permuted(permutation), score | |
def _parse_table(text, transposed = False): # ํ ์ ๋ชฉ, ์ด ์ด๋ฆ, ํ ์ฐพ๊ธฐ | |
"""Builds a table from a markdown representation.""" | |
lines = text.lower().splitlines() | |
if not lines: | |
return Table() | |
if lines[0].startswith("๋์: "): | |
base = lines[0][len("๋์: ") :].strip() | |
offset = 1 # | |
else: | |
base = None | |
offset = 0 | |
if lines[1].startswith("์ ๋ชฉ: "): | |
title = lines[1][len("์ ๋ชฉ: ") :].strip() | |
offset = 2 # | |
else: | |
title = None | |
offset = 1 | |
if lines[2].startswith("์ ํ: "): | |
chartType = lines[2][len("์ ํ: ") :].strip() | |
offset = 3 # | |
else: | |
chartType = None | |
if len(lines) < offset + 1: | |
return Table(base=base, title=title, chartType=chartType) | |
rows = [] | |
for line in lines[offset:]: | |
rows.append(tuple(v.strip() for v in line.split(" | "))) | |
if transposed: | |
rows = [tuple(row) for row in itertools.zip_longest(*rows, fillvalue="")] | |
return Table(base=base, title=title, chartType=chartType, headers=rows[0], rows=tuple(rows[1:])) | |
def _get_table_datapoints(table): | |
datapoints = {} | |
if table.base is not None: | |
datapoints["๋์"] = table.base | |
if table.title is not None: | |
datapoints["์ ๋ชฉ"] = table.title | |
if table.chartType is not None: | |
datapoints["์ ํ"] = table.chartType | |
if not table.rows or len(table.headers) <= 1: | |
return datapoints | |
for row in table.rows: | |
for header, cell in zip(table.headers[1:], row[1:]): | |
#print(f"{row[0]} {header} >> {cell}") | |
datapoints[f"{row[0]} {header}"] = cell # | |
return datapoints | |
def _get_datapoint_metric( # | |
target, | |
prediction, | |
text_theta=0.5, | |
number_theta=0.1, | |
): | |
"""Computes a metric that scores how similar two datapoint pairs are.""" | |
key_metric = anls_metric( | |
target[0], prediction[0], text_theta | |
) | |
pred_float = _to_float(prediction[1]) # ์ซ์์ธ์ง ํ์ธ | |
target_float = _to_float(target[1]) | |
if pred_float is not None and target_float: | |
return key_metric * ( | |
1 - _get_relative_distance(target_float, pred_float, number_theta) # ์ซ์๋ฉด ์๋์ ๊ฑฐ๋ฆฌ๊ฐ ๊ณ์ฐ | |
) | |
elif target[1] == prediction[1]: | |
return key_metric | |
else: | |
return key_metric * anls_metric( | |
target[1], prediction[1], text_theta | |
) | |
def _table_datapoints_precision_recall_f1( # ์ฐ ๊ณ์ฐ | |
target_table, | |
prediction_table, | |
text_theta = 0.5, | |
number_theta = 0.1, | |
): | |
"""Calculates matching similarity between two tables as dicts.""" | |
target_datapoints = list(_get_table_datapoints(target_table).items()) | |
prediction_datapoints = list(_get_table_datapoints(prediction_table).items()) | |
if not target_datapoints and not prediction_datapoints: | |
return 1, 1, 1 | |
if not target_datapoints: | |
return 0, 1, 0 | |
if not prediction_datapoints: | |
return 1, 0, 0 | |
distance = [] | |
for t, _ in target_datapoints: | |
distance.append( | |
[ | |
1 - anls_metric(t, p, text_theta) | |
for p, _ in prediction_datapoints | |
] | |
) | |
cost_matrix = np.array(distance) | |
row_ind, col_ind = optimize.linear_sum_assignment(cost_matrix) | |
score = 0 | |
for r, c in zip(row_ind, col_ind): | |
score += _get_datapoint_metric( | |
target_datapoints[r], prediction_datapoints[c], text_theta, number_theta | |
) | |
if score == 0: | |
return 0, 0, 0 | |
precision = score / len(prediction_datapoints) | |
recall = score / len(target_datapoints) | |
return precision, recall, 2 * precision * recall / (precision + recall) | |
def table_datapoints_precision_recall_per_point( # ๊ฐ๊ฐ ๊ณ์ฐ... | |
targets, | |
predictions, | |
text_theta = 0.5, | |
number_theta = 0.1, | |
): | |
"""Computes precisin recall and F1 metrics given two flattened tables. | |
Parses each string into a dictionary of keys and values using row and column | |
headers. Then we match keys between the two dicts as long as their relative | |
levenshtein distance is below a threshold. Values are also compared with | |
ANLS if strings or relative distance if they are numeric. | |
Args: | |
targets: list of list of strings. | |
predictions: list of strings. | |
text_theta: relative edit distance above this is set to the maximum of 1. | |
number_theta: relative error rate above this is set to the maximum of 1. | |
Returns: | |
Dictionary with per-point precision, recall and F1 | |
""" | |
assert len(targets) == len(predictions) | |
per_point_scores = {"precision": [], "recall": [], "f1": []} | |
for pred, target in zip(predictions, targets): | |
all_metrics = [] | |
for transposed in [True, False]: | |
pred_table = _parse_table(pred, transposed=transposed) | |
target_table = _parse_table(target, transposed=transposed) | |
all_metrics.extend([_table_datapoints_precision_recall_f1(target_table, pred_table, text_theta, number_theta)]) | |
p, r, f = max(all_metrics, key=lambda x: x[-1]) | |
per_point_scores["precision"].append(p) | |
per_point_scores["recall"].append(r) | |
per_point_scores["f1"].append(f) | |
return per_point_scores | |
def table_datapoints_precision_recall( # deplot ์ฑ๋ฅ์งํ | |
targets, | |
predictions, | |
text_theta = 0.5, | |
number_theta = 0.1, | |
): | |
"""Aggregated version of table_datapoints_precision_recall_per_point(). | |
Same as table_datapoints_precision_recall_per_point() but returning aggregated | |
scores instead of per-point scores. | |
Args: | |
targets: list of list of strings. | |
predictions: list of strings. | |
text_theta: relative edit distance above this is set to the maximum of 1. | |
number_theta: relative error rate above this is set to the maximum of 1. | |
Returns: | |
Dictionary with aggregated precision, recall and F1 | |
""" | |
score_dict = table_datapoints_precision_recall_per_point( | |
targets, predictions, text_theta, number_theta | |
) | |
return { | |
"table_datapoints_precision": ( | |
sum(score_dict["precision"]) / len(targets) | |
), | |
"table_datapoints_recall": ( | |
sum(score_dict["recall"]) / len(targets) | |
), | |
"table_datapoints_f1": sum(score_dict["f1"]) / len(targets), | |
} | |
def evaluate_rms(generated_table,label_table): | |
predictions=[generated_table] | |
targets=[label_table] | |
RMS = table_datapoints_precision_recall(targets, predictions) | |
return RMS | |
def is_float(s): | |
try: | |
float(s) | |
return True | |
except ValueError: | |
return False | |
def ko_deplot_convert_to_dataframe(table_str): | |
lines = table_str.strip().split("\n") | |
title=lines[0].split(" | ")[1] | |
if(len(lines[1].split(" | "))==len(lines[2].split(" | "))): | |
headers=["0","1"] | |
if(is_float(lines[1].split(" | ")[1]) or lines[1].split(" | ")[0]==""): | |
data=[line.split(" | ") for line in lines[1:]] | |
df=pd.DataFrame(data,columns=headers) | |
return df | |
else: | |
category=lines[1].split(" | ") | |
value=lines[2].split(" | ") | |
df=pd.DataFrame({"๋ฒ๋ก":category,"๊ฐ":value}) | |
return df | |
else: | |
headers=[] | |
data=[] | |
for i in range(len(lines[2].split(" | "))): | |
headers.append(f"{i}") | |
line1=lines[1].split(" | ") | |
line1.insert(0," ") | |
data.append(line1) | |
for line in lines[2:]: | |
data.append(line.split(" | ")) | |
df = pd.DataFrame(data, columns=headers) | |
return df | |
def aihub_deplot_convert_to_dataframe(table_str): | |
lines = table_str.strip().split("\n") | |
headers = [] | |
if(len(lines[3].split(" | "))>len(lines[4].split(" | "))): | |
category=lines[3].split(" | ") | |
del category[0] | |
value=lines[4].split(" | ") | |
df=pd.DataFrame({"๋ฒ๋ก":category,"๊ฐ":value}) | |
return df | |
else: | |
for i in range(len(lines[3].split(" | "))): | |
headers.append(f"{i}") | |
data = [line.split(" | ") for line in lines[3:]] | |
df = pd.DataFrame(data, columns=headers) | |
return df | |
class Highlighter: | |
def __init__(self): | |
self.row = 0 | |
self.col = 0 | |
def compare_and_highlight(self, pred_table_elem, target_table, pred_table_row, props=''): | |
if self.row >= pred_table_row: | |
self.col += 1 | |
self.row = 0 | |
if pred_table_elem != target_table.iloc[self.row, self.col]: | |
self.row += 1 | |
return props | |
else: | |
self.row += 1 | |
return None | |
# 1. ๋ฐ์ดํฐ ๋ก๋ | |
aihub_deplot_result_df = pd.read_csv('./aihub_deplot_result.csv') | |
ko_deplot_result= './ko_deplot_result.json' | |
# 2. ์ฒดํฌํด์ผ ํ๋ ์ด๋ฏธ์ง ํ์ผ ๋ก๋ | |
def load_image_checklist(file): | |
with open(file, 'r') as f: | |
#image_names = [f'"{line.strip()}"' for line in f] | |
image_names = f.read().splitlines() | |
return image_names | |
# 3. ํ์ฌ ์ธ๋ฑ์ค๋ฅผ ์ถ์ ํ๊ธฐ ์ํ ๋ณ์ | |
current_index = 0 | |
image_names = [] | |
def show_image(current_idx): | |
image_name=image_names[current_idx] | |
image_path = f"./images/{image_name}.jpg" | |
if not os.path.exists(image_path): | |
raise FileNotFoundError(f"Image file not found: {image_path}") | |
return Image.open(image_path) | |
# 4. ๋ฒํผ ํด๋ฆญ ์ด๋ฒคํธ ํธ๋ค๋ฌ | |
def non_real_time_check(file): | |
highlighter1 = Highlighter() | |
highlighter2 = Highlighter() | |
#global image_names, current_index | |
#image_names = load_image_checklist(file) | |
#current_index = 0 | |
#image=show_image(current_index) | |
file_name =image_names[current_index].replace("Source","Label") | |
json_path="./ko_deplot_labeling_data.json" | |
with open(json_path, 'r', encoding='utf-8') as file: | |
json_data = json.load(file) | |
for key, value in json_data.items(): | |
if key == file_name: | |
ko_deplot_labeling_str=value.get("txt").replace("<0x0A>","\n") | |
ko_deplot_label_title=ko_deplot_labeling_str.split(" \n ")[0].replace("TITLE | ","์ ๋ชฉ:") | |
break | |
ko_deplot_rms_path="./ko_deplot_rms.txt" | |
with open(ko_deplot_rms_path,'r',encoding='utf-8') as file: | |
lines=file.readlines() | |
flag=0 | |
for line in lines: | |
parts=line.strip().split(", ") | |
if(len(parts)==2 and parts[0]==image_names[current_index]): | |
ko_deplot_rms=parts[1] | |
flag=1 | |
break | |
if(flag==0): | |
ko_deplot_rms="none" | |
ko_deplot_generated_title,ko_deplot_generated_table=ko_deplot_display_results(current_index) | |
aihub_deplot_generated_table,aihub_deplot_label_table,aihub_deplot_generated_title,aihub_deplot_label_title=aihub_deplot_display_results(current_index) | |
#ko_deplot_RMS=evaluate_rms(ko_deplot_generated_table,ko_deplot_labeling_str) | |
aihub_deplot_RMS=evaluate_rms(aihub_deplot_generated_table,aihub_deplot_label_table) | |
if flag == 1: | |
value = [round(float(ko_deplot_rms), 1)] | |
else: | |
value = [0] | |
ko_deplot_score_table = pd.DataFrame({ | |
'category': ['f1'], | |
'value': value | |
}) | |
aihub_deplot_score_table=pd.DataFrame({ | |
'category': ['precision', 'recall', 'f1'], | |
'value': [ | |
round(aihub_deplot_RMS['table_datapoints_precision'],1), | |
round(aihub_deplot_RMS['table_datapoints_recall'],1), | |
round(aihub_deplot_RMS['table_datapoints_f1'],1) | |
] | |
}) | |
ko_deplot_generated_df=ko_deplot_convert_to_dataframe(ko_deplot_generated_table) | |
aihub_deplot_generated_df=aihub_deplot_convert_to_dataframe(aihub_deplot_generated_table) | |
ko_deplot_labeling_df=ko_deplot_convert_to_dataframe(ko_deplot_labeling_str) | |
aihub_deplot_labeling_df=aihub_deplot_convert_to_dataframe(aihub_deplot_label_table) | |
ko_deplot_generated_df_row=ko_deplot_generated_df.shape[0] | |
aihub_deplot_generated_df_row=aihub_deplot_generated_df.shape[0] | |
styled_ko_deplot_table=ko_deplot_generated_df.style.applymap(highlighter1.compare_and_highlight,target_table=ko_deplot_labeling_df,pred_table_row=ko_deplot_generated_df_row,props='color:red') | |
styled_aihub_deplot_table=aihub_deplot_generated_df.style.applymap(highlighter2.compare_and_highlight,target_table=aihub_deplot_labeling_df,pred_table_row=aihub_deplot_generated_df_row,props='color:red') | |
#return ko_deplot_convert_to_dataframe(ko_deplot_generated_table), aihub_deplot_convert_to_dataframe(aihub_deplot_generated_table), aihub_deplot_convert_to_dataframe(label_table), ko_deplot_score_table, aihub_deplot_score_table | |
return gr.DataFrame(styled_ko_deplot_table,label=ko_deplot_generated_title+"(ko deplot ์ถ๋ก ๊ฒฐ๊ณผ)"),gr.DataFrame(styled_aihub_deplot_table,label=aihub_deplot_generated_title+"(aihub deplot ์ถ๋ก ๊ฒฐ๊ณผ)"),gr.DataFrame(ko_deplot_labeling_df,label=ko_deplot_label_title+"(ko deplot ์ ๋ต ํ ์ด๋ธ)"), gr.DataFrame(aihub_deplot_labeling_df,label=aihub_deplot_label_title+"(aihub deplot ์ ๋ต ํ ์ด๋ธ)"),ko_deplot_score_table, aihub_deplot_score_table | |
def ko_deplot_display_results(index): | |
filename=image_names[index]+".jpg" | |
with open(ko_deplot_result, 'r', encoding='utf-8') as f: | |
data = json.load(f) | |
for entry in data: | |
if entry['filename'].endswith(filename): | |
#return entry['table'] | |
parts=entry['table'].split(" \n ",1) | |
return parts[0].replace("TITLE | ","์ ๋ชฉ:"),entry['table'] | |
def aihub_deplot_display_results(index): | |
if index < 0 or index >= len(image_names): | |
return "Index out of range", None, None | |
image_name = image_names[index] | |
image_row = aihub_deplot_result_df[aihub_deplot_result_df['data_id'] == image_name] | |
if not image_row.empty: | |
generated_table = image_row['generated_table'].values[0] | |
generated_title=generated_table.split("\n")[1] | |
label_table = image_row['label_table'].values[0] | |
label_title=label_table.split("\n")[1] | |
return generated_table, label_table, generated_title, label_title | |
else: | |
return "No results found for the image", None, None | |
def previous_image(): | |
global current_index | |
if current_index>0: | |
current_index-=1 | |
image=show_image(current_index) | |
return image, image_names[current_index],gr.update(interactive=current_index>0), gr.update(interactive=current_index<len(image_names)-1) | |
def next_image(): | |
global current_index | |
if current_index<len(image_names)-1: | |
current_index+=1 | |
image=show_image(current_index) | |
return image, image_names[current_index],gr.update(interactive=current_index>0), gr.update(interactive=current_index<len(image_names)-1) | |
def real_time_check(image_file): | |
highlighter1 = Highlighter() | |
highlighter2 = Highlighter() | |
image = Image.open(image_file) | |
result_model1 = predict_model1(image) | |
ko_deplot_generated_title=result_model1.split("\n")[0].split(" | ")[1] | |
ko_deplot_table=ko_deplot_convert_to_dataframe(result_model1) | |
result_model2 = predict_model2(image) | |
aihub_deplot_generated_title=result_model2.split("\n")[1].split(":")[1] | |
aihub_deplot_table=aihub_deplot_convert_to_dataframe(result_model2) | |
image_base_name = os.path.basename(image_file.name).replace("Source","Label") | |
file_name, _ = os.path.splitext(image_base_name) | |
aihub_labeling_data_json="./labeling_data/"+file_name+".json" | |
#aihub_labeling_data_json="./labeling_data/line_graph.json" | |
ko_deplot_labeling_str=process_json_file(aihub_labeling_data_json) | |
ko_deplot_label_title=ko_deplot_labeling_str.split("\n")[0].split(" | ")[1] | |
ko_deplot_label_table=ko_deplot_convert_to_dataframe(ko_deplot_labeling_str) | |
aihub_deplot_labeling_str=process_json_file2(aihub_labeling_data_json) | |
aihub_deplot_label_title=aihub_deplot_labeling_str.split("\n")[1].split(":")[1] | |
aihub_deplot_label_table=aihub_deplot_convert_to_dataframe(aihub_deplot_labeling_str) | |
ko_deplot_RMS=evaluate_rms(result_model1,ko_deplot_labeling_str) | |
aihub_deplot_RMS=evaluate_rms(result_model2,aihub_deplot_labeling_str) | |
ko_deplot_score_table=pd.DataFrame({ | |
'category': ['precision', 'recall', 'f1'], | |
'value': [ | |
round(ko_deplot_RMS['table_datapoints_precision'],1), | |
round(ko_deplot_RMS['table_datapoints_recall'],1), | |
round(ko_deplot_RMS['table_datapoints_f1'],1) | |
] | |
}) | |
aihub_deplot_score_table=pd.DataFrame({ | |
'category': ['precision', 'recall', 'f1'], | |
'value': [ | |
round(aihub_deplot_RMS['table_datapoints_precision'],1), | |
round(aihub_deplot_RMS['table_datapoints_recall'],1), | |
round(aihub_deplot_RMS['table_datapoints_f1'],1) | |
] | |
}) | |
ko_deplot_generated_df_row=ko_deplot_table.shape[0] | |
aihub_deplot_generated_df_row=aihub_deplot_table.shape[0] | |
styled_ko_deplot_table=ko_deplot_table.style.applymap(highlighter1.compare_and_highlight,target_table=ko_deplot_label_table,pred_table_row=ko_deplot_generated_df_row,props='color:red') | |
styled_aihub_deplot_table=aihub_deplot_table.style.applymap(highlighter2.compare_and_highlight,target_table=aihub_deplot_label_table,pred_table_row=aihub_deplot_generated_df_row,props='color:red') | |
return gr.DataFrame(styled_ko_deplot_table,label=ko_deplot_generated_title+"(kodeplot ์ถ๋ก ๊ฒฐ๊ณผ)") , gr.DataFrame(styled_aihub_deplot_table,label=aihub_deplot_generated_title+"(aihub deplot ์ถ๋ก ๊ฒฐ๊ณผ)"),gr.DataFrame(ko_deplot_label_table,label=ko_deplot_label_title+"(kodeplot ์ ๋ต ํ ์ด๋ธ)"),gr.DataFrame(aihub_deplot_label_table,label=aihub_deplot_label_title+"(aihub deplot ์ ๋ต ํ ์ด๋ธ)"),ko_deplot_score_table, aihub_deplot_score_table | |
#return ko_deplot_table,aihub_deplot_table,aihub_deplot_label_table,ko_deplot_score_table,aihub_deplot_score_table | |
def inference(mode,image_uploader,file_uploader): | |
if(mode=="์ด๋ฏธ์ง ์ ๋ก๋"): | |
ko_deplot_table, aihub_deplot_table, ko_deplot_label_table,aihub_deplot_label_table,ko_deplot_score_table, aihub_deplot_score_table = real_time_check(image_uploader) | |
return ko_deplot_table, aihub_deplot_table, ko_deplot_label_table, aihub_deplot_label_table,ko_deplot_score_table, aihub_deplot_score_table | |
else: | |
styled_ko_deplot_table, styled_aihub_deplot_table, ko_deplot_label_table, aihub_deplot_label_table,ko_deplot_score_table, aihub_deplot_score_table =non_real_time_check(file_uploader) | |
return styled_ko_deplot_table, styled_aihub_deplot_table, ko_deplot_label_table,aihub_deplot_label_table,ko_deplot_score_table, aihub_deplot_score_table | |
def interface_selector(selector): | |
if selector == "์ด๋ฏธ์ง ์ ๋ก๋": | |
return gr.update(visible=True),gr.update(visible=False),gr.State("image_upload"),gr.update(visible=False),gr.update(visible=False) | |
elif selector == "ํ์ผ ์ ๋ก๋": | |
return gr.update(visible=False),gr.update(visible=True),gr.State("file_upload"), gr.update(visible=True),gr.update(visible=True) | |
def file_selector(selector): | |
if selector == "low score ์ฐจํธ": | |
return gr.File("./bottom_20_percent_images.txt") | |
elif selector == "high score ์ฐจํธ": | |
return gr.File("./top_20_percent_images.txt") | |
def update_results(model_type): | |
if "ko_deplot" == model_type: | |
return gr.update(visible=True),gr.update(visible=True),gr.update(visible=False),gr.update(visible=False),gr.update(visible=True),gr.update(visible=False) | |
elif "aihub_deplot" == model_type: | |
return gr.update(visible=False),gr.update(visible=False),gr.update(visible=True),gr.update(visible=True),gr.update(visible=False),gr.update(visible=True) | |
else: | |
return gr.update(visible=True), gr.update(visible=True),gr.update(visible=True),gr.update(visible=True),gr.update(visible=True),gr.update(visible=True) | |
def display_image(image_file): | |
image=Image.open(image_file) | |
return image, os.path.basename(image_file) | |
def display_image_in_file(image_checklist): | |
global image_names, current_index | |
image_names = load_image_checklist(image_checklist) | |
image=show_image(current_index) | |
return image,image_names[current_index] | |
def update_file_based_on_chart_type(chart_type, all_file_path): | |
with open(all_file_path, 'r', encoding='utf-8') as file: | |
lines = file.readlines() | |
filtered_lines=[] | |
if chart_type == "์ ์ฒด": | |
filtered_lines = lines | |
elif chart_type == "์ผ๋ฐ ๊ฐ๋ก ๋ง๋ํ": | |
filtered_lines = [line for line in lines if "_horizontal bar_standard" in line] | |
elif chart_type=="๋์ ๊ฐ๋ก ๋ง๋ํ": | |
filtered_lines = [line for line in lines if "_horizontal bar_accumulation" in line] | |
elif chart_type=="100% ๊ธฐ์ค ๋์ ๊ฐ๋ก ๋ง๋ํ": | |
filtered_lines = [line for line in lines if "_horizontal bar_100per accumulation" in line] | |
elif chart_type=="์ผ๋ฐ ์ธ๋ก ๋ง๋ํ": | |
filtered_lines = [line for line in lines if "_vertical bar_standard" in line] | |
elif chart_type=="๋์ ์ธ๋ก ๋ง๋ํ": | |
filtered_lines = [line for line in lines if "_vertical bar_accumulation" in line] | |
elif chart_type=="100% ๊ธฐ์ค ๋์ ์ธ๋ก ๋ง๋ํ": | |
filtered_lines = [line for line in lines if "_vertical bar_100per accumulation" in line] | |
elif chart_type=="์ ํ": | |
filtered_lines = [line for line in lines if "_line_standard" in line] | |
elif chart_type=="์ํ": | |
filtered_lines = [line for line in lines if "_pie_standard" in line] | |
elif chart_type=="๊ธฐํ ๋ฐฉ์ฌํ": | |
filtered_lines = [line for line in lines if "_etc_radial" in line] | |
elif chart_type=="๊ธฐํ ํผํฉํ": | |
filtered_lines = [line for line in lines if "_etc_mix" in line] | |
# ์๋ก์ด ํ์ผ์ ๊ธฐ๋ก | |
new_file_path = "./filtered_chart_images.txt" | |
with open(new_file_path, 'w', encoding='utf-8') as file: | |
file.writelines(filtered_lines) | |
return new_file_path | |
def handle_chart_type_change(chart_type,all_file_path): | |
new_file_path = update_file_based_on_chart_type(chart_type, all_file_path) | |
global image_names, current_index | |
image_names = load_image_checklist(new_file_path) | |
current_index=0 | |
image=show_image(current_index) | |
return image,image_names[current_index] | |
with gr.Blocks() as iface: | |
mode=gr.State("image_upload") | |
with gr.Row(): | |
with gr.Column(): | |
#mode_label=gr.Text("์ด๋ฏธ์ง ์ ๋ก๋๊ฐ ์ ํ๋์์ต๋๋ค.") | |
upload_option = gr.Radio(choices=["์ด๋ฏธ์ง ์ ๋ก๋", "ํ์ผ ์ ๋ก๋"], value="์ด๋ฏธ์ง ์ ๋ก๋", label="์ ๋ก๋ ์ต์ ") | |
#with gr.Row(): | |
#image_button = gr.Button("์ด๋ฏธ์ง ์ ๋ก๋") | |
#file_button = gr.Button("ํ์ผ ์ ๋ก๋") | |
# ์ด๋ฏธ์ง์ ํ์ผ ์ ๋ก๋ ์ปดํฌ๋ํธ (์ด๊ธฐ์๋ ์จ๊น ์ํ) | |
# global image_uploader,file_uploader | |
image_uploader= gr.File(file_count="single",file_types=["image"],visible=True) | |
file_uploader= gr.File(file_count="single", file_types=[".txt"], visible=False) | |
file_upload_option=gr.Radio(choices=["low score ์ฐจํธ","high score ์ฐจํธ"],label="ํ์ผ ์ ๋ก๋ ์ต์ ",visible=False) | |
chart_type = gr.Dropdown(["์ผ๋ฐ ๊ฐ๋ก ๋ง๋ํ","๋์ ๊ฐ๋ก ๋ง๋ํ","100% ๊ธฐ์ค ๋์ ๊ฐ๋ก ๋ง๋ํ", "์ผ๋ฐ ์ธ๋ก ๋ง๋ํ","๋์ ์ธ๋ก ๋ง๋ํ","100% ๊ธฐ์ค ๋์ ์ธ๋ก ๋ง๋ํ","์ ํ", "์ํ", "๊ธฐํ ๋ฐฉ์ฌํ", "๊ธฐํ ํผํฉํ", "์ ์ฒด"], label="Chart Type", value="all") | |
model_type=gr.Dropdown(["ko_deplot","aihub_deplot","all"],label="model") | |
image_displayer=gr.Image(visible=True) | |
with gr.Row(): | |
pre_button=gr.Button("์ด์ ",interactive="False") | |
next_button=gr.Button("๋ค์") | |
image_name=gr.Text("์ด๋ฏธ์ง ์ด๋ฆ",visible=False) | |
#image_button.click(interface_selector, inputs=gr.State("์ด๋ฏธ์ง ์ ๋ก๋"), outputs=[image_uploader,file_uploader,mode,mode_label,image_name]) | |
#file_button.click(interface_selector, inputs=gr.State("ํ์ผ ์ ๋ก๋"), outputs=[image_uploader, file_uploader,mode,mode_label,image_name]) | |
inference_button=gr.Button("์ถ๋ก ") | |
with gr.Column(): | |
ko_deplot_generated_table=gr.DataFrame(visible=False,label="ko-deplot ์ถ๋ก ๊ฒฐ๊ณผ") | |
aihub_deplot_generated_table=gr.DataFrame(visible=False,label="aihub-deplot ์ถ๋ก ๊ฒฐ๊ณผ") | |
with gr.Column(): | |
ko_deplot_label_table=gr.DataFrame(visible=False,label="ko-deplot ์ ๋ตํ ์ด๋ธ") | |
aihub_deplot_label_table=gr.DataFrame(visible=False,label="aihub-deplot ์ ๋ตํ ์ด๋ธ") | |
with gr.Column(): | |
ko_deplot_score_table=gr.DataFrame(visible=False,label="ko_deplot ์ ์") | |
aihub_deplot_score_table=gr.DataFrame(visible=False,label="aihub_deplot ์ ์") | |
model_type.change( | |
update_results, | |
inputs=[model_type], | |
outputs=[ko_deplot_generated_table,ko_deplot_score_table,aihub_deplot_generated_table,aihub_deplot_score_table,ko_deplot_label_table,aihub_deplot_label_table] | |
) | |
upload_option.change( | |
interface_selector, | |
inputs=[upload_option], | |
outputs=[image_uploader, file_uploader, mode, image_name,file_upload_option] | |
) | |
file_upload_option.change( | |
file_selector, | |
inputs=[file_upload_option], | |
outputs=[file_uploader] | |
) | |
chart_type.change(handle_chart_type_change, inputs=[chart_type,file_uploader],outputs=[image_displayer,image_name]) | |
image_uploader.upload(display_image,inputs=[image_uploader],outputs=[image_displayer,image_name]) | |
file_uploader.change(display_image_in_file,inputs=[file_uploader],outputs=[image_displayer,image_name]) | |
pre_button.click(previous_image, outputs=[image_displayer,image_name,pre_button,next_button]) | |
next_button.click(next_image, outputs=[image_displayer,image_name,pre_button,next_button]) | |
inference_button.click(inference,inputs=[upload_option,image_uploader,file_uploader],outputs=[ko_deplot_generated_table, aihub_deplot_generated_table, ko_deplot_label_table, aihub_deplot_label_table,ko_deplot_score_table, aihub_deplot_score_table]) | |
if __name__ == "__main__": | |
print("Launching Gradio interface...") | |
sys.stdout.flush() # stdout ๋ฒํผ๋ฅผ ๋น์๋๋ค. | |
iface.launch(share=True) | |
time.sleep(2) # Gradio URL์ด ์ถ๋ ฅ๋ ๋๊น์ง ์ ์ ๊ธฐ๋ค๋ฆฝ๋๋ค. | |
sys.stdout.flush() # ๋ค์ stdout ๋ฒํผ๋ฅผ ๋น์๋๋ค. | |
# Gradio๊ฐ ์ ๊ณตํ๋ URLs์ ํ์ผ์ ๊ธฐ๋กํฉ๋๋ค. | |
with open("gradio_url.log", "w") as f: | |
print(iface.local_url, file=f) | |
print(iface.share_url, file=f) | |