FTA / app.py
TDN-M's picture
Update app.py
0f4e065 verified
import gradio as gr
import os
from PIL import Image
import google.generativeai as genai
import logging
import re
import pandas as pd
import fitz # PyMuPDF
import io
import pdfplumber
from datetime import datetime
# Thiết lập logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("app.log"), logging.StreamHandler()]
)
logger = logging.getLogger(__name__)
# Cấu hình API key từ biến môi trường
genai.configure(api_key=os.getenv('GOOGLE_API_KEY'))
# Định nghĩa headers cố định cho bảng Tab 1
TABLE_HEADERS_TAB1 = [
"Supplier Name", "Invoice Number", "Invoice Date",
"Description - Name", "Description - Destination", "Description - Period/Date",
"Project Code", "Currency", "Amount (Before GST)", "GST",
"Amount (After GST)", "Converted Amount"
]
# Định nghĩa headers cố định cho bảng Tab 2
TABLE_HEADERS_TAB2 = [
"Reference - Code", "GL Posting Date", "Date", "Settlement Date",
"Security Name", "Currency", "Quantity", "Price",
"Transaction Amount", "Commission"
]
# Hàm chuẩn hóa số
def standardize_number(value):
if not value or pd.isna(value):
return None
value = str(value).replace(" ", "").replace(",", ".")
value = re.sub(r"[^\d.-]", "", value)
try:
return float(value)
except ValueError:
logger.warning(f"Không thể chuyển đổi giá trị thành số: {value}")
return None
# Hàm chuẩn hóa ngày tháng
def standardize_date(date_str):
if not date_str or pd.isna(date_str):
return None
formats = ["%d/%m/%Y", "%d-%b-%y"]
for fmt in formats:
try:
return datetime.strptime(date_str, fmt).strftime("%m/%d/%Y")
except ValueError:
continue
logger.warning(f"Không thể chuẩn hóa ngày: {date_str}")
return None
# Hàm gọi mô hình Gemini để trích xuất thông tin (Tab 1)
def get_response(model, input_text, image_parts, prompt):
logger.info("Bắt đầu gọi mô hình Gemini-1.5-flash")
try:
response = model.generate_content([input_text, image_parts[0], prompt])
logger.info(f"Phản hồi từ API: {response.text}")
return response.text
except Exception as e:
logger.error(f"Lỗi khi gọi API: {str(e)}")
return f"Error: {str(e)}"
# Hàm kiểm tra và xử lý file (Tab 1)
def check_and_process_file(file):
if file is None:
return False, "No file uploaded", None
file_path = file.name
file_ext = os.path.splitext(file_path)[1].lower()
logger.info(f"File path: {file_path}, Extension: {file_ext}")
allowed_types = ['.jpg', '.jpeg', '.png', '.pdf']
if file_ext not in allowed_types:
return False, f"Invalid file extension: {file_ext}. Please upload a file with extension: {allowed_types}", None
return True, "File type valid", file_path
# Hàm xử lý PDF thành danh sách ảnh (Tab 1)
def process_pdf(file_path):
logger.info(f"Xử lý file PDF: {file_path}")
try:
doc = fitz.open(file_path)
images = []
for page_num in range(len(doc)):
page = doc.load_page(page_num)
pix = page.get_pixmap()
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
images.append(img)
doc.close()
logger.info(f"Trích xuất {len(images)} trang từ PDF")
return images
except Exception as e:
logger.error(f"Lỗi khi xử lý PDF: {str(e)}")
return []
# Hàm chuyển đổi ảnh thành JPEG bytes (Tab 1)
def image_to_jpeg_bytes(image):
img_byte_arr = io.BytesIO()
image.save(img_byte_arr, format="JPEG")
return img_byte_arr.getvalue()
# Hàm trích xuất văn bản thô từ ảnh hoặc PDF (Tab 1)
def extract_raw_text(uploaded_file, input_prompt):
is_valid, message, file_path = check_and_process_file(uploaded_file)
if not is_valid:
logger.error(message)
return message
try:
if file_path.lower().endswith('.pdf'):
images = process_pdf(file_path)
if not images:
return "Error: Could not process PDF"
full_text = ""
for i, img in enumerate(images):
logger.info(f"Xử lý trang {i+1} từ PDF")
img_data = image_to_jpeg_bytes(img)
image_data = [{"mime_type": "image/jpeg", "data": img_data}]
model = genai.GenerativeModel('gemini-1.5-flash')
text = get_response(model, input_prompt, image_data, input_prompt)
full_text += text + "\n"
raw_text = full_text.strip()
else:
image = Image.open(file_path)
img_data = image_to_jpeg_bytes(image)
image_data = [{"mime_type": "image/jpeg", "data": img_data}]
logger.info(f"Xử lý file ảnh: {file_path}")
model = genai.GenerativeModel('gemini-1.5-flash')
raw_text = get_response(model, input_prompt, image_data, input_prompt).strip()
logger.info(f"Văn bản thô từ mô hình: {raw_text}")
return raw_text
except Exception as e:
logger.error(f"Lỗi khi xử lý file: {str(e)}")
return f"Error processing file: {str(e)}"
# Hàm chuyển văn bản thô thành bảng và nối vào bảng hiện tại (Tab 1)
def generate_table_tab1(raw_text, current_table):
logger.info("Chuyển văn bản thô thành bảng")
logger.info(f"Văn bản thô đầu vào: {raw_text}")
if not raw_text or "Error" in raw_text:
return current_table if current_table is not None else pd.DataFrame(columns=TABLE_HEADERS_TAB1), current_table
fields = {
"Supplier Name": r"Supplier Name: (.*?)(?=\n|$)",
"Invoice Number": r"Invoice Number: (.*?)(?=\n|$)",
"Invoice Date": r"Invoice Date: (.*?)(?=\n|$)",
"Description - Name": r"Description - Name: (.*?)(?=\n|$)",
"Description - Destination": r"Description - Destination: (.*?)(?=\n|$)",
"Description - Period/Date": r"Description - Period/Date: (.*?)(?=\n|$)",
"Project Code": r"Project Code: (.*?)(?=\n|$)",
"Currency": r"Currency: (.*?)(?=\n|$)",
"Amount (Before GST)": r"Amount \(Before GST\): (.*?)(?=\n|$)",
"GST": r"GST: (.*?)(?=\n|$)",
"Amount (After GST)": r"Amount \(After GST\): (.*?)(?=\n|$)",
"Converted Amount": r"Converted Amount: (.*?)(?=\n|$)"
}
result = {header: "" for header in TABLE_HEADERS_TAB1}
for field, pattern in fields.items():
match = re.search(pattern, raw_text, re.DOTALL)
if match and match.group(1) != "Not found" and match.group(1).strip():
result[field] = match.group(1).strip()
logger.info(f"Trích xuất {field}: {result[field]}")
else:
logger.info(f"Không tìm thấy {field}")
new_row = pd.DataFrame([result], columns=TABLE_HEADERS_TAB1)
if current_table is not None and not current_table.empty:
updated_table = pd.concat([current_table, new_row], ignore_index=True)
else:
updated_table = new_row
logger.info(f"Kết quả bảng: {updated_table.to_dict()}")
return updated_table, updated_table
# Hàm xuất CSV và reset bảng (Tab 1)
def export_and_reset_tab1(current_table):
if current_table is None or current_table.empty:
return None, pd.DataFrame(columns=TABLE_HEADERS_TAB1), "No data to export", pd.DataFrame(columns=TABLE_HEADERS_TAB1)
csv_file = "extracted_invoices.csv"
current_table.to_csv(csv_file, index=False)
logger.info(f"Đã xuất bảng ra file: {csv_file}")
reset_table = pd.DataFrame(columns=TABLE_HEADERS_TAB1)
return gr.File(value=csv_file), reset_table, "Export successful, table reset", reset_table
# Hàm trích xuất thô từ PDF (Tab 2)
def extract_raw_tables_from_pdf(file_path):
tables = []
try:
with pdfplumber.open(file_path) as pdf:
for page_num in range(len(pdf.pages)):
page = pdf.pages[page_num]
table = page.extract_table()
if table and len(table) > 1:
df = pd.DataFrame(table[1:], columns=table[0])
df["Page"] = page_num + 1
tables.append(df)
logger.info(f"Trích xuất thành công bảng từ trang {page_num + 1}")
return tables
except Exception as e:
logger.error(f"Lỗi trích xuất PDF: {e}")
return None
# Hàm chuẩn hóa cấu trúc cột
def normalize_table_structure(tables, expected_columns):
normalized_tables = []
for df in tables:
if df is not None and not df.empty:
df.columns = df.columns.str.strip()
for col in expected_columns:
if col not in df.columns:
df[col] = pd.NA
df = df[expected_columns]
normalized_tables.append(df)
return normalized_tables
# Hàm nối các bảng
def combine_tables(normalized_tables):
if not normalized_tables:
logger.warning("Không có bảng nào để nối")
return None
try:
combined_df = pd.concat(normalized_tables, ignore_index=True)
logger.info(f"Nối thành công {len(normalized_tables)} bảng")
return combined_df
except Exception as e:
logger.error(f"Lỗi khi nối bảng: {e}")
return None
# Hàm trích xuất thông tin từ Description
def parse_description(description):
pattern = r"(Bought|Sold)\s+([\d,]+)\s+([\w\s&]+?)\s*@\s*([A-Z]{3})\s*([\d,]+\.?\d*)"
match = re.search(pattern, description, re.IGNORECASE)
if match:
action, quantity, security, currency, price = match.groups()
quantity = standardize_number(quantity)
price = standardize_number(price)
return security.strip(), currency, quantity, price
logger.warning(f"Không thể phân tích Description: {description}")
return None, None, None, None
def find_settlement_date(transactions, ref_code, trans_type):
"""
Tìm ngày thanh toán (Settlement Date) dựa trên loại giao dịch và mã tham chiếu.
Args:
transactions (pd.DataFrame): DataFrame chứa dữ liệu giao dịch.
ref_code (str): Mã tham chiếu của giao dịch cần tìm Settlement Date.
trans_type (str): Loại giao dịch ("TSF" hoặc "TPF").
Returns:
str or None: Ngày thanh toán (định dạng mm/dd/yyyy) nếu tìm thấy, ngược lại trả về None.
"""
if trans_type == "TSF":
# Rule 1: Tìm dòng có Reference bắt đầu bằng "PY" và Description chứa "Amount paid TFR to TRUST (ref_code)"
pattern = re.compile(rf"Amount\s+paid\s+TFR\s+to\s+TRUST\s*\({re.escape(ref_code)}\)", re.IGNORECASE)
for _, row in transactions.iterrows():
if (
row.get("Reference", "").startswith("PY") and
pattern.search(str(row.get("Description", ""))) and
pd.notna(row.get("Balance in Trust"))
):
return standardize_date(row.get("Date"))
elif trans_type == "TPF":
# Rule 2: Tìm mã RC trung gian từ Description chứa "TRUSTTFR_TRTTFR (ref_code)"
rc_pattern = re.compile(rf"TRUSTTFR_TRTTFR\s*\(\s*([A-Z0-9]+)\s*\)", re.IGNORECASE)
rc_code = None
for _, row in transactions.iterrows():
if (
row.get("Reference", "").startswith("RC") and
rc_pattern.search(str(row.get("Description", "")))
):
rc_code = rc_pattern.search(str(row.get("Description", ""))).group(1)
break
if rc_code:
# Rule 3: Tìm dòng có Reference bắt đầu bằng "WC" và Description chứa "Withdrawal from TRUST (rc_code)"
wc_pattern = re.compile(rf"Withdrawal\s+from\s+TRUST.*\({re.escape(rc_code)}\)", re.IGNORECASE)
for _, wc_row in transactions.iterrows():
if (
wc_row.get("Reference", "").startswith("WC") and
wc_pattern.search(str(wc_row.get("Description", ""))) and
pd.notna(wc_row.get("Balance in Trust"))
):
return standardize_date(wc_row.get("Date"))
# Nếu không tìm thấy Settlement Date, trả về None
return None
# Hàm nối các bảng
def combine_tables(normalized_tables):
if not normalized_tables:
logger.warning("Không có bảng nào để nối")
return None
try:
combined_df = pd.concat(normalized_tables, ignore_index=True)
logger.info(f"Nối thành công {len(normalized_tables)} bảng")
return combined_df
except Exception as e:
logger.error(f"Lỗi khi nối bảng: {e}")
return None
# Hàm trích xuất thông tin từ Description (Tab 2)
def parse_description(description):
pattern = r"(Bought|Sold)\s+([\d,]+)\s+([\w\s&]+?)\s*@\s*([A-Z]{3})\s*([\d,]+\.?\d*)"
match = re.search(pattern, description, re.IGNORECASE)
if match:
action, quantity, security, currency, price = match.groups()
quantity = standardize_number(quantity)
price = standardize_number(price)
return security.strip(), currency, quantity, price
logger.warning(f"Không thể phân tích Description: {description}")
return None, None, None, None
# Hàm tìm Settlement Date (Tab 2)
def find_settlement_date(transactions, ref_code, trans_type):
if trans_type == "TSF":
pattern = re.compile(rf"Amount\s+paid\s+TFR\s+to\s+TRUST\s*\({re.escape(ref_code)}\)", re.IGNORECASE)
for _, row in transactions.iterrows():
reference = str(row.get("Reference", ""))
if reference.startswith("PY") and \
pattern.search(str(row.get("Description", ""))) and \
pd.notna(row.get("Balance in Trust")):
return standardize_date(row.get("Date"))
elif trans_type == "TPF":
rc_pattern = re.compile(rf"TRUSTTFR_TRTTFR\s*\(\s*([A-Z0-9]+)\s*\)", re.IGNORECASE)
for _, row in transactions.iterrows():
reference = str(row.get("Reference", ""))
if reference.startswith("RC") and \
rc_pattern.search(str(row.get("Description", ""))):
rc_code = rc_pattern.search(str(row.get("Description", ""))).group(1)
wc_pattern = re.compile(rf"Withdrawal\s+from\s+TRUST.*\({re.escape(rc_code)}\)", re.IGNORECASE)
for _, wc_row in transactions.iterrows():
wc_reference = str(wc_row.get("Reference", ""))
if wc_reference.startswith("WC") and \
wc_pattern.search(str(wc_row.get("Description", ""))) and \
pd.notna(wc_row.get("Balance in Trust")):
return standardize_date(wc_row.get("Date"))
return None
# Hàm xử lý giao dịch (Tab 2)
def process_transactions(transactions):
result = []
for _, row in transactions.iterrows():
ref_code = str(row.get("Reference", ""))
trans_type = "TPF" if ref_code.startswith("TPF") else "TSF" if ref_code.startswith("TSF") else None
if not trans_type:
continue
# Trích xuất thông tin từ Description
security, currency, quantity, price = parse_description(row.get("Description"))
if not all([security, currency, quantity, price]):
continue
# Xác định GL Posting Date và Settlement Date
gl_date = standardize_date(row.get("Date"))
settlement_date = find_settlement_date(transactions, ref_code, trans_type)
# Tính toán Transaction Amount
transaction_amount = quantity * price if quantity is not None and price is not None else None
# Chuẩn hóa Balance in Trust
balance_in_trust = standardize_number(row.get("Balance in Trust"))
# Tính toán Commission với kiểm tra điều kiện
if balance_in_trust is not None and transaction_amount is not None:
commission = (
abs(balance_in_trust + transaction_amount) if trans_type == "TPF"
else abs(balance_in_trust - transaction_amount)
)
else:
commission = None
# Thêm dữ liệu vào kết quả
result.append({
"Reference - Code": ref_code,
"GL Posting Date": gl_date,
"Date": gl_date,
"Settlement Date": settlement_date,
"Security Name": security,
"Currency": currency,
"Quantity": -quantity if trans_type == "TSF" else quantity,
"Price": price,
"Transaction Amount": transaction_amount,
"Commission": commission
})
return pd.DataFrame(result, columns=TABLE_HEADERS_TAB2)
# Hàm xử lý file tải lên (Tab 2)
def process_file(file):
if file is None:
return None, "No file uploaded"
file_path = file.name
file_ext = os.path.splitext(file_path)[1].lower()
if file_ext != '.pdf':
return None, "Only PDF files are supported"
# Bước 1: Trích xuất thô từ PDF
raw_tables = extract_raw_tables_from_pdf(file_path)
if raw_tables is None:
return None, "Could not extract data from PDF"
# Bước 2: Chuẩn hóa cấu trúc cột
expected_columns = ["Date", "Reference", "Description", "Debit", "Credit", "Balance", "Balance in Trust"]
normalized_tables = normalize_table_structure(raw_tables, expected_columns)
if not normalized_tables:
return None, "No valid tables after normalization"
# Bước 3: Nối các bảng
transactions = combine_tables(normalized_tables)
if transactions is None:
return None, "Failed to combine tables"
# Bước 4: Xử lý giao dịch
processed_data = process_transactions(transactions)
return processed_data, "Processing completed"
# Hàm tạo bảng
def generate_table(file, current_table):
if not file:
return current_table, "Vui lòng tải lên file PDF", current_table
try:
table, status = process_file(file)
if table is None or table.empty:
return current_table, status, current_table
# Kiểm tra và loại bỏ cột rỗng hoặc toàn NaN trước khi nối
if current_table is not None and not current_table.empty:
current_table = current_table.dropna(axis=1, how='all')
if not table.empty:
table = table.dropna(axis=1, how='all')
# Nối bảng
updated_table = pd.concat([current_table, table], ignore_index=True) if current_table is not None else table
return updated_table, status, updated_table
except Exception as e:
logger.error(f"Lỗi xử lý file: {e}")
return current_table, f"Lỗi: {str(e)}", current_table
# Hàm xuất CSV và reset
def export_and_reset(current_table):
if current_table is None or current_table.empty:
return None, pd.DataFrame(columns=TABLE_HEADERS_TAB2), "No data to export"
# Tạo thư mục output nếu chưa tồn tại
output_dir = "output"
if not os.path.exists(output_dir):
try:
os.makedirs(output_dir)
logger.info(f"Thư mục '{output_dir}' đã được tạo thành công.")
except Exception as e:
logger.error(f"Lỗi khi tạo thư mục '{output_dir}': {e}")
raise OSError(f"Không thể tạo thư mục '{output_dir}'. Vui lòng kiểm tra quyền truy cập.")
# Lưu file CSV vào thư mục output
csv_file = os.path.join(output_dir, "extracted_transactions.csv")
current_table.to_csv(csv_file, index=False)
logger.info(f"File CSV đã được lưu tại: {csv_file}")
# Reset bảng
reset_table = pd.DataFrame(columns=TABLE_HEADERS_TAB2)
return gr.File(value=csv_file), reset_table, "Export successful", reset_table
# Giao diện Gradio với Tabs
with gr.Blocks(title="Multifunctional Application") as app:
with gr.Tabs():
# Tab 1: Invoice Extractor
with gr.Tab("Invoice Extractor"):
gr.Markdown("# Multilanguage Invoice Extractor")
gr.Markdown("Demo #1")
with gr.Column():
file_input_tab1 = gr.File(label="Upload Invoice (jpg, jpeg, png, pdf)")
extract_btn_tab1 = gr.Button("Extract Contents")
raw_text_output_tab1 = gr.Textbox(label="Contents for next step", lines=20)
table_btn_tab1 = gr.Button("Next Step")
output_table_tab1 = gr.Dataframe(headers=TABLE_HEADERS_TAB1, label="Extracted Information")
export_btn_tab1 = gr.Button("Export to CSV & Reset")
csv_output_tab1 = gr.File(label="Download CSV")
export_status_tab1 = gr.Textbox(label="Export Status")
table_state_tab1 = gr.State(value=pd.DataFrame(columns=TABLE_HEADERS_TAB1))
extract_btn_tab1.click(
fn=extract_raw_text,
inputs=[file_input_tab1, gr.State("""
You are an expert in extracting specific information from invoices in multiple languages and formats. Users will upload images or PDFs of invoices, and your task is to extract the following fields: Supplier Name, Invoice Number, Invoice Date, Description - Name, Description - Destination, Description - Period/Date, Project Code, Currency, Amount (Before GST), GST, Amount (After GST), and Converted Amount. Identify these fields based on common invoice patterns, even if labels or formats vary (e.g., "Total" might be "Amount (After GST)", "Tax" might be "GST"). If a field is not found or unclear, leave it blank. Return the extracted data strictly in this format, with no additional text or JSON:
Supplier Name: [value]
Invoice Number: [value]
Invoice Date: [value]
Description - Name: [value]
Description - Destination: [value]
Description - Period/Date: [value]
Project Code: [value]
Currency: [value]
Amount (Before GST): [value]
GST: [value]
Amount (After GST): [value]
Converted Amount: [value]
""")],
outputs=raw_text_output_tab1
)
table_btn_tab1.click(
fn=generate_table_tab1,
inputs=[raw_text_output_tab1, table_state_tab1],
outputs=[output_table_tab1, table_state_tab1]
)
export_btn_tab1.click(
fn=export_and_reset_tab1,
inputs=table_state_tab1,
outputs=[csv_output_tab1, output_table_tab1, export_status_tab1, table_state_tab1]
)
# Tab 2: Transaction Extractor
with gr.Tab("Transaction Extractor"):
gr.Markdown("# Transaction Extractor")
with gr.Column():
file_input_tab2 = gr.File(label="Upload Bank Statement (PDF)")
extract_btn_tab2 = gr.Button("Extract Transactions")
status_output_tab2 = gr.Textbox(label="Status")
table_output_tab2 = gr.Dataframe(headers=TABLE_HEADERS_TAB2, label="Extracted Transactions")
export_btn_tab2 = gr.Button("Export to CSV & Reset")
csv_output_tab2 = gr.File(label="Download CSV")
export_status_tab2 = gr.Textbox(label="Export Status")
table_state_tab2 = gr.State(value=pd.DataFrame(columns=TABLE_HEADERS_TAB2))
extract_btn_tab2.click(
fn=generate_table,
inputs=[file_input_tab2, table_state_tab2],
outputs=[table_output_tab2, status_output_tab2, table_state_tab2]
)
export_btn_tab2.click(
fn=export_and_reset,
inputs=table_state_tab2,
outputs=[csv_output_tab2, table_output_tab2, export_status_tab2, table_state_tab2]
)
app.launch(share=True)