from pptx import Presentation from pptx.enum.shapes import MSO_SHAPE_TYPE from typing import List, Dict, Any from PIL import Image from io import BytesIO import requests from concurrent.futures import ThreadPoolExecutor, as_completed import tempfile import os import sys sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) from config import config # OCR Space API configuration API_KEY = getattr(config, 'OCR_SPACE_API_KEY', None) API_URL = "https://api.ocr.space/parse/image" def ocr_space_file(filename, api_key=API_KEY, overlay=False, language="eng"): """Extract text from image file using OCR Space API""" if not api_key: return filename, "OCR API key not configured" payload = { "isOverlayRequired": overlay, "apikey": api_key, "language": language, "detectOrientation": True, "scale": True, "isTable": False, "OCREngine": 2 } try: with open(filename, "rb") as f: response = requests.post(API_URL, files={filename: f}, data=payload, timeout=30) if response.status_code != 200: return filename, f"API Error: HTTP {response.status_code}" parsed = response.json() if parsed.get("OCRExitCode") == 1: parsed_text = parsed.get("ParsedResults", [{}])[0].get("ParsedText", "") return filename, parsed_text else: error_msg = parsed.get("ErrorMessage", ["Unknown error"])[0] if parsed.get("ErrorMessage") else "Unknown OCR error" return filename, f"OCR Error: {error_msg}" except requests.exceptions.Timeout: return filename, "Error: Request timeout" except requests.exceptions.RequestException as e: return filename, f"Error: Network error - {str(e)}" except Exception as e: return filename, f"Error: {e}" def extract_pptx(pptx_path: str) -> str: """Extract text and images from PowerPoint presentations.""" try: prs = Presentation(pptx_path) except Exception as e: return f"Error loading PowerPoint file: {str(e)}" all_content = [] temp_files = [] try: for slide_idx, slide in enumerate(prs.slides): slide_content = [f"\\n=== Slide {slide_idx + 1} ===\\n"] slide_images = [] for shape in slide.shapes: # Extract text if hasattr(shape, "text") and shape.text.strip(): slide_content.append(shape.text.strip()) # Extract images elif shape.shape_type == MSO_SHAPE_TYPE.PICTURE: try: image = shape.image image_bytes = image.blob # Save image to temp file temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png") temp_file.write(image_bytes) temp_file.close() temp_files.append(temp_file.name) slide_images.append(temp_file.name) except Exception as e: slide_content.append(f"[Image extraction error: {str(e)}]") # Process images with OCR if API key is available if slide_images and API_KEY: try: with ThreadPoolExecutor(max_workers=3) as executor: future_to_filename = { executor.submit(ocr_space_file, img_file): img_file for img_file in slide_images } for future in as_completed(future_to_filename): filename, ocr_result = future.result() if ocr_result and not ocr_result.startswith("Error") and not ocr_result.startswith("OCR Error"): slide_content.append(f"[Image Text]: {ocr_result}") except Exception as e: slide_content.append(f"[OCR processing error: {str(e)}]") elif slide_images: slide_content.append(f"[{len(slide_images)} images found - OCR not available]") all_content.append("\\n".join(slide_content)) finally: # Clean up temp files for temp_file in temp_files: try: os.unlink(temp_file) except: pass return "\\n\\n".join(all_content)