rag-app / file_preprocessing.py
bhavinmatariya's picture
Upload 13 files
3506c42 verified
import os
import re
import io
import html
import fitz
import ezdxf
import base64
import asyncio
import logging
import openpyxl
import tempfile
import requests
import pandas as pd
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from spire.presentation import *
from openai import AsyncOpenAI, OpenAI
from spire.presentation.common import *
from audio_extract import extract_audio
from ezdxf.addons.drawing import pymupdf
from urllib.parse import urljoin, urlparse
from youtube_transcript_api import YouTubeTranscriptApi
from ezdxf.addons.drawing import Frontend, RenderContext, layout, config
from system_prompt import SYSTEM_PROMPT, IMAGE_SUMMARY_PROMPT, TABLE_DETECTION_PROMPT, TABLE_TO_MARKDOWN_PROMPT, SIMPLE_IMAGE_SUMMARY_PROMPT, DXF_ANALYSIS_PROMPT
# Try to import Gemini, but don't fail if not available
try:
import google.generativeai as genai
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
# Logger not yet initialized, use print instead
print("Warning: Google Generative AI (Gemini) not available. Install with: pip install google-generativeai")
# Cache for Gemini model to avoid reconfiguring on every page
_gemini_model_cache = None
load_dotenv()
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
sync_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
SUPPORTED_FILES = [".pdf", ".doc", ".docx", ".txt", ".pptx", ".xlsx", ".mp3", ".wav", ".mp4", ".dxf", ".jpg", ".jpeg", ".png"]
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Reduce OpenAI logging
logging.getLogger("openai").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)
# Suppress ezdxf warnings (common and harmless)
logging.getLogger("ezdxf").setLevel(logging.ERROR)
def upload_file_to_openai(file_path: str) -> str:
"""Upload file to OpenAI and return file ID"""
try:
# Upload the file
with open(file_path, "rb") as file:
file_response = sync_client.files.create(
file=file,
purpose="assistants"
)
file_id = file_response.id
logger.info("Successfully uploaded file. File ID")
return file_id
except Exception as e:
logger.error(f"Failed to upload file: {e}")
raise Exception(f"Failed to upload file: {str(e)}")
async def extract_native_text_from_pdf_page(doc, page_num: int) -> str:
"""Extract native text from PDF page with proper encoding handling"""
try:
page = doc.load_page(page_num)
# Try multiple text extraction methods with different encoding options
text_methods = [
lambda: page.get_text("text"),
lambda: page.get_text("dict"),
lambda: page.get_text("html"),
lambda: page.get_text("xml"),
]
best_text = ""
for method in text_methods:
try:
result = method()
# Handle different return types
if isinstance(result, str):
text = result
elif isinstance(result, dict):
# Extract text from dictionary format
text = ""
for block in result.get("blocks", []):
if "lines" in block:
for line in block["lines"]:
for span in line.get("spans", []):
text += span.get("text", "")
text += "\n"
else:
continue
# Clean up the text
if text and text.strip():
# Decode HTML entities (for Japanese characters and other Unicode)
text = html.unescape(text)
# Remove HTML tags if present
text = re.sub(r'<[^>]+>', '', text)
# Remove excessive whitespace and normalize
text = re.sub(r'\s+', ' ', text.strip())
# Check if this text looks better (more readable characters)
# Count printable characters including Japanese, Chinese, Korean, etc.
readable_chars = sum(1 for c in text if c.isprintable() or c.isspace())
total_chars = len(text)
if total_chars > 0 and readable_chars / total_chars > 0.8: # At least 80% readable
best_text = text
break
except Exception as e:
logger.debug(f"Text extraction method failed: {e}")
continue
# If we still have garbled text, try OCR fallback
if best_text and len(best_text.strip()) > 10:
# Use the universal corruption detection function
if detect_corrupted_text(best_text):
logger.info(f"Page {page_num} has corrupted text, will use OCR")
return "" # Return empty to trigger OCR
return best_text
except Exception as e:
logger.error(f"Error extracting native text from page {page_num}: {e}")
return ""
def detect_corrupted_text(text: str) -> bool:
"""
Detect if text contains corrupted characters that indicate encoding issues.
This function works for any language (English, Japanese, Chinese, etc.)
Args:
text (str): Text to analyze for corruption
Returns:
bool: True if text appears corrupted, False otherwise
"""
if not text or len(text) < 10:
return False
# Check for excessive non-printable characters
non_printable_ratio = sum(1 for c in text if not c.isprintable() and not c.isspace()) / len(text)
# Check for excessive repetition of the same character (common corruption sign)
char_counts = {}
for char in text:
if char.isprintable():
char_counts[char] = char_counts.get(char, 0) + 1
max_char_ratio = max(char_counts.values()) / len(text) if char_counts else 0
# Check for unusual character patterns that indicate encoding issues
unusual_patterns = [
r'[\u0000-\u001F\u007F-\u009F]',
r'[^\x20-\x7E\u3040-\u309F\u30A0-\u30FF\u4E00-\u9FAF\uAC00-\uD7AF\u0400-\u04FF\u0100-\u017F\u0180-\u024F]', # Characters outside normal ranges
]
unusual_count = 0
for pattern in unusual_patterns:
unusual_count += len(re.findall(pattern, text))
unusual_ratio = unusual_count / len(text) if len(text) > 0 else 0
# Check for excessive whitespace or special characters
whitespace_ratio = sum(1 for c in text if c.isspace()) / len(text)
special_char_ratio = sum(1 for c in text if not c.isalnum() and not c.isspace()) / len(text)
is_corrupted = (non_printable_ratio > 0.05 or
max_char_ratio > 0.5 or
unusual_ratio > 0.1 or
whitespace_ratio > 0.6 or
special_char_ratio > 0.7)
if is_corrupted:
logger.info(f"Detected corrupted text: non-printable={non_printable_ratio:.2f}, max_char={max_char_ratio:.2f}, unusual={unusual_ratio:.2f}, whitespace={whitespace_ratio:.2f}, special={special_char_ratio:.2f}")
return is_corrupted
async def validate_pdf_file(file_path: str) -> bool:
"""Validate if the PDF file is not corrupted and can be opened"""
try:
with fitz.open(file_path) as doc:
page_count = len(doc)
if page_count == 0:
logger.warning("PDF file has 0 pages")
return False
first_page = doc.load_page(0)
return True
except fitz.FileDataError as e:
logger.error(f"PDF file is corrupted or broken: {e}")
return False
except Exception as e:
logger.error(f"Error validating PDF file: {e}")
return False
async def render_page_to_base64(doc, page_num: int) -> str:
"""Render PDF page to base64 PNG string"""
try:
page = doc.load_page(page_num)
pix = page.get_pixmap(dpi=150)
img_bytes = pix.tobytes("png")
return base64.b64encode(img_bytes).decode("utf-8")
except Exception as e:
logger.error(f"Error rendering page {page_num}: {e}")
raise
async def extract_html_from_pdf_page(doc, page_num: int) -> str:
"""
Extract HTML from PDF page preserving table structure, colors, and formatting.
This method extracts HTML format which preserves:
- Table structure and borders
- Color information
- Text formatting (bold, italic, etc.)
- Layout information
Args:
doc: PyMuPDF document object
page_num: Page number (0-indexed)
Returns:
str: HTML content of the page
"""
try:
page = doc.load_page(page_num)
# Extract HTML format from PDF page
html_content = page.get_text("html")
# Clean and enhance HTML if needed
if html_content:
# Decode HTML entities
html_content = html.unescape(html_content)
# Add page marker
html_content = f"<!-- PAGE {page_num + 1} -->\n{html_content}"
return html_content or ""
except Exception as e:
logger.error(f"Error extracting HTML from page {page_num}: {e}")
return ""
async def extract_html_from_pdf(pdf_path: str, progress_callback=None) -> dict:
"""
Extract HTML from all PDF pages.
Args:
pdf_path: Path to PDF file
progress_callback: Optional callback for progress updates
Returns:
dict: Dictionary with page numbers as keys and HTML content as values
"""
try:
doc = fitz.open(pdf_path)
page_count = len(doc)
html_pages = {}
for i in range(page_count):
if progress_callback:
await progress_callback(f"📄 Extracting HTML from page {i+1} of {page_count}")
html_content = await extract_html_from_pdf_page(doc, i)
if html_content:
html_pages[i + 1] = html_content
doc.close()
return html_pages
except Exception as e:
logger.error(f"Error extracting HTML from PDF: {e}")
return {}
# Cache for Gemini 3 model specifically for video analysis
_gemini3_model_cache = None
def configure_gemini():
"""Configure Gemini API - cached version to avoid repeated API calls"""
global _gemini_model_cache
# Return cached model if available
if _gemini_model_cache is not None:
return _gemini_model_cache
if not GEMINI_AVAILABLE:
raise Exception("Gemini SDK not installed. Install with: pip install google-generativeai")
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
raise Exception("GEMINI_API_KEY not found in environment variables")
genai.configure(api_key=gemini_api_key)
# First, get the list of available models (only once!)
try:
available_models = [m.name for m in genai.list_models() if 'generateContent' in m.supported_generation_methods]
except Exception as e:
logger.warning(f"Could not list available models: {e}")
available_models = []
# Try models in order of preference, but only ones that are actually available
preferred_models = [
'gemini-1.5-flash',
'gemini-1.5-pro',
'gemini-pro',
'models/gemini-1.5-flash',
'models/gemini-1.5-pro',
'models/gemini-pro'
]
# Try preferred models first if they're in the available list
for model_name in preferred_models:
# Check if model is available (with or without 'models/' prefix)
model_variants = [model_name, f"models/{model_name}", model_name.replace("models/", "")]
is_available = any(variant in available_models for variant in model_variants)
if is_available or not available_models: # Try anyway if we couldn't list models
try:
model = genai.GenerativeModel(model_name)
logger.info(f"Successfully configured Gemini model: {model_name}")
_gemini_model_cache = model # Cache the model
return model
except Exception as e:
logger.debug(f"Failed to configure model {model_name}: {e}")
continue
# If all preferred models fail, try the first available model
if available_models:
try:
first_model = available_models[0]
# Remove 'models/' prefix if present
model_name = first_model.replace('models/', '')
model = genai.GenerativeModel(model_name)
logger.info(f"Using first available Gemini model: {model_name}")
_gemini_model_cache = model # Cache the model
return model
except Exception as e:
logger.error(f"Failed to use first available model {first_model}: {e}")
raise Exception(f"Failed to configure any Gemini model. Available models: {available_models}")
def configure_gemini3():
"""Configure Gemini 3 Pro model specifically for video frame analysis - cached version"""
global _gemini3_model_cache
# Return cached model if available
if _gemini3_model_cache is not None:
return _gemini3_model_cache
if not GEMINI_AVAILABLE:
raise Exception("Gemini SDK not installed. Install with: pip install google-generativeai")
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
raise Exception("GEMINI_API_KEY not found in environment variables")
genai.configure(api_key=gemini_api_key)
# Try Gemini 3 Pro with various possible naming conventions
model_candidates = [
'gemini-3-pro-preview',
]
for model_name in model_candidates:
try:
model = genai.GenerativeModel(model_name)
# Test the model with a simple request
test_response = model.generate_content("Hello")
logger.info(f"Successfully configured Gemini 3 Pro: {model_name}")
_gemini3_model_cache = model
return model
except Exception as e:
logger.debug(f"Failed to configure {model_name}: {e}")
continue
# If Gemini 3 Pro is not available, raise clear error
raise Exception(
"Gemini 3 Pro model is not available. "
"Please ensure you have access to Gemini 3 Pro in your API account. "
"Check https://ai.google.dev/ for model availability."
)
async def process_hybrid_with_gemini(html_content: str, page_image_base64: str, user_query: str = None, page_num: int = None) -> str:
"""
Process hybrid format (HTML + PDF image) with Gemini.
This combines:
- HTML content (table structure, colors, formatting)
- PDF page image (visual information, graphs, layout)
Args:
html_content: HTML content from PDF page
page_image_base64: Base64-encoded image of the PDF page
user_query: Optional user query for context
page_num: Optional page number for reference
Returns:
str: Processed text content from Gemini
"""
try:
if not GEMINI_AVAILABLE:
raise Exception("Gemini SDK not available")
model = configure_gemini()
# Prepare prompt
prompt = f"""Analyze this PDF page content. You are receiving:
1. HTML content (preserves table structure, colors, and formatting)
2. Page image (preserves visual information like graphs, images, and layout)
Please extract and combine information from both sources to provide a comprehensive understanding of the page content.
Focus on:
- Table data and structure from HTML
- Color information from HTML
- Visual elements (graphs, charts, images) from the page image
- Overall layout and structure
- Text content from both sources
"""
if page_num:
prompt += f"Page Number: {page_num}\n\n"
if user_query:
prompt += f"User query context: {user_query}\n\n"
# Limit HTML length to avoid token limits
html_preview = html_content[:8000] if len(html_content) > 8000 else html_content
if len(html_content) > 8000:
html_preview += "\n\n[HTML content truncated for length...]"
prompt += "\nHTML Content:\n" + html_preview
# Prepare content parts
content_parts = [prompt]
# Add image if available
if page_image_base64:
try:
image_data = base64.b64decode(page_image_base64)
content_parts.append({
"mime_type": "image/png",
"data": image_data
})
except Exception as e:
logger.warning(f"Could not decode image for Gemini: {e}")
# Generate response using async wrapper
def generate_sync():
return model.generate_content(content_parts)
response = await asyncio.to_thread(generate_sync)
if hasattr(response, 'text'):
return response.text
else:
return str(response)
except Exception as e:
logger.error(f"Error processing hybrid content with Gemini: {e}")
# Fallback: return HTML content if Gemini fails
if html_content:
return f"[Hybrid processing with Gemini failed: {str(e)}]\n\nHTML Content:\n{html_content[:2000]}"
return f"[Error processing with Gemini: {str(e)}]"
async def extract_page_with_llm(base64_img: str, page_num: int) -> str:
"""Send base64 page image to GPT-4 Vision asynchronously"""
try:
response = await client.chat.completions.create(
model="gpt-4.1-mini",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_img}"}}
]}
],
temperature=0.0,
)
return f"--- PAGE {page_num} ---\n{response.choices[0].message.content.strip()}\n"
except Exception as e:
logger.error(f"Error processing page {page_num} with LLM: {e}")
return f"[ERROR on page {page_num}: {str(e)}]"
async def summarize_image_with_gpt(image_blob_url: str, image_id: str, storage_provider: str = "azure", markdown_kv_enabled: bool = False, document_text: str = "") -> str:
"""Generate AI-powered summary of an image using GPT-4 Vision
This function analyzes images and creates text summaries that can be used for:
- Document search and retrieval
- RAG (Retrieval-Augmented Generation) systems
- Table conversion to markdown format (when markdown_kv_enabled=True)
Args:
image_blob_url: URL of the image stored in cloud storage
image_id: Unique identifier for the image
storage_provider: Cloud storage provider ("azure" or "aws")
markdown_kv_enabled: If True, converts tables to markdown; otherwise creates simple summaries
document_text: Optional document text for context
Returns:
str: AI-generated summary of the image content
"""
try:
# Download image from self-hosted (HF) storage
from hf_storage import retrieve_image
folder_prefix = "pdf_images"
if "pdf_images" in image_blob_url:
folder_prefix = "pdf_images"
elif "pptx_images" in image_blob_url:
folder_prefix = "pptx_images"
elif "docx_images" in image_blob_url:
folder_prefix = "docx_images"
elif "web_images" in image_blob_url:
folder_prefix = "web_images"
image_id_from_url = image_id
if "/" in image_blob_url:
parts = image_blob_url.split("/")
for part in parts:
if part.endswith(".jpg"):
image_id_from_url = part.replace(".jpg", "")
break
logger.info(f"Downloading image from HF persistent storage: {image_id_from_url}")
image_result = await retrieve_image(image_id_from_url, folder_prefix)
if image_result and "base64_data" in image_result:
image_data = base64.b64decode(image_result["base64_data"])
logger.info(f"Successfully downloaded {len(image_data)} bytes from HF persistent storage")
else:
logger.warning(f"Image not found in HF persistent storage: {image_id_from_url}")
return f"[Image Summary for {image_id}: Visual content extracted from document]"
base64_img = base64.b64encode(image_data).decode("utf-8")
# If markdown_kv_enabled is False, use the standard summary approach
if not markdown_kv_enabled:
response = await client.chat.completions.create(
model="gpt-4.1-mini",
messages=[
{"role": "system", "content": IMAGE_SUMMARY_PROMPT},
{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_img}"}}
]}
],
temperature=0.0,
)
# Check if response is valid and has choices
if not response or not response.choices or len(response.choices) == 0:
logger.error(f"No valid response for image {image_id}")
return f"[Image Summary for {image_id}: Visual content extracted from document - No response from GPT]"
summary = response.choices[0].message.content.strip()
logger.info(f"Generated detailed summary for image {image_id}")
return summary
# If markdown_kv_enabled is True, check if image contains a table
else:
logger.info(f"Markdown-KV mode enabled. Detecting if image {image_id} contains a table...")
# Step 1: Detect if the image contains a table
detection_response = await client.chat.completions.create(
model="gpt-4.1-mini",
messages=[
{"role": "system", "content": TABLE_DETECTION_PROMPT},
{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_img}"}}
]}
],
temperature=0.0,
)
if not detection_response or not detection_response.choices or len(detection_response.choices) == 0:
logger.error(f"No valid detection response for image {image_id}")
return f"[Image Summary for {image_id}: Visual content extracted from document - No response from GPT]"
detection_result = detection_response.choices[0].message.content.strip().upper()
logger.info(f"Image {image_id} detection result: {detection_result}")
# Step 2: Process based on detection result
if "TABLE" in detection_result:
# Convert table to markdown format
logger.info(f"Converting table image {image_id} to markdown format...")
markdown_response = await client.chat.completions.create(
model="gpt-4.1-mini",
messages=[
{"role": "system", "content": TABLE_TO_MARKDOWN_PROMPT},
{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_img}"}}
]}
],
temperature=0.0,
)
if not markdown_response or not markdown_response.choices or len(markdown_response.choices) == 0:
logger.error(f"No valid markdown conversion response for image {image_id}")
return f"[Image Summary for {image_id}: Table detected but conversion failed]"
markdown_table = markdown_response.choices[0].message.content.strip()
logger.info(f"Successfully converted table image {image_id} to markdown format")
return f"{markdown_table}"
else:
# Generate simple summary for normal images
logger.info(f"Generating simple summary for normal image {image_id}...")
summary_response = await client.chat.completions.create(
model="gpt-4.1-mini",
messages=[
{"role": "system", "content": IMAGE_SUMMARY_PROMPT},
{"role": "user", "content": [
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_img}"}}
]}
],
temperature=0.0,
)
if not summary_response or not summary_response.choices or len(summary_response.choices) == 0:
logger.error(f"No valid summary response for image {image_id}")
return f"[Image Summary for {image_id}: Visual content extracted from document - No response from GPT]"
simple_summary = summary_response.choices[0].message.content.strip()
# Enhance with document visual elements if available
if document_text:
try:
from shared_utilities import merge_visual_elements_with_ai_summary
enhanced_summary, figure_metadata = await merge_visual_elements_with_ai_summary(image_id, simple_summary, document_text)
logger.info(f"Generated enhanced summary for image {image_id} using document text + AI")
# Store figure metadata if available (will be handled by caller)
# For now, just return the enhanced summary
return enhanced_summary
except Exception as e:
logger.warning(f"Failed to enhance summary for {image_id}: {e}")
logger.info(f"Falling back to AI summary only for {image_id}")
logger.info(f"Generated simple summary for image {image_id}")
return simple_summary
except Exception as e:
logger.error(f"Error summarizing image {image_id}: {e}")
# Return a simple fallback summary instead of an error message
return f"Visual content extracted from document - contains images, diagrams, or other visual elements"
async def extract_images_from_pdf(pdf_path: str, bbox_dict=None, progress_callback=None, storage_provider="azure") -> dict:
"""Extract ONLY images from PDF with Azure blob storage integration (NO LOCAL STORAGE)
Args:
pdf_path: Path to PDF file
bbox_dict: Optional dict mapping image_id to bounding box (x0, y0, x1, y1)
Example: {"page1_image1": (10, 10, 200, 200)}
progress_callback: Optional callback for progress updates
Returns:
dict with image_blob_urls and image_ids_by_page
"""
try:
from PIL import Image
doc = fitz.open(pdf_path)
extracted_data = {
"image_blob_urls": {},
"image_ids_by_page": {},
"total_pages": len(doc)
}
# Self-hosted: HF persistent storage (no client init needed)
# Allowed image formats
ALLOWED_FORMATS = ['jpeg', 'jpg', 'png']
total_images_found = 0
total_images_processed = 0
for page_num in range(len(doc)):
page = doc.load_page(page_num)
image_list = page.get_images(full=True)
if image_list:
page_images_found = len(image_list)
total_images_found += page_images_found
if progress_callback:
await progress_callback(f"📸 Page {page_num+1}: Found {page_images_found} images")
else:
print(f"Page {page_num+1}: Found {page_images_found} images")
# Extract images if they exist
page_image_ids = []
for img_index, img_info in enumerate(image_list):
xref = img_info[0]
base_image = doc.extract_image(xref)
image_bytes = base_image["image"]
image_ext = base_image["ext"].lower()
# Check image format compatibility
if image_ext not in ALLOWED_FORMATS:
if progress_callback:
await progress_callback(f"⏭️ Skipping image {img_index+1} on page {page_num+1} (format: {image_ext})")
else:
logger.info(f"Skipping image on page {page_num+1} with format: {image_ext} (only jpg, jpeg, png allowed)")
continue
# Convert image to RGB if necessary
image = Image.open(io.BytesIO(image_bytes))
if image.mode != 'RGB':
image = image.convert('RGB')
# Filter out small/empty images (less than 50x50 pixels or very small file size)
if image.width < 50 or image.height < 50 or len(image_bytes) < 5000:
if progress_callback:
await progress_callback(f"⏭️ Skipping small/empty image {img_index+1} on page {page_num+1} ({image.width}x{image.height}, {len(image_bytes)} bytes)")
else:
print(f"Skipping small/empty image on page {page_num+1}: {image.width}x{image.height}, {len(image_bytes)} bytes")
continue
# Check for duplicate images by comparing image hash
import hashlib
image_hash = hashlib.md5(image_bytes).hexdigest()
if hasattr(extract_images_from_pdf, '_seen_hashes'):
if image_hash in extract_images_from_pdf._seen_hashes:
if progress_callback:
await progress_callback(f"⏭️ Skipping duplicate image {img_index+1} on page {page_num+1}")
else:
print(f"Skipping duplicate image on page {page_num+1}")
continue
extract_images_from_pdf._seen_hashes.add(image_hash)
else:
extract_images_from_pdf._seen_hashes = {image_hash}
# Generate unique image ID
image_id = f"page{page_num+1}_image{img_index+1}"
page_image_ids.append(image_id)
total_images_processed += 1
# Get image bounding box on the page (for caption linking)
image_rect = None
try:
# Try to get image rectangle from page
page = doc.load_page(page_num)
image_rects = page.get_image_rects(xref)
if image_rects:
# Get the first (usually only) rectangle
rect = image_rects[0]
image_rect = (rect.x0, rect.y0, rect.x1, rect.y1)
extracted_data["image_bboxes"][image_id] = image_rect
# Store detailed image info for caption linking
extracted_data["image_info"][image_id] = {
'bbox': image_rect,
'page': page_num + 1,
'index': img_index + 1,
'width': image.width,
'height': image.height
}
except Exception as e:
logger.debug(f"Could not get bounding box for image {image_id}: {e}")
if progress_callback:
await progress_callback(f"🖼️ Processing image {total_images_processed}: {image_id} ({image.width}x{image.height})")
else:
print(f"Processing image {total_images_processed}: {image_id} ({image.width}x{image.height})")
# Apply bounding box cropping if provided
bbox = bbox_dict.get(image_id) if bbox_dict else None
if bbox:
image = await crop_image_with_bounding_box(image, bbox)
# Resize image to reasonable size for UI display (max 800px width/height)
max_size = 800
if image.width > max_size or image.height > max_size:
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
# Convert to JPEG bytes for blob storage with quality optimization
img_buffer = io.BytesIO()
image.save(img_buffer, format='JPEG', quality=85, optimize=True)
img_buffer.seek(0)
jpeg_bytes = img_buffer.getvalue()
# TESTING: Save image locally for inspection
try:
pdf_name = os.path.splitext(os.path.basename(pdf_path))[0]
local_images_dir = os.path.join(os.path.dirname(pdf_path), f"{pdf_name}_extracted_images")
os.makedirs(local_images_dir, exist_ok=True)
local_image_path = os.path.join(local_images_dir, f"{image_id}.jpg")
with open(local_image_path, 'wb') as f:
f.write(jpeg_bytes)
logger.info(f"TESTING: Saved image locally: {local_image_path}")
except Exception as local_e:
logger.warning(f"Failed to save image locally: {local_e}")
# Store image in self-hosted (HF) storage
try:
from hf_storage import store_image
success = await store_image(image_id, jpeg_bytes, folder_prefix="pdf_images")
if success:
blob_url = f"hf://pdf_images/{image_id}.jpg"
extracted_data["image_blob_urls"][image_id] = blob_url
logger.info(f"Uploaded image to HF persistent storage: pdf_images/{image_id}.jpg")
else:
logger.error(f"Failed to upload image {image_id} to HF persistent storage")
except Exception as e:
logger.error(f"Failed to upload image {image_id} to HF storage: {e}")
continue
# Store image IDs for this page
if page_image_ids:
extracted_data["image_ids_by_page"][page_num+1] = page_image_ids
else:
if progress_callback:
await progress_callback(f"📄 Page {page_num+1}: No images found")
else:
logger.info(f"Page {page_num+1}: No images found")
# Final summary
if progress_callback:
await progress_callback(f"✅ Image extraction complete: {total_images_processed}/{total_images_found} images processed")
doc.close()
return extracted_data
except Exception as e:
logger.error(f"Error extracting images from PDF: {e}")
raise Exception(f"Error extracting images from PDF: {str(e)}")
async def process_pdf(file_path: str, progress_callback=None) -> str:
"""Process PDF pages with native text extraction priority and OCR fallback"""
try:
if not await validate_pdf_file(file_path):
raise Exception("The uploaded PDF file is corrupted or cannot be opened. Please upload a valid PDF file.")
doc = fitz.open(file_path)
page_count = len(doc)
if page_count == 0:
doc.close()
raise Exception("The PDF file contains no pages.")
logger.info(f"Processing PDF with {page_count} pages")
# First, try to extract native text from all pages
native_text_results = []
pages_needing_ocr = []
all_extracted_text = []
for i in range(page_count):
try:
if progress_callback:
await progress_callback(f"📄 Extracting native text from page {i+1} of {page_count}")
print(f"Extracting native text from page {i+1} of {page_count}")
native_text = await extract_native_text_from_pdf_page(doc, i)
if native_text and len(native_text.strip()) > 50:
native_text_results.append(f"--- PAGE {i+1} ---\n{native_text}\n")
all_extracted_text.append(native_text)
else:
# Page needs OCR processing
print(f"Page {i+1} needs OCR processing")
pages_needing_ocr.append(i)
native_text_results.append(None)
except Exception as e:
logger.error(f"Error processing page {i+1}: {e}")
pages_needing_ocr.append(i)
native_text_results.append(None)
# Process pages that need OCR
if pages_needing_ocr:
if progress_callback:
await progress_callback(f"🔍 Using OCR for {len(pages_needing_ocr)} pages with insufficient native text")
print(f"🔍 Using OCR for {len(pages_needing_ocr)} pages with insufficient native text")
# Process OCR pages sequentially with progress updates
ocr_results = []
for idx, page_num in enumerate(pages_needing_ocr):
try:
if progress_callback:
await progress_callback(f"🔄 OCR processing page {page_num+1} ({idx+1}/{len(pages_needing_ocr)})")
print(f"🔄 OCR processing page {page_num+1} ({idx+1}/{len(pages_needing_ocr)})")
base64_img = await render_page_to_base64(doc, page_num)
result = await extract_page_with_llm(base64_img, page_num + 1)
ocr_results.append(result)
if progress_callback:
await progress_callback(f"✅ Page {page_num+1} OCR completed ({idx+1}/{len(pages_needing_ocr)})")
print(f"✅ Page {page_num+1} OCR completed ({idx+1}/{len(pages_needing_ocr)})")
except Exception as e:
logger.error(f"Error processing OCR for page {page_num+1}: {e}")
ocr_results.append(f"[ERROR on page {page_num+1}: {str(e)}]")
if progress_callback:
await progress_callback(f"❌ Page {page_num+1} OCR failed ({idx+1}/{len(pages_needing_ocr)})")
print(f"❌ Page {page_num+1} OCR failed ({idx+1}/{len(pages_needing_ocr)})")
# Replace None results with OCR results
ocr_index = 0
for i, result in enumerate(native_text_results):
if result is None:
if ocr_index < len(ocr_results):
if isinstance(ocr_results[ocr_index], Exception):
native_text_results[i] = f"[ERROR on page {i+1}: {str(ocr_results[ocr_index])}]"
else:
native_text_results[i] = ocr_results[ocr_index]
ocr_index += 1
else:
native_text_results[i] = f"[ERROR on page {i+1}: OCR processing failed]"
doc.close()
# Post-process: Check for corrupted text and re-process with OCR
# Only check pages that used native text extraction (not already OCR'd)
if progress_callback:
await progress_callback("🔍 Checking for corrupted text and applying OCR fixes...")
print("Checking for corrupted text and applying OCR fixes...")
final_processed_results = []
pages_to_reprocess = []
for i, result in enumerate(native_text_results):
# Skip corruption check for pages that were already OCR'd
if i in pages_needing_ocr:
# This page already went through OCR, use the result as-is
final_processed_results.append(result)
elif result and not result.startswith("[ERROR"):
# Extract the actual text content (remove page header)
page_content = result.replace(f"--- PAGE {i+1} ---\n", "").strip()
if detect_corrupted_text(page_content):
logger.info(f"Page {i+1} has corrupted text, will reprocess with OCR")
pages_to_reprocess.append(i)
final_processed_results.append(None) # Placeholder
else:
final_processed_results.append(result)
else:
final_processed_results.append(result)
# Re-process corrupted pages with OCR
if pages_to_reprocess:
if progress_callback:
await progress_callback(f"🔍 Re-processing {len(pages_to_reprocess)} corrupted pages with OCR...")
print(f"Re-processing {len(pages_to_reprocess)} corrupted pages with OCR...")
# Re-open document for OCR processing
doc = fitz.open(file_path)
# Process corrupted pages sequentially with progress updates
ocr_results = []
for idx, page_num in enumerate(pages_to_reprocess):
try:
if progress_callback:
await progress_callback(f"🔄 OCR reprocessing corrupted page {page_num+1} ({idx+1}/{len(pages_to_reprocess)})")
print(f"🔄 OCR reprocessing corrupted page {page_num+1} ({idx+1}/{len(pages_to_reprocess)})")
base64_img = await render_page_to_base64(doc, page_num)
result = await extract_page_with_llm(base64_img, page_num + 1)
ocr_results.append(result)
if progress_callback:
await progress_callback(f"✅ Corrupted page {page_num+1} OCR completed ({idx+1}/{len(pages_to_reprocess)})")
print(f"✅ Corrupted page {page_num+1} OCR completed ({idx+1}/{len(pages_to_reprocess)})")
except Exception as e:
logger.error(f"Error reprocessing OCR for corrupted page {page_num+1}: {e}")
ocr_results.append(f"[ERROR on page {page_num+1}: {str(e)}]")
if progress_callback:
await progress_callback(f"❌ Corrupted page {page_num+1} OCR failed ({idx+1}/{len(pages_to_reprocess)})")
print(f"❌ Corrupted page {page_num+1} OCR failed ({idx+1}/{len(pages_to_reprocess)})")
# Replace None results with OCR results
ocr_index = 0
for i, result in enumerate(final_processed_results):
if result is None:
if ocr_index < len(ocr_results):
if isinstance(ocr_results[ocr_index], Exception):
final_processed_results[i] = f"[ERROR on page {i+1}: {str(ocr_results[ocr_index])}]"
else:
final_processed_results[i] = ocr_results[ocr_index]
ocr_index += 1
else:
final_processed_results[i] = f"[ERROR on page {i+1}: OCR processing failed]"
doc.close()
# Filter out None results and join
final_results = [result for result in final_processed_results if result is not None]
if not final_results:
return "No text content could be extracted from the PDF file."
return "\n".join(final_results)
except Exception as e:
logger.error(f"Error processing PDF: {e}")
raise Exception(f"Error processing PDF file: {str(e)}")
async def process_pdf_hybrid(file_path: str, progress_callback=None) -> str:
"""
Process PDF using hybrid method: HTML + PDF images with Gemini.
This method combines:
- HTML conversion (table structure, colors, formatting)
- PDF image conversion (visual information, graphs, layout)
Both are passed to Gemini for comprehensive analysis.
Args:
file_path: Path to PDF file
progress_callback: Optional callback for progress updates
Returns:
str: Processed text content combining HTML and image information
"""
try:
if not await validate_pdf_file(file_path):
raise Exception("The uploaded PDF file is corrupted or cannot be opened.")
doc = fitz.open(file_path)
page_count = len(doc)
if page_count == 0:
doc.close()
raise Exception("The PDF file contains no pages.")
logger.info(f"Processing PDF with hybrid method: {page_count} pages")
print(f"Processing PDF with hybrid method: {page_count} pages")
# Check if Gemini is available
if not GEMINI_AVAILABLE:
logger.warning("Gemini not available, falling back to standard PDF processing")
doc.close()
return await process_pdf(file_path, progress_callback)
all_results = []
for i in range(page_count):
try:
if progress_callback:
await progress_callback(f"🔄 Hybrid processing page {i+1} of {page_count} (HTML + Image)")
# Step 1: Extract HTML from page
html_content = await extract_html_from_pdf_page(doc, i)
# Step 2: Render page as image
page_image_base64 = await render_page_to_base64(doc, i)
# Step 3: Process both with Gemini
if html_content or page_image_base64:
processed_content = await process_hybrid_with_gemini(
html_content or "",
page_image_base64,
None, # No user query during processing
i + 1 # Page number
)
all_results.append(f"--- PAGE {i+1} ---\n{processed_content}\n")
else:
all_results.append(f"--- PAGE {i+1} ---\n[No content extracted]\n")
if progress_callback:
await progress_callback(f"✅ Page {i+1} hybrid processing completed")
except Exception as e:
logger.error(f"Error processing page {i+1} with hybrid method: {e}")
# Fallback to standard extraction for this page
try:
native_text = await extract_native_text_from_pdf_page(doc, i)
if native_text and len(native_text.strip()) > 50:
all_results.append(f"--- PAGE {i+1} ---\n{native_text}\n")
else:
all_results.append(f"--- PAGE {i+1} ---\n[Error: {str(e)}]\n")
except:
all_results.append(f"--- PAGE {i+1} ---\n[Error: {str(e)}]\n")
doc.close()
if not all_results:
return "No content could be extracted from the PDF file."
return "\n".join(all_results)
except Exception as e:
logger.error(f"Error processing PDF with hybrid method: {e}")
# Fallback to standard processing
try:
logger.info("Falling back to standard PDF processing")
return await process_pdf(file_path, progress_callback)
except:
raise Exception(f"Error processing PDF file: {str(e)}")
async def capture_page_images_as_fallback(pdf_path: str, storage_provider: str = "azure", progress_callback=None) -> dict:
"""Capture PDF pages as images when no extractable images are found
This fallback mechanism handles PDFs that contain visual content but don't have
extractable image objects. It renders each page as a high-quality image and stores
them for AI analysis and retrieval.
Common use cases:
- Scanned documents where entire pages are images
- PDFs with vector graphics that aren't stored as raster images
- Documents with embedded content in non-standard formats
Args:
pdf_path: Path to the PDF file
storage_provider: Cloud storage provider ("azure" or "aws")
progress_callback: Optional callback function for progress updates
Returns:
dict: Contains image_blob_urls, image_ids_by_page, and total_pages
"""
try:
from PIL import Image
doc = fitz.open(pdf_path)
extracted_data = {
"image_blob_urls": {},
"image_ids_by_page": {},
"total_pages": len(doc)
}
# Self-hosted: HF persistent storage (no client init needed)
total_images_processed = 0
for page_num in range(len(doc)):
try:
page = doc.load_page(page_num)
# Render page as image
pix = page.get_pixmap(dpi=150)
img_bytes = pix.tobytes("png")
# Convert to PIL Image
image = Image.open(io.BytesIO(img_bytes))
if image.mode != 'RGB':
image = image.convert('RGB')
# Generate unique image ID
image_id = f"page{page_num+1}_image1"
extracted_data["image_ids_by_page"][page_num+1] = [image_id]
total_images_processed += 1
if progress_callback:
await progress_callback(f"🖼️ Capturing page {page_num+1} as image: {image_id} ({image.width}x{image.height})")
else:
print(f"Capturing page {page_num+1} as image: {image_id} ({image.width}x{image.height})")
# Resize image to reasonable size for UI display (max 800px width/height)
max_size = 800
if image.width > max_size or image.height > max_size:
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
# Convert to JPEG bytes for blob storage with quality optimization
img_buffer = io.BytesIO()
image.save(img_buffer, format='JPEG', quality=85, optimize=True)
img_buffer.seek(0)
jpeg_bytes = img_buffer.getvalue()
# Upload to self-hosted (HF) storage
try:
from hf_storage import store_image
success = await store_image(image_id, jpeg_bytes, folder_prefix="pdf_images")
if success:
blob_url = f"hf://pdf_images/{image_id}.jpg"
extracted_data["image_blob_urls"][image_id] = blob_url
logger.info(f"Uploaded page image to HF persistent storage: {image_id}")
else:
logger.error(f"Failed to upload page image {image_id} to HF storage")
except Exception as e:
logger.error(f"Failed to upload page image {image_id} to HF storage: {e}")
continue
except Exception as e:
logger.error(f"Error capturing page {page_num+1} as image: {e}")
continue
# Final summary
if progress_callback:
await progress_callback(f"✅ Page image capture complete: {total_images_processed} pages captured as images")
doc.close()
return extracted_data
except Exception as e:
logger.error(f"Error capturing page images as fallback: {e}")
raise Exception(f"Error capturing page images as fallback: {str(e)}")
async def extract_images_from_docx(docx_path: str, bbox_dict=None, progress_callback=None, storage_provider="azure") -> dict:
"""Extract ONLY images from DOCX with Azure blob storage integration (NO LOCAL STORAGE)
Args:
docx_path: Path to DOCX file
bbox_dict: Optional dict mapping image_id to bounding box (x0, y0, x1, y1)
Example: {"docx_image1": (10, 10, 200, 200)}
progress_callback: Optional callback for progress updates
Returns:
dict with image_blob_urls and image_ids
"""
try:
from docx import Document
from PIL import Image
import imghdr
doc = Document(docx_path)
extracted_data = {
"image_blob_urls": {},
"image_ids": [],
"total_paragraphs": len(doc.paragraphs)
}
# Self-hosted: HF persistent storage (no client init needed)
# Allowed image formats
ALLOWED_FORMATS = ['jpeg', 'jpg', 'png']
# Extract images from document relationships
image_counter = 0
total_images_found = 0
total_images_processed = 0
# Count total images first
for rel in doc.part.rels:
if "image" in doc.part.rels[rel].target_ref:
total_images_found += 1
if progress_callback:
await progress_callback(f"📸 DOCX: Found {total_images_found} images")
else:
print(f"DOCX: Found {total_images_found} images")
for rel in doc.part.rels:
if "image" in doc.part.rels[rel].target_ref:
try:
image_part = doc.part.rels[rel].target_part
image_bytes = image_part.blob
# Detect image format
image_format = imghdr.what(None, h=image_bytes)
# Check image format compatibility
if image_format not in ALLOWED_FORMATS:
if progress_callback:
await progress_callback(f"⏭️ Skipping DOCX image (format: {image_format})")
else:
logger.info(f"Skipping DOCX image with format: {image_format} (only jpg, jpeg, png allowed)")
continue
# Convert image to RGB if necessary
pil_image = Image.open(io.BytesIO(image_bytes))
if pil_image.mode != 'RGB':
pil_image = pil_image.convert('RGB')
# Filter out small/empty images (less than 50x50 pixels or very small file size)
if pil_image.width < 50 or pil_image.height < 50 or len(image_bytes) < 5000:
if progress_callback:
await progress_callback(f"⏭️ Skipping small/empty DOCX image ({pil_image.width}x{pil_image.height}, {len(image_bytes)} bytes)")
else:
print(f"Skipping small/empty DOCX image: {pil_image.width}x{pil_image.height}, {len(image_bytes)} bytes")
continue
# Check for duplicate images by comparing image hash
import hashlib
image_hash = hashlib.md5(image_bytes).hexdigest()
if hasattr(extract_images_from_docx, '_seen_hashes'):
if image_hash in extract_images_from_docx._seen_hashes:
if progress_callback:
await progress_callback(f"⏭️ Skipping duplicate DOCX image")
else:
print(f"Skipping duplicate DOCX image")
continue
extract_images_from_docx._seen_hashes.add(image_hash)
else:
extract_images_from_docx._seen_hashes = {image_hash}
# Generate unique image ID
image_id = f"docx_image{image_counter+1}"
extracted_data["image_ids"].append(image_id)
image_counter += 1
total_images_processed += 1
if progress_callback:
await progress_callback(f"🖼️ Processing DOCX image {total_images_processed}: {image_id} ({pil_image.width}x{pil_image.height})")
else:
print(f"Processing DOCX image {total_images_processed}: {image_id} ({pil_image.width}x{pil_image.height})")
# Apply bounding box cropping if provided
bbox = bbox_dict.get(image_id) if bbox_dict else None
if bbox:
pil_image = await crop_image_with_bounding_box(pil_image, bbox)
# Resize image to reasonable size for UI display (max 800px width/height)
max_size = 800
if pil_image.width > max_size or pil_image.height > max_size:
pil_image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
# Convert to JPEG bytes for blob storage with quality optimization
img_buffer = io.BytesIO()
pil_image.save(img_buffer, format='JPEG', quality=85, optimize=True)
img_buffer.seek(0)
jpeg_bytes = img_buffer.getvalue()
# Store image in self-hosted (HF) storage
try:
from hf_storage import store_image
success = await store_image(image_id, jpeg_bytes, folder_prefix="docx_images")
if success:
blob_url = f"hf://docx_images/{image_id}.jpg"
extracted_data["image_blob_urls"][image_id] = blob_url
logger.info(f"Uploaded image to HF persistent storage: docx_images/{image_id}.jpg")
else:
logger.error(f"Failed to upload image {image_id} to HF persistent storage")
except Exception as e:
logger.error(f"Failed to upload image {image_id} to HF storage: {e}")
continue
except Exception as e:
logger.error(f"Error processing image {rel}: {e}")
continue
# Final summary
if progress_callback:
await progress_callback(f"✅ DOCX image extraction complete: {total_images_processed}/{total_images_found} images processed")
if not extracted_data["image_ids"]:
if progress_callback:
await progress_callback("📄 No images found in DOCX document")
else:
logger.info("No images found in document")
return extracted_data
except Exception as e:
logger.error(f"Error extracting images from DOCX: {e}")
raise Exception(f"Error extracting images from DOCX: {str(e)}")
async def process_docx(file_path: str, progress_callback=None) -> str:
"""Process DOCX file with progress logging using python-docx"""
try:
if progress_callback:
await progress_callback("📄 Processing DOCX file...")
else:
print("Processing DOCX file...")
from docx import Document
# Load the document
doc = Document(file_path)
# Extract all text from paragraphs
all_text = []
for paragraph in doc.paragraphs:
if paragraph.text.strip():
all_text.append(paragraph.text.strip())
# Extract text from tables
for table in doc.tables:
for row in table.rows:
row_text = []
for cell in row.cells:
if cell.text.strip():
row_text.append(cell.text.strip())
if row_text:
all_text.append(" | ".join(row_text))
# Combine all text
full_text = "\n".join(all_text)
if not full_text.strip():
return "No text content found in the DOCX document."
return full_text
except Exception as e:
logger.error(f"Error processing DOCX file: {e}")
raise Exception(f"Error processing DOCX file: {str(e)}")
async def process_txt(file_path: str) -> str:
with open(file_path, "r", encoding="utf-8") as file:
content = file.read()
return content
async def extract_images_from_pptx(pptx_path: str, bbox_dict=None, progress_callback=None, storage_provider="azure") -> dict:
"""Extract ONLY images from PPTX with Azure blob storage integration (NO LOCAL STORAGE)
Args:
pptx_path: Path to PPTX file
bbox_dict: Optional dict mapping image_id to bounding box (x0, y0, x1, y1)
Example: {"slide1_image1": (10, 10, 200, 200)}
progress_callback: Optional callback for progress updates
Returns:
dict with image_blob_urls and image_ids_by_slide
"""
try:
from pptx import Presentation
from PIL import Image
prs = Presentation(pptx_path)
extracted_data = {
"image_blob_urls": {},
"image_ids_by_slide": {},
"total_slides": len(prs.slides)
}
# Self-hosted: HF persistent storage (no client init needed)
total_images_found = 0
total_images_processed = 0
# Count total images first
for slide_num, slide in enumerate(prs.slides):
for shape in slide.shapes:
if hasattr(shape, "image"):
if shape.image.ext.lower() in ['jpg', 'jpeg', 'png']:
total_images_found += 1
if progress_callback:
await progress_callback(f"📸 PPTX: Found {total_images_found} images across {len(prs.slides)} slides")
else:
print(f"PPTX: Found {total_images_found} images across {len(prs.slides)} slides")
for slide_num, slide in enumerate(prs.slides):
# Extract images from slide
slide_image_ids = []
image_counter = 0
for shape in slide.shapes:
if hasattr(shape, "image"):
try:
image = shape.image
# Only extract jpg, jpeg, and png images
if image.ext.lower() in ['jpg', 'jpeg', 'png']:
image_bytes = image.blob
# Generate unique image ID
image_id = f"slide{slide_num+1}_image{image_counter+1}"
slide_image_ids.append(image_id)
image_counter += 1
total_images_processed += 1
if progress_callback:
await progress_callback(f"🖼️ Processing PPTX image {total_images_processed}: {image_id}")
else:
print(f"Processing PPTX image {total_images_processed}: {image_id}")
# Convert image to RGB if necessary
pil_image = Image.open(io.BytesIO(image_bytes))
if pil_image.mode != 'RGB':
pil_image = pil_image.convert('RGB')
# Filter out small/empty images (less than 50x50 pixels or very small file size)
if pil_image.width < 50 or pil_image.height < 50 or len(image_bytes) < 5000:
if progress_callback:
await progress_callback(f"⏭️ Skipping small/empty PPTX image on slide {slide_num+1} ({pil_image.width}x{pil_image.height}, {len(image_bytes)} bytes)")
else:
print(f"Skipping small/empty PPTX image on slide {slide_num+1}: {pil_image.width}x{pil_image.height}, {len(image_bytes)} bytes")
continue
# Check for duplicate images by comparing image hash
import hashlib
image_hash = hashlib.md5(image_bytes).hexdigest()
if hasattr(extract_images_from_pptx, '_seen_hashes'):
if image_hash in extract_images_from_pptx._seen_hashes:
if progress_callback:
await progress_callback(f"⏭️ Skipping duplicate PPTX image on slide {slide_num+1}")
else:
print(f"Skipping duplicate PPTX image on slide {slide_num+1}")
continue
extract_images_from_pptx._seen_hashes.add(image_hash)
else:
extract_images_from_pptx._seen_hashes = {image_hash}
# Apply bounding box cropping if provided
bbox = bbox_dict.get(image_id) if bbox_dict else None
if bbox:
pil_image = await crop_image_with_bounding_box(pil_image, bbox)
# Resize image to reasonable size for UI display (max 800px width/height)
max_size = 800
if pil_image.width > max_size or pil_image.height > max_size:
pil_image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
# Convert to JPEG bytes for blob storage with quality optimization
img_buffer = io.BytesIO()
pil_image.save(img_buffer, format='JPEG', quality=85, optimize=True)
img_buffer.seek(0)
jpeg_bytes = img_buffer.getvalue()
# Store image in cloud storage
try:
from hf_storage import store_image
success = await store_image(image_id, jpeg_bytes, folder_prefix="pptx_images")
if success:
blob_url = f"hf://pptx_images/{image_id}.jpg"
extracted_data["image_blob_urls"][image_id] = blob_url
logger.info(f"Uploaded image to HF persistent storage: pptx_images/{image_id}.jpg")
else:
logger.error(f"Failed to upload image {image_id} to HF persistent storage")
except Exception as e:
logger.error(f"Failed to upload image {image_id} to HF storage: {e}")
continue
except Exception as e:
logger.error(f"Error processing image on slide {slide_num+1}: {e}")
continue
# Store image IDs for this slide
if slide_image_ids:
extracted_data["image_ids_by_slide"][slide_num+1] = slide_image_ids
else:
if progress_callback:
await progress_callback(f"📄 Slide {slide_num+1}: No images found")
else:
logger.info(f"Slide {slide_num+1}: No images found")
# Final summary
if progress_callback:
await progress_callback(f"✅ PPTX image extraction complete: {total_images_processed}/{total_images_found} images processed")
return extracted_data
except Exception as e:
logger.error(f"Error extracting images from PPTX: {e}")
raise Exception(f"Error extracting images from PPTX: {str(e)}")
async def process_pptx(file_path: str, progress_callback=None) -> str:
"""Process PPTX file by extracting text and sending to LLM for analysis with progress logging"""
try:
from pptx import Presentation
# Load the presentation
prs = Presentation(file_path)
slide_count = len(prs.slides)
logger.info(f"Processing PPTX with {slide_count} slides")
# Extract all text from the presentation
all_text = []
for slide_num, slide in enumerate(prs.slides):
if progress_callback:
await progress_callback(f"📊 Processing slide {slide_num+1} of {slide_count}")
else:
print(f"Processing slide {slide_num+1} of {slide_count}")
slide_text = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
slide_text.append(shape.text.strip())
# Extract tables
if shape.has_table:
table = shape.table
for row in table.rows:
row_text = " | ".join(cell.text for cell in row.cells)
if row_text.strip():
slide_text.append(row_text)
# Extract notes if available
if slide.has_notes_slide:
notes = slide.notes_slide.notes_text_frame
if notes:
slide_text.append("--- Notes ---")
for para in notes.paragraphs:
if para.text.strip():
slide_text.append(para.text.strip())
if slide_text:
all_text.append(f"=== SLIDE {slide_num + 1} ===\n" + "\n".join(slide_text))
else:
all_text.append(f"=== SLIDE {slide_num + 1} ===\n[No text content]")
# Combine all text
full_presentation_text = "\n\n".join(all_text)
return full_presentation_text
except Exception as e:
logger.error(f"Error processing PPTX file: {e}")
raise Exception(f"Error processing PPTX file: {str(e)}")
def convert_dataframe_to_markdown_kv(data: pd.DataFrame, sheet_name: str) -> str:
"""Convert pandas DataFrame to markdown key-value format
Args:
data: pandas DataFrame
sheet_name: Name of the sheet
Returns:
str: Markdown formatted text with key-value pairs for each record
"""
if data.empty:
return f"# {sheet_name}\n\n*No data found in this sheet.*\n"
# Clean column names (remove extra whitespace)
data.columns = data.columns.str.strip()
# Build markdown output
output = [f"# {sheet_name}\n"]
# Convert each row to a record
for record_num, (idx, row) in enumerate(data.iterrows(), start=1):
output.append(f"## Record {record_num}\n")
output.append("```\n")
# Add each column as key-value pair
for col in data.columns:
value = row[col]
# Handle NaN, None, and empty values
if pd.isna(value) or value == "":
value = ""
else:
value = str(value).strip()
output.append(f"{col}: {value}\n")
output.append("```\n")
return "".join(output)
async def process_excel(file_path: str, markdown_kv: bool = False, storage_provider: str = "azure") -> tuple:
"""Process Excel file and extract all images and data from all sheets
Args:
file_path: Path to Excel file
markdown_kv: If True, converts cell data and table images to markdown key-value format
storage_provider: Cloud storage provider ("azure", "aws", or "selfhosted")
Returns:
tuple: (extracted_text: str, image_summaries: dict)
"""
try:
extracted_text = []
image_summaries = {}
pxl_doc = openpyxl.load_workbook(file_path)
# Collect all text data
all_text_data = []
for sheet_name in pxl_doc.sheetnames:
print(f"Processing sheet: {sheet_name}")
sheet = pxl_doc[sheet_name]
try:
data = pd.read_excel(file_path, sheet_name=sheet_name)
if not data.empty:
if markdown_kv:
# Convert DataFrame to markdown key-value format
sheet_data = convert_dataframe_to_markdown_kv(data, sheet_name)
extracted_text.append(sheet_data)
all_text_data.append(sheet_data)
print(f"Extracted data from sheet: {sheet_name} (markdown-kv format)")
else:
# Use original format
sheet_data = f"=== SHEET: {sheet_name} ===\n{data.to_string()}\n"
extracted_text.append(sheet_data)
all_text_data.append(sheet_data)
print(f"Extracted data from sheet: {sheet_name}")
except Exception as e:
logger.error(f"Error extracting data from sheet {sheet_name}: {e}")
extracted_text.append(f"[ERROR extracting data from sheet {sheet_name}: {str(e)}]")
# Self-hosted: HF storage used when markdown_kv is enabled (no client init needed)
# Process images
for sheet_name in pxl_doc.sheetnames:
sheet = pxl_doc[sheet_name]
if sheet._images:
print(f"Found {len(sheet._images)} images in sheet: {sheet_name}")
for idx, image in enumerate(sheet._images):
try:
print(f"Processing image {idx + 1} from sheet {sheet_name}")
img_data = image._data()
image_id = f"{sheet_name}_image_{idx + 1}"
if markdown_kv:
# Upload image to cloud storage and use summarize_image_with_gpt
from PIL import Image
import io
# Convert image to PIL Image
pil_image = Image.open(io.BytesIO(img_data))
if pil_image.mode != 'RGB':
pil_image = pil_image.convert('RGB')
# Resize if too large
max_size = 800
if pil_image.width > max_size or pil_image.height > max_size:
pil_image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
# Convert to JPEG bytes
img_buffer = io.BytesIO()
pil_image.save(img_buffer, format='JPEG', quality=85, optimize=True)
img_buffer.seek(0)
jpeg_bytes = img_buffer.getvalue()
# Upload to self-hosted (HF) storage
blob_url = None
try:
from hf_storage import store_image
success = await store_image(image_id, jpeg_bytes, folder_prefix="xlsx_images")
if success:
blob_url = f"hf://xlsx_images/{image_id}.jpg"
logger.info(f"Uploaded image to HF persistent storage: xlsx_images/{image_id}.jpg")
else:
logger.error(f"Failed to upload image {image_id} to HF persistent storage")
blob_url = None
except Exception as e:
logger.error(f"Failed to upload image {image_id} to HF storage: {e}")
blob_url = None
# Generate summary with markdown-kv support
if blob_url:
try:
summary = await summarize_image_with_gpt(blob_url, image_id, storage_provider, markdown_kv_enabled=True)
image_summaries[image_id] = summary
extracted_text.append(f"=== IMAGE {idx + 1} FROM SHEET: {sheet_name} ===\n{summary}\n")
except Exception as e:
logger.error(f"Error summarizing image {image_id} with GPT: {e}")
extracted_text.append(f"[ERROR processing image {idx + 1} from sheet {sheet_name}: {str(e)}]")
else:
# Fallback to extract_page_with_llm if upload failed
base64_img = base64.b64encode(img_data).decode("utf-8")
result = await extract_page_with_llm(base64_img, image_id)
extracted_text.append(f"=== IMAGE {idx + 1} FROM SHEET: {sheet_name} ===\n{result}\n")
else:
# Use extract_page_with_llm (original behavior)
base64_img = base64.b64encode(img_data).decode("utf-8")
result = await extract_page_with_llm(base64_img, image_id)
extracted_text.append(f"=== IMAGE {idx + 1} FROM SHEET: {sheet_name} ===\n{result}\n")
except Exception as e:
logger.error(f"Error processing image {idx + 1} from sheet {sheet_name}: {e}")
extracted_text.append(f"[ERROR processing image {idx + 1} from sheet {sheet_name}: {str(e)}]")
pxl_doc.close()
if not extracted_text:
return "No data or images found in the Excel file.", {}
return "\n".join(extracted_text), image_summaries
except Exception as e:
logger.error(f"Error processing Excel file: {e}")
raise Exception(f"Error processing Excel file: {str(e)}")
async def process_csv(file_path: str) -> str:
"""Process CSV file and extract all data"""
try:
extracted_text = []
# Try to read CSV with pandas first (handles encoding better)
try:
# Try different encodings
encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']
data = None
for encoding in encodings:
try:
data = pd.read_csv(file_path, encoding=encoding)
logger.info(f"Successfully read CSV with encoding: {encoding}")
break
except UnicodeDecodeError:
continue
if data is None:
# Fallback to csv module
import csv
rows = []
with open(file_path, 'r', encoding='utf-8', errors='ignore') as csvfile:
csv_reader = csv.reader(csvfile)
for row in csv_reader:
rows.append(row)
data = pd.DataFrame(rows)
except Exception as e:
logger.warning(f"Error reading CSV with pandas: {e}, trying csv module")
# Fallback to csv module
import csv
rows = []
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as csvfile:
csv_reader = csv.reader(csvfile)
for row in csv_reader:
rows.append(row)
data = pd.DataFrame(rows)
except Exception as e2:
logger.error(f"Error reading CSV with csv module: {e2}")
raise Exception(f"Error reading CSV file: {str(e2)}")
if data is not None and not data.empty:
# Extract all data as string
csv_data = data.to_string()
extracted_text.append(f"=== CSV DATA ===\n{csv_data}\n")
logger.info(f"Extracted {len(data)} rows from CSV file")
else:
return "No data found in the CSV file."
return "\n".join(extracted_text)
except Exception as e:
logger.error(f"Error processing CSV file: {e}")
raise Exception(f"Error processing CSV file: {str(e)}")
async def process_audio(file_path: str, progress_callback=None) -> str:
"""
Transcribe audio file using Google Gemini.
Args:
file_path: Path to the audio file to transcribe
progress_callback: Optional callback for progress updates
Returns:
str: Transcribed text
"""
try:
if not GEMINI_AVAILABLE:
raise Exception("Gemini SDK not available. Install with: pip install google-generativeai")
gemini_api_key = os.getenv("GEMINI_API_KEY")
if not gemini_api_key:
raise Exception("GEMINI_API_KEY not found in environment variables")
genai.configure(api_key=gemini_api_key)
if progress_callback:
await progress_callback("📤 Uploading audio to Gemini...")
# Determine MIME type based on file extension
file_ext = os.path.splitext(file_path)[1].lower()
mime_types = {
'.mp3': 'audio/mp3',
'.wav': 'audio/wav',
'.flac': 'audio/flac',
'.aac': 'audio/aac',
'.ogg': 'audio/ogg',
'.m4a': 'audio/mp4',
'.wma': 'audio/x-ms-wma',
'.webm': 'audio/webm',
'.amr': 'audio/amr'
}
mime_type = mime_types.get(file_ext, 'audio/mpeg')
# Upload audio using Files API
uploaded_file = genai.upload_file(
path=file_path,
mime_type=mime_type
)
if uploaded_file:
logger.info(f"✅ Successfully uploaded audio to Gemini Files API: {uploaded_file.name}")
if progress_callback:
await progress_callback("🔄 Transcribing audio with Gemini...")
# Wait for file to be processed
import time
while uploaded_file.state.name == "PROCESSING":
if progress_callback:
await progress_callback("⏳ Waiting for audio processing...")
time.sleep(2)
uploaded_file = genai.get_file(uploaded_file.name)
if uploaded_file.state.name == "FAILED":
raise Exception(f"Audio processing failed: {uploaded_file.state.name}")
# Use Gemini to transcribe the audio
model = configure_gemini()
prompt = """Transcribe this audio file accurately.
Follow these guidelines:
- Transcribe all spoken words exactly as heard
- Preserve the original language of the speech
- Include speaker labels if multiple speakers are detected (e.g., Speaker 1:, Speaker 2:)
- Include timestamps in format [MM:SS] at natural breaks or speaker changes
- Preserve punctuation and sentence structure
- Note any significant non-speech sounds in brackets (e.g., [music], [applause])
- If the audio is in Japanese, transcribe in Japanese
- Maintain the natural flow and structure of the conversation
"""
# Generate response using async wrapper
def generate_sync():
return model.generate_content([prompt, uploaded_file])
response = await asyncio.to_thread(generate_sync)
# Clean up uploaded file
try:
genai.delete_file(uploaded_file.name)
except:
pass
if progress_callback:
await progress_callback("✅ Audio transcription completed!")
if hasattr(response, 'text'):
return response.text
else:
return str(response)
except Exception as e:
logger.error(f"Error processing audio file with Gemini: {e}")
raise Exception(f"Error processing audio file: {str(e)}")
async def extract_video_frames(file_path: str, interval_seconds: float = 4.0, progress_callback=None) -> list:
"""
Extract frames from video at specified intervals.
Args:
file_path: Path to video file
interval_seconds: Interval in seconds between frames (default: 4.0 for ~3-5 sec range)
progress_callback: Optional callback for progress updates
Returns:
list: List of PIL Image objects representing extracted frames
"""
try:
# Try to import cv2
try:
import cv2
except ImportError:
raise Exception("OpenCV (cv2) is required for video frame extraction. Install with: pip install opencv-python")
from PIL import Image
frames = []
cap = cv2.VideoCapture(file_path)
if not cap.isOpened():
raise Exception(f"Could not open video file: {file_path}")
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps if fps > 0 else 0
if progress_callback:
await progress_callback(f"📹 Extracting frames from video (duration: {duration:.1f}s, interval: {interval_seconds}s)...")
frame_interval = int(fps * interval_seconds) if fps > 0 else int(30 * interval_seconds) # Default to 30 fps if unknown
frame_count = 0
extracted_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
# Extract frame at intervals
if frame_count % frame_interval == 0:
# Convert BGR to RGB for PIL
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(frame_rgb)
frames.append(pil_image)
extracted_count += 1
if progress_callback:
timestamp = frame_count / fps if fps > 0 else frame_count / 30
await progress_callback(f"📸 Extracted frame {extracted_count} at {timestamp:.1f}s")
frame_count += 1
cap.release()
if progress_callback:
await progress_callback(f"✅ Extracted {len(frames)} frames from video")
return frames
except Exception as e:
logger.error(f"Error extracting video frames: {e}")
raise Exception(f"Error extracting video frames: {str(e)}")
async def analyze_video_frames_with_gemini(frames: list, progress_callback=None) -> str:
"""
Analyze video frames using Gemini 3 Pro and generate a visual summary.
"""
try:
if not GEMINI_AVAILABLE:
raise Exception("Gemini SDK not available. Install with: pip install google-generativeai")
if not frames:
return "No frames extracted from video."
from PIL import Image
# Use Gemini 3 Pro specifically
model = configure_gemini3()
if progress_callback:
await progress_callback(f"🤖 Analyzing {len(frames)} frames with Gemini 3 Pro...")
import io
frame_descriptions = []
batch_size = 10 # Process 10 frames at a time
for i in range(0, len(frames), batch_size):
batch = frames[i:i+batch_size]
content_parts = []
# Add text prompt
frame_nums = f"frames {i+1}-{min(i+len(batch), len(frames))}"
prompt = f"""Analyze these video frames ({frame_nums} of {len(frames)} total frames).
Describe what you see in detail, including:
- Main subjects, objects, or people
- Actions or activities happening
- Setting or environment
- Any text, diagrams, or important visual elements
- Overall scene context
Provide a comprehensive description for this batch of frames."""
content_parts.append(prompt)
# Add images to content
for frame in batch:
# Resize if too large
max_size = 1536
if frame.width > max_size or frame.height > max_size:
frame = frame.copy()
frame.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
# Convert to bytes for Gemini
img_buffer = io.BytesIO()
frame.save(img_buffer, format='JPEG', quality=90)
img_buffer.seek(0)
# Add as PIL Image
content_parts.append(Image.open(img_buffer))
try:
# Generate content using Gemini 3 Pro
def generate_batch():
return model.generate_content(
content_parts,
generation_config={
'temperature': 0.4,
'max_output_tokens': 2048,
}
)
response = await asyncio.to_thread(generate_batch)
if response and hasattr(response, 'text') and response.text:
frame_descriptions.append(f"Frames {frame_nums}:\n{response.text}\n")
if progress_callback:
await progress_callback(f"✅ Analyzed batch {i//batch_size + 1}/{(len(frames)-1)//batch_size + 1}")
else:
logger.warning(f"No text in response for batch {i//batch_size + 1}")
frame_descriptions.append(f"Frames {frame_nums}: [No response generated]\n")
except Exception as e:
logger.warning(f"Error analyzing batch {i//batch_size + 1}: {e}")
frame_descriptions.append(f"Frames {frame_nums}: [Analysis error: {str(e)}]\n")
# Generate final summary from all frame descriptions
if progress_callback:
await progress_callback("📝 Generating final visual summary with Gemini 3 Pro...")
if not frame_descriptions:
return "Could not analyze any video frames."
all_descriptions = "\n\n".join(frame_descriptions)
summary_prompt = f"""Based on the following frame-by-frame analysis of a video, create a comprehensive visual summary:
{all_descriptions}
Please provide:
1. A concise overall summary of what the video shows
2. Key visual elements, scenes, or moments
3. Main subjects or topics visible in the video
4. Any important details that appear across multiple frames
Format the summary in a clear, organized manner."""
try:
def generate_summary():
return model.generate_content(
summary_prompt,
generation_config={
'temperature': 0.4,
'max_output_tokens': 2048,
}
)
summary_response = await asyncio.to_thread(generate_summary)
if summary_response and hasattr(summary_response, 'text') and summary_response.text:
visual_summary = summary_response.text
else:
visual_summary = "\n\n".join(frame_descriptions)
except Exception as e:
logger.warning(f"Error generating final summary: {e}")
visual_summary = "\n\n".join(frame_descriptions)
if progress_callback:
await progress_callback("✅ Visual summary generated successfully with Gemini 3 Pro")
return visual_summary
except Exception as e:
logger.error(f"Error analyzing video frames with Gemini 3 Pro: {e}")
raise Exception(f"Error analyzing video frames with Gemini 3 Pro: {str(e)}")
async def process_video(file_path: str, progress_callback=None) -> dict:
"""
Process video: extract audio for transcription and frames for visual analysis.
Args:
file_path: Path to video file
progress_callback: Optional callback for progress updates
Returns:
dict: Contains 'transcript' (str) and 'visual_summary' (str)
"""
try:
temp_dir = tempfile.gettempdir()
audio_path = os.path.join(temp_dir, f"temp_audio_{os.getpid()}_{hash(file_path) % 10000}.mp3")
# Step 1: Extract and transcribe audio
if progress_callback:
await progress_callback("🎵 Extracting audio from video...")
audio = extract_audio(input_path=file_path, output_path=audio_path)
transcript = await process_audio(audio_path)
# Step 2: Extract video frames at 3-5 second intervals
frames = []
visual_summary = ""
try:
# Extract frames (using 4 second interval for ~3-5 sec range)
frames = await extract_video_frames(file_path, interval_seconds=4.0, progress_callback=progress_callback)
# Step 3: Analyze frames with Gemini
if frames and GEMINI_AVAILABLE:
visual_summary = await analyze_video_frames_with_gemini(frames, progress_callback=progress_callback)
elif frames:
visual_summary = f"Extracted {len(frames)} frames from video, but Gemini is not available for analysis."
else:
visual_summary = "No frames could be extracted from the video."
except Exception as e:
logger.warning(f"Error processing video frames: {e}")
visual_summary = f"Visual analysis failed: {str(e)}"
# Clean up temporary audio file
try:
if os.path.exists(audio_path):
os.unlink(audio_path)
except Exception as e:
logger.warning(f"Failed to delete temporary audio file {audio_path}: {e}")
return {
"transcript": transcript,
"visual_summary": visual_summary,
"frame_count": len(frames)
}
except Exception as e:
logger.error(f"Error processing video file: {e}")
raise Exception(f"Error processing video file: {str(e)}")
async def extract_text_from_dxf(file_path: str) -> str:
"""Extract all text content from DXF file using ezdxf (comprehensive extraction)"""
try:
logger.info(f"Extracting text from DXF file: {file_path}")
# Read the DXF file
doc = ezdxf.readfile(file_path)
msp = doc.modelspace()
extracted_texts = []
# Extract TEXT entities
for text_entity in msp.query("TEXT"):
if hasattr(text_entity.dxf, 'text') and text_entity.dxf.text:
extracted_texts.append(f"TEXT: {text_entity.dxf.text}")
# Extract MTEXT entities (multi-line text)
for mtext_entity in msp.query("MTEXT"):
if hasattr(mtext_entity, 'text') and mtext_entity.text:
extracted_texts.append(f"MTEXT: {mtext_entity.text}")
# Extract INSERT (block references) with attributes
for insert in msp.query("INSERT"):
# Get block reference name
block_name = insert.dxf.name if hasattr(insert.dxf, 'name') else "Unknown Block"
extracted_texts.append(f"BLOCK: {block_name}")
# Extract ATTRIB entities (attributes attached to blocks)
for attrib in insert.attribs:
if hasattr(attrib.dxf, 'text') and attrib.dxf.text:
extracted_texts.append(f" ATTRIB: {attrib.dxf.text}")
# Try to get text from block definition
block = insert.block()
if block is not None:
# Extract TEXT from block definition
for text in block.query("TEXT"):
if hasattr(text.dxf, 'text') and text.dxf.text:
extracted_texts.append(f" BLOCK_TEXT: {text.dxf.text}")
# Extract MTEXT from block definition
for mtext in block.query("MTEXT"):
if hasattr(mtext, 'text') and mtext.text:
extracted_texts.append(f" BLOCK_MTEXT: {mtext.text}")
# Extract DIMENSION entities (may contain measurement text)
for dim in msp.query("DIMENSION"):
if hasattr(dim.dxf, 'text') and dim.dxf.text:
extracted_texts.append(f"DIMENSION: {dim.dxf.text}")
# Extract LEADER entities (may contain annotation text)
for leader in msp.query("LEADER"):
if hasattr(leader.dxf, 'annotation') and leader.dxf.annotation:
extracted_texts.append(f"LEADER: {leader.dxf.annotation}")
# Combine all extracted text
if extracted_texts:
combined_text = "\n".join(extracted_texts)
logger.info(f"Successfully extracted {len(extracted_texts)} text elements from DXF")
return combined_text
else:
logger.warning("No text found in DXF file")
return "No text content found in DXF file"
except Exception as e:
logger.error(f"Error extracting text from DXF: {e}")
return f"Error extracting text from DXF: {str(e)}"
async def process_dxf(file_path: str, progress_callback=None, custom_prompt: str = None) -> str:
"""
Process DXF file with complete pipeline:
1. Extract text using ezdxf
2. Convert to PDF and upload to OpenAI
3. Generate detailed report using LLM with file_id
4. Combine text + report for embeddings
5. Store in vector database
Args:
file_path: Path to DXF file
progress_callback: Optional callback for progress updates
custom_prompt: Optional custom system prompt (uses default DXF_ANALYSIS_PROMPT if not provided)
"""
temp_pdf_path = None
file_id = None
async def _update(msg: str):
if progress_callback:
await progress_callback(msg)
try:
await _update("🔄 Processing DXF file with LLM analysis pipeline...")
logger.info("Processing DXF file with complete pipeline")
# Step 1: Extract text from DXF using ezdxf
await _update("📐 Step 1: Extracting text from DXF...")
logger.info("Step 1: Extracting text from DXF")
extracted_text = await extract_text_from_dxf(file_path)
# Step 2: Convert DXF to PDF
await _update("📄 Step 2: Converting DXF to PDF...")
logger.info("Step 2: Converting DXF to PDF")
temp_dir = tempfile.gettempdir()
temp_pdf_path = os.path.join(temp_dir, f"temp_dxf_{os.getpid()}_{hash(file_path) % 10000}.pdf")
doc = ezdxf.readfile(file_path)
msp = doc.modelspace()
# Create render context
ctx = RenderContext(doc)
# Create backend for PyMuPDF (for PDF)
backend = pymupdf.PyMuPdfBackend()
# Configure appearance
cfg = config.Configuration(
background_policy=config.BackgroundPolicy.WHITE,
color_policy=config.ColorPolicy.BLACK
)
# Create frontend with the configuration
frontend = Frontend(ctx, backend, config=cfg)
# Draw the layout (modelspace) into the backend
frontend.draw_layout(msp)
# Define a page layout (e.g. A4, margins)
page = layout.Page(210, 297, layout.Units.mm, margins=layout.Margins.all(20))
# Get PDF as bytes
pdf_bytes = backend.get_pdf_bytes(page)
# Write bytes to file
with open(temp_pdf_path, "wb") as f:
f.write(pdf_bytes)
await _update("✅ DXF to PDF conversion completed")
logger.info("DXF to PDF conversion completed")
print(f'DXF to PDF conversion completed', temp_pdf_path)
# Step 3: Upload the PDF file to OpenAI
await _update("📤 Step 3: Uploading PDF to OpenAI...")
logger.info("Step 3: Uploading PDF to OpenAI")
file_id = upload_file_to_openai(temp_pdf_path)
# Step 4: Generate detailed report using LLM with extracted text + file_id
await _update("🤖 Step 4: Generating detailed report with LLM...")
logger.info("Step 4: Generating detailed report with LLM")
# Use custom prompt if provided, otherwise use default
system_prompt = custom_prompt if (custom_prompt and custom_prompt.strip()) else DXF_ANALYSIS_PROMPT
# Prepare enhanced prompt with extracted text
enhanced_prompt = f"""You are analyzing a DXF CAD file. First, here is the extracted text content from the file: {extracted_text} Now, using both the extracted text above AND the visual PDF representation of the file, follow the system prompt instructions to generate the JSON object and detailed narrative explanation."""
messages = [
{
"role": "system",
"content": system_prompt
},
{
"role": "user",
"content": [
{
"type": "text",
"text": enhanced_prompt
},
{
"type": "file",
"file": {
"file_id": file_id
}
}
]
}
]
# Use sync client for chat completion
response = await asyncio.to_thread(
sync_client.chat.completions.create,
model="gpt-4.1-mini",
messages=messages,
temperature=0.0
)
# Extract the analysis result (detailed report)
llm_report = response.choices[0].message.content.strip()
# Step 5: Combine extracted text + LLM report for embeddings
await _update("✅ Step 5: Combining text and report for embeddings...")
logger.info("Step 5: Combining text and report for embeddings")
combined_content = f"""{extracted_text}{llm_report}"""
await _update("✅ DXF processing completed successfully!")
logger.info(f"Successfully processed DXF file. Combined content length: {len(combined_content)} characters")
return combined_content
except Exception as e:
logger.error(f"Error processing DXF file: {e}")
raise Exception(f"Error processing DXF file: {str(e)}")
finally:
# Clean up - delete the uploaded file from OpenAI
if file_id:
try:
await asyncio.to_thread(sync_client.files.delete, file_id)
logger.info(f"Successfully deleted uploaded file: {file_id}")
except Exception as e:
logger.warning(f"Failed to delete uploaded file {file_id}: {e}")
# Clean up temporary PDF file
if temp_pdf_path and os.path.exists(temp_pdf_path):
try:
os.unlink(temp_pdf_path)
logger.info(f"Successfully deleted temporary PDF: {temp_pdf_path}")
except Exception as e:
logger.warning(f"Failed to delete temporary PDF file {temp_pdf_path}: {e}")
async def process_image(file_path: str) -> str:
"""Process image files (JPG, PNG) using LLM vision (default: OpenAI)"""
try:
# Read the image file
with open(file_path, "rb") as img_file:
img_data = img_file.read()
base64_img = base64.b64encode(img_data).decode("utf-8")
# Get file extension for context
file_ext = os.path.splitext(file_path)[1].lower()
image_type = "image" if file_ext in [".jpg", ".jpeg"] else "PNG image"
# For standalone images, analyze the image for content
result = await extract_page_with_llm(base64_img, f"{image_type}_file")
return result
except Exception as e:
logger.error(f"Error processing image file: {e}")
raise Exception(f"Error processing image file: {str(e)}")
async def youtube_video(video_id: str) -> str:
try:
ytt_api = YouTubeTranscriptApi()
transcript = ytt_api.fetch(video_id)
if hasattr(transcript, 'snippets'):
text_content = ' '.join([snippet.text for snippet in transcript.snippets])
return text_content
else:
return str(transcript)
except Exception as e:
logger.error(f"Error processing YouTube video: {e}")
raise Exception(f"Error processing YouTube video: {str(e)}")
async def process_web_url(url: str, storage_provider: str = "azure") -> str:
try:
# Fetch the webpage using requests
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
}
resp = requests.get(url, timeout=10, headers=headers)
resp.raise_for_status()
soup = BeautifulSoup(resp.content, "html.parser")
# Extract text content using BeautifulSoup
text_content = soup.get_text(separator=' ', strip=True)
# Self-hosted: HF persistent storage (no client init needed)
# Extract image URLs using images_url.py logic
def is_valid_url(url):
parsed = urlparse(url)
return bool(parsed.scheme) and bool(parsed.netloc)
def extract_from_html(soup, base_url):
urls = set()
# 1. <img>, <picture>, <source> tags
for tag in soup.find_all(("img", "source")):
for attr in ("data-srcset", "data-src", "srcset", "src"):
v = tag.get(attr)
if v:
# if srcset has comma list, take largest
if attr in ("srcset", "data-srcset"):
parts = [p.strip().split(" ")[0] for p in v.split(",")]
v2 = parts[-1]
else:
v2 = v
full = urljoin(base_url, v2)
if is_valid_url(full):
urls.add(full)
break
# 2. inline styles – background-image
bg_re = re.compile(r"background(?:-image)?\s*:\s*url\((['\"]?)(.*?)\1\)")
for tag in soup.find_all(style=True):
style = tag["style"]
m = bg_re.search(style)
if m:
u = m.group(2)
full = urljoin(base_url, u)
if is_valid_url(full):
urls.add(full)
return urls
def extract_from_css(css_text, base_url):
urls = set()
# search url(...) patterns
for match in re.finditer(r"url\((['\"]?)(.*?)\1\)", css_text):
u = match.group(2)
full = urljoin(base_url, u)
if is_valid_url(full):
urls.add(full)
return urls
def fetch_css_links(soup, base_url):
links = []
for link in soup.find_all("link", rel="stylesheet"):
href = link.get("href")
if href:
full = urljoin(base_url, href)
links.append(full)
return links
# Extract image URLs from HTML
image_urls = extract_from_html(soup, url)
# Fetch CSS files and parse them for additional image URLs
css_links = fetch_css_links(soup, url)
for css_url in css_links:
try:
cr = requests.get(css_url, timeout=10, headers=headers)
cr.raise_for_status()
more_urls = extract_from_css(cr.text, css_url)
image_urls.update(more_urls)
except Exception as e:
logger.debug(f"CSS fetch failed: {css_url}, {e}")
logger.info(f"Found {len(image_urls)} candidate image URLs")
# Process images
image_descriptions = []
image_counter = 0
image_blob_urls = {}
for img_url in image_urls:
try:
# Download image header first to check type and size
head_response = requests.head(img_url, timeout=10, headers=headers)
head_response.raise_for_status()
# Check content type - only JPG and PNG
content_type = head_response.headers.get('content-type', '').lower()
if not (content_type.startswith("image/jpeg") or content_type.startswith("image/png")):
logger.debug(f"Skipping unsupported format: {content_type}")
continue
# Check image size (skip very small images - likely icons)
content_length = head_response.headers.get('content-length')
if content_length and int(content_length) < 15000: # Less than 15KB
logger.debug(f"Skipping small image (likely icon): {content_length} bytes")
continue
# Download the full image
img_response = requests.get(img_url, timeout=10, stream=True, headers=headers)
img_response.raise_for_status()
# Double-check content type from actual response
actual_content_type = img_response.headers.get('content-type', '').lower()
if not (actual_content_type.startswith("image/jpeg") or actual_content_type.startswith("image/png")):
logger.debug(f"Skipping unsupported format in response: {actual_content_type}")
continue
# Create unique image ID
image_counter += 1
image_id = f"web_image_{image_counter:03d}"
# Get image data
image_data = img_response.content
# Upload to self-hosted (HF) storage
try:
from hf_storage import store_image
success = await store_image(image_id, image_data, folder_prefix="web_images")
if success:
blob_url = f"hf://web_images/{image_id}.jpg"
image_blob_urls[image_id] = blob_url
logger.info(f"Uploaded web image to HF persistent storage: web_images/{image_id}.jpg")
else:
logger.error(f"Failed to upload web image {image_id} to HF persistent storage")
except Exception as upload_error:
logger.error(f"Failed to upload image {image_id}: {upload_error}")
continue
# Create image description with image ID
img_desc = f"[IMAGE: Web image from {url}] (Image ID: {image_id})"
image_descriptions.append(img_desc)
logger.info(f"Processed web image: {image_id}")
except Exception as img_error:
logger.warning(f"Could not download image {img_url}: {img_error}")
continue
# Generate image summaries using GPT if images were found
image_summaries = {}
if image_blob_urls:
logger.info("Generating image summaries with GPT...")
for image_id, blob_url in image_blob_urls.items():
try:
# Import markdown_kv_enabled from app module
from app import markdown_kv_enabled
image_summaries[image_id] = await summarize_image_with_gpt(blob_url, image_id, storage_provider, markdown_kv_enabled)
except Exception as e:
logger.error(f"Error summarizing image {image_id}: {e}")
image_summaries[image_id] = f"[Error summarizing image {image_id}: {str(e)}]"
# Prepare content for RAG (includes image information and summaries for search)
rag_content = text_content
if image_descriptions:
# Create image IDs list for embedding
image_ids = list(image_blob_urls.keys())
images_section = f"[Images: {', '.join(image_ids)}]"
# Add image references to the content for RAG search
rag_content = text_content + "\n\n--- IMAGES FOUND ---\n" + "\n".join(image_descriptions)
rag_content += f"\n\n{images_section}"
# Add image summaries for better search capabilities
if image_summaries:
rag_content += "\n\nImage Summaries:"
for img_id, summary in image_summaries.items():
rag_content += f"\n- {img_id}: {summary}"
# Return both versions: clean text for display, full content for RAG
return {
"text": rag_content, # Full content with image info for RAG
"display_text": text_content, # Clean text for UI display
"image_summaries": image_summaries,
"image_blob_urls": image_blob_urls
}
except Exception as e:
logger.error(f"Error processing URL: {e}")
raise Exception(f"Error processing URL: {str(e)}")
async def _extract_text_async(file, video_id, web_url, storage_provider="azure"):
"""Async wrapper for the main extraction logic"""
# Handle web URL processing
if web_url and web_url.strip():
try:
result = await process_web_url(web_url.strip(), storage_provider)
if isinstance(result, dict):
return result["text"].strip()
else:
return result.strip()
except Exception as e:
return f"Error processing URL: {str(e)}"
# Handle YouTube video processing
if video_id and video_id.strip():
try:
video_id_clean = video_id.strip()
# Extract video ID from full URL if provided
if "youtube.com" in video_id_clean or "youtu.be" in video_id_clean:
patterns = [
r'(?:youtube\.com\/watch\?v=|youtube\.com\/shorts\/|youtu\.be\/)([a-zA-Z0-9_-]+)',
r'youtube\.com\/embed\/([a-zA-Z0-9_-]+)'
]
for pattern in patterns:
match = re.search(pattern, video_id_clean)
if match:
video_id_clean = match.group(1)
break
else:
return "Error: Invalid YouTube URL format"
extracted_text = await youtube_video(video_id_clean)
return extracted_text
except Exception as e:
return f"Error processing YouTube video: {str(e)}"
# Handle file processing
if file is None:
return "Error: Please provide either a file, YouTube video ID, or web URL"
# Check file size (20MB = 20 * 1024 * 1024 bytes)
try:
# Get file size from the file object
if hasattr(file, 'size'):
file_size_mb = file.size / (1024 * 1024)
elif hasattr(file, 'name'):
# If size attribute doesn't exist, get it from the file path
file_size_mb = os.path.getsize(file.name) / (1024 * 1024)
else:
# If we can't determine file size, skip the check
file_size_mb = 0
if file_size_mb > 20:
return f"Error: File size ({file_size_mb:.2f} MB) exceeds the maximum allowed size of 20 MB. Please upload a smaller file."
except Exception as e:
# If file size check fails, log it but continue processing
logger.warning(f"Could not check file size: {e}")
# Continue without size check
# Get file extension
filename = file.name
ext = os.path.splitext(filename)[1].lower()
if ext not in SUPPORTED_FILES:
return f"Error: Unsupported file type {ext}. Supported types: {', '.join(SUPPORTED_FILES)}"
try:
# Process based on file type
if ext == ".pdf":
extracted_text = await process_pdf(file.name)
elif ext in [".doc", ".docx"]:
extracted_text = await process_docx(file.name)
elif ext == ".txt":
extracted_text = await process_txt(file.name)
elif ext == ".pptx":
extracted_text = await process_pptx(file.name)
elif ext == ".xlsx":
extracted_text, _ = await process_excel(file.name)
elif ext in [".mp3", ".wav"]:
extracted_text = await process_audio(file.name)
elif ext == ".mp4":
video_result = await process_video(file.name)
# Combine transcript and visual summary
if isinstance(video_result, dict):
transcript = video_result.get("transcript", "")
visual_summary = video_result.get("visual_summary", "")
if visual_summary:
extracted_text = f"=== VIDEO TRANSCRIPT ===\n\n{transcript}\n\n=== VISUAL SUMMARY (Based on Video Frames) ===\n\n{visual_summary}"
else:
extracted_text = transcript
else:
# Fallback for old format
extracted_text = video_result
elif ext == ".dxf":
extracted_text = await process_dxf(file.name)
elif ext in [".jpg", ".jpeg", ".png"]:
extracted_text = await process_image(file.name)
else:
return f"Error: Unsupported file type {ext}"
return extracted_text
except Exception as e:
return f"Error processing file: {str(e)}"
def extract_text_from_file(file, video_id, web_url, storage_provider="azure"):
"""Main function to handle file extraction through Gradio"""
try:
# Run the async function in an event loop
return asyncio.run(_extract_text_async(file, video_id, web_url, storage_provider))
except Exception as e:
return f"Error: {str(e)}"