|
import gradio as gr |
|
import os |
|
from PIL import Image |
|
import google.generativeai as genai |
|
import logging |
|
import re |
|
import pandas as pd |
|
import fitz |
|
import io |
|
import pdfplumber |
|
from datetime import datetime |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format="%(asctime)s - %(levelname)s - %(message)s", |
|
handlers=[logging.FileHandler("app.log"), logging.StreamHandler()] |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
genai.configure(api_key=os.getenv('GOOGLE_API_KEY')) |
|
|
|
|
|
TABLE_HEADERS_TAB1 = [ |
|
"Supplier Name", "Invoice Number", "Invoice Date", |
|
"Description - Name", "Description - Destination", "Description - Period/Date", |
|
"Project Code", "Currency", "Amount (Before GST)", "GST", |
|
"Amount (After GST)", "Converted Amount" |
|
] |
|
|
|
|
|
TABLE_HEADERS_TAB2 = [ |
|
"Reference - Code", "GL Posting Date", "Date", "Settlement Date", |
|
"Security Name", "Currency", "Quantity", "Price", |
|
"Transaction Amount", "Commission" |
|
] |
|
|
|
|
|
def standardize_number(value): |
|
if not value or pd.isna(value): |
|
return None |
|
value = str(value).replace(" ", "").replace(",", ".") |
|
value = re.sub(r"[^\d.-]", "", value) |
|
try: |
|
return float(value) |
|
except ValueError: |
|
logger.warning(f"Không thể chuyển đổi giá trị thành số: {value}") |
|
return None |
|
|
|
|
|
def standardize_date(date_str): |
|
if not date_str or pd.isna(date_str): |
|
return None |
|
formats = ["%d/%m/%Y", "%d-%b-%y"] |
|
for fmt in formats: |
|
try: |
|
return datetime.strptime(date_str, fmt).strftime("%m/%d/%Y") |
|
except ValueError: |
|
continue |
|
logger.warning(f"Không thể chuẩn hóa ngày: {date_str}") |
|
return None |
|
|
|
|
|
def get_response(model, input_text, image_parts, prompt): |
|
logger.info("Bắt đầu gọi mô hình Gemini-1.5-flash") |
|
try: |
|
response = model.generate_content([input_text, image_parts[0], prompt]) |
|
logger.info(f"Phản hồi từ API: {response.text}") |
|
return response.text |
|
except Exception as e: |
|
logger.error(f"Lỗi khi gọi API: {str(e)}") |
|
return f"Error: {str(e)}" |
|
|
|
|
|
def check_and_process_file(file): |
|
if file is None: |
|
return False, "No file uploaded", None |
|
file_path = file.name |
|
file_ext = os.path.splitext(file_path)[1].lower() |
|
logger.info(f"File path: {file_path}, Extension: {file_ext}") |
|
allowed_types = ['.jpg', '.jpeg', '.png', '.pdf'] |
|
if file_ext not in allowed_types: |
|
return False, f"Invalid file extension: {file_ext}. Please upload a file with extension: {allowed_types}", None |
|
return True, "File type valid", file_path |
|
|
|
|
|
def process_pdf(file_path): |
|
logger.info(f"Xử lý file PDF: {file_path}") |
|
try: |
|
doc = fitz.open(file_path) |
|
images = [] |
|
for page_num in range(len(doc)): |
|
page = doc.load_page(page_num) |
|
pix = page.get_pixmap() |
|
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
|
images.append(img) |
|
doc.close() |
|
logger.info(f"Trích xuất {len(images)} trang từ PDF") |
|
return images |
|
except Exception as e: |
|
logger.error(f"Lỗi khi xử lý PDF: {str(e)}") |
|
return [] |
|
|
|
|
|
def image_to_jpeg_bytes(image): |
|
img_byte_arr = io.BytesIO() |
|
image.save(img_byte_arr, format="JPEG") |
|
return img_byte_arr.getvalue() |
|
|
|
|
|
def extract_raw_text(uploaded_file, input_prompt): |
|
is_valid, message, file_path = check_and_process_file(uploaded_file) |
|
if not is_valid: |
|
logger.error(message) |
|
return message |
|
try: |
|
if file_path.lower().endswith('.pdf'): |
|
images = process_pdf(file_path) |
|
if not images: |
|
return "Error: Could not process PDF" |
|
full_text = "" |
|
for i, img in enumerate(images): |
|
logger.info(f"Xử lý trang {i+1} từ PDF") |
|
img_data = image_to_jpeg_bytes(img) |
|
image_data = [{"mime_type": "image/jpeg", "data": img_data}] |
|
model = genai.GenerativeModel('gemini-1.5-flash') |
|
text = get_response(model, input_prompt, image_data, input_prompt) |
|
full_text += text + "\n" |
|
raw_text = full_text.strip() |
|
else: |
|
image = Image.open(file_path) |
|
img_data = image_to_jpeg_bytes(image) |
|
image_data = [{"mime_type": "image/jpeg", "data": img_data}] |
|
logger.info(f"Xử lý file ảnh: {file_path}") |
|
model = genai.GenerativeModel('gemini-1.5-flash') |
|
raw_text = get_response(model, input_prompt, image_data, input_prompt).strip() |
|
logger.info(f"Văn bản thô từ mô hình: {raw_text}") |
|
return raw_text |
|
except Exception as e: |
|
logger.error(f"Lỗi khi xử lý file: {str(e)}") |
|
return f"Error processing file: {str(e)}" |
|
|
|
|
|
def generate_table_tab1(raw_text, current_table): |
|
logger.info("Chuyển văn bản thô thành bảng") |
|
logger.info(f"Văn bản thô đầu vào: {raw_text}") |
|
if not raw_text or "Error" in raw_text: |
|
return current_table if current_table is not None else pd.DataFrame(columns=TABLE_HEADERS_TAB1), current_table |
|
fields = { |
|
"Supplier Name": r"Supplier Name: (.*?)(?=\n|$)", |
|
"Invoice Number": r"Invoice Number: (.*?)(?=\n|$)", |
|
"Invoice Date": r"Invoice Date: (.*?)(?=\n|$)", |
|
"Description - Name": r"Description - Name: (.*?)(?=\n|$)", |
|
"Description - Destination": r"Description - Destination: (.*?)(?=\n|$)", |
|
"Description - Period/Date": r"Description - Period/Date: (.*?)(?=\n|$)", |
|
"Project Code": r"Project Code: (.*?)(?=\n|$)", |
|
"Currency": r"Currency: (.*?)(?=\n|$)", |
|
"Amount (Before GST)": r"Amount \(Before GST\): (.*?)(?=\n|$)", |
|
"GST": r"GST: (.*?)(?=\n|$)", |
|
"Amount (After GST)": r"Amount \(After GST\): (.*?)(?=\n|$)", |
|
"Converted Amount": r"Converted Amount: (.*?)(?=\n|$)" |
|
} |
|
result = {header: "" for header in TABLE_HEADERS_TAB1} |
|
for field, pattern in fields.items(): |
|
match = re.search(pattern, raw_text, re.DOTALL) |
|
if match and match.group(1) != "Not found" and match.group(1).strip(): |
|
result[field] = match.group(1).strip() |
|
logger.info(f"Trích xuất {field}: {result[field]}") |
|
else: |
|
logger.info(f"Không tìm thấy {field}") |
|
new_row = pd.DataFrame([result], columns=TABLE_HEADERS_TAB1) |
|
if current_table is not None and not current_table.empty: |
|
updated_table = pd.concat([current_table, new_row], ignore_index=True) |
|
else: |
|
updated_table = new_row |
|
logger.info(f"Kết quả bảng: {updated_table.to_dict()}") |
|
return updated_table, updated_table |
|
|
|
|
|
def export_and_reset_tab1(current_table): |
|
if current_table is None or current_table.empty: |
|
return None, pd.DataFrame(columns=TABLE_HEADERS_TAB1), "No data to export", pd.DataFrame(columns=TABLE_HEADERS_TAB1) |
|
csv_file = "extracted_invoices.csv" |
|
current_table.to_csv(csv_file, index=False) |
|
logger.info(f"Đã xuất bảng ra file: {csv_file}") |
|
reset_table = pd.DataFrame(columns=TABLE_HEADERS_TAB1) |
|
return gr.File(value=csv_file), reset_table, "Export successful, table reset", reset_table |
|
|
|
|
|
def extract_raw_tables_from_pdf(file_path): |
|
tables = [] |
|
try: |
|
with pdfplumber.open(file_path) as pdf: |
|
for page_num in range(len(pdf.pages)): |
|
page = pdf.pages[page_num] |
|
table = page.extract_table() |
|
if table and len(table) > 1: |
|
df = pd.DataFrame(table[1:], columns=table[0]) |
|
df["Page"] = page_num + 1 |
|
tables.append(df) |
|
logger.info(f"Trích xuất thành công bảng từ trang {page_num + 1}") |
|
return tables |
|
except Exception as e: |
|
logger.error(f"Lỗi trích xuất PDF: {e}") |
|
return None |
|
|
|
|
|
def normalize_table_structure(tables, expected_columns): |
|
normalized_tables = [] |
|
for df in tables: |
|
if df is not None and not df.empty: |
|
df.columns = df.columns.str.strip() |
|
for col in expected_columns: |
|
if col not in df.columns: |
|
df[col] = pd.NA |
|
df = df[expected_columns] |
|
normalized_tables.append(df) |
|
return normalized_tables |
|
|
|
|
|
def combine_tables(normalized_tables): |
|
if not normalized_tables: |
|
logger.warning("Không có bảng nào để nối") |
|
return None |
|
try: |
|
combined_df = pd.concat(normalized_tables, ignore_index=True) |
|
logger.info(f"Nối thành công {len(normalized_tables)} bảng") |
|
return combined_df |
|
except Exception as e: |
|
logger.error(f"Lỗi khi nối bảng: {e}") |
|
return None |
|
|
|
|
|
def parse_description(description): |
|
pattern = r"(Bought|Sold)\s+([\d,]+)\s+([\w\s&]+?)\s*@\s*([A-Z]{3})\s*([\d,]+\.?\d*)" |
|
match = re.search(pattern, description, re.IGNORECASE) |
|
if match: |
|
action, quantity, security, currency, price = match.groups() |
|
quantity = standardize_number(quantity) |
|
price = standardize_number(price) |
|
return security.strip(), currency, quantity, price |
|
logger.warning(f"Không thể phân tích Description: {description}") |
|
return None, None, None, None |
|
|
|
def find_settlement_date(transactions, ref_code, trans_type): |
|
""" |
|
Tìm ngày thanh toán (Settlement Date) dựa trên loại giao dịch và mã tham chiếu. |
|
|
|
Args: |
|
transactions (pd.DataFrame): DataFrame chứa dữ liệu giao dịch. |
|
ref_code (str): Mã tham chiếu của giao dịch cần tìm Settlement Date. |
|
trans_type (str): Loại giao dịch ("TSF" hoặc "TPF"). |
|
|
|
Returns: |
|
str or None: Ngày thanh toán (định dạng mm/dd/yyyy) nếu tìm thấy, ngược lại trả về None. |
|
""" |
|
if trans_type == "TSF": |
|
|
|
pattern = re.compile(rf"Amount\s+paid\s+TFR\s+to\s+TRUST\s*\({re.escape(ref_code)}\)", re.IGNORECASE) |
|
for _, row in transactions.iterrows(): |
|
if ( |
|
row.get("Reference", "").startswith("PY") and |
|
pattern.search(str(row.get("Description", ""))) and |
|
pd.notna(row.get("Balance in Trust")) |
|
): |
|
return standardize_date(row.get("Date")) |
|
|
|
elif trans_type == "TPF": |
|
|
|
rc_pattern = re.compile(rf"TRUSTTFR_TRTTFR\s*\(\s*([A-Z0-9]+)\s*\)", re.IGNORECASE) |
|
rc_code = None |
|
|
|
for _, row in transactions.iterrows(): |
|
if ( |
|
row.get("Reference", "").startswith("RC") and |
|
rc_pattern.search(str(row.get("Description", ""))) |
|
): |
|
rc_code = rc_pattern.search(str(row.get("Description", ""))).group(1) |
|
break |
|
|
|
if rc_code: |
|
|
|
wc_pattern = re.compile(rf"Withdrawal\s+from\s+TRUST.*\({re.escape(rc_code)}\)", re.IGNORECASE) |
|
for _, wc_row in transactions.iterrows(): |
|
if ( |
|
wc_row.get("Reference", "").startswith("WC") and |
|
wc_pattern.search(str(wc_row.get("Description", ""))) and |
|
pd.notna(wc_row.get("Balance in Trust")) |
|
): |
|
return standardize_date(wc_row.get("Date")) |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
def combine_tables(normalized_tables): |
|
if not normalized_tables: |
|
logger.warning("Không có bảng nào để nối") |
|
return None |
|
try: |
|
combined_df = pd.concat(normalized_tables, ignore_index=True) |
|
logger.info(f"Nối thành công {len(normalized_tables)} bảng") |
|
return combined_df |
|
except Exception as e: |
|
logger.error(f"Lỗi khi nối bảng: {e}") |
|
return None |
|
|
|
|
|
def parse_description(description): |
|
pattern = r"(Bought|Sold)\s+([\d,]+)\s+([\w\s&]+?)\s*@\s*([A-Z]{3})\s*([\d,]+\.?\d*)" |
|
match = re.search(pattern, description, re.IGNORECASE) |
|
if match: |
|
action, quantity, security, currency, price = match.groups() |
|
quantity = standardize_number(quantity) |
|
price = standardize_number(price) |
|
return security.strip(), currency, quantity, price |
|
logger.warning(f"Không thể phân tích Description: {description}") |
|
return None, None, None, None |
|
|
|
|
|
def find_settlement_date(transactions, ref_code, trans_type): |
|
if trans_type == "TSF": |
|
pattern = re.compile(rf"Amount\s+paid\s+TFR\s+to\s+TRUST\s*\({re.escape(ref_code)}\)", re.IGNORECASE) |
|
for _, row in transactions.iterrows(): |
|
reference = str(row.get("Reference", "")) |
|
if reference.startswith("PY") and \ |
|
pattern.search(str(row.get("Description", ""))) and \ |
|
pd.notna(row.get("Balance in Trust")): |
|
return standardize_date(row.get("Date")) |
|
elif trans_type == "TPF": |
|
rc_pattern = re.compile(rf"TRUSTTFR_TRTTFR\s*\(\s*([A-Z0-9]+)\s*\)", re.IGNORECASE) |
|
for _, row in transactions.iterrows(): |
|
reference = str(row.get("Reference", "")) |
|
if reference.startswith("RC") and \ |
|
rc_pattern.search(str(row.get("Description", ""))): |
|
rc_code = rc_pattern.search(str(row.get("Description", ""))).group(1) |
|
wc_pattern = re.compile(rf"Withdrawal\s+from\s+TRUST.*\({re.escape(rc_code)}\)", re.IGNORECASE) |
|
for _, wc_row in transactions.iterrows(): |
|
wc_reference = str(wc_row.get("Reference", "")) |
|
if wc_reference.startswith("WC") and \ |
|
wc_pattern.search(str(wc_row.get("Description", ""))) and \ |
|
pd.notna(wc_row.get("Balance in Trust")): |
|
return standardize_date(wc_row.get("Date")) |
|
return None |
|
|
|
|
|
def process_transactions(transactions): |
|
result = [] |
|
for _, row in transactions.iterrows(): |
|
ref_code = str(row.get("Reference", "")) |
|
trans_type = "TPF" if ref_code.startswith("TPF") else "TSF" if ref_code.startswith("TSF") else None |
|
if not trans_type: |
|
continue |
|
|
|
|
|
security, currency, quantity, price = parse_description(row.get("Description")) |
|
if not all([security, currency, quantity, price]): |
|
continue |
|
|
|
|
|
gl_date = standardize_date(row.get("Date")) |
|
settlement_date = find_settlement_date(transactions, ref_code, trans_type) |
|
|
|
|
|
transaction_amount = quantity * price if quantity is not None and price is not None else None |
|
|
|
|
|
balance_in_trust = standardize_number(row.get("Balance in Trust")) |
|
|
|
|
|
if balance_in_trust is not None and transaction_amount is not None: |
|
commission = ( |
|
abs(balance_in_trust + transaction_amount) if trans_type == "TPF" |
|
else abs(balance_in_trust - transaction_amount) |
|
) |
|
else: |
|
commission = None |
|
|
|
|
|
result.append({ |
|
"Reference - Code": ref_code, |
|
"GL Posting Date": gl_date, |
|
"Date": gl_date, |
|
"Settlement Date": settlement_date, |
|
"Security Name": security, |
|
"Currency": currency, |
|
"Quantity": -quantity if trans_type == "TSF" else quantity, |
|
"Price": price, |
|
"Transaction Amount": transaction_amount, |
|
"Commission": commission |
|
}) |
|
return pd.DataFrame(result, columns=TABLE_HEADERS_TAB2) |
|
|
|
|
|
def process_file(file): |
|
if file is None: |
|
return None, "No file uploaded" |
|
file_path = file.name |
|
file_ext = os.path.splitext(file_path)[1].lower() |
|
if file_ext != '.pdf': |
|
return None, "Only PDF files are supported" |
|
|
|
|
|
raw_tables = extract_raw_tables_from_pdf(file_path) |
|
if raw_tables is None: |
|
return None, "Could not extract data from PDF" |
|
|
|
|
|
expected_columns = ["Date", "Reference", "Description", "Debit", "Credit", "Balance", "Balance in Trust"] |
|
normalized_tables = normalize_table_structure(raw_tables, expected_columns) |
|
if not normalized_tables: |
|
return None, "No valid tables after normalization" |
|
|
|
|
|
transactions = combine_tables(normalized_tables) |
|
if transactions is None: |
|
return None, "Failed to combine tables" |
|
|
|
|
|
processed_data = process_transactions(transactions) |
|
return processed_data, "Processing completed" |
|
|
|
|
|
def generate_table(file, current_table): |
|
if not file: |
|
return current_table, "Vui lòng tải lên file PDF", current_table |
|
try: |
|
table, status = process_file(file) |
|
if table is None or table.empty: |
|
return current_table, status, current_table |
|
|
|
|
|
if current_table is not None and not current_table.empty: |
|
current_table = current_table.dropna(axis=1, how='all') |
|
if not table.empty: |
|
table = table.dropna(axis=1, how='all') |
|
|
|
|
|
updated_table = pd.concat([current_table, table], ignore_index=True) if current_table is not None else table |
|
return updated_table, status, updated_table |
|
except Exception as e: |
|
logger.error(f"Lỗi xử lý file: {e}") |
|
return current_table, f"Lỗi: {str(e)}", current_table |
|
|
|
|
|
def export_and_reset(current_table): |
|
if current_table is None or current_table.empty: |
|
return None, pd.DataFrame(columns=TABLE_HEADERS_TAB2), "No data to export" |
|
|
|
|
|
output_dir = "output" |
|
if not os.path.exists(output_dir): |
|
try: |
|
os.makedirs(output_dir) |
|
logger.info(f"Thư mục '{output_dir}' đã được tạo thành công.") |
|
except Exception as e: |
|
logger.error(f"Lỗi khi tạo thư mục '{output_dir}': {e}") |
|
raise OSError(f"Không thể tạo thư mục '{output_dir}'. Vui lòng kiểm tra quyền truy cập.") |
|
|
|
|
|
csv_file = os.path.join(output_dir, "extracted_transactions.csv") |
|
current_table.to_csv(csv_file, index=False) |
|
logger.info(f"File CSV đã được lưu tại: {csv_file}") |
|
|
|
|
|
reset_table = pd.DataFrame(columns=TABLE_HEADERS_TAB2) |
|
return gr.File(value=csv_file), reset_table, "Export successful", reset_table |
|
|
|
|
|
with gr.Blocks(title="Multifunctional Application") as app: |
|
with gr.Tabs(): |
|
|
|
with gr.Tab("Invoice Extractor"): |
|
gr.Markdown("# Multilanguage Invoice Extractor") |
|
gr.Markdown("Demo #1") |
|
with gr.Column(): |
|
file_input_tab1 = gr.File(label="Upload Invoice (jpg, jpeg, png, pdf)") |
|
extract_btn_tab1 = gr.Button("Extract Contents") |
|
raw_text_output_tab1 = gr.Textbox(label="Contents for next step", lines=20) |
|
table_btn_tab1 = gr.Button("Next Step") |
|
output_table_tab1 = gr.Dataframe(headers=TABLE_HEADERS_TAB1, label="Extracted Information") |
|
export_btn_tab1 = gr.Button("Export to CSV & Reset") |
|
csv_output_tab1 = gr.File(label="Download CSV") |
|
export_status_tab1 = gr.Textbox(label="Export Status") |
|
table_state_tab1 = gr.State(value=pd.DataFrame(columns=TABLE_HEADERS_TAB1)) |
|
extract_btn_tab1.click( |
|
fn=extract_raw_text, |
|
inputs=[file_input_tab1, gr.State(""" |
|
You are an expert in extracting specific information from invoices in multiple languages and formats. Users will upload images or PDFs of invoices, and your task is to extract the following fields: Supplier Name, Invoice Number, Invoice Date, Description - Name, Description - Destination, Description - Period/Date, Project Code, Currency, Amount (Before GST), GST, Amount (After GST), and Converted Amount. Identify these fields based on common invoice patterns, even if labels or formats vary (e.g., "Total" might be "Amount (After GST)", "Tax" might be "GST"). If a field is not found or unclear, leave it blank. Return the extracted data strictly in this format, with no additional text or JSON: |
|
Supplier Name: [value] |
|
Invoice Number: [value] |
|
Invoice Date: [value] |
|
Description - Name: [value] |
|
Description - Destination: [value] |
|
Description - Period/Date: [value] |
|
Project Code: [value] |
|
Currency: [value] |
|
Amount (Before GST): [value] |
|
GST: [value] |
|
Amount (After GST): [value] |
|
Converted Amount: [value] |
|
""")], |
|
outputs=raw_text_output_tab1 |
|
) |
|
table_btn_tab1.click( |
|
fn=generate_table_tab1, |
|
inputs=[raw_text_output_tab1, table_state_tab1], |
|
outputs=[output_table_tab1, table_state_tab1] |
|
) |
|
export_btn_tab1.click( |
|
fn=export_and_reset_tab1, |
|
inputs=table_state_tab1, |
|
outputs=[csv_output_tab1, output_table_tab1, export_status_tab1, table_state_tab1] |
|
) |
|
|
|
|
|
with gr.Tab("Transaction Extractor"): |
|
gr.Markdown("# Transaction Extractor") |
|
with gr.Column(): |
|
file_input_tab2 = gr.File(label="Upload Bank Statement (PDF)") |
|
extract_btn_tab2 = gr.Button("Extract Transactions") |
|
status_output_tab2 = gr.Textbox(label="Status") |
|
table_output_tab2 = gr.Dataframe(headers=TABLE_HEADERS_TAB2, label="Extracted Transactions") |
|
export_btn_tab2 = gr.Button("Export to CSV & Reset") |
|
csv_output_tab2 = gr.File(label="Download CSV") |
|
export_status_tab2 = gr.Textbox(label="Export Status") |
|
table_state_tab2 = gr.State(value=pd.DataFrame(columns=TABLE_HEADERS_TAB2)) |
|
extract_btn_tab2.click( |
|
fn=generate_table, |
|
inputs=[file_input_tab2, table_state_tab2], |
|
outputs=[table_output_tab2, status_output_tab2, table_state_tab2] |
|
) |
|
export_btn_tab2.click( |
|
fn=export_and_reset, |
|
inputs=table_state_tab2, |
|
outputs=[csv_output_tab2, table_output_tab2, export_status_tab2, table_state_tab2] |
|
) |
|
|
|
app.launch(share=True) |