File size: 6,380 Bytes
eab288c 8b81b1d eab288c 44013a5 eab288c 44013a5 eab288c 44013a5 eab288c f0e68b1 eab288c 44013a5 eab288c 44013a5 eab288c 44013a5 eab288c 8b81b1d eab288c 8b81b1d eab288c 44013a5 906da16 44013a5 906da16 eab288c 44013a5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
import time
import threading
from typing import Dict, List, Tuple, Optional, Iterator, Union
from app.config import get_settings
from loguru import logger
class RequestLimitManager:
_instance = None
_lock = threading.Lock()
def __new__(cls, provider: str):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self, provider: str):
if hasattr(self, 'initialized'):
return
self.provider = provider
self.lock = threading.Lock()
self._init_keys_models()
self.initialized = True
def _init_keys_models(self):
settings = get_settings()
if self.provider == "gemini":
self.api_keys: List[str] = getattr(settings, 'gemini_api_keys_list', [])
self.models: List[str] = getattr(settings, 'gemini_models_list', [])
# Có thể mở rộng cho provider khác ở đây
self.status: Dict[str, Dict[str, Dict[str, Union[str, float]]]] = {}
now = time.time()
for key in self.api_keys:
self.status[key] = {}
for model in self.models:
self.status[key][model] = {"status": "active", "timestamp": now}
self.current_key: Optional[str] = self.api_keys[0] if self.api_keys else None
self.current_model: Optional[str] = self.models[0] if self.models else None
key_display = f"{self.current_key[:5]}...{self.current_key[-5:]}" if self.current_key else "None"
logger.info(f"[LIMIT] Initialized with current key={key_display} model={self.current_model}")
def get_current_key_model(self) -> Tuple[str, str]:
"""
Trả về cặp key/model hiện tại đang active.
Chỉ scan tìm key/model mới khi current pair bị blocked.
"""
with self.lock:
now = time.time()
# Check if current pair is still available
if self.current_key and self.current_model:
info = self.status.get(self.current_key, {}).get(self.current_model, {})
status = info.get("status", "active")
ts = float(info.get("timestamp", 0.0))
if status == "active" or (status == "blocked" and now > ts):
logger.info(f"[LIMIT] Using current key={self.current_key[:5]}...{self.current_key[-5:]} model={self.current_model}")
return self.current_key, self.current_model
# Current pair not available, scan for new one
logger.warning(f"[LIMIT] Current pair not available, scanning for new key/model...")
new_key, new_model = self._find_available_key_model()
if new_key and new_model:
self.current_key = new_key
self.current_model = new_model
logger.info(f"[LIMIT] Switched to new key={self.current_key[:5]}...{self.current_key[-5:]} model={self.current_model}")
return self.current_key, self.current_model
else:
logger.error(f"[LIMIT] No available key/model found for provider {self.provider}")
raise RuntimeError(f"No available key/model for provider {self.provider}")
def _find_available_key_model(self) -> Tuple[Optional[str], Optional[str]]:
"""
Tìm cặp key/model khả dụng gần nhất.
"""
now = time.time()
keys = self.api_keys[:]
models = self.models[:]
# Ưu tiên default key/model nếu có
if self.current_key and self.current_key in keys:
keys.remove(self.current_key)
keys = [self.current_key] + keys
if self.current_model and self.current_model in models:
models.remove(self.current_model)
models = [self.current_model] + models
for key in keys:
for model in models:
info = self.status.get(key, {}).get(model, {"status": "active", "timestamp": 0.0})
status = info.get("status", "active")
ts = float(info.get("timestamp", 0.0))
if status == "active" or (status == "blocked" and now > ts):
logger.info(f"[LIMIT] Found available key={key[:5]}...{key[-5:]} model={model}")
return key, model
return None, None
def log_request(self, key: str, model: str, success: bool, retry_delay: Optional[int] = None):
"""
Log kết quả request và cập nhật status.
Nếu request fail với 429, trigger scan cho key/model mới.
"""
with self.lock:
now = time.time()
if key not in self.status:
self.status[key] = {}
if model not in self.status[key]:
self.status[key][model] = {"status": "active", "timestamp": now}
if success:
logger.info(f"[LIMIT] Mark key={key[:5]}...{key[-5:]} - model={model} as active at {now}")
self.status[key][model]["status"] = "active"
self.status[key][model]["timestamp"] = now
else:
logger.warning(f"[LIMIT] Mark key={key[:5]}...{key[-5:]} - model={model} as blocked until {now + (retry_delay or 60)} (retry_delay={retry_delay})")
self.status[key][model]["status"] = "blocked"
self.status[key][model]["timestamp"] = now + (retry_delay or 60)
# Chỉ clear current pair nếu chính xác là pair đang được sử dụng
# Không clear ngay lập tức để tránh trigger scan không cần thiết
if key == self.current_key and model == self.current_model:
logger.warning(f"[LIMIT] Current pair blocked, will scan for new pair on next request")
# Không clear ngay lập tức, để get_current_key_model() tự xử lý
def iterate_key_model(self) -> Iterator[Tuple[str, str]]:
"""
Legacy method - chỉ trả về current pair.
Để tương thích với code cũ.
"""
key, model = self.get_current_key_model()
yield key, model |