import base64 import json import requests import datetime import hashlib import hmac import logging import ntplib import time import os import tempfile import io from openai import OpenAI from openpyxl import Workbook import gradio as gr import re import fitz # PyMuPDF import pandas as pd from gradio_pdf import PDF # Import the new PDF component # Configure logging logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') # Get configuration from environment variables SECRET_ID = os.getenv("SECRET_ID", "AKID9EGD5tdKtpq5V1pkfbkwcJLOLEFVnJwp") SECRET_KEY = os.getenv("SECRET_KEY", "374ugKueFkK7DFA62675Gk9TizCGA49A") REGION = os.getenv("REGION", "ap-guangzhou") ENDPOINT = os.getenv("ENDPOINT", "lke.tencentcloudapi.com") SERVICE = "lke" ACTION = "ReconstructDocument" VERSION = "2023-11-30" # OpenAI API key OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "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") # Get NTP time def get_ntp_time(): ntp_client = ntplib.NTPClient() try: response = ntp_client.request('pool.ntp.org', version=3, timeout=5) return datetime.datetime.fromtimestamp(response.tx_time, datetime.timezone.utc) except Exception as e: logging.warning(f"Unable to get NTP time, using local time: {e}") return datetime.datetime.now(datetime.timezone.utc) # Signing function def sign(key, msg): return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() # Get authentication information def get_auth(secret_id, secret_key, host, method, params, headers): algorithm = "TC3-HMAC-SHA256" ntp_time = get_ntp_time() timestamp = int(ntp_time.timestamp()) date = ntp_time.strftime('%Y-%m-%d') http_request_method = method.upper() canonical_uri = "/" canonical_querystring = "" ct = headers.get("content-type", "application/x-www-form-urlencoded") payload = json.dumps(params) canonical_headers = f"content-type:{ct}\nhost:{host}\n" signed_headers = "content-type;host" hashed_request_payload = hashlib.sha256(payload.encode("utf-8")).hexdigest() canonical_request = (f"{http_request_method}\n{canonical_uri}\n{canonical_querystring}\n" f"{canonical_headers}\n{signed_headers}\n{hashed_request_payload}") credential_scope = f"{date}/{SERVICE}/tc3_request" hashed_canonical_request = hashlib.sha256(canonical_request.encode("utf-8")).hexdigest() string_to_sign = (f"{algorithm}\n{timestamp}\n{credential_scope}\n{hashed_canonical_request}") secret_date = sign(f"TC3{secret_key}".encode("utf-8"), date) secret_service = sign(secret_date, SERVICE) secret_signing = sign(secret_service, "tc3_request") signature = hmac.new(secret_signing, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() authorization = (f"{algorithm} Credential={secret_id}/{credential_scope}, " f"SignedHeaders={signed_headers}, Signature={signature}") return { "Authorization": authorization, "Host": host, "Content-Type": ct, "X-TC-Timestamp": str(timestamp), "X-TC-Version": VERSION, "X-TC-Action": ACTION, "X-TC-Region": REGION } # Extract information def extract_information(content): client = OpenAI(api_key=OPENAI_API_KEY) prompt = ( "There are some guides, respond in detailed content, respond without content in (), JSON begin with contracts value:\n" "1. Contract awarded date\n" "2. Construction location (This part of the content is in the title, not in the table; the address must be returned and should be detailed.)\n" "3. Tender reference\n" "4. Construction summary (in the 'particular' section)\n" "5. Contractor\n" "6. Contractor address(this is not company name, the address must be returned and should be detailed.)\n" "7. Amount\n" "8. Notice publish date (at the end of the content)" ) for attempt in range(3): # Try three times try: logging.info(f"Extracting information (Attempt {attempt + 1}/3)") response = client.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": "You are a helpful assistant designed to output JSON"}, {"role": "user", "content": f"{prompt}\n\n{content}"} ], response_format={"type": "json_object"} ) if response.choices[0].finish_reason == "stop": extracted_info = json.loads(response.choices[0].message.content) return json.dumps(extracted_info, ensure_ascii=False, indent=4) else: logging.warning(f"Warning: Unexpected completion reason - {response.choices[0].finish_reason}") except Exception as e: logging.error(f"Error: API call failed - {str(e)}") if attempt < 2: # If not the last attempt, wait before retrying time.sleep(5) return None # If all three attempts fail, return None. # JSON to Excel def json_to_excel(json_data): data = json.loads(json_data) wb = Workbook() ws = wb.active headers = ['contract_awarded_date', 'construction_location', 'tender_reference', 'construction_summary', 'contractor', 'contractor_address', 'amount', 'notice_publish_date'] ws.append(headers) # Create a helper function for exact matching def exact_match(key, target): key = ''.join(c.lower() for c in key if c.isalnum()) target = ''.join(c.lower() for c in target if c.isalnum()) return key == target for contract in data['contracts']: row = [] for header in headers: # Use exact matching to find the corresponding value matched_value = next((v for k, v in contract.items() if exact_match(header, k)), '') row.append(matched_value) ws.append(row) with tempfile.NamedTemporaryFile(delete=False, suffix='.xlsx') as tmp: wb.save(tmp.name) return tmp.name def clean_url(input_text): # Remove any leading or trailing quotes cleaned_url = input_text.strip().strip('"') return cleaned_url # New function: Process uploaded PDF def process_pdf(file): logging.info(f"Start processing PDF file: {type(file)}") try: if hasattr(file, 'name'): # If file is a file object with fitz.open(file.name) as doc: text_content = "" for page in doc: text_content += page.get_text() else: # If file is a string (file path) with fitz.open(file) as doc: text_content = "" for page in doc: text_content += page.get_text() logging.info("PDF processing successful") return text_content except Exception as e: logging.error(f"PDF processing error: {str(e)}") raise def preview_excel(excel_path): try: df = pd.read_excel(excel_path, nrows=10) preview_df = df.iloc[:10, :8] return gr.Dataframe(value=preview_df) except Exception as e: logging.error(f"Excel preview error: {str(e)}") return gr.Dataframe() def process_pdf_file(file): if file is None: logging.warning("No file uploaded") return "Please upload a PDF file.", None, gr.Dataframe() try: logging.info(f"Received file: {type(file)}, {file.name if hasattr(file, 'name') else 'No name'}") pdf_content = process_pdf(file) except Exception as e: logging.error(f"Error processing PDF file: {str(e)}", exc_info=True) return f"Error processing PDF file: {str(e)}", None, gr.Dataframe() try: json_data = extract_information(pdf_content) if json_data is None: logging.error("Failed to extract information") return "Error extracting information. Please try again later.", None, gr.Dataframe() excel_path = json_to_excel(json_data) excel_preview = preview_excel(excel_path) logging.info("File processing successful") return "Processing successful!", excel_path, excel_preview except Exception as e: logging.error(f"Error processing file: {str(e)}", exc_info=True) return f"Error processing file: {str(e)}", None, gr.Dataframe() # Gradio interface iface = gr.Interface( fn=process_pdf_file, inputs=[ PDF(label="Upload PDF File") # Only keep the label parameter ], outputs=[ gr.Textbox(label="Processing Status"), gr.File(label="Download Excel File"), gr.Dataframe(label="Excel Preview (First 10 rows, 8 columns)") ], title="PDF Document Processing and Information Extraction", description="Upload a PDF file, and the system will process it and generate an Excel result." ) # Run the Gradio app if __name__ == "__main__": iface.launch()