|
|
""" |
|
|
Document extraction functionality for processing documents. |
|
|
""" |
|
|
import json |
|
|
import os |
|
|
import concurrent.futures |
|
|
import time |
|
|
|
|
|
import cohere |
|
|
import logging |
|
|
from pathlib import Path |
|
|
from typing import List, Dict, Any, Optional |
|
|
from langchain.docstore.document import Document |
|
|
from ..config.settings import CHUNK_SIZE, LLM_MODEL |
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
logger.addHandler(logging.NullHandler()) |
|
|
|
|
|
|
|
|
class DocumentProcessor: |
|
|
"""Base class for document processors""" |
|
|
|
|
|
def __init__(self): |
|
|
self.supported_extensions = [] |
|
|
|
|
|
def can_process(self, file_path: str) -> bool: |
|
|
"""Check if the processor can handle this file type""" |
|
|
ext = Path(file_path).suffix.lower() |
|
|
return ext in self.supported_extensions |
|
|
|
|
|
def process(self, file_path: str, **kwargs) -> str: |
|
|
"""Process the document and extract text""" |
|
|
raise NotImplementedError("Subclasses must implement this method") |
|
|
|
|
|
|
|
|
class PdfProcessor(DocumentProcessor): |
|
|
"""Processor for PDF documents""" |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__() |
|
|
self.supported_extensions = ['.pdf'] |
|
|
|
|
|
def process(self, file_path: str, **kwargs) -> str: |
|
|
"""Extract text from a PDF file""" |
|
|
try: |
|
|
|
|
|
from pypdf import PdfReader |
|
|
|
|
|
logger.debug(f"Processing PDF: {file_path}") |
|
|
reader = PdfReader(file_path) |
|
|
text = "" |
|
|
for page in reader.pages: |
|
|
text += page.extract_text() + "\n" |
|
|
return text.strip() |
|
|
except Exception as e: |
|
|
logger.error(f"Error processing PDF {file_path}: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
class ImageProcessor(DocumentProcessor): |
|
|
"""Processor for image files""" |
|
|
|
|
|
def __init__(self): |
|
|
super().__init__() |
|
|
self.supported_extensions = ['.png', '.jpg', '.jpeg', '.tiff', '.bmp', '.gif'] |
|
|
|
|
|
self.default_languages = "eng+fra+hin+spa+chi-sim" |
|
|
|
|
|
def process(self, file_path: str, **kwargs) -> str: |
|
|
"""Extract text from an image file using OCR""" |
|
|
try: |
|
|
|
|
|
import pytesseract |
|
|
from PIL import Image |
|
|
|
|
|
|
|
|
lang = kwargs.get('lang', self.default_languages) |
|
|
logger.debug(f"Processing image: {file_path} with languages: {lang}") |
|
|
image = Image.open(file_path) |
|
|
text = pytesseract.image_to_string(image, lang=lang) |
|
|
return text.strip() |
|
|
except Exception as e: |
|
|
logger.error(f"Error processing image {file_path}: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
class DocumentExtractor: |
|
|
"""Main class for document text extraction""" |
|
|
|
|
|
def __init__(self): |
|
|
"""Initialize with default processors""" |
|
|
self.processors = [ |
|
|
PdfProcessor(), |
|
|
ImageProcessor() |
|
|
] |
|
|
self.cohere_client = None |
|
|
|
|
|
def add_processor(self, processor: DocumentProcessor) -> None: |
|
|
"""Add a custom document processor""" |
|
|
self.processors.append(processor) |
|
|
|
|
|
def get_processor(self, file_path: str) -> Optional[DocumentProcessor]: |
|
|
"""Get the appropriate processor for a file""" |
|
|
for processor in self.processors: |
|
|
if processor.can_process(file_path): |
|
|
return processor |
|
|
return None |
|
|
|
|
|
def get_language(self, text: str) -> str: |
|
|
""" |
|
|
Detect the language of the provided text using Cohere API. |
|
|
|
|
|
Args: |
|
|
text: Text sample to analyze |
|
|
|
|
|
Returns: |
|
|
String containing the detected language name |
|
|
""" |
|
|
try: |
|
|
|
|
|
start = time.time() |
|
|
if not self.cohere_client: |
|
|
self.cohere_client = cohere.Client() |
|
|
|
|
|
prompt = f"What language is this sentence written in?\n\n{text}\n\nRespond only with the language name." |
|
|
response = self.cohere_client.chat( |
|
|
model=LLM_MODEL, |
|
|
message= prompt, |
|
|
max_tokens=100, |
|
|
temperature=0.2, |
|
|
) |
|
|
return response.text |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error detecting language: {e}") |
|
|
return "unknown" |
|
|
|
|
|
def process_file(self, file_path: str, **kwargs) -> Dict[str, Any]: |
|
|
""" |
|
|
Process a single file based on its extension. |
|
|
|
|
|
Args: |
|
|
file_path: Path to the file |
|
|
**kwargs: Additional processing options |
|
|
|
|
|
Returns: |
|
|
Dictionary containing processing results and metadata |
|
|
""" |
|
|
result = { |
|
|
"file_path": file_path, |
|
|
"filename": Path(file_path).name, |
|
|
"text": "", |
|
|
"error": None, |
|
|
"type": None, |
|
|
"language": None, |
|
|
"chunk_size": 0 |
|
|
} |
|
|
|
|
|
try: |
|
|
processor = self.get_processor(file_path) |
|
|
|
|
|
if processor: |
|
|
text = processor.process(file_path, **kwargs) |
|
|
result["text"] = text |
|
|
result["language"] = self.get_language(text[:CHUNK_SIZE]) if text else None |
|
|
result["type"] = processor.__class__.__name__.lower().replace('processor', '') |
|
|
else: |
|
|
ext = Path(file_path).suffix.lower() |
|
|
result["error"] = f"Unsupported file type: {ext}" |
|
|
except Exception as e: |
|
|
result["error"] = str(e) |
|
|
|
|
|
return result |
|
|
|
|
|
def process_files(self, file_paths: List[str], **kwargs) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Process multiple files in parallel. |
|
|
|
|
|
Args: |
|
|
file_paths: List of file paths to process |
|
|
**kwargs: Additional processing options |
|
|
(max_workers: max number of processes) |
|
|
|
|
|
Returns: |
|
|
List of dictionaries with processing results |
|
|
""" |
|
|
max_workers = kwargs.pop('max_workers', os.cpu_count() or 1) |
|
|
logger.info(f"Processing {len(file_paths)} files with {max_workers} workers") |
|
|
|
|
|
results = [] |
|
|
with concurrent.futures.ProcessPoolExecutor(max_workers=16) as executor: |
|
|
futures = { |
|
|
executor.submit(self.process_file, file_path, **kwargs): file_path |
|
|
for file_path in file_paths |
|
|
} |
|
|
|
|
|
for future in concurrent.futures.as_completed(futures): |
|
|
file_path = futures[future] |
|
|
try: |
|
|
result = future.result() |
|
|
results.append(result) |
|
|
except Exception as e: |
|
|
logger.error(f"Exception processing {file_path}: {e}") |
|
|
results.append({ |
|
|
"filepath": file_path, |
|
|
"filename": Path(file_path).name, |
|
|
"text": "", |
|
|
"error": str(e), |
|
|
"type": None, |
|
|
"langugae": None, |
|
|
"chunk_size": 0 |
|
|
}) |
|
|
|
|
|
return results |
|
|
|
|
|
def find_supported_files(self, folder_path: str, recursive: bool = True) -> List[str]: |
|
|
""" |
|
|
Get all supported files in a folder. |
|
|
|
|
|
Args: |
|
|
folder_path: Path to the folder |
|
|
recursive: Whether to include subfolders |
|
|
|
|
|
Returns: |
|
|
List of file paths |
|
|
""" |
|
|
|
|
|
supported_extensions = [] |
|
|
for processor in self.processors: |
|
|
supported_extensions.extend(processor.supported_extensions) |
|
|
|
|
|
file_paths = [] |
|
|
|
|
|
if recursive: |
|
|
for root, _, files in os.walk(folder_path): |
|
|
for file in files: |
|
|
file_path = os.path.join(root, file) |
|
|
if Path(file).suffix.lower() in supported_extensions: |
|
|
file_paths.append(file_path) |
|
|
else: |
|
|
for file in os.listdir(folder_path): |
|
|
file_path = os.path.join(folder_path, file) |
|
|
if os.path.isfile(file_path) and Path(file).suffix.lower() in supported_extensions: |
|
|
file_paths.append(file_path) |
|
|
|
|
|
return file_paths |
|
|
|
|
|
def process_folder(self, folder_path: str, recursive: bool = True, **kwargs) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Process all supported files in a folder. |
|
|
|
|
|
Args: |
|
|
folder_path: Path to the folder containing documents |
|
|
recursive: Whether to process subfolders recursively |
|
|
**kwargs: Additional processing options |
|
|
|
|
|
Returns: |
|
|
List of dictionaries with processing results |
|
|
""" |
|
|
file_paths = self.find_supported_files(folder_path, recursive) |
|
|
logger.info(f"Found {len(file_paths)} supported files in {folder_path}") |
|
|
|
|
|
return self.process_files(file_paths, **kwargs) |
|
|
|
|
|
|
|
|
class FileOutputManager: |
|
|
"""Class for managing output of extracted text""" |
|
|
|
|
|
def __init__(self, output_dir: str = None): |
|
|
"""Initialize with output directory""" |
|
|
|
|
|
|
|
|
if output_dir is None: |
|
|
output_dir = os.path.join("/tmp", "extracted_texts") |
|
|
|
|
|
self.output_dir = output_dir |
|
|
os.makedirs(self.output_dir, exist_ok=True) |
|
|
|
|
|
def save_results(self, results: List[Dict[str, Any]]) -> Dict[str, int]: |
|
|
""" |
|
|
Save extracted text to files. |
|
|
|
|
|
Args: |
|
|
results: List of processing results |
|
|
|
|
|
Returns: |
|
|
Dictionary with counts of successful and failed saves |
|
|
""" |
|
|
stats = {"success": 0, "skipped": 0, "failed": 0} |
|
|
|
|
|
for result in results: |
|
|
if not result["text"]: |
|
|
stats["skipped"] += 1 |
|
|
continue |
|
|
|
|
|
try: |
|
|
|
|
|
base_name = Path(result['filename']).stem |
|
|
file_type = result.get('type', 'unknown') |
|
|
output_filename = f"{base_name}_{file_type}.txt" |
|
|
|
|
|
output_path = os.path.join(self.output_dir, output_filename) |
|
|
with open(output_path, "w", encoding="utf-8") as f: |
|
|
f.write(result["text"]) |
|
|
stats["success"] += 1 |
|
|
except Exception as e: |
|
|
logger.error(f"Error saving text from {result['file_path']}: {e}") |
|
|
stats["failed"] += 1 |
|
|
|
|
|
return stats |
|
|
|
|
|
|
|
|
|
|
|
class DocumentProcessorAdapter: |
|
|
""" |
|
|
Adapter to process documents and convert them to langchain Document objects. |
|
|
""" |
|
|
def __init__(self): |
|
|
"""Initialize document processor adapter with the extractor.""" |
|
|
self.extractor = DocumentExtractor() |
|
|
|
|
|
def process_folder(self, folder_path): |
|
|
""" |
|
|
Process all documents in a folder. |
|
|
|
|
|
Args: |
|
|
folder_path (str): Path to the folder containing documents |
|
|
|
|
|
Returns: |
|
|
tuple: (list of langchain Document objects, original extraction results) |
|
|
""" |
|
|
if not os.path.exists(folder_path): |
|
|
raise FileNotFoundError(f"Folder not found: {folder_path}") |
|
|
|
|
|
|
|
|
extraction_results = self.extractor.process_folder(folder_path) |
|
|
print(f"Processed {len(extraction_results)} documents") |
|
|
return extraction_results |