|
import os |
|
from pathlib import Path |
|
import fitz |
|
from PIL import Image |
|
from transformers import Qwen2VLForConditionalGeneration, AutoProcessor |
|
import torch |
|
import gradio as gr |
|
|
|
|
|
OUTPUT_DIR = Path("outputs") |
|
OUTPUT_DIR.mkdir(exist_ok=True) |
|
|
|
def generate_page_image(pdf_path, page_num): |
|
""" |
|
Generate an image from a specific PDF page for analysis |
|
""" |
|
try: |
|
|
|
pdf_document = fitz.open(pdf_path) |
|
page = pdf_document[page_num] |
|
|
|
|
|
rect = page.rect |
|
width = rect.width |
|
height = rect.height |
|
|
|
|
|
|
|
zoom = 1000 / max(width, height) |
|
|
|
|
|
mat = fitz.Matrix(zoom, zoom) |
|
|
|
|
|
pix = page.get_pixmap(matrix=mat) |
|
|
|
|
|
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) |
|
|
|
|
|
image_path = OUTPUT_DIR / f"page_{page_num + 1}.png" |
|
img.save(image_path, "PNG") |
|
|
|
pdf_document.close() |
|
return image_path |
|
except Exception as e: |
|
print(f"Error generating image for page {page_num + 1}: {str(e)}") |
|
return None |
|
|
|
def extract_text_from_pdf(pdf_path, page_num): |
|
""" |
|
Extract text directly from a specific PDF page |
|
""" |
|
try: |
|
|
|
pdf_document = fitz.open(pdf_path) |
|
page = pdf_document[page_num] |
|
|
|
|
|
text = page.get_text("text") |
|
|
|
pdf_document.close() |
|
return text.strip() |
|
except Exception as e: |
|
print(f"Error extracting text from page {page_num + 1}: {str(e)}") |
|
return "" |
|
|
|
def analyze_image(image_path): |
|
""" |
|
Analyze image content using Qwen2.5 VL model for detailed description |
|
""" |
|
try: |
|
|
|
model = Qwen2VLForConditionalGeneration.from_pretrained( |
|
"Qwen/Qwen2-VL-72B-Instruct", |
|
torch_dtype=torch.float16, |
|
device_map="auto" |
|
) |
|
processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-72B-Instruct") |
|
|
|
|
|
image = Image.open(image_path).convert('RGB') |
|
|
|
|
|
messages = [ |
|
{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "image", "image": image}, |
|
{"type": "text", "text": "Provide a detailed description of the content in this image, focusing on text, layout, and any diagrams or figures."} |
|
] |
|
} |
|
] |
|
|
|
|
|
text_prompt = processor.apply_chat_template(messages, add_generation_prompt=True) |
|
inputs = processor( |
|
text=text_prompt, |
|
images=[image], |
|
padding=True, |
|
return_tensors="pt" |
|
) |
|
|
|
|
|
inputs = inputs.to("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
with torch.no_grad(): |
|
output_ids = model.generate(**inputs, max_new_tokens=512) |
|
generated_text = processor.decode(output_ids[0], skip_special_tokens=True) |
|
|
|
|
|
response = generated_text.split("Assistant: ")[1] if "Assistant: " in generated_text else generated_text |
|
|
|
return response |
|
except Exception as e: |
|
print(f"Error during image analysis: {str(e)}") |
|
return "Image content could not be analyzed." |
|
|
|
def process_pdf(pdf_path, output_txt_path): |
|
""" |
|
Main function to process the PDF and generate output |
|
""" |
|
try: |
|
|
|
pdf_document = fitz.open(pdf_path) |
|
num_pages = len(pdf_document) |
|
pdf_document.close() |
|
|
|
if num_pages == 0: |
|
print("The PDF is empty.") |
|
return |
|
|
|
|
|
with open(output_txt_path, 'w', encoding='utf-8') as f: |
|
f.write(f"Analysis of {os.path.basename(pdf_path)}\n") |
|
f.write("=" * 50 + "\n\n") |
|
|
|
|
|
for page_num in range(num_pages): |
|
print(f"Processing page {page_num + 1}...") |
|
|
|
|
|
f.write(f"Page {page_num + 1}\n") |
|
f.write("-" * 30 + "\n\n") |
|
|
|
|
|
text = extract_text_from_pdf(pdf_path, page_num) |
|
if text: |
|
f.write("Extracted Text:\n") |
|
f.write(text) |
|
f.write("\n\n") |
|
else: |
|
f.write("No text could be extracted from this page.\n\n") |
|
|
|
|
|
image_path = generate_page_image(pdf_path, page_num) |
|
if image_path: |
|
description = analyze_image(image_path) |
|
f.write("Image Description:\n") |
|
f.write(f"{description}\n") |
|
f.write("\n" + "=" * 50 + "\n\n") |
|
else: |
|
f.write("Image Description:\n") |
|
f.write("Could not generate image for analysis.\n") |
|
f.write("\n" + "=" * 50 + "\n\n") |
|
|
|
print(f"Processing complete. Results saved to {output_txt_path}") |
|
except Exception as e: |
|
print(f"Error processing PDF: {str(e)}") |
|
|
|
def process_uploaded_pdf(pdf_file): |
|
if pdf_file is None: |
|
return "Please upload a PDF file." |
|
|
|
output_txt = OUTPUT_DIR / "analysis_results.txt" |
|
process_pdf(pdf_file.name, output_txt) |
|
|
|
|
|
with open(output_txt, 'r', encoding='utf-8') as f: |
|
results = f.read() |
|
|
|
return results |
|
|
|
|
|
interface = gr.Interface( |
|
fn=process_uploaded_pdf, |
|
inputs=gr.File(label="Upload PDF"), |
|
outputs=gr.Textbox(label="Analysis Results"), |
|
title="PDF Analyzer", |
|
description="Upload a PDF file to extract text directly and analyze images using Qwen2.5 VL." |
|
) |
|
|
|
interface.launch() |