|
import asyncio |
|
from fastapi import HTTPException, status, Request |
|
from app.models import ChatCompletionRequest |
|
from app.services import GeminiClient |
|
from app.utils import cache_response, update_api_call_stats,handle_api_error |
|
from app.utils.logging import log |
|
from .client_disconnect import check_client_disconnect, handle_client_disconnect |
|
from .gemini_handlers import run_gemini_completion |
|
import app.config.settings as settings |
|
import random |
|
from typing import Literal |
|
|
|
|
|
async def process_nonstream_request( |
|
chat_request: ChatCompletionRequest, |
|
http_request: Request, |
|
request_type: str, |
|
contents, |
|
system_instruction, |
|
current_api_key: str, |
|
response_cache_manager, |
|
active_requests_manager, |
|
safety_settings, |
|
safety_settings_g2, |
|
api_call_stats, |
|
cache_key: str = None, |
|
client_ip: str = None |
|
): |
|
"""处理非流式API请求""" |
|
gemini_client = GeminiClient(current_api_key) |
|
|
|
|
|
gemini_task = asyncio.create_task( |
|
run_gemini_completion( |
|
gemini_client, |
|
chat_request, |
|
contents, |
|
system_instruction, |
|
request_type, |
|
current_api_key, |
|
safety_settings, |
|
safety_settings_g2 |
|
) |
|
) |
|
|
|
disconnect_task = asyncio.create_task( |
|
check_client_disconnect( |
|
http_request, |
|
current_api_key, |
|
request_type, |
|
chat_request.model |
|
) |
|
) |
|
|
|
try: |
|
|
|
done, pending = await asyncio.wait( |
|
[gemini_task, disconnect_task], |
|
return_when=asyncio.FIRST_COMPLETED |
|
) |
|
|
|
if disconnect_task in done: |
|
|
|
result = await handle_client_disconnect( |
|
gemini_task, |
|
chat_request, |
|
request_type, |
|
current_api_key, |
|
response_cache_manager, |
|
cache_key, |
|
client_ip, |
|
chat_request.model, |
|
current_api_key |
|
) |
|
return (result, "success" if result else "empty") |
|
|
|
else: |
|
|
|
disconnect_task.cancel() |
|
|
|
|
|
response_content = await gemini_task |
|
|
|
|
|
if not response_content or not response_content.text: |
|
log('warning', f"非流式请求: API密钥 {current_api_key[:8]}... 返回空响应", |
|
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
|
return (None, "empty") |
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key) |
|
if cache_hit: |
|
log('info', f"缓存已存在,直接返回: {cache_key[:8]}...", |
|
extra={'cache_operation': 'use-existing', 'request_type': request_type}) |
|
|
|
|
|
if cache_key in response_cache_manager.cache: |
|
del response_cache_manager.cache[cache_key] |
|
log('info', f"缓存使用后已删除: {cache_key[:8]}...", |
|
extra={'cache_operation': 'used-and-removed', 'request_type': request_type}) |
|
|
|
return (cached_response, "success") |
|
|
|
|
|
from app.utils.response import create_response |
|
response = create_response(chat_request, response_content) |
|
|
|
update_api_call_stats(settings.api_call_stats, endpoint=current_api_key, model=chat_request.model) |
|
|
|
cache_response(response, cache_key, client_ip, response_cache_manager, endpoint=current_api_key,model=chat_request.model) |
|
|
|
|
|
if cache_key and cache_key in response_cache_manager.cache: |
|
del response_cache_manager.cache[cache_key] |
|
log('info', f"缓存创建后立即删除: {cache_key[:8]}...", |
|
extra={'request_type': request_type}) |
|
|
|
|
|
return (response, "success") |
|
|
|
except asyncio.CancelledError: |
|
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, 'error_message':"请求被取消"} |
|
log('info', "请求取消", extra=extra_log) |
|
|
|
|
|
cached_response, cache_hit = response_cache_manager.get(cache_key) |
|
if cache_hit: |
|
log('info', f"请求取消但找到有效缓存,使用缓存响应: {cache_key[:8]}...", |
|
extra={'cache_operation': 'use-cache-on-cancel', 'request_type': request_type}) |
|
|
|
|
|
if cache_key in response_cache_manager.cache: |
|
del response_cache_manager.cache[cache_key] |
|
log('info', f"缓存使用后已删除: {cache_key[:8]}...", |
|
extra={'cache_operation': 'used-and-removed', 'request_type': request_type}) |
|
|
|
return (cached_response, "success") |
|
|
|
|
|
if not gemini_task.done(): |
|
log('info', "请求取消但API请求尚未完成,继续等待...", |
|
extra={'key': current_api_key[:8], 'request_type': request_type}) |
|
|
|
|
|
response_content = await asyncio.shield(gemini_task) |
|
|
|
|
|
if not response_content or not response_content.text: |
|
log('warning', f"非流式请求(取消后): API密钥 {current_api_key[:8]}... 返回空响应", |
|
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
|
return (None, "empty") |
|
update_api_call_stats(settings.api_call_stats, endpoint=current_api_key, model=chat_request.model) |
|
|
|
from app.utils.response import create_response |
|
response = create_response(chat_request, response_content) |
|
|
|
|
|
return (response, "success") |
|
else: |
|
|
|
response_content = gemini_task.result() |
|
|
|
|
|
if not response_content or not response_content.text: |
|
log('warning', f"非流式请求(已完成): API密钥 {current_api_key[:8]}... 返回空响应", |
|
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
|
return (None, "empty") |
|
|
|
|
|
from app.utils.response import create_response |
|
response = create_response(chat_request, response_content) |
|
|
|
|
|
return (response, "success") |
|
|
|
except HTTPException as e: |
|
if e.status_code == status.HTTP_408_REQUEST_TIMEOUT: |
|
extra_log = {'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model, |
|
'status_code': 408, 'error_message': '客户端连接中断'} |
|
log('error', "客户端连接中断,终止后续重试", extra=extra_log) |
|
raise |
|
else: |
|
raise |
|
except Exception as e: |
|
|
|
log('error', f"非流式请求异常: {str(e)[:8]}", |
|
extra={'key': current_api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
|
return (None, "error") |
|
|
|
|
|
|
|
async def process_request( |
|
chat_request: ChatCompletionRequest, |
|
http_request: Request, |
|
request_type: Literal['stream', 'non-stream'], |
|
key_manager, |
|
response_cache_manager, |
|
active_requests_manager, |
|
safety_settings, |
|
safety_settings_g2, |
|
api_call_stats, |
|
cache_key: str = None, |
|
client_ip: str = None |
|
): |
|
"""处理非流式请求""" |
|
global current_api_key |
|
|
|
|
|
contents, system_instruction = GeminiClient.convert_messages( |
|
GeminiClient, chat_request.messages,model=chat_request.model) |
|
|
|
|
|
|
|
key_manager.reset_tried_keys_for_request() |
|
|
|
current_concurrent = settings.CONCURRENT_REQUESTS |
|
|
|
|
|
all_keys = key_manager.api_keys.copy() |
|
random.shuffle(all_keys) |
|
|
|
if len(all_keys) < current_concurrent: |
|
current_concurrent = len(all_keys) |
|
|
|
|
|
while all_keys: |
|
|
|
current_batch = all_keys[:current_concurrent] |
|
all_keys = all_keys[current_concurrent:] |
|
|
|
|
|
tasks = [] |
|
for api_key in current_batch: |
|
|
|
log('info', f"并发请求使用密钥: {api_key[:8]}...", |
|
extra={'key': api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
|
|
|
|
|
task = asyncio.create_task( |
|
process_nonstream_request( |
|
chat_request, |
|
http_request, |
|
request_type, |
|
contents, |
|
system_instruction, |
|
api_key, |
|
response_cache_manager, |
|
active_requests_manager, |
|
safety_settings, |
|
safety_settings_g2, |
|
api_call_stats, |
|
cache_key, |
|
client_ip |
|
) |
|
) |
|
tasks.append((api_key, task)) |
|
|
|
|
|
done, pending = await asyncio.wait( |
|
[task for _, task in tasks], |
|
return_when=asyncio.ALL_COMPLETED |
|
) |
|
|
|
|
|
success = False |
|
for api_key, task in tasks: |
|
if task in done: |
|
try: |
|
result, status = task.result() |
|
if status == "success" and result: |
|
success = True |
|
log('info', f"并发请求成功,使用密钥: {api_key[:8]}...", |
|
extra={'key': api_key[:8], 'request_type': request_type, 'model': chat_request.model}) |
|
return result |
|
except Exception as e: |
|
|
|
error_result = await handle_api_error( |
|
e, |
|
api_key, |
|
key_manager, |
|
request_type, |
|
chat_request.model, |
|
0 |
|
) |
|
|
|
|
|
if error_result.get('remove_cache', False) and cache_key and cache_key in response_cache_manager.cache: |
|
log('info', f"因API错误,删除缓存: {cache_key[:8]}...", |
|
extra={'cache_operation': 'remove-on-error', 'request_type': request_type}) |
|
del response_cache_manager.cache[cache_key] |
|
|
|
|
|
if not success and all_keys: |
|
|
|
current_concurrent = min(current_concurrent + settings.INCREASE_CONCURRENT_ON_FAILURE, settings.MAX_CONCURRENT_REQUESTS) |
|
log('info', f"所有并发请求失败或返回空响应,增加并发数至: {current_concurrent}", |
|
extra={'request_type': request_type, 'model': chat_request.model}) |
|
|
|
|
|
msg = "所有API密钥均请求失败,请稍后重试" |
|
log('error', "API key 替换失败,所有API key都已尝试,请重新配置或稍后重试", extra={'request_type': 'switch_key'}) |