Spaces:
Sleeping
Sleeping
from pptx import Presentation | |
from pptx.enum.shapes import MSO_SHAPE_TYPE | |
from typing import List, Dict, Any | |
from PIL import Image | |
from io import BytesIO | |
import requests | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import tempfile | |
import os | |
import sys | |
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..')) | |
from config import config | |
# OCR Space API configuration | |
API_KEY = getattr(config, 'OCR_SPACE_API_KEY', None) | |
API_URL = "https://api.ocr.space/parse/image" | |
def ocr_space_file(filename, api_key=API_KEY, overlay=False, language="eng"): | |
"""Extract text from image file using OCR Space API""" | |
if not api_key: | |
return filename, "OCR API key not configured" | |
payload = { | |
"isOverlayRequired": overlay, | |
"apikey": api_key, | |
"language": language, | |
"detectOrientation": True, | |
"scale": True, | |
"isTable": False, | |
"OCREngine": 2 | |
} | |
try: | |
with open(filename, "rb") as f: | |
response = requests.post(API_URL, files={filename: f}, data=payload, timeout=30) | |
if response.status_code != 200: | |
return filename, f"API Error: HTTP {response.status_code}" | |
parsed = response.json() | |
if parsed.get("OCRExitCode") == 1: | |
parsed_text = parsed.get("ParsedResults", [{}])[0].get("ParsedText", "") | |
return filename, parsed_text | |
else: | |
error_msg = parsed.get("ErrorMessage", ["Unknown error"])[0] if parsed.get("ErrorMessage") else "Unknown OCR error" | |
return filename, f"OCR Error: {error_msg}" | |
except requests.exceptions.Timeout: | |
return filename, "Error: Request timeout" | |
except requests.exceptions.RequestException as e: | |
return filename, f"Error: Network error - {str(e)}" | |
except Exception as e: | |
return filename, f"Error: {e}" | |
def extract_pptx(pptx_path: str) -> str: | |
"""Extract text and images from PowerPoint presentations.""" | |
try: | |
prs = Presentation(pptx_path) | |
except Exception as e: | |
return f"Error loading PowerPoint file: {str(e)}" | |
all_content = [] | |
temp_files = [] | |
try: | |
for slide_idx, slide in enumerate(prs.slides): | |
slide_content = [f"\\n=== Slide {slide_idx + 1} ===\\n"] | |
slide_images = [] | |
for shape in slide.shapes: | |
# Extract text | |
if hasattr(shape, "text") and shape.text.strip(): | |
slide_content.append(shape.text.strip()) | |
# Extract images | |
elif shape.shape_type == MSO_SHAPE_TYPE.PICTURE: | |
try: | |
image = shape.image | |
image_bytes = image.blob | |
# Save image to temp file | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png") | |
temp_file.write(image_bytes) | |
temp_file.close() | |
temp_files.append(temp_file.name) | |
slide_images.append(temp_file.name) | |
except Exception as e: | |
slide_content.append(f"[Image extraction error: {str(e)}]") | |
# Process images with OCR if API key is available | |
if slide_images and API_KEY: | |
try: | |
with ThreadPoolExecutor(max_workers=3) as executor: | |
future_to_filename = { | |
executor.submit(ocr_space_file, img_file): img_file | |
for img_file in slide_images | |
} | |
for future in as_completed(future_to_filename): | |
filename, ocr_result = future.result() | |
if ocr_result and not ocr_result.startswith("Error") and not ocr_result.startswith("OCR Error"): | |
slide_content.append(f"[Image Text]: {ocr_result}") | |
except Exception as e: | |
slide_content.append(f"[OCR processing error: {str(e)}]") | |
elif slide_images: | |
slide_content.append(f"[{len(slide_images)} images found - OCR not available]") | |
all_content.append("\\n".join(slide_content)) | |
finally: | |
# Clean up temp files | |
for temp_file in temp_files: | |
try: | |
os.unlink(temp_file) | |
except: | |
pass | |
return "\\n\\n".join(all_content) | |