import gradio as gr import os from PIL import Image import google.generativeai as genai import logging import re import pandas as pd import fitz # PyMuPDF import io import pdfplumber from datetime import datetime # Thiết lập logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler("app.log"), logging.StreamHandler()] ) logger = logging.getLogger(__name__) # Cấu hình API key từ biến môi trường genai.configure(api_key=os.getenv('GOOGLE_API_KEY')) # Định nghĩa headers cố định cho bảng Tab 1 TABLE_HEADERS_TAB1 = [ "Supplier Name", "Invoice Number", "Invoice Date", "Description - Name", "Description - Destination", "Description - Period/Date", "Project Code", "Currency", "Amount (Before GST)", "GST", "Amount (After GST)", "Converted Amount" ] # Định nghĩa headers cố định cho bảng Tab 2 TABLE_HEADERS_TAB2 = [ "Reference - Code", "GL Posting Date", "Date", "Settlement Date", "Security Name", "Currency", "Quantity", "Price", "Transaction Amount", "Commission" ] # Hàm chuẩn hóa số def standardize_number(value, currency=None): """ Chuẩn hóa giá trị số từ chuỗi, tránh nhầm lẫn giữa dấu phẩy hàng nghìn và thập phân. """ if not value or pd.isna(value): return None value = str(value).replace(" ", "") logger.debug(f"Chuẩn hóa số: Nguyên gốc = {value}") if "," in value and "." in value: if value.rindex(",") < value.rindex("."): value = value.replace(",", "") else: value = value.replace(".", "").replace(",", ".") elif "," in value: parts = value.split(",") if len(parts[1]) <= 2: value = value.replace(",", ".") else: value = value.replace(",", "") elif "." in value: pass value = re.sub(r"[^\d.-]", "", value) try: num = float(value) formatted_num = "{:.2f}".format(num) logger.debug(f"Chuẩn hóa số: Kết quả = {formatted_num}") return formatted_num except ValueError: logger.warning(f"Không thể chuyển đổi giá trị thành số: {value}") return None # Hàm chuẩn hóa ngày tháng def standardize_date(date_str): """ Chuẩn hóa ngày tháng thành định dạng mm/dd/yyyy. Hỗ trợ nhiều định dạng đầu vào. """ if not date_str or pd.isna(date_str): return None formats = [ "%d/%m/%Y", "%d-%m-%Y", "%d-%b-%y", "%Y-%m-%d", "%d/%m/%y", "%m/%d/%Y", "%m-%d-%Y", "%d-%b-%Y" ] for fmt in formats: try: return datetime.strptime(date_str, fmt).strftime("%m/%d/%Y") except ValueError: continue logger.warning(f"Không thể chuẩn hóa ngày: {date_str}") return None # Hàm gọi mô hình Gemini để trích xuất thông tin (Tab 1) def get_response(model, input_text, image_parts, prompt): logger.info("Bắt đầu gọi mô hình Gemini-1.5-flash") try: response = model.generate_content([input_text, image_parts[0], prompt]) logger.info(f"Phản hồi từ API: {response.text}") return response.text except Exception as e: logger.error(f"Lỗi khi gọi API: {str(e)}") return f"Error: {str(e)}" # Hàm kiểm tra và xử lý file (Tab 1) def check_and_process_file(file): if file is None: return False, "No file uploaded", None file_path = file.name file_ext = os.path.splitext(file_path)[1].lower() logger.info(f"File path: {file_path}, Extension: {file_ext}") allowed_types = ['.jpg', '.jpeg', '.png', '.pdf'] if file_ext not in allowed_types: return False, f"Invalid file extension: {file_ext}. Please upload a file with extension: {allowed_types}", None return True, "File type valid", file_path # Hàm xử lý PDF thành danh sách ảnh (Tab 1) def process_pdf(file_path): logger.info(f"Xử lý file PDF: {file_path}") try: doc = fitz.open(file_path) images = [] for page_num in range(len(doc)): page = doc.load_page(page_num) pix = page.get_pixmap() img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) images.append(img) doc.close() logger.info(f"Trích xuất {len(images)} trang từ PDF") return images except Exception as e: logger.error(f"Lỗi khi xử lý PDF: {str(e)}") return [] # Hàm chuyển đổi ảnh thành JPEG bytes (Tab 1) def image_to_jpeg_bytes(image): img_byte_arr = io.BytesIO() image.save(img_byte_arr, format="JPEG") return img_byte_arr.getvalue() # Hàm trích xuất văn bản thô từ ảnh hoặc PDF (Tab 1) def extract_raw_text(uploaded_file, input_prompt): is_valid, message, file_path = check_and_process_file(uploaded_file) if not is_valid: logger.error(message) return message try: if file_path.lower().endswith('.pdf'): images = process_pdf(file_path) if not images: return "Error: Could not process PDF" full_text = "" for i, img in enumerate(images): logger.info(f"Xử lý trang {i+1} từ PDF") img_data = image_to_jpeg_bytes(img) image_data = [{"mime_type": "image/jpeg", "data": img_data}] model = genai.GenerativeModel('gemini-1.5-flash') text = get_response(model, input_prompt, image_data, input_prompt) full_text += text + "\n" raw_text = full_text.strip() else: image = Image.open(file_path) img_data = image_to_jpeg_bytes(image) image_data = [{"mime_type": "image/jpeg", "data": img_data}] logger.info(f"Xử lý file ảnh: {file_path}") model = genai.GenerativeModel('gemini-1.5-flash') raw_text = get_response(model, input_prompt, image_data, input_prompt).strip() logger.info(f"Văn bản thô từ mô hình: {raw_text}") return raw_text except Exception as e: logger.error(f"Lỗi khi xử lý file: {str(e)}") return f"Error processing file: {str(e)}" # Hàm chuyển văn bản thô thành bảng và nối vào bảng hiện tại (Tab 1) def generate_table_tab1(raw_text, current_table): logger.info("Chuyển văn bản thô thành bảng") logger.info(f"Văn bản thô đầu vào: {raw_text}") if not raw_text or "Error" in raw_text: return current_table if current_table is not None else pd.DataFrame(columns=TABLE_HEADERS_TAB1), current_table fields = { "Supplier Name": r"Supplier Name: (.*?)(?=\n|$)", "Invoice Number": r"Invoice Number: (.*?)(?=\n|$)", "Invoice Date": r"Invoice Date: (.*?)(?=\n|$)", "Description - Name": r"Description - Name: (.*?)(?=\n|$)", "Description - Destination": r"Description - Destination: (.*?)(?=\n|$)", "Description - Period/Date": r"Description - Period/Date: (.*?)(?=\n|$)", "Project Code": r"Project Code: (.*?)(?=\n|$)", "Currency": r"Currency: (.*?)(?=\n|$)", "Amount (Before GST)": r"Amount \(Before GST\): (.*?)(?=\n|$)", "GST": r"GST: (.*?)(?=\n|$)", "Amount (After GST)": r"Amount \(After GST\): (.*?)(?=\n|$)", "Converted Amount": r"Converted Amount: (.*?)(?=\n|$)" } result = {header: "" for header in TABLE_HEADERS_TAB1} for field, pattern in fields.items(): match = re.search(pattern, raw_text, re.DOTALL) if match and match.group(1) != "Not found" and match.group(1).strip(): result[field] = match.group(1).strip() logger.info(f"Trích xuất {field}: {result[field]}") else: logger.info(f"Không tìm thấy {field}") new_row = pd.DataFrame([result], columns=TABLE_HEADERS_TAB1) if current_table is not None and not current_table.empty: updated_table = pd.concat([current_table, new_row], ignore_index=True) else: updated_table = new_row logger.info(f"Kết quả bảng: {updated_table.to_dict()}") return updated_table, updated_table # Hàm xuất CSV và reset bảng (Tab 1) def export_and_reset_tab1(current_table): if current_table is None or current_table.empty: return None, pd.DataFrame(columns=TABLE_HEADERS_TAB1), "No data to export", pd.DataFrame(columns=TABLE_HEADERS_TAB1) csv_file = "extracted_invoices.csv" current_table.to_csv(csv_file, index=False) logger.info(f"Đã xuất bảng ra file: {csv_file}") reset_table = pd.DataFrame(columns=TABLE_HEADERS_TAB1) return gr.File(value=csv_file), reset_table, "Export successful, table reset", reset_table # Hàm trích xuất thô từ PDF (Tab 2) def extract_raw_tables_from_pdf(file_path): tables = [] try: with pdfplumber.open(file_path) as pdf: for page_num in range(len(pdf.pages)): page = pdf.pages[page_num] table = page.extract_table() if table and len(table) > 1: df = pd.DataFrame(table[1:], columns=table[0]) df["Page"] = page_num + 1 tables.append(df) logger.info(f"Trích xuất thành công bảng từ trang {page_num + 1}") logger.debug(f"Các cột trích xuất: {df.columns.tolist()}") return tables except Exception as e: logger.error(f"Lỗi trích xuất PDF: {e}") return None # Hàm chuẩn hóa cấu trúc cột (Tab 2) def normalize_table_structure(tables, expected_columns): normalized_tables = [] for df in tables: if df is not None and not df.empty: df.columns = df.columns.str.strip() for col in expected_columns: if col not in df.columns: df[col] = pd.NA df = df[expected_columns] normalized_tables.append(df) logger.debug(f"Các cột sau khi chuẩn hóa: {df.columns.tolist()}") return normalized_tables # Hàm nối các bảng (Tab 2) def combine_tables(normalized_tables): if not normalized_tables: logger.warning("Không có bảng nào để nối") return None try: combined_df = pd.concat(normalized_tables, ignore_index=True) logger.info(f"Nối thành công {len(normalized_tables)} bảng") logger.debug(f"Dữ liệu sau khi nối:\n{combined_df.head()}") return combined_df except Exception as e: logger.error(f"Lỗi khi nối bảng: {e}") return None # Hàm trích xuất thông tin từ Description (Tab 2) def parse_description(description): """ Trích xuất thông tin từ cột Description. """ if not description or pd.isna(description): return None, None, None, None pattern = r"(Bought|Sold)\s+([\d,]+)\s+([\w\s&]+?)\s*@\s*([A-Z]{3})\s*([\d,]+\.?\d*)" match = re.search(pattern, description, re.IGNORECASE) if match: action, quantity, security, currency, price = match.groups() quantity = standardize_number(quantity) price = standardize_number(price) return security.strip(), currency, quantity, price logger.warning(f"Không thể phân tích Description: {description}") return None, None, None, None # Hàm tìm Settlement Date (Tab 2) def find_settlement_date(transactions, ref_code, trans_type, gl_date): """ Tìm ngày thanh toán (Settlement Date) dựa trên loại giao dịch và mã tham chiếu. Cải tiến: Nới lỏng điều kiện, thêm log chi tiết, và sử dụng GL Posting Date nếu không tìm thấy. """ logger.debug(f"Tìm Settlement Date cho ref_code: {ref_code}, trans_type: {trans_type}") if trans_type == "TSF": pattern = re.compile(rf"Amount\s*paid\s*TFR\s*to\s*TRUST\s*\(\s*{re.escape(ref_code)}\s*\)", re.IGNORECASE) candidates = [] for _, row in transactions.iterrows(): reference = str(row.get("Reference", "")) description = str(row.get("Description", "")) date = str(row.get("Date", "")) logger.debug(f"TSF - Kiểm tra dòng: Reference={reference}, Description={description}") if reference.startswith("PY") and pattern.search(description): settlement_date = standardize_date(date) if settlement_date: candidates.append(settlement_date) logger.debug(f"TSF - Tìm thấy ứng viên: {settlement_date}") if candidates: settlement_date = candidates[0] # Lấy ngày đầu tiên nếu có nhiều kết quả logger.info(f"Settlement Date tìm thấy cho TSF {ref_code}: {settlement_date}") return settlement_date logger.warning(f"Không tìm thấy Settlement Date cho TSF {ref_code}") elif trans_type == "TPF": rc_pattern = re.compile(rf"TRUSTTFR_TRTTFR\s*\(\s*([A-Z0-9]+)\s*\)", re.IGNORECASE) rc_code = None for _, row in transactions.iterrows(): reference = str(row.get("Reference", "")) description = str(row.get("Description", "")) logger.debug(f"TPF - Kiểm tra RC: Reference={reference}, Description={description}") if reference.startswith("RC") and rc_pattern.search(description): rc_code = rc_pattern.search(description).group(1) logger.debug(f"Tìm thấy RC Code cho TPF {ref_code}: {rc_code}") break if rc_code: wc_pattern = re.compile(rf"Withdrawal\s*from\s*TRUST.*\({re.escape(rc_code)}\)", re.IGNORECASE) candidates = [] for _, row in transactions.iterrows(): reference = str(row.get("Reference", "")) description = str(row.get("Description", "")) date = str(row.get("Date", "")) logger.debug(f"TPF - Kiểm tra WC: Reference={reference}, Description={description}") if reference.startswith("WC") and wc_pattern.search(description): settlement_date = standardize_date(date) if settlement_date: candidates.append(settlement_date) logger.debug(f"TPF - Tìm thấy ứng viên: {settlement_date}") if candidates: settlement_date = candidates[0] # Lấy ngày đầu tiên nếu có nhiều kết quả logger.info(f"Settlement Date tìm thấy cho TPF {ref_code}: {settlement_date}") return settlement_date logger.warning(f"Không tìm thấy Settlement Date cho RC {rc_code}") else: logger.warning(f"Không tìm thấy RC Code cho TPF {ref_code}") # Nếu không tìm thấy Settlement Date, sử dụng GL Posting Date làm mặc định logger.warning(f"Không tìm thấy Settlement Date cho ref_code: {ref_code}, trans_type: {trans_type}. Sử dụng GL Posting Date: {gl_date}") return gl_date # Hàm xử lý giao dịch (Tab 2) def process_transactions(transactions): """ Xử lý dữ liệu giao dịch và tạo bảng kết quả. """ result = [] for _, row in transactions.iterrows(): ref_code = str(row.get("Reference", "")) trans_type = "TPF" if ref_code.startswith("TPF") else "TSF" if ref_code.startswith("TSF") else None if not trans_type: logger.warning(f"Bỏ qua dòng do trans_type không hợp lệ: {ref_code}") continue # Trích xuất thông tin từ Description security, currency, quantity, price = parse_description(row.get("Description")) if not all([security, currency, quantity, price]): logger.warning(f"Bỏ qua dòng do thiếu dữ liệu từ Description: {ref_code}") continue # Chuẩn hóa ngày tháng gl_date = standardize_date(row.get("Date")) if not gl_date: logger.warning(f"Bỏ qua dòng do không chuẩn hóa được ngày: {row.get('Date')}") continue # Tìm Settlement Date settlement_date = find_settlement_date(transactions, ref_code, trans_type, gl_date) # Tính toán Transaction Amount qty = float(quantity) prc = float(price) transaction_amount = qty * prc # Tính toán Commission (tạm thời để None, sẽ giải quyết sau nếu cần) balance_in_trust = standardize_number(row.get("Balance in Trust"), currency) commission = None if balance_in_trust: bal = float(balance_in_trust) if trans_type == "TPF": # Mua: Tiền bị trừ commission = abs(bal + transaction_amount) elif trans_type == "TSF": # Bán: Tiền được cộng commission = abs(bal - transaction_amount) # Thêm vào kết quả result.append({ "Reference - Code": ref_code, "GL Posting Date": gl_date, "Date": gl_date, "Settlement Date": settlement_date, "Security Name": security, "Currency": currency, "Quantity": -qty if trans_type == "TSF" else qty, "Price": "{:.2f}".format(prc), "Transaction Amount": "{:.2f}".format(transaction_amount), "Commission": "{:.2f}".format(commission) if commission is not None else None }) processed_data = pd.DataFrame(result, columns=TABLE_HEADERS_TAB2) logger.debug(f"Dữ liệu đã xử lý:\n{processed_data}") return processed_data # Hàm xử lý file PDF và tạo bảng (Tab 2) def generate_table_tab2(file_input, current_table): """ Xử lý file PDF và nối dữ liệu vào bảng hiện tại. """ if file_input is None: return current_table, "Vui lòng tải lên file PDF", current_table file_path = file_input.name file_ext = os.path.splitext(file_path)[1].lower() if file_ext != '.pdf': return current_table, "Chỉ hỗ trợ file PDF", current_table try: # Trích xuất bảng từ PDF tables = extract_raw_tables_from_pdf(file_path) if tables is None: return current_table, "Không thể trích xuất bảng từ PDF", current_table # Chuẩn hóa cấu trúc bảng expected_columns = ["Date", "Reference", "Description", "Debit", "Credit", "Balance", "Balance in Trust"] normalized_tables = normalize_table_structure(tables, expected_columns) if not normalized_tables: return current_table, "Không có bảng nào sau khi chuẩn hóa", current_table # Nối các bảng transactions = combine_tables(normalized_tables) if transactions is None: return current_table, "Không thể nối bảng", current_table # Xử lý giao dịch processed_data = process_transactions(transactions) if processed_data.empty: return current_table, "Không có dữ liệu hợp lệ để xử lý", current_table # Nối dữ liệu mới vào bảng hiện tại if current_table is not None and not current_table.empty: updated_table = pd.concat([current_table, processed_data], ignore_index=True) else: updated_table = processed_data return updated_table, "Xử lý thành công", updated_table except Exception as e: logger.error(f"Lỗi khi xử lý file: {str(e)}") return current_table, f"Error processing file: {str(e)}", current_table # Hàm xuất CSV và reset (Tab 2) def export_and_reset_tab2(current_table): if current_table is None or current_table.empty: return None, pd.DataFrame(columns=TABLE_HEADERS_TAB2), "No data to export", pd.DataFrame(columns=TABLE_HEADERS_TAB2) output_dir = "output" if not os.path.exists(output_dir): os.makedirs(output_dir) logger.info(f"Thư mục '{output_dir}' đã được tạo.") csv_file = os.path.join(output_dir, "extracted_transactions.csv") current_table.to_csv(csv_file, index=False) logger.info(f"File CSV đã được lưu tại: {csv_file}") reset_table = pd.DataFrame(columns=TABLE_HEADERS_TAB2) return gr.File(value=csv_file), reset_table, "Export successful, table reset", reset_table # Giao diện Gradio với Tabs with gr.Blocks(title="Multifunctional Application") as app: with gr.Tabs(): # Tab 1: Invoice Extractor with gr.Tab("Invoice Extractor"): gr.Markdown("# Multilanguage Invoice Extractor") gr.Markdown("Demo #1") with gr.Column(): file_input_tab1 = gr.File(label="Upload Invoice (jpg, jpeg, png, pdf)") extract_btn_tab1 = gr.Button("Extract Contents") raw_text_output_tab1 = gr.Textbox(label="Contents for next step", lines=20) table_btn_tab1 = gr.Button("Next Step") output_table_tab1 = gr.Dataframe(headers=TABLE_HEADERS_TAB1, label="Extracted Information") export_btn_tab1 = gr.Button("Export to CSV & Reset") csv_output_tab1 = gr.File(label="Download CSV") export_status_tab1 = gr.Textbox(label="Export Status") table_state_tab1 = gr.State(value=pd.DataFrame(columns=TABLE_HEADERS_TAB1)) extract_btn_tab1.click( fn=extract_raw_text, inputs=[file_input_tab1, gr.State(""" You are an expert in extracting specific information from invoices in multiple languages and formats. Users will upload images or PDFs of invoices, and your task is to extract the following fields: Supplier Name, Invoice Number, Invoice Date, Description - Name, Description - Destination, Description - Period/Date, Project Code, Currency, Amount (Before GST), GST, Amount (After GST), and Converted Amount. Identify these fields based on common invoice patterns, even if labels or formats vary (e.g., "Total" might be "Amount (After GST)", "Tax" might be "GST"). If a field is not found or unclear, leave it blank. Return the extracted data strictly in this format, with no additional text or JSON: Supplier Name: [value] Invoice Number: [value] Invoice Date: [value] Description - Name: [value] Description - Destination: [value] Description - Period/Date: [value] Project Code: [value] Currency: [value] Amount (Before GST): [value] GST: [value] Amount (After GST): [value] Converted Amount: [value] """)], outputs=raw_text_output_tab1 ) table_btn_tab1.click( fn=generate_table_tab1, inputs=[raw_text_output_tab1, table_state_tab1], outputs=[output_table_tab1, table_state_tab1] ) export_btn_tab1.click( fn=export_and_reset_tab1, inputs=table_state_tab1, outputs=[csv_output_tab1, output_table_tab1, export_status_tab1, table_state_tab1] ) # Tab 2: Transaction Extractor with gr.Tab("Transaction Extractor"): gr.Markdown("# Transaction Extractor") with gr.Column(): file_input_tab2 = gr.File(label="Upload Bank Statement (PDF)") extract_btn_tab2 = gr.Button("Extract Transactions") status_output_tab2 = gr.Textbox(label="Status") table_output_tab2 = gr.Dataframe(headers=TABLE_HEADERS_TAB2, label="Extracted Transactions") export_btn_tab2 = gr.Button("Export to CSV & Reset") csv_output_tab2 = gr.File(label="Download CSV") export_status_tab2 = gr.Textbox(label="Export Status") table_state_tab2 = gr.State(value=pd.DataFrame(columns=TABLE_HEADERS_TAB2)) extract_btn_tab2.click( fn=generate_table_tab2, inputs=[file_input_tab2, table_state_tab2], outputs=[table_output_tab2, status_output_tab2, table_state_tab2] ) export_btn_tab2.click( fn=export_and_reset_tab2, inputs=table_state_tab2, outputs=[csv_output_tab2, table_output_tab2, export_status_tab2, table_state_tab2] ) app.launch(share=True)