Spaces:
Sleeping
Sleeping
import base64 | |
import json | |
import requests | |
import datetime | |
import hashlib | |
import hmac | |
import logging | |
import ntplib | |
import time | |
import os | |
import tempfile | |
import io | |
from openai import OpenAI | |
from openpyxl import Workbook | |
import gradio as gr | |
import re | |
import fitz # PyMuPDF | |
import pandas as pd | |
from gradio_pdf import PDF # Import the new PDF component | |
# Configure logging | |
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Get configuration from environment variables | |
SECRET_ID = os.getenv("SECRET_ID", "AKID9EGD5tdKtpq5V1pkfbkwcJLOLEFVnJwp") | |
SECRET_KEY = os.getenv("SECRET_KEY", "374ugKueFkK7DFA62675Gk9TizCGA49A") | |
REGION = os.getenv("REGION", "ap-guangzhou") | |
ENDPOINT = os.getenv("ENDPOINT", "lke.tencentcloudapi.com") | |
SERVICE = "lke" | |
ACTION = "ReconstructDocument" | |
VERSION = "2023-11-30" | |
# OpenAI API key | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", | |
"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") | |
# Get NTP time | |
def get_ntp_time(): | |
ntp_client = ntplib.NTPClient() | |
try: | |
response = ntp_client.request('pool.ntp.org', version=3, timeout=5) | |
return datetime.datetime.fromtimestamp(response.tx_time, datetime.timezone.utc) | |
except Exception as e: | |
logging.warning(f"Unable to get NTP time, using local time: {e}") | |
return datetime.datetime.now(datetime.timezone.utc) | |
# Signing function | |
def sign(key, msg): | |
return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() | |
# Get authentication information | |
def get_auth(secret_id, secret_key, host, method, params, headers): | |
algorithm = "TC3-HMAC-SHA256" | |
ntp_time = get_ntp_time() | |
timestamp = int(ntp_time.timestamp()) | |
date = ntp_time.strftime('%Y-%m-%d') | |
http_request_method = method.upper() | |
canonical_uri = "/" | |
canonical_querystring = "" | |
ct = headers.get("content-type", "application/x-www-form-urlencoded") | |
payload = json.dumps(params) | |
canonical_headers = f"content-type:{ct}\nhost:{host}\n" | |
signed_headers = "content-type;host" | |
hashed_request_payload = hashlib.sha256(payload.encode("utf-8")).hexdigest() | |
canonical_request = (f"{http_request_method}\n{canonical_uri}\n{canonical_querystring}\n" | |
f"{canonical_headers}\n{signed_headers}\n{hashed_request_payload}") | |
credential_scope = f"{date}/{SERVICE}/tc3_request" | |
hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest() | |
string_to_sign = (f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical_request}") | |
secret_date = sign(f"TC3{secret_key}".encode("utf-8"), date) | |
secret_service = sign(secret_date, SERVICE) | |
secret_signing = sign(secret_service, "tc3_request") | |
signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() | |
authorization = (f"{algorithm} Credential={secret_id}/{credential_scope}, " | |
f"SignedHeaders={signed_headers}, Signature={signature}") | |
return { | |
"Authorization": authorization, | |
"Host": host, | |
"Content-Type": ct, | |
"X-TC-Timestamp": str(timestamp), | |
"X-TC-Version": VERSION, | |
"X-TC-Action": ACTION, | |
"X-TC-Region": REGION | |
} | |
# Extract information | |
def extract_information(content): | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
prompt = ( | |
"There are some guides, respond in detailed content, respond without content in (), JSON begin with contracts value:\n" | |
"1. Contract awarded date\n" | |
"2. Construction location (This part of the content is in the title, not in the table; the address must be returned and should be detailed.)\n" | |
"3. Tender reference\n" | |
"4. Construction summary (in the 'particular' section)\n" | |
"5. Contractor\n" | |
"6. Contractor address(this is not company name, the address must be returned and should be detailed.)\n" | |
"7. Amount\n" | |
"8. Notice publish date (at the end of the content)" | |
) | |
for attempt in range(3): # Try three times | |
try: | |
logging.info(f"Extracting information (Attempt {attempt + 1}/3)") | |
response = client.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{"role": "system", "content": "You are a helpful assistant designed to output JSON"}, | |
{"role": "user", "content": f"{prompt}\n\n{content}"} | |
], | |
response_format={"type": "json_object"} | |
) | |
if response.choices[0].finish_reason == "stop": | |
extracted_info = json.loads(response.choices[0].message.content) | |
return json.dumps(extracted_info, ensure_ascii=False, indent=4) | |
else: | |
logging.warning(f"Warning: Unexpected completion reason - {response.choices[0].finish_reason}") | |
except Exception as e: | |
logging.error(f"Error: API call failed - {str(e)}") | |
if attempt < 2: # If not the last attempt, wait before retrying | |
time.sleep(5) | |
return None # If all three attempts fail, return None. | |
# JSON to Excel | |
def json_to_excel(json_data): | |
data = json.loads(json_data) | |
wb = Workbook() | |
ws = wb.active | |
headers = ['contract_awarded_date', 'construction_location', 'tender_reference', | |
'construction_summary', 'contractor', 'contractor_address', | |
'amount', 'notice_publish_date'] | |
ws.append(headers) | |
# Create a helper function for exact matching | |
def exact_match(key, target): | |
key = ''.join(c.lower() for c in key if c.isalnum()) | |
target = ''.join(c.lower() for c in target if c.isalnum()) | |
return key == target | |
for contract in data['contracts']: | |
row = [] | |
for header in headers: | |
# Use exact matching to find the corresponding value | |
matched_value = next((v for k, v in contract.items() if exact_match(header, k)), '') | |
row.append(matched_value) | |
ws.append(row) | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: | |
wb.save(tmp.name) | |
return tmp.name | |
def clean_url(input_text): | |
# Remove any leading or trailing quotes | |
cleaned_url = input_text.strip().strip('"') | |
return cleaned_url | |
# New function: Process uploaded PDF | |
def process_pdf(file): | |
logging.info(f"Start processing PDF file: {type(file)}") | |
try: | |
if hasattr(file, 'name'): | |
# If file is a file object | |
with fitz.open(file.name) as doc: | |
text_content = "" | |
for page in doc: | |
text_content += page.get_text() | |
else: | |
# If file is a string (file path) | |
with fitz.open(file) as doc: | |
text_content = "" | |
for page in doc: | |
text_content += page.get_text() | |
logging.info("PDF processing successful") | |
return text_content | |
except Exception as e: | |
logging.error(f"PDF processing error: {str(e)}") | |
raise | |
def preview_excel(excel_path): | |
try: | |
df = pd.read_excel(excel_path, nrows=10) | |
preview_df = df.iloc[:10, :8] | |
return gr.Dataframe(value=preview_df) | |
except Exception as e: | |
logging.error(f"Excel preview error: {str(e)}") | |
return gr.Dataframe() | |
def process_pdf_file(file): | |
if file is None: | |
logging.warning("No file uploaded") | |
return "Please upload a PDF file.", None, gr.Dataframe() | |
try: | |
logging.info(f"Received file: {type(file)}, {file.name if hasattr(file, 'name') else 'No name'}") | |
pdf_content = process_pdf(file) | |
except Exception as e: | |
logging.error(f"Error processing PDF file: {str(e)}", exc_info=True) | |
return f"Error processing PDF file: {str(e)}", None, gr.Dataframe() | |
try: | |
json_data = extract_information(pdf_content) | |
if json_data is None: | |
logging.error("Failed to extract information") | |
return "Error extracting information. Please try again later.", None, gr.Dataframe() | |
excel_path = json_to_excel(json_data) | |
excel_preview = preview_excel(excel_path) | |
logging.info("File processing successful") | |
return "Processing successful!", excel_path, excel_preview | |
except Exception as e: | |
logging.error(f"Error processing file: {str(e)}", exc_info=True) | |
return f"Error processing file: {str(e)}", None, gr.Dataframe() | |
# Gradio interface | |
iface = gr.Interface( | |
fn=process_pdf_file, | |
inputs=[ | |
PDF(label="Upload PDF File") # Only keep the label parameter | |
], | |
outputs=[ | |
gr.Textbox(label="Processing Status"), | |
gr.File(label="Download Excel File"), | |
gr.Dataframe(label="Excel Preview (First 10 rows, 8 columns)") | |
], | |
title="PDF Document Processing and Information Extraction", | |
description="Upload a PDF file, and the system will process it and generate an Excel result." | |
) | |
# Run the Gradio app | |
if __name__ == "__main__": | |
iface.launch() |