binery's picture
Update app.py
02acfd5
raw
history blame
14.8 kB
import streamlit as st
from predict import PaddleOCR
from pdf2image import convert_from_bytes
import cv2
import PIL
import numpy as np
import os
import tempfile
import random
import string
from ultralyticsplus import YOLO
import streamlit as st
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import io
import re
from dateutil.parser import parse
import datetime
from file_utils import (
get_img,
save_excel_file,
concat_csv,
convert_pdf_to_image,
filter_color,
plot,
delete_file,
)
from process import (
filter_columns,
extract_text_of_col,
prepare_cols,
process_cols,
finalize_data,
)
table_model = YOLO("table.pt")
column_model = YOLO("columns.pt")
def remove_dots(string):
# Remove dots from the first and last position of the string
string = string.strip('.')
# Remove the first dot from left to right if there are still more than one dots
if string.count('.') > 1:
string = string.replace(".", "", 1)
return string
def convert_df(df):
return df.to_csv(index=False).encode('utf-8')
def PIL_to_cv(pil_img):
return cv2.cvtColor(np.array(pil_img), cv2.COLOR_RGB2BGR)
def cv_to_PIL(cv_img):
return PIL.Image.fromarray(cv2.cvtColor(cv_img, cv2.COLOR_BGR2RGB))
def visualize_ocr(pil_img, ocr_result):
plt.imshow(pil_img, interpolation='lanczos')
plt.gcf().set_size_inches(20, 20)
ax = plt.gca()
for idx, result in enumerate(ocr_result):
bbox = result['bbox']
text = result['text']
rect = patches.Rectangle(bbox[:2], bbox[2]-bbox[0], bbox[3]-bbox[1], linewidth=2, edgecolor='red', facecolor='none', linestyle='-')
ax.add_patch(rect)
ax.text(bbox[0], bbox[1], text, horizontalalignment='left', verticalalignment='bottom', color='blue', fontsize=7)
plt.xticks([], [])
plt.yticks([], [])
plt.gcf().set_size_inches(10, 10)
plt.axis('off')
img_buf = io.BytesIO()
plt.savefig(img_buf, bbox_inches='tight', dpi=150)
plt.close()
return PIL.Image.open(img_buf)
def filter_columns(columns: np.ndarray):
for idx, col in enumerate(columns):
if idx >= len(columns) - 1:
break
nxt = columns[idx + 1]
threshold = ((col[2] - col[0]) + (nxt[2] - nxt[0])) / 2
if (col[2] - columns[idx + 1][0]) > threshold * 0.5:
col[1], col[2], col[3] = min(col[1], nxt[1]), nxt[2], max(col[3], nxt[3])
columns = np.delete(columns, idx + 1, 0)
idx -= 1
return columns
st.title("Extract Data from Bank Statements")
model = PaddleOCR()
uploaded = st.file_uploader(
"Upload a bank statement pdf file",
type=["png", "jpg", "jpeg", "PNG", "JPG", "JPEG", "pdf", "PDF"],
)
number = st.number_input('Select Year',value=2023, step=1)
filter = st.checkbox("filter color")
if st.button('Analyze image'):
final_csv = pd.DataFrame()
first_flag_dataframe=0
if uploaded is None:
st.write('Please upload an image')
else:
tabs = st.tabs(
['Pages','Table Detection', 'Table Structure Recognition', 'Extracted Table(s)']
)
print(uploaded.type)
if uploaded.type == "application/pdf":
foldername = tempfile.TemporaryDirectory(dir=os.getcwd())
filename = uploaded.name.split(".")[0]
pdf_pages=convert_from_bytes(uploaded.read(),500)
for page_enumeration, page in enumerate(pdf_pages, start=1):
with tabs[0]:
st.header('Pages : '+str(page_enumeration))
st.image(page)
page_img=np.asarray(page)
tables = PaddleOCR.table_model(page_img, conf=0.75)
tabel_datas=tables[0].boxes.data.cpu().numpy()
tables = tables[0].boxes.xyxy.cpu().numpy()
with tabs[1]:
st.header('Table Detection Page :'+str(page_enumeration))
str_cols = st.columns(4)
str_cols[0].subheader('Table image')
str_cols[1].subheader('Columns')
str_cols[2].subheader('Structure result')
str_cols[3].subheader('Cells result')
results = []
for table in tables:
try:
tabel_data = np.array(
sorted(tabel_datas, key=lambda x: x[0]), dtype=np.ndarray
)
tabel_data = filter_columns(tabel_data)
str_cols[0].image(plot(page_img, tabel_data), channels="RGB")
# * crop the table as an image from the original image
sub_img = page_img[
int(table[1].item()): int(table[3].item()),
int(table[0].item()): int(table[2].item()),
]
columns_detect = PaddleOCR.column_model(sub_img, conf=0.75)
cols_data = columns_detect[0].boxes.data.cpu().numpy()
# * Sort columns according to the x coordinate
cols_data = np.array(
sorted(cols_data, key=lambda x: x[0]), dtype=np.ndarray
)
# * merge the duplicated columns
cols_data = filter_columns(cols_data)
str_cols[1].image(plot(sub_img, cols_data), channels="RGB")
except Exception as e:
print(e)
st.warning("No Detection")
try:
####################################################################
# # columns = cols_data[:, 0:4]
# # #sub_imgs = []
# # thr = 0
# # column = columns[0]
# # maxcol1=int(column[1])
# # maxcol3=int(column[3])
# # cols = []
# # for column in columns:
# # if maxcol1 < int(column[1]) :
# # maxcol1=int(column[1])
# # if maxcol3 < int(column[3]) :
# # maxcol3=int(column[3])
# # sub_imgs = (sub_img[ maxcol1: maxcol3, : ])
# # str_cols[2].image(sub_imgs)
# # image = filter_color(sub_imgs)
# # res, threshold,ocr_res = extract_text_of_col(image)
# # vis_ocr_img = visualize_ocr(image, ocr_res)
# # str_cols[3].image(vis_ocr_img)
# # thr += threshold
# # cols.append(prepare_cols(res, threshold * 0.6))
# # print("cols : ",cols)
# # thr = thr / len(columns)
# # data = process_cols(cols, thr * 0.6)
# # print("data : ",data)
######################################################################
columns = cols_data[:, 0:4]
sub_imgs = []
column = columns[0]
maxcol1=int(column[1])
maxcol3=int(column[3])
#for column in columns:
# if maxcol1 < int(column[1]) :
# maxcol1=int(column[1])
# if maxcol3 < int(column[3]) :
# maxcol3=int(column[3])
for column in columns:
# * Create list of cropped images for each column
sub_imgs.append(sub_img[maxcol1:maxcol3, int(column[0]): int(column[2])])
cols = []
thr = 0
for image in sub_imgs:
if filter:
# * keep only black color in the image
image = filter_color(image)
# * extract text of each column and get the length threshold
res, threshold, ocr_res = extract_text_of_col(image)
thr += threshold
# * arrange the rows of each column with respect to row length threshold
cols.append(prepare_cols(res, threshold * 0.6))
thr = thr / len(sub_imgs)
# * append each element in each column to its right place in the dataframe
data = process_cols(cols, thr * 0.6)
# * merge the related rows together
data: pd.DataFrame = finalize_data(data, page_enumeration)
results.append(data)
with tabs[2]:
st.header('Extracted Table(s)')
st.dataframe(data)
print("data : ",data)
print("results : ", results)
if first_flag_dataframe == 0 :
first_flag_dataframe=1
final_csv=data
else:
final_csv = pd.concat([final_csv,data],ignore_index=True)
csv = convert_df(data)
print(csv)
except:
st.warning("Text Extraction Failed")
continue
with tabs[3]:
st.dataframe(final_csv)
rough_csv= convert_df(final_csv)
st.download_button(
"rough-csv",
rough_csv,
"file.csv",
"text/csv",
key='rough-csv'
)
final_csv.columns = ['page','Date', 'Transaction_Details', 'Three', 'Deposit','Withdrawal','Balance']
#final_csv = final_csv.rename(columns={1: 'Date', 2: 'Transaction_Details', 3: 'Three', 4: 'Deposit',5 : 'Withdrawal',6:'Balance'})
final_csv['Date'] = final_csv['Date'].astype(str)
st.dataframe(final_csv)
final_csv = final_csv[~final_csv['Date'].str.contains('Date')]
final_csv = final_csv[~final_csv['Date'].str.contains('日期')]
final_csv = final_csv[~final_csv['Date'].str.contains('口期')]
final_csv['Date'] = final_csv['Date'].apply(lambda x: re.sub(r'[^a-zA-Z0-9 ]', '', x))
final_csv['Date'] = final_csv['Date'].apply(lambda x: x + str(number))
final_csv['Date'] = final_csv['Date'].apply(lambda x:parse(x, fuzzy=True))
#final_csv['Date']=final_csv['Date'].str.replace(' ', '')
final_csv['*Date'] = pd.to_datetime(final_csv['Date']).dt.strftime('%d-%m-%Y')
final_csv['Withdrawal'] = final_csv['Withdrawal'].astype(str)
final_csv['Withdrawal'] = final_csv['Withdrawal'].str.replace('i', '').str.replace('E', '').str.replace(':', '').str.replace('M', '').str.replace('?', '').str.replace('t', '').str.replace('+', '').str.replace(';', '').str.replace('g', '').str.replace('^', '').str.replace('m', '').str.replace('/', '').str.replace('#', '').str.replace("'", '').str.replace('w', '').str.replace('"', '').str.replace('%', '').str.replace('r', '').str.replace('-', '').str.replace('v', '').str.replace(',', '').str.replace('·', '').str.replace(':', '').str.replace(' ', '').str.replace('*', '').str.replace('~', '').str.replace('V', '')
final_csv['Withdrawal'] = final_csv['Withdrawal'].apply(remove_dots)
final_csv['Withdrawal'] = final_csv['Withdrawal'].astype(float)*-1
final_csv['Deposit'] = final_csv['Deposit'].astype(str)
final_csv['Deposit'] = final_csv['Deposit'].str.replace('i', '').str.replace('E', '').str.replace(':', '').str.replace('M', '').str.replace('?', '').str.replace('t', '').str.replace('+', '').str.replace(';', '').str.replace('g', '').str.replace('^', '').str.replace('m', '').str.replace('/', '').str.replace('#', '').str.replace("'", '').str.replace('w', '').str.replace('"', '').str.replace('%', '').str.replace('r', '').str.replace('-', '').str.replace('v', '').str.replace(',', '').str.replace('·', '').str.replace(':', '').str.replace(' ', '').str.replace('*', '').str.replace('~', '').str.replace('V', '')
final_csv['Deposit'] = final_csv['Deposit'].apply(remove_dots)
final_csv['Deposit'] = final_csv['Deposit'].astype(float)
final_csv['*Amount'] = final_csv['Withdrawal'].fillna(0) + final_csv['Deposit'].fillna(0)
final_csv = final_csv.drop(['Withdrawal','Deposit'], axis=1)
final_csv['Payee'] = ''
final_csv['Description'] = final_csv['Transaction_Details']
final_csv.loc[final_csv['Three'].notnull(), 'Description'] += " "+final_csv['Three']
final_csv = final_csv.drop(['Transaction_Details','Three'], axis=1)
final_csv['Reference'] = ''
final_csv['Check Number'] = ''
df = final_csv[['*Date', '*Amount', 'Payee', 'Description','Reference','Check Number']]
df = df[df['*Amount'] != 0]
csv = convert_df(df)
st.dataframe(df)
st.download_button(
"Press to Download",
csv,
"file.csv",
"text/csv",
key='download-csv'
)
#success = st.button("Extract", on_click=model, args=[uploaded, filter])