Ragora-Server / app /services /document_processor_adapter.py
Peterase's picture
feat: async document processing with BackgroundTasks + thread pool executors
8b37702
"""Document processor adapter β€” text extraction runs in thread pool."""
from app.ports.document_processor import DocumentProcessorPort
from typing import BinaryIO
from pathlib import Path
import asyncio
import logging
logger = logging.getLogger(__name__)
# Shared executor for I/O-bound extraction work
_executor = None
def _get_executor():
global _executor
if _executor is None:
from concurrent.futures import ThreadPoolExecutor
_executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="doc_processor")
return _executor
def _extract_pdf_sync(data: bytes) -> str:
"""Synchronous PDF extraction β€” runs in thread pool."""
from io import BytesIO
from PyPDF2 import PdfReader
reader = PdfReader(BytesIO(data))
text = ""
for page in reader.pages:
text += (page.extract_text() or "") + "\n"
if not text.strip():
raise ValueError("No text could be extracted from PDF")
logger.info(f"Extracted {len(text)} characters from PDF")
return text.strip()
def _extract_docx_sync(data: bytes) -> str:
"""Synchronous DOCX extraction β€” runs in thread pool."""
from io import BytesIO
from docx import Document
doc = Document(BytesIO(data))
text = "\n".join(p.text for p in doc.paragraphs)
if not text.strip():
raise ValueError("No text could be extracted from DOCX")
logger.info(f"Extracted {len(text)} characters from DOCX")
return text.strip()
class DocumentProcessorAdapter(DocumentProcessorPort):
"""Non-blocking text extraction via thread pool."""
async def extract_text(self, file: BinaryIO, filename: str) -> str:
"""Extract text without blocking the event loop."""
ext = Path(filename).suffix.lower()
data = file.read() # read bytes once
loop = asyncio.get_event_loop()
if ext == ".pdf":
return await loop.run_in_executor(_get_executor(), _extract_pdf_sync, data)
elif ext in (".docx", ".doc"):
return await loop.run_in_executor(_get_executor(), _extract_docx_sync, data)
else:
raise ValueError(f"Unsupported file type: {ext}")
def supports_file(self, filename: str) -> bool:
return Path(filename).suffix.lower() in (".pdf", ".docx", ".doc")