| """ |
| API Key Pool Manager for DocStrange. |
| |
| Manages a pool of Nanonets API keys with automatic rotation on rate limit (429). |
| """ |
|
|
| import os |
| import json |
| import time |
| import threading |
| from pathlib import Path |
| from typing import Optional, List, Dict, Any |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class KeyStatus: |
| ACTIVE = "active" |
| RATE_LIMITED = "rate_limited" |
| EXPIRED = "expired" |
|
|
|
|
| class ApiKeyEntry: |
| """Represents a single API key in the pool with its state.""" |
|
|
| def __init__(self, key: str, source: str = "manual"): |
| self.key = key |
| self.source = source |
| self.status = KeyStatus.ACTIVE |
| self.rate_limited_at = None |
| self.reset_at = None |
| self.requests_made = 0 |
| self.last_used = None |
|
|
| def mark_rate_limited(self, reset_after_seconds: int = 3600): |
| """Mark this key as rate-limited.""" |
| self.status = KeyStatus.RATE_LIMITED |
| self.rate_limited_at = time.time() |
| self.reset_at = time.time() + reset_after_seconds |
| logger.warning(f"API key {self.key[:8]}... rate limited, resets at {self.reset_at}") |
|
|
| def is_available(self) -> bool: |
| """Check if this key is available for use.""" |
| if self.status == KeyStatus.ACTIVE: |
| return True |
| if self.status == KeyStatus.RATE_LIMITED and self.reset_at: |
| if time.time() >= self.reset_at: |
| self.status = KeyStatus.ACTIVE |
| self.rate_limited_at = None |
| self.reset_at = None |
| return True |
| return False |
|
|
| def record_use(self): |
| """Record that this key was used.""" |
| self.requests_made += 1 |
| self.last_used = time.time() |
|
|
|
|
| class ApiKeyPool: |
| """ |
| Manages a pool of API keys with automatic rotation. |
| |
| When a key hits rate limit (429), it's marked as unavailable and the next |
| key in the pool is tried. When all keys are exhausted, signals fallback. |
| """ |
|
|
| _instance = None |
| _lock = threading.Lock() |
|
|
| def __init__(self): |
| self._keys: List[ApiKeyEntry] = [] |
| self._current_index = 0 |
| self._lock_pool = threading.Lock() |
| self._config_path = Path.home() / ".docstrange" / "api_keys.json" |
| self._load_config() |
|
|
| @classmethod |
| def get_instance(cls) -> "ApiKeyPool": |
| """Get singleton instance.""" |
| if cls._instance is None: |
| with cls._lock: |
| if cls._instance is None: |
| cls._instance = cls() |
| return cls._instance |
|
|
| def _load_config(self): |
| """Load API keys from config file.""" |
| try: |
| if self._config_path.exists(): |
| with open(self._config_path, 'r') as f: |
| config = json.load(f) |
|
|
| keys = config.get("api_keys", []) |
| for key_entry in keys: |
| if isinstance(key_entry, str): |
| self.add_key(key_entry, source="config") |
| elif isinstance(key_entry, dict) and "key" in key_entry: |
| self.add_key(key_entry["key"], source=key_entry.get("source", "config")) |
|
|
| logger.info(f"Loaded {len(self._keys)} API keys from config") |
| except Exception as e: |
| logger.warning(f"Failed to load API key config: {e}") |
|
|
| |
| env_keys = os.environ.get('NANONETS_API_KEYS', '') |
| if env_keys: |
| for key in env_keys.split(','): |
| key = key.strip() |
| if key: |
| self.add_key(key, source="env") |
|
|
| def save_config(self): |
| """Save API keys to config file.""" |
| try: |
| config_dir = self._config_path.parent |
| config_dir.mkdir(exist_ok=True) |
|
|
| keys_data = [] |
| for entry in self._keys: |
| keys_data.append({ |
| "key": entry.key, |
| "source": entry.source |
| }) |
|
|
| with open(self._config_path, 'w') as f: |
| json.dump({"api_keys": keys_data}, f, indent=2) |
|
|
| os.chmod(self._config_path, 0o600) |
| logger.info(f"Saved {len(keys_data)} API keys to config") |
| except Exception as e: |
| logger.error(f"Failed to save API key config: {e}") |
|
|
| def add_key(self, key: str, source: str = "manual") -> bool: |
| """Add an API key to the pool.""" |
| with self._lock_pool: |
| |
| for entry in self._keys: |
| if entry.key == key: |
| return False |
|
|
| self._keys.append(ApiKeyEntry(key, source)) |
| logger.info(f"Added API key from {source} to pool (total: {len(self._keys)})") |
| return True |
|
|
| def remove_key(self, key: str) -> bool: |
| """Remove an API key from the pool.""" |
| with self._lock_pool: |
| for i, entry in enumerate(self._keys): |
| if entry.key == key: |
| self._keys.pop(i) |
| return True |
| return False |
|
|
| def get_next_key(self) -> Optional[str]: |
| """ |
| Get the next available API key. |
| |
| Returns None if all keys are rate-limited. |
| """ |
| with self._lock_pool: |
| if not self._keys: |
| return None |
|
|
| |
| total_keys = len(self._keys) |
| for i in range(total_keys): |
| idx = (self._current_index + i) % total_keys |
| if self._keys[idx].is_available(): |
| self._current_index = idx |
| self._keys[idx].record_use() |
| return self._keys[idx].key |
|
|
| return None |
|
|
| def mark_key_rate_limited(self, key: str, reset_after_seconds: int = 3600): |
| """Mark a specific key as rate-limited.""" |
| with self._lock_pool: |
| for entry in self._keys: |
| if entry.key == key: |
| entry.mark_rate_limited(reset_after_seconds) |
| break |
|
|
| def has_available_keys(self) -> bool: |
| """Check if any API keys are available.""" |
| with self._lock_pool: |
| return any(k.is_available() for k in self._keys) |
|
|
| def get_pool_stats(self) -> Dict[str, Any]: |
| """Get statistics about the key pool.""" |
| with self._lock_pool: |
| stats = { |
| "total_keys": len(self._keys), |
| "available": 0, |
| "rate_limited": 0, |
| "total_requests": 0 |
| } |
| for key in self._keys: |
| if key.is_available(): |
| stats["available"] += 1 |
| else: |
| stats["rate_limited"] += 1 |
| stats["total_requests"] += key.requests_made |
| return stats |
|
|
| def get_all_keys(self) -> List[str]: |
| """Get all API keys (masked for display).""" |
| with self._lock_pool: |
| return [f"{k.key[:8]}...{k.key[-4:]}" if len(k.key) > 12 else "***" for k in self._keys] |
|
|
|
|
| |
| def get_pool() -> ApiKeyPool: |
| """Get the API key pool singleton.""" |
| return ApiKeyPool.get_instance() |
|
|
|
|
| def add_api_key(key: str): |
| """Add an API key to the pool.""" |
| pool = get_pool() |
| pool.add_key(key) |
| pool.save_config() |
|
|
|
|
| def remove_api_key(key: str): |
| """Remove an API key from the pool.""" |
| pool = get_pool() |
| pool.remove_key(key) |
| pool.save_config() |
|
|
|
|
| def list_api_keys() -> List[str]: |
| """List all API keys (masked).""" |
| pool = get_pool() |
| return pool.get_all_keys() |
|
|
|
|
| def get_available_key() -> Optional[str]: |
| """Get the next available API key.""" |
| return get_pool().get_next_key() |
|
|