Info_Extraction / app.py
CurioChen's picture
Upload 2 files
ad72ad9 verified
import base64
import json
import requests
import datetime
import hashlib
import hmac
import logging
import ntplib
import time
import os
import tempfile
import io
from openai import OpenAI
from openpyxl import Workbook
import gradio as gr
import re
import fitz # PyMuPDF
import pandas as pd
from gradio_pdf import PDF # Import the new PDF component
# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Get configuration from environment variables
SECRET_ID = os.getenv("SECRET_ID", "AKID9EGD5tdKtpq5V1pkfbkwcJLOLEFVnJwp")
SECRET_KEY = os.getenv("SECRET_KEY", "374ugKueFkK7DFA62675Gk9TizCGA49A")
REGION = os.getenv("REGION", "ap-guangzhou")
ENDPOINT = os.getenv("ENDPOINT", "lke.tencentcloudapi.com")
SERVICE = "lke"
ACTION = "ReconstructDocument"
VERSION = "2023-11-30"
# OpenAI API key
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY",
"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")
# Get NTP time
def get_ntp_time():
ntp_client = ntplib.NTPClient()
try:
response = ntp_client.request('pool.ntp.org', version=3, timeout=5)
return datetime.datetime.fromtimestamp(response.tx_time, datetime.timezone.utc)
except Exception as e:
logging.warning(f"Unable to get NTP time, using local time: {e}")
return datetime.datetime.now(datetime.timezone.utc)
# Signing function
def sign(key, msg):
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest()
# Get authentication information
def get_auth(secret_id, secret_key, host, method, params, headers):
algorithm = "TC3-HMAC-SHA256"
ntp_time = get_ntp_time()
timestamp = int(ntp_time.timestamp())
date = ntp_time.strftime('%Y-%m-%d')
http_request_method = method.upper()
canonical_uri = "/"
canonical_querystring = ""
ct = headers.get("content-type", "application/x-www-form-urlencoded")
payload = json.dumps(params)
canonical_headers = f"content-type:{ct}\nhost:{host}\n"
signed_headers = "content-type;host"
hashed_request_payload = hashlib.sha256(payload.encode("utf-8")).hexdigest()
canonical_request = (f"{http_request_method}\n{canonical_uri}\n{canonical_querystring}\n"
f"{canonical_headers}\n{signed_headers}\n{hashed_request_payload}")
credential_scope = f"{date}/{SERVICE}/tc3_request"
hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest()
string_to_sign = (f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical_request}")
secret_date = sign(f"TC3{secret_key}".encode("utf-8"), date)
secret_service = sign(secret_date, SERVICE)
secret_signing = sign(secret_service, "tc3_request")
signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest()
authorization = (f"{algorithm} Credential={secret_id}/{credential_scope}, "
f"SignedHeaders={signed_headers}, Signature={signature}")
return {
"Authorization": authorization,
"Host": host,
"Content-Type": ct,
"X-TC-Timestamp": str(timestamp),
"X-TC-Version": VERSION,
"X-TC-Action": ACTION,
"X-TC-Region": REGION
}
# Extract information
def extract_information(content):
client = OpenAI(api_key=OPENAI_API_KEY)
prompt = (
"There are some guides, respond in detailed content, respond without content in (), JSON begin with contracts value:\n"
"1. Contract awarded date\n"
"2. Construction location (This part of the content is in the title, not in the table; the address must be returned and should be detailed.)\n"
"3. Tender reference\n"
"4. Construction summary (in the 'particular' section)\n"
"5. Contractor\n"
"6. Contractor address(this is not company name, the address must be returned and should be detailed.)\n"
"7. Amount\n"
"8. Notice publish date (at the end of the content)"
)
for attempt in range(3): # Try three times
try:
logging.info(f"Extracting information (Attempt {attempt + 1}/3)")
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant designed to output JSON"},
{"role": "user", "content": f"{prompt}\n\n{content}"}
],
response_format={"type": "json_object"}
)
if response.choices[0].finish_reason == "stop":
extracted_info = json.loads(response.choices[0].message.content)
return json.dumps(extracted_info, ensure_ascii=False, indent=4)
else:
logging.warning(f"Warning: Unexpected completion reason - {response.choices[0].finish_reason}")
except Exception as e:
logging.error(f"Error: API call failed - {str(e)}")
if attempt < 2: # If not the last attempt, wait before retrying
time.sleep(5)
return None # If all three attempts fail, return None.
# JSON to Excel
def json_to_excel(json_data):
data = json.loads(json_data)
wb = Workbook()
ws = wb.active
headers = ['contract_awarded_date', 'construction_location', 'tender_reference',
'construction_summary', 'contractor', 'contractor_address',
'amount', 'notice_publish_date']
ws.append(headers)
# Create a helper function for exact matching
def exact_match(key, target):
key = ''.join(c.lower() for c in key if c.isalnum())
target = ''.join(c.lower() for c in target if c.isalnum())
return key == target
for contract in data['contracts']:
row = []
for header in headers:
# Use exact matching to find the corresponding value
matched_value = next((v for k, v in contract.items() if exact_match(header, k)), '')
row.append(matched_value)
ws.append(row)
with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp:
wb.save(tmp.name)
return tmp.name
def clean_url(input_text):
# Remove any leading or trailing quotes
cleaned_url = input_text.strip().strip('"')
return cleaned_url
# New function: Process uploaded PDF
def process_pdf(file):
logging.info(f"Start processing PDF file: {type(file)}")
try:
if hasattr(file, 'name'):
# If file is a file object
with fitz.open(file.name) as doc:
text_content = ""
for page in doc:
text_content += page.get_text()
else:
# If file is a string (file path)
with fitz.open(file) as doc:
text_content = ""
for page in doc:
text_content += page.get_text()
logging.info("PDF processing successful")
return text_content
except Exception as e:
logging.error(f"PDF processing error: {str(e)}")
raise
def preview_excel(excel_path):
try:
df = pd.read_excel(excel_path, nrows=10)
preview_df = df.iloc[:10, :8]
return gr.Dataframe(value=preview_df)
except Exception as e:
logging.error(f"Excel preview error: {str(e)}")
return gr.Dataframe()
def process_pdf_file(file):
if file is None:
logging.warning("No file uploaded")
return "Please upload a PDF file.", None, gr.Dataframe()
try:
logging.info(f"Received file: {type(file)}, {file.name if hasattr(file, 'name') else 'No name'}")
pdf_content = process_pdf(file)
except Exception as e:
logging.error(f"Error processing PDF file: {str(e)}", exc_info=True)
return f"Error processing PDF file: {str(e)}", None, gr.Dataframe()
try:
json_data = extract_information(pdf_content)
if json_data is None:
logging.error("Failed to extract information")
return "Error extracting information. Please try again later.", None, gr.Dataframe()
excel_path = json_to_excel(json_data)
excel_preview = preview_excel(excel_path)
logging.info("File processing successful")
return "Processing successful!", excel_path, excel_preview
except Exception as e:
logging.error(f"Error processing file: {str(e)}", exc_info=True)
return f"Error processing file: {str(e)}", None, gr.Dataframe()
# Gradio interface
iface = gr.Interface(
fn=process_pdf_file,
inputs=[
PDF(label="Upload PDF File") # Only keep the label parameter
],
outputs=[
gr.Textbox(label="Processing Status"),
gr.File(label="Download Excel File"),
gr.Dataframe(label="Excel Preview (First 10 rows, 8 columns)")
],
title="PDF Document Processing and Information Extraction",
description="Upload a PDF file, and the system will process it and generate an Excel result."
)
# Run the Gradio app
if __name__ == "__main__":
iface.launch()