quantumbit's picture
Upload 41 files
5ff6b14 verified
raw
history blame
4.71 kB
from pptx import Presentation
from pptx.enum.shapes import MSO_SHAPE_TYPE
from typing import List, Dict, Any
from PIL import Image
from io import BytesIO
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
import tempfile
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..'))
from config import config
# OCR Space API configuration
API_KEY = getattr(config, 'OCR_SPACE_API_KEY', None)
API_URL = "https://api.ocr.space/parse/image"
def ocr_space_file(filename, api_key=API_KEY, overlay=False, language="eng"):
"""Extract text from image file using OCR Space API"""
if not api_key:
return filename, "OCR API key not configured"
payload = {
"isOverlayRequired": overlay,
"apikey": api_key,
"language": language,
"detectOrientation": True,
"scale": True,
"isTable": False,
"OCREngine": 2
}
try:
with open(filename, "rb") as f:
response = requests.post(API_URL, files={filename: f}, data=payload, timeout=30)
if response.status_code != 200:
return filename, f"API Error: HTTP {response.status_code}"
parsed = response.json()
if parsed.get("OCRExitCode") == 1:
parsed_text = parsed.get("ParsedResults", [{}])[0].get("ParsedText", "")
return filename, parsed_text
else:
error_msg = parsed.get("ErrorMessage", ["Unknown error"])[0] if parsed.get("ErrorMessage") else "Unknown OCR error"
return filename, f"OCR Error: {error_msg}"
except requests.exceptions.Timeout:
return filename, "Error: Request timeout"
except requests.exceptions.RequestException as e:
return filename, f"Error: Network error - {str(e)}"
except Exception as e:
return filename, f"Error: {e}"
def extract_pptx(pptx_path: str) -> str:
"""Extract text and images from PowerPoint presentations."""
try:
prs = Presentation(pptx_path)
except Exception as e:
return f"Error loading PowerPoint file: {str(e)}"
all_content = []
temp_files = []
try:
for slide_idx, slide in enumerate(prs.slides):
slide_content = [f"\\n=== Slide {slide_idx + 1} ===\\n"]
slide_images = []
for shape in slide.shapes:
# Extract text
if hasattr(shape, "text") and shape.text.strip():
slide_content.append(shape.text.strip())
# Extract images
elif shape.shape_type == MSO_SHAPE_TYPE.PICTURE:
try:
image = shape.image
image_bytes = image.blob
# Save image to temp file
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png")
temp_file.write(image_bytes)
temp_file.close()
temp_files.append(temp_file.name)
slide_images.append(temp_file.name)
except Exception as e:
slide_content.append(f"[Image extraction error: {str(e)}]")
# Process images with OCR if API key is available
if slide_images and API_KEY:
try:
with ThreadPoolExecutor(max_workers=3) as executor:
future_to_filename = {
executor.submit(ocr_space_file, img_file): img_file
for img_file in slide_images
}
for future in as_completed(future_to_filename):
filename, ocr_result = future.result()
if ocr_result and not ocr_result.startswith("Error") and not ocr_result.startswith("OCR Error"):
slide_content.append(f"[Image Text]: {ocr_result}")
except Exception as e:
slide_content.append(f"[OCR processing error: {str(e)}]")
elif slide_images:
slide_content.append(f"[{len(slide_images)} images found - OCR not available]")
all_content.append("\\n".join(slide_content))
finally:
# Clean up temp files
for temp_file in temp_files:
try:
os.unlink(temp_file)
except:
pass
return "\\n\\n".join(all_content)