Spaces:
Paused
Paused
| import time | |
| import hashlib | |
| import json | |
| from typing import Dict, Any, Optional | |
| import logging | |
| from app.utils.logging import log | |
| from app.config.settings import ( | |
| api_call_stats | |
| ) | |
| logger = logging.getLogger("my_logger") | |
| class ResponseCacheManager: | |
| """管理API响应缓存的类""" | |
| def __init__(self, expiry_time: int, max_entries: int, remove_after_use: bool = True, | |
| cache_dict: Dict[str, Dict[str, Any]] = None): | |
| self.cache = cache_dict if cache_dict is not None else {} # 使用传入的缓存字典或创建新字典 | |
| self.expiry_time = expiry_time | |
| self.max_entries = max_entries | |
| self.remove_after_use = remove_after_use | |
| def get(self, cache_key: str): | |
| """获取缓存项,如果存在且未过期""" | |
| now = time.time() | |
| if cache_key in self.cache and now < self.cache[cache_key].get('expiry_time', 0): | |
| cached_item = self.cache[cache_key] | |
| # 获取响应但先不删除 | |
| response = cached_item['response'] | |
| # 返回响应 | |
| return response, True | |
| return None, False | |
| def store(self, cache_key: str, response, client_ip: str = None): | |
| """存储响应到缓存""" | |
| now = time.time() | |
| self.cache[cache_key] = { | |
| 'response': response, | |
| 'expiry_time': now + self.expiry_time, | |
| 'created_at': now, | |
| 'client_ip': client_ip | |
| } | |
| log('info', f"响应已缓存: {cache_key[:8]}...", | |
| extra={'cache_operation': 'store', 'request_type': 'non-stream'}) | |
| # 如果缓存超过限制,清理最旧的 | |
| self.clean_if_needed() | |
| def clean_expired(self): | |
| """清理所有过期的缓存项""" | |
| now = time.time() | |
| expired_keys = [k for k, v in self.cache.items() if now > v.get('expiry_time', 0)] | |
| for key in expired_keys: | |
| del self.cache[key] | |
| log('info', f"清理过期缓存: {key[:8]}...", extra={'cache_operation': 'clean'}) | |
| def clean_if_needed(self): | |
| """如果缓存数量超过限制,清理最旧的项目""" | |
| if len(self.cache) <= self.max_entries: | |
| return | |
| # 按创建时间排序 | |
| sorted_keys = sorted(self.cache.keys(), | |
| key=lambda k: self.cache[k].get('created_at', 0)) | |
| # 计算需要删除的数量 | |
| to_remove = len(self.cache) - self.max_entries | |
| # 删除最旧的项 | |
| for key in sorted_keys[:to_remove]: | |
| del self.cache[key] | |
| log('info', f"缓存容量限制,删除旧缓存: {key[:8]}...", extra={'cache_operation': 'limit'}) | |
| def generate_cache_key(chat_request) -> str: | |
| """生成请求的唯一缓存键""" | |
| # 创建包含请求关键信息的字典 | |
| request_data = { | |
| 'model': chat_request.model, | |
| 'messages': [] | |
| } | |
| # 添加消息内容 | |
| for msg in chat_request.messages: | |
| if isinstance(msg.content, str): | |
| message_data = {'role': msg.role, 'content': msg.content} | |
| request_data['messages'].append(message_data) | |
| elif isinstance(msg.content, list): | |
| content_list = [] | |
| for item in msg.content: | |
| if item.get('type') == 'text': | |
| content_list.append({'type': 'text', 'text': item.get('text')}) | |
| # 对于图像数据,我们只使用标识符而不是全部数据 | |
| elif item.get('type') == 'image_url': | |
| image_data = item.get('image_url', {}).get('url', '') | |
| if image_data.startswith('data:image/'): | |
| # 对于base64图像,使用前32字符作为标识符 | |
| content_list.append({'type': 'image_url', 'hash': hashlib.md5(image_data[:32].encode()).hexdigest()}) | |
| else: | |
| content_list.append({'type': 'image_url', 'url': image_data}) | |
| request_data['messages'].append({'role': msg.role, 'content': content_list}) | |
| # 将字典转换为JSON字符串并计算哈希值 | |
| json_data = json.dumps(request_data, sort_keys=True) | |
| return hashlib.md5(json_data.encode()).hexdigest() | |
| def cache_response(response, cache_key, client_ip, response_cache_manager, update_api_call_stats, api_key=None): | |
| """ | |
| 将响应存入缓存 | |
| 参数: | |
| - response: 响应对象 | |
| - cache_key: 缓存键 | |
| - client_ip: 客户端IP | |
| - response_cache_manager: 缓存管理器 | |
| - update_api_call_stats: 更新统计的函数 | |
| - api_key: API密钥,用于更新API密钥使用统计 | |
| """ | |
| if not cache_key: | |
| return | |
| # 先检查缓存是否已存在 | |
| existing_cache = cache_key in response_cache_manager.cache | |
| if existing_cache: | |
| log('info', f"缓存已存在,跳过存储: {cache_key[:8]}...", | |
| extra={'cache_operation': 'skip-existing', 'request_type': 'non-stream'}) | |
| else: | |
| response_cache_manager.store(cache_key, response, client_ip) | |
| log('info', f"API响应已缓存: {cache_key[:8]}...", | |
| extra={'cache_operation': 'store-new', 'request_type': 'non-stream'}) | |
| # 更新API调用统计 | |
| update_api_call_stats(api_call_stats, api_key) |