Spaces:
Sleeping
Sleeping
import os | |
import re | |
import json | |
import gradio as gr | |
import pandas as pd | |
import pdfplumber | |
import pytesseract | |
from pdf2image import convert_from_path | |
from huggingface_hub import InferenceClient | |
from fpdf import FPDF # Added for PDF generation | |
import tempfile # Added for temporary file handling | |
# Initialize with reliable free model | |
hf_token = os.getenv("HF_TOKEN") | |
client = InferenceClient(model="mistralai/Mistral-7B-Instruct-v0.2", token=hf_token) | |
def extract_excel_data(file_path): | |
"""Extract text from Excel file""" | |
df = pd.read_excel(file_path, engine='openpyxl') | |
return df.to_string(index=False) | |
def extract_text_from_pdf(pdf_path, is_scanned=False): | |
"""Extract text from PDF with fallback OCR""" | |
try: | |
# Try native PDF extraction first | |
with pdfplumber.open(pdf_path) as pdf: | |
text = "" | |
for page in pdf.pages: | |
# Extract tables first for structured data | |
tables = page.extract_tables() | |
for table in tables: | |
for row in table: | |
text += " | ".join(str(cell) for cell in row) + "\n" | |
text += "\n" | |
# Extract text for unstructured data | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text + "\n\n" | |
return text | |
except Exception as e: | |
print(f"Native PDF extraction failed: {str(e)}") | |
# Fallback to OCR for scanned PDFs | |
images = convert_from_path(pdf_path, dpi=200) | |
text = "" | |
for image in images: | |
text += pytesseract.image_to_string(image) + "\n" | |
return text | |
def parse_bank_statement(text, file_type): | |
"""Parse bank statement using LLM with fallback to rule-based parser""" | |
# Clean text differently based on file type | |
cleaned_text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]', '', text) | |
if file_type == 'pdf': | |
# PDF-specific cleaning | |
cleaned_text = re.sub(r'Page \d+ of \d+', '', cleaned_text, flags=re.IGNORECASE) | |
cleaned_text = re.sub(r'CropBox.*?MediaBox', '', cleaned_text, flags=re.IGNORECASE) | |
# Keep only lines that look like transactions | |
transaction_lines = [] | |
for line in cleaned_text.split('\n'): | |
if re.match(r'^\d{4}-\d{2}-\d{2}', line): # Date pattern | |
transaction_lines.append(line) | |
elif '|' in line and any(x in line for x in ['Date', 'Amount', 'Balance']): | |
transaction_lines.append(line) | |
cleaned_text = "\n".join(transaction_lines) | |
print(f"Cleaned text sample: {cleaned_text[:200]}...") | |
# Try rule-based parsing first for structured data | |
rule_based_data = rule_based_parser(cleaned_text) | |
if rule_based_data["transactions"]: | |
print("Using rule-based parser results") | |
return rule_based_data | |
# Fallback to LLM for unstructured data | |
print("Falling back to LLM parsing") | |
return llm_parser(cleaned_text) | |
def llm_parser(text): | |
"""LLM parser for unstructured text""" | |
# Craft precise prompt with strict JSON formatting instructions | |
prompt = f""" | |
<|system|> | |
You are a financial data parser. Extract transactions from bank statements and return ONLY valid JSON. | |
</s> | |
<|user|> | |
Extract all transactions from this bank statement with these exact fields: | |
- date (format: YYYY-MM-DD) | |
- description | |
- amount (format: 0.00) | |
- debit (format: 0.00) | |
- credit (format: 0.00) | |
- closing_balance (format: 0.00 or -0.00 for negative) | |
- category | |
Statement text: | |
{text[:3000]} [truncated if too long] | |
Return JSON with this exact structure: | |
{{ | |
"transactions": [ | |
{{ | |
"date": "2025-05-08", | |
"description": "Company XYZ Payroll", | |
"amount": "8315.40", | |
"debit": "0.00", | |
"credit": "8315.40", | |
"closing_balance": "38315.40", | |
"category": "Salary" | |
}} | |
] | |
}} | |
RULES: | |
1. Output ONLY the JSON object with no additional text | |
2. Keep amounts as strings with 2 decimal places | |
3. For missing values, use empty strings | |
4. Convert negative amounts to format "-123.45" | |
5. Map categories to: Salary, Groceries, Medical, Utilities, Entertainment, Dining, Misc | |
</s> | |
<|assistant|> | |
""" | |
try: | |
# Call LLM via Hugging Face Inference API | |
response = client.text_generation( | |
prompt, | |
max_new_tokens=2000, | |
temperature=0.01, | |
stop=["</s>"] # Updated to 'stop' parameter | |
) | |
print(f"LLM Response: {response}") | |
# Validate and clean JSON response | |
response = response.strip() | |
if not response.startswith('{'): | |
# Find the first { and last } to extract JSON | |
start_idx = response.find('{') | |
end_idx = response.rfind('}') | |
if start_idx != -1 and end_idx != -1: | |
response = response[start_idx:end_idx+1] | |
# Parse JSON and validate structure | |
data = json.loads(response) | |
if "transactions" not in data: | |
raise ValueError("Missing 'transactions' key in JSON") | |
return data | |
except Exception as e: | |
print(f"LLM Error: {str(e)}") | |
return {"transactions": []} | |
def rule_based_parser(text): | |
"""Enhanced fallback parser for structured tables""" | |
lines = [line.strip() for line in text.split('\n') if line.strip()] | |
# Find header line - more flexible detection | |
header_index = None | |
header_patterns = [ | |
r'Date\b', r'Description\b', r'Amount\b', | |
r'Debit\b', r'Credit\b', r'Closing\s*Balance\b', r'Category\b' | |
] | |
# First try: Look for a full header line | |
for i, line in enumerate(lines): | |
if all(re.search(pattern, line, re.IGNORECASE) for pattern in header_patterns[:3]): | |
header_index = i | |
break | |
# Second try: Look for any header indicators | |
if header_index is None: | |
for i, line in enumerate(lines): | |
if any(re.search(pattern, line, re.IGNORECASE) for pattern in header_patterns): | |
header_index = i | |
break | |
# Third try: Look for pipe-delimited headers | |
if header_index is None: | |
for i, line in enumerate(lines): | |
if '|' in line and any(p in line for p in ['Date', 'Amount', 'Balance']): | |
header_index = i | |
break | |
if header_index is None: | |
return {"transactions": []} | |
data_lines = lines[header_index + 1:] | |
transactions = [] | |
for line in data_lines: | |
# Handle both pipe-delimited and space-delimited formats | |
if '|' in line: | |
parts = [p.strip() for p in line.split('|') if p.strip()] | |
else: | |
# Space-delimited format - split by 2+ spaces | |
parts = re.split(r'\s{2,}', line) | |
# Skip lines that don't have enough parts | |
if len(parts) < 7: | |
continue | |
try: | |
# Handle transaction date validation | |
if not re.match(r'\d{4}-\d{2}-\d{2}', parts[0]): | |
continue | |
transactions.append({ | |
"date": parts[0], | |
"description": parts[1], | |
"amount": format_number(parts[2]), | |
"debit": format_number(parts[3]), | |
"credit": format_number(parts[4]), | |
"closing_balance": format_number(parts[5]), | |
"category": parts[6] | |
}) | |
except Exception as e: | |
print(f"Error parsing line: {str(e)}") | |
return {"transactions": transactions} | |
def format_number(value): | |
"""Format numeric values consistently""" | |
if not value or str(value).lower() in ['nan', 'nat']: | |
return "0.00" | |
# If it's already a number, format directly | |
if isinstance(value, (int, float)): | |
return f"{value:.2f}" | |
# Clean string values | |
value = str(value).replace(',', '').replace('$', '').strip() | |
# Handle negative numbers in parentheses | |
if '(' in value and ')' in value: | |
value = '-' + value.replace('(', '').replace(')', '') | |
# Handle empty values | |
if not value: | |
return "0.00" | |
# Standardize decimal format | |
if '.' not in value: | |
value += '.00' | |
# Ensure two decimal places | |
try: | |
num_value = float(value) | |
return f"{num_value:.2f}" | |
except ValueError: | |
# If we can't convert to float, return original but clean it | |
return value.split('.')[0] + '.' + value.split('.')[1][:2].ljust(2, '0') | |
def process_file(file, is_scanned=False): | |
"""Main processing function""" | |
if not file: | |
return empty_df() | |
file_path = file.name | |
file_ext = os.path.splitext(file_path)[1].lower() | |
try: | |
if file_ext == '.xlsx': | |
# Directly process Excel files without text conversion | |
df = pd.read_excel(file_path, engine='openpyxl') | |
# Normalize column names | |
df.columns = df.columns.str.strip().str.lower() | |
# Create mapping to expected columns | |
col_mapping = { | |
'date': 'date', | |
'description': 'description', | |
'amount': 'amount', | |
'debit': 'debit', | |
'credit': 'credit', | |
'closing balance': 'closing_balance', | |
'closing': 'closing_balance', | |
'balance': 'closing_balance', | |
'category': 'category' | |
} | |
# Create output DataFrame with required columns | |
output_df = pd.DataFrame() | |
for col in ['date', 'description', 'amount', 'debit', 'credit', 'closing_balance', 'category']: | |
if col in df.columns: | |
output_df[col] = df[col] | |
elif any(alias in col_mapping and col_mapping[alias] == col for alias in df.columns): | |
# Find alias | |
for alias in df.columns: | |
if alias in col_mapping and col_mapping[alias] == col: | |
output_df[col] = df[alias] | |
break | |
else: | |
output_df[col] = "" | |
# Format numeric columns | |
for col in ['amount', 'debit', 'credit', 'closing_balance']: | |
output_df[col] = output_df[col].apply(format_number) | |
# Rename columns for display | |
output_df.columns = ["Date", "Description", "Amount", "Debit", | |
"Credit", "Closing Balance", "Category"] | |
return output_df | |
elif file_ext == '.pdf': | |
text = extract_text_from_pdf(file_path, is_scanned=is_scanned) | |
parsed_data = parse_bank_statement(text, 'pdf') | |
df = pd.DataFrame(parsed_data["transactions"]) | |
# Ensure all required columns exist | |
required_cols = ["date", "description", "amount", "debit", | |
"credit", "closing_balance", "category"] | |
for col in required_cols: | |
if col not in df.columns: | |
df[col] = "" | |
# Format columns properly | |
df.columns = ["Date", "Description", "Amount", "Debit", | |
"Credit", "Closing Balance", "Category"] | |
return df | |
else: | |
return empty_df() | |
except Exception as e: | |
print(f"Processing error: {str(e)}") | |
return empty_df() | |
def empty_df(): | |
"""Return empty DataFrame with correct columns""" | |
return pd.DataFrame(columns=["Date", "Description", "Amount", "Debit", | |
"Credit", "Closing Balance", "Category"]) | |
# New function to generate PDF from DataFrame | |
def generate_pdf(df): | |
"""Generate PDF from DataFrame and return file path""" | |
if df.empty: | |
return None | |
# Create a PDF | |
pdf = FPDF() | |
pdf.add_page() | |
pdf.set_font("Arial", size=8) # Smaller font to fit more data | |
# Set column widths | |
col_widths = [22, 65, 20, 15, 15, 25, 20] # Adjusted to fit all columns | |
# Headers | |
headers = df.columns.tolist() | |
for i, header in enumerate(headers): | |
pdf.cell(col_widths[i], 10, header, border=1) | |
pdf.ln() | |
# Data | |
for _, row in df.iterrows(): | |
for i, col in enumerate(headers): | |
# Truncate long descriptions | |
value = str(row[col]) | |
if headers[i] == "Description" and len(value) > 30: | |
value = value[:27] + "..." | |
pdf.cell(col_widths[i], 10, value, border=1) | |
pdf.ln() | |
# Save to temporary file | |
temp_file = tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) | |
temp_file.close() | |
pdf.output(temp_file.name) | |
return temp_file.name | |
# Modified Gradio Interface | |
with gr.Blocks() as interface: # Changed to Blocks for more control | |
gr.Markdown("## AI Bank Statement Parser") | |
gr.Markdown("Extract structured transaction data from PDF/Excel bank statements") | |
# File input | |
file_input = gr.File(label="Upload Bank Statement (PDF/Excel)") | |
# Output dataframe | |
output_df = gr.Dataframe( | |
label="Parsed Transactions", | |
headers=["Date", "Description", "Amount", "Debit", "Credit", "Closing Balance", "Category"], | |
datatype=["date", "str", "number", "number", "number", "number", "str"] | |
) | |
# State to store the processed DataFrame | |
state_df = gr.State(value=pd.DataFrame()) | |
# Download button (initially hidden) | |
download_btn = gr.DownloadButton( | |
"Download as PDF", | |
visible=False, | |
elem_classes="download-btn" | |
) | |
# Process file and update state | |
def process_and_store(file): | |
df = process_file(file) | |
return df, df, gr.DownloadButton(visible=not df.empty) | |
# Connect components | |
file_input.change( | |
process_and_store, | |
inputs=[file_input], | |
outputs=[output_df, state_df, download_btn] | |
) | |
# Generate PDF when download button is clicked | |
def on_download_click(df): | |
return generate_pdf(df) | |
download_btn.click( | |
on_download_click, | |
inputs=[state_df], | |
outputs=[download_btn] | |
) | |
# Add custom CSS for the download button position | |
interface.css = """ | |
.download-btn { | |
margin-top: 20px !important; | |
margin-bottom: 30px !important; | |
} | |
""" | |
if __name__ == "__main__": | |
interface.launch() |