|
import time |
|
import hashlib |
|
import json |
|
from typing import Dict, Any, Optional |
|
import logging |
|
from app.utils.logging import log |
|
logger = logging.getLogger("my_logger") |
|
|
|
class ResponseCacheManager: |
|
"""管理API响应缓存的类""" |
|
|
|
def __init__(self, expiry_time: int, max_entries: int, remove_after_use: bool = True, |
|
cache_dict: Dict[str, Dict[str, Any]] = None): |
|
self.cache = cache_dict if cache_dict is not None else {} |
|
self.expiry_time = expiry_time |
|
self.max_entries = max_entries |
|
self.remove_after_use = remove_after_use |
|
|
|
def get(self, cache_key: str): |
|
"""获取缓存项,如果存在且未过期""" |
|
now = time.time() |
|
if cache_key in self.cache and now < self.cache[cache_key].get('expiry_time', 0): |
|
cached_item = self.cache[cache_key] |
|
|
|
|
|
response = cached_item['response'] |
|
|
|
|
|
return response, True |
|
|
|
return None, False |
|
|
|
def store(self, cache_key: str, response, client_ip: str = None): |
|
"""存储响应到缓存""" |
|
now = time.time() |
|
self.cache[cache_key] = { |
|
'response': response, |
|
'expiry_time': now + self.expiry_time, |
|
'created_at': now, |
|
} |
|
|
|
log('info', f"响应已缓存: {cache_key[:8]}...", |
|
extra={'request_type': 'non-stream'}) |
|
|
|
|
|
self.clean_if_needed() |
|
|
|
def clean_expired(self): |
|
"""清理所有过期的缓存项""" |
|
now = time.time() |
|
expired_keys = [k for k, v in self.cache.items() if now > v.get('expiry_time', 0)] |
|
|
|
for key in expired_keys: |
|
del self.cache[key] |
|
log('info', f"清理过期缓存: {key[:8]}...") |
|
|
|
def clean_if_needed(self): |
|
"""如果缓存数量超过限制,清理最旧的项目""" |
|
if len(self.cache) <= self.max_entries: |
|
return |
|
|
|
|
|
sorted_keys = sorted(self.cache.keys(), |
|
key=lambda k: self.cache[k].get('created_at', 0)) |
|
|
|
|
|
to_remove = len(self.cache) - self.max_entries |
|
|
|
|
|
for key in sorted_keys[:to_remove]: |
|
del self.cache[key] |
|
log('info', f"缓存容量限制,删除旧缓存: {key[:8]}...", extra={'cache_operation': 'limit'}) |
|
|
|
def generate_cache_key(chat_request) -> str: |
|
"""生成请求的唯一缓存键""" |
|
|
|
request_data = { |
|
'model': chat_request.model, |
|
'messages': [] |
|
} |
|
|
|
|
|
for msg in chat_request.messages: |
|
if isinstance(msg.content, str): |
|
message_data = {'role': msg.role, 'content': msg.content} |
|
request_data['messages'].append(message_data) |
|
elif isinstance(msg.content, list): |
|
content_list = [] |
|
for item in msg.content: |
|
if item.get('type') == 'text': |
|
content_list.append({'type': 'text', 'text': item.get('text')}) |
|
|
|
elif item.get('type') == 'image_url': |
|
image_data = item.get('image_url', {}).get('url', '') |
|
if image_data.startswith('data:image/'): |
|
|
|
content_list.append({'type': 'image_url', 'hash': hashlib.md5(image_data[:32].encode()).hexdigest()}) |
|
else: |
|
content_list.append({'type': 'image_url', 'url': image_data}) |
|
request_data['messages'].append({'role': msg.role, 'content': content_list}) |
|
|
|
|
|
json_data = json.dumps(request_data, sort_keys=True) |
|
return hashlib.md5(json_data.encode()).hexdigest() |
|
|
|
def cache_response(response, cache_key, client_ip, response_cache_manager, endpoint=None,model=None): |
|
""" |
|
将响应存入缓存 |
|
|
|
参数: |
|
- response: 响应对象 |
|
- cache_key: 缓存键 |
|
- response_cache_manager: 缓存管理器 |
|
- api_key: API密钥,用于更新API密钥使用统计 |
|
""" |
|
if not cache_key: |
|
return |
|
|
|
|
|
existing_cache = cache_key in response_cache_manager.cache |
|
|
|
if existing_cache: |
|
log('info', f"缓存已存在,跳过存储: {cache_key[:8]}...", |
|
extra={'request_type': 'non-stream'}) |
|
else: |
|
response_cache_manager.store(cache_key, response) |
|
log('info', f"API响应已缓存: {cache_key[:8]}...", |
|
extra={'request_type': 'non-stream'}) |