| """Cloud processor for Nanonets API integration with API key pool rotation and local fallback.""" |
|
|
| import os |
| import requests |
| import json |
| import logging |
| import time |
| from typing import Dict, Any, Optional, List |
|
|
| from .base import BaseProcessor |
| from ..result import ConversionResult |
| from ..exceptions import ConversionError |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| DEFAULT_RATE_LIMIT_RESET = 3600 |
|
|
|
|
| class CloudConversionResult(ConversionResult): |
| """Enhanced ConversionResult for cloud mode with lazy API calls, key rotation, and local fallback.""" |
|
|
| def __init__(self, file_path: str, cloud_processor: 'CloudProcessor', metadata: Optional[Dict[str, Any]] = None, |
| api_key_pool=None, local_fallback_processor=None): |
| |
| super().__init__("", metadata) |
| self.file_path = file_path |
| self.cloud_processor = cloud_processor |
| self.api_key_pool = api_key_pool |
| self.local_fallback_processor = local_fallback_processor |
| self._cached_outputs = {} |
| self._used_fallback = False |
| |
| def _get_cloud_output(self, output_type: str, specified_fields: Optional[list] = None, json_schema: Optional[dict] = None) -> str: |
| """Get output from cloud API for specific type, with caching, key rotation, and local fallback.""" |
| |
| valid_output_types = ["markdown", "flat-json", "html", "csv", "specified-fields", "specified-json"] |
| if output_type not in valid_output_types: |
| logger.warning(f"Invalid output type '{output_type}' for cloud API. Using 'markdown'.") |
| output_type = "markdown" |
|
|
| |
| cache_key = output_type |
| if specified_fields: |
| cache_key += f"_fields_{','.join(specified_fields)}" |
| if json_schema: |
| cache_key += f"_schema_{hash(str(json_schema))}" |
|
|
| if cache_key in self._cached_outputs: |
| return self._cached_outputs[cache_key] |
|
|
| |
| if self._used_fallback: |
| return self._convert_locally(output_type) |
|
|
| |
| last_error = None |
| keys_tried = set() |
|
|
| while True: |
| |
| current_key = None |
| if self.api_key_pool: |
| current_key = self.api_key_pool.get_next_key() |
|
|
| |
| if not current_key and self.cloud_processor.api_key: |
| current_key = self.cloud_processor.api_key |
|
|
| if not current_key: |
| logger.info("No API keys available, falling back to local processing") |
| return self._convert_locally(output_type) |
|
|
| |
| if current_key in keys_tried: |
| logger.info("All API keys rate limited, falling back to local processing") |
| return self._convert_locally(output_type) |
|
|
| keys_tried.add(current_key) |
|
|
| try: |
| |
| headers = {} |
| if current_key: |
| headers['Authorization'] = f'Bearer {current_key}' |
|
|
| |
| with open(self.file_path, 'rb') as file: |
| files = { |
| 'file': (os.path.basename(self.file_path), file, self.cloud_processor._get_content_type(self.file_path)) |
| } |
|
|
| data = { |
| 'output_type': output_type |
| } |
|
|
| |
| if self.cloud_processor.model_type: |
| data['model_type'] = self.cloud_processor.model_type |
|
|
| |
| if output_type == "specified-fields" and specified_fields: |
| data['specified_fields'] = ','.join(specified_fields) |
| elif output_type == "specified-json" and json_schema: |
| data['json_schema'] = json.dumps(json_schema) |
|
|
| log_prefix = f"API key {current_key[:8]}..." if current_key else "no auth" |
| logger.info(f"Making cloud API call ({log_prefix}) for {output_type} on {self.file_path}") |
|
|
| |
| response = requests.post( |
| self.cloud_processor.api_url, |
| headers=headers, |
| files=files, |
| data=data, |
| timeout=300 |
| ) |
|
|
| |
| if response.status_code == 429: |
| |
| if self.api_key_pool: |
| self.api_key_pool.mark_key_rate_limited(current_key, DEFAULT_RATE_LIMIT_RESET) |
|
|
| |
| if self.cloud_processor.api_key == current_key: |
| logger.warning(f"Processor API key rate limited, will try pool keys") |
|
|
| logger.warning(f"API key {current_key[:8]}... rate limited, trying next key...") |
| last_error = f"Rate limited (429)" |
| continue |
|
|
| response.raise_for_status() |
| result_data = response.json() |
|
|
| |
| content = self.cloud_processor._extract_content_from_response(result_data) |
|
|
| |
| self._cached_outputs[cache_key] = content |
| return content |
|
|
| except requests.exceptions.HTTPError as e: |
| if '429' in str(e): |
| if self.api_key_pool: |
| self.api_key_pool.mark_key_rate_limited(current_key, DEFAULT_RATE_LIMIT_RESET) |
| logger.warning(f"API key {current_key[:8]}... rate limited (HTTPError), trying next key...") |
| last_error = str(e) |
| continue |
| else: |
| logger.error(f"Cloud API HTTP error: {e}") |
| last_error = str(e) |
| break |
| except Exception as e: |
| logger.error(f"Cloud API call failed: {e}") |
| last_error = str(e) |
| break |
|
|
| |
| logger.warning(f"All API keys rate limited or failed. Falling back to local Docling processing.") |
| self._used_fallback = True |
| return self._convert_locally(output_type) |
| |
| def _convert_locally(self, output_type: str) -> str: |
| """Fallback to local Docling/GPU conversion methods.""" |
| self._used_fallback = True |
|
|
| |
| if self.local_fallback_processor: |
| try: |
| logger.info(f"Using local Docling processor for fallback on {self.file_path}") |
| local_result = self.local_fallback_processor.process(self.file_path) |
|
|
| if output_type == "html": |
| return local_result.extract_html() |
| elif output_type == "flat-json": |
| return json.dumps(local_result.extract_data(), indent=2) |
| elif output_type == "csv": |
| return local_result.extract_csv(include_all_tables=True) |
| else: |
| return local_result.extract_markdown() |
| except Exception as e: |
| logger.error(f"Local Docling fallback also failed: {e}") |
|
|
| |
| if output_type == "html": |
| return super().extract_html() |
| elif output_type == "flat-json": |
| return json.dumps(super().extract_data(), indent=2) |
| elif output_type == "csv": |
| return super().extract_csv(include_all_tables=True) |
| else: |
| return self.content |
| |
| def extract_markdown(self) -> str: |
| """Export as markdown.""" |
| return self._get_cloud_output("markdown") |
| |
| def extract_html(self) -> str: |
| """Export as HTML.""" |
| return self._get_cloud_output("html") |
| |
| def extract_data(self, specified_fields: Optional[list] = None, json_schema: Optional[dict] = None) -> Dict[str, Any]: |
| """Export as structured JSON with optional field extraction. |
| |
| Args: |
| specified_fields: Optional list of specific fields to extract |
| json_schema: Optional JSON schema defining fields and types to extract |
| |
| Returns: |
| Structured JSON with extracted data |
| """ |
| try: |
| if specified_fields: |
| |
| content = self._get_cloud_output("specified-fields", specified_fields=specified_fields) |
| extracted_data = json.loads(content) |
| return { |
| "extracted_fields": extracted_data, |
| "format": "specified_fields" |
| } |
| |
| elif json_schema: |
| |
| content = self._get_cloud_output("specified-json", json_schema=json_schema) |
| extracted_data = json.loads(content) |
| return { |
| "structured_data": extracted_data, |
| "format": "structured_json" |
| } |
| |
| else: |
| |
| json_content = self._get_cloud_output("flat-json") |
| parsed_content = json.loads(json_content) |
| return { |
| "document": parsed_content, |
| "format": "cloud_flat_json" |
| } |
| |
| except Exception as e: |
| logger.error(f"Failed to parse JSON content: {e}") |
| return { |
| "document": {"raw_content": content if 'content' in locals() else ""}, |
| "format": "json_parse_error", |
| "error": str(e) |
| } |
| |
|
|
| |
| def extract_text(self) -> str: |
| """Export as plain text.""" |
| |
| try: |
| return self._get_cloud_output("markdown") |
| except Exception as e: |
| logger.error(f"Failed to get text output: {e}") |
| return "" |
| |
| def extract_csv(self, table_index: int = 0, include_all_tables: bool = False) -> str: |
| """Export tables as CSV format. |
| |
| Args: |
| table_index: Which table to export (0-based index). Default is 0 (first table). |
| include_all_tables: If True, export all tables with separators. Default is False. |
| |
| Returns: |
| CSV formatted string of the table(s) |
| |
| Raises: |
| ValueError: If no tables are found or table_index is out of range |
| """ |
| return self._get_cloud_output("csv") |
|
|
|
|
| class CloudProcessor(BaseProcessor): |
| """Processor for cloud-based document conversion using Nanonets API with API key pool rotation.""" |
|
|
| def __init__(self, api_key: Optional[str] = None, output_type: str = None, model_type: Optional[str] = None, |
| specified_fields: Optional[list] = None, json_schema: Optional[dict] = None, |
| api_key_pool=None, local_fallback_processor=None, **kwargs): |
| """Initialize the cloud processor. |
| |
| Args: |
| api_key: API key for cloud processing (optional - uses rate-limited free tier without key) |
| output_type: Output type for cloud processing (markdown, flat-json, html, csv, specified-fields, specified-json) |
| model_type: Model type for cloud processing (gemini, openapi, nanonets) |
| specified_fields: List of fields to extract (for specified-fields output type) |
| json_schema: JSON schema defining fields and types to extract (for specified-json output type) |
| api_key_pool: ApiKeyPool instance for key rotation |
| local_fallback_processor: Local processor (GPU/Docling) for fallback when all keys exhausted |
| """ |
| super().__init__(**kwargs) |
| self.api_key = api_key |
| self.output_type = output_type |
| self.model_type = model_type |
| self.specified_fields = specified_fields |
| self.json_schema = json_schema |
| self.api_key_pool = api_key_pool |
| self.local_fallback_processor = local_fallback_processor |
| self.api_url = "https://extraction-api.nanonets.com/extract" |
|
|
| |
| |
| |
| def can_process(self, file_path: str) -> bool: |
| """Check if the processor can handle the file.""" |
| |
| |
| supported_extensions = { |
| '.pdf', '.docx', '.doc', '.xlsx', '.xls', '.pptx', '.ppt', |
| '.txt', '.html', '.htm', '.png', '.jpg', '.jpeg', '.gif', |
| '.bmp', '.tiff', '.tif' |
| } |
| |
| _, ext = os.path.splitext(file_path.lower()) |
| return ext in supported_extensions |
| |
| def process(self, file_path: str) -> CloudConversionResult: |
| """Create a lazy CloudConversionResult that will make API calls on demand with key rotation. |
| |
| Args: |
| file_path: Path to the file to process |
| |
| Returns: |
| CloudConversionResult that makes API calls when output methods are called |
| |
| Raises: |
| ConversionError: If file doesn't exist |
| """ |
| if not os.path.exists(file_path): |
| raise ConversionError(f"File not found: {file_path}") |
|
|
| |
| metadata = { |
| 'source_file': file_path, |
| 'processing_mode': 'cloud', |
| 'api_provider': 'nanonets', |
| 'file_size': os.path.getsize(file_path), |
| 'model_type': self.model_type, |
| 'has_api_key': bool(self.api_key), |
| 'key_rotation': True, |
| 'local_fallback': self.local_fallback_processor is not None |
| } |
|
|
| if self.api_key: |
| logger.info(f"Created cloud extractor for {file_path} with API key pool rotation") |
| else: |
| logger.info(f"Created cloud extractor for {file_path} without API key - will use pool + local fallback") |
|
|
| |
| return CloudConversionResult( |
| file_path=file_path, |
| cloud_processor=self, |
| metadata=metadata, |
| api_key_pool=self.api_key_pool, |
| local_fallback_processor=self.local_fallback_processor |
| ) |
| |
| def _extract_content_from_response(self, response_data: Dict[str, Any]) -> str: |
| """Extract content from API response.""" |
| try: |
| |
| if 'content' in response_data: |
| return response_data['content'] |
| |
| |
| logger.warning("No 'content' field found in API response, returning full response") |
| return json.dumps(response_data, indent=2) |
| |
| except Exception as e: |
| logger.error(f"Failed to extract content from API response: {e}") |
| return json.dumps(response_data, indent=2) |
| |
| def _get_content_type(self, file_path: str) -> str: |
| """Get content type for file upload.""" |
| _, ext = os.path.splitext(file_path.lower()) |
| |
| content_types = { |
| '.pdf': 'application/pdf', |
| '.docx': 'application/vnd.openxmlformats-officedocument.wordprocessingml.document', |
| '.doc': 'application/msword', |
| '.xlsx': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', |
| '.xls': 'application/vnd.ms-excel', |
| '.pptx': 'application/vnd.openxmlformats-officedocument.presentationml.presentation', |
| '.ppt': 'application/vnd.ms-powerpoint', |
| '.txt': 'text/plain', |
| '.html': 'text/html', |
| '.htm': 'text/html', |
| '.png': 'image/png', |
| '.jpg': 'image/jpeg', |
| '.jpeg': 'image/jpeg', |
| '.gif': 'image/gif', |
| '.bmp': 'image/bmp', |
| '.tiff': 'image/tiff', |
| '.tif': 'image/tiff' |
| } |
| |
| return content_types.get(ext, 'application/octet-stream') |