|
import logging |
|
import json |
|
import time |
|
import asyncio |
|
import os |
|
import traceback |
|
import sys |
|
from contextlib import asynccontextmanager |
|
import random |
|
|
|
import uvicorn |
|
from fastapi import FastAPI, Request, HTTPException |
|
from fastapi.responses import StreamingResponse |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from fastapi.responses import JSONResponse |
|
from google import genai |
|
from google.genai import types |
|
from typing import Optional, List, Dict, Any |
|
|
|
|
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s [%(levelname)s]: %(message)s', |
|
datefmt='%Y-%m-%d %H:%M:%S' |
|
) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
GEMINI_MODELS = { |
|
"gemini-2.0-flash-exp": "gemini-2.0-flash-exp", |
|
"gemini-2.5-flash-preview-05-20": "gemini-2.5-flash-preview-05-20", |
|
"gemini-2.5-flash": "gemini-2.5-flash", |
|
"gemini-2.5-flash-preview-04-17": "gemini-2.5-flash-preview-04-17", |
|
"gemini-2.5-pro": "gemini-2.5-pro" |
|
} |
|
|
|
|
|
SUPPORTED_MODELS = [ |
|
{ |
|
"id": "gemini-2.5-flash-preview-05-20", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-2.5-flash-preview-05-20", |
|
"parent": None, |
|
"description": "Gemini 2.5 Flash Preview - 最新实验性模型" |
|
}, |
|
{ |
|
"id": "gemini-2.5-flash-preview-04-17", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-2.5-flash-preview-04-17", |
|
"parent": None, |
|
"description": "gemini-2.5-flash-preview-04-17- 经典专业模型" |
|
}, |
|
{ |
|
"id": "gemini-2.5-flash", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-2.5-flash", |
|
"parent": None, |
|
"description": "gemini-2.5-flash稳定经典专业模型" |
|
}, |
|
{ |
|
"id": "gemini-2.5-pro", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-2.5-pro", |
|
"parent": None, |
|
"description": "gemini-2.5-pro稳定经典专业模型" |
|
} |
|
] |
|
|
|
|
|
def get_model_name(requested_model: str) -> str: |
|
"""获取实际的Gemini模型名称""" |
|
print(f"实际模型名称:{GEMINI_MODELS.get(requested_model)}") |
|
return GEMINI_MODELS.get(requested_model, "gemini-2.5-flash") |
|
|
|
|
|
|
|
|
|
def convert_messages(messages): |
|
content_parts = [] |
|
system_instruction = None |
|
|
|
for message in messages: |
|
role = message.get("role", "user") |
|
content = message.get("content", "") |
|
|
|
if role == "system": |
|
system_instruction = content |
|
elif role == "assistant": |
|
content_parts.append({ |
|
"role": "model", |
|
"parts": [{"text": content}] |
|
}) |
|
elif role == "user": |
|
content_parts.append({ |
|
"role": "user", |
|
"parts": [{"text": content}] |
|
}) |
|
|
|
return content_parts, system_instruction |
|
|
|
|
|
def handle_error(error): |
|
"""简化的错误处理""" |
|
error_str = str(error).lower() |
|
|
|
if "prompt_feedback" in error_str: |
|
if "other" in error_str: |
|
return "您的输入内容可能过长或触发了安全策略。请尝试缩短您的问题。", "length" |
|
elif "safety" in error_str: |
|
return "您的请求被安全策略阻止。请尝试修改您的问题。", "content_filter" |
|
elif "safety" in error_str: |
|
return "您的请求被安全策略过滤。请尝试修改您的问题。", "content_filter" |
|
|
|
return "生成内容时遇到错误。请稍后重试。", "stop" |
|
|
|
|
|
@asynccontextmanager |
|
async def lifespan(app: FastAPI): |
|
try: |
|
setup_gemini() |
|
logger.info("应用启动完成") |
|
yield |
|
except Exception as e: |
|
logger.error(f"应用启动失败: {str(e)}") |
|
raise |
|
finally: |
|
logger.info("应用关闭") |
|
|
|
|
|
|
|
app = FastAPI( |
|
lifespan=lifespan, |
|
title="Gemini Official API (ap2)", |
|
version="1.3.0" |
|
) |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
|
|
|
|
|
|
API_KEYS = [ |
|
'AIzaSyA4cFMyM6Mry9hp7Aser3ASyEEzUFfLvNM', |
|
'AIzaSyCcu6mKtAv1gQlDXnelvViezBQSf3ntJqc' |
|
] |
|
|
|
|
|
def get_random_api_key(): |
|
"""获取随机API密钥""" |
|
return random.choice(API_KEYS) |
|
|
|
|
|
def setup_gemini(api_key=None): |
|
"""配置Gemini API""" |
|
if not api_key: |
|
api_key = get_random_api_key() |
|
|
|
if not API_KEYS: |
|
logger.error("请设置有效的API密钥列表") |
|
raise ValueError("API_KEYS未设置") |
|
|
|
client = genai.Client(api_key=api_key) |
|
return client, api_key |
|
|
|
|
|
|
|
SAFETY_SETTINGS = [ |
|
types.SafetySetting( |
|
category=types.HarmCategory.HARM_CATEGORY_HARASSMENT, |
|
threshold=types.HarmBlockThreshold.BLOCK_NONE, |
|
), |
|
types.SafetySetting( |
|
category=types.HarmCategory.HARM_CATEGORY_HATE_SPEECH, |
|
threshold=types.HarmBlockThreshold.BLOCK_NONE, |
|
), |
|
types.SafetySetting( |
|
category=types.HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT, |
|
threshold=types.HarmBlockThreshold.BLOCK_NONE, |
|
), |
|
types.SafetySetting( |
|
category=types.HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT, |
|
threshold=types.HarmBlockThreshold.BLOCK_NONE, |
|
), |
|
types.SafetySetting( |
|
category=types.HarmCategory.HARM_CATEGORY_CIVIC_INTEGRITY, |
|
threshold=types.HarmBlockThreshold.BLOCK_NONE, |
|
), |
|
] |
|
|
|
|
|
async def try_generate_content(model_name, content_parts, config, max_retries=3): |
|
"""带重试机制的内容生成""" |
|
last_error = None |
|
used_keys = set() |
|
|
|
for attempt in range(max_retries): |
|
try: |
|
available_keys = [key for key in API_KEYS if key not in used_keys] |
|
if not available_keys: |
|
used_keys.clear() |
|
available_keys = API_KEYS |
|
|
|
api_key = random.choice(available_keys) |
|
used_keys.add(api_key) |
|
|
|
client, current_key = setup_gemini(api_key) |
|
logger.info(f"尝试第 {attempt + 1} 次,使用密钥: {current_key[:20]}...") |
|
|
|
response = client.models.generate_content( |
|
model=model_name, |
|
contents=content_parts, |
|
config=config |
|
) |
|
|
|
return response, current_key |
|
|
|
except Exception as e: |
|
last_error = e |
|
error_str = str(e).lower() |
|
|
|
if any(code in error_str for code in ['400', '401', '403', '429', '500', '502', '503', '504']): |
|
logger.warning(f"第 {attempt + 1} 次尝试失败: {str(e)}") |
|
if attempt < max_retries - 1: |
|
await asyncio.sleep(1) |
|
continue |
|
else: |
|
raise e |
|
|
|
raise last_error |
|
|
|
|
|
@app.post("/v1/chat/completions") |
|
async def chat_completions(request: Request): |
|
"""聊天对话接口""" |
|
try: |
|
body = await request.json() |
|
messages = body.get('messages', []) |
|
stream = body.get('stream', False) |
|
max_tokens = body.get('max_tokens', 65536) |
|
temperature = body.get('temperature', 1.2) |
|
top_p = body.get('top_p', 0.0) |
|
requested_model = body.get('model', 'gemini-2.5-flash') |
|
|
|
model_name = get_model_name(requested_model) |
|
content_parts, system_instruction = convert_messages(messages) |
|
|
|
config = types.GenerateContentConfig( |
|
max_output_tokens=max_tokens, |
|
temperature=temperature, |
|
top_p=top_p, |
|
system_instruction=system_instruction, |
|
safety_settings=SAFETY_SETTINGS, |
|
) |
|
|
|
if stream: |
|
client, api_key = setup_gemini() |
|
return StreamingResponse( |
|
stream_response_with_retry(client, model_name, content_parts, config), |
|
media_type='text/event-stream' |
|
) |
|
else: |
|
response, used_key = await try_generate_content(model_name, content_parts, config) |
|
response_text = response.text if response else "" |
|
finish_reason = "stop" |
|
if not response_text: |
|
response_text = "无法生成回复。请尝试修改您的问题。" |
|
|
|
logger.info(f"成功生成回复,使用密钥: {used_key[:20]}...") |
|
return { |
|
'id': f'chatcmpl-{int(time.time())}-{random.randint(1000, 9999)}', |
|
'object': 'chat.completion', |
|
'created': int(time.time()), |
|
'model': requested_model, |
|
'choices': [{'index': 0, 'message': {'role': 'assistant', 'content': response_text}, 'finish_reason': finish_reason}], |
|
'usage': {'prompt_tokens': len(content_parts), 'completion_tokens': len(response_text.split()), 'total_tokens': len(content_parts) + len(response_text.split())} |
|
} |
|
except Exception as e: |
|
logger.error(f"处理聊天请求出错: {str(e)}") |
|
error_message, finish_reason = handle_error(e) |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
async def stream_response_with_retry(client, model_name, content_parts, config, max_retries=3): |
|
"""带重试机制的流式响应生成器""" |
|
last_error = None |
|
used_keys = set() |
|
for attempt in range(max_retries): |
|
try: |
|
available_keys = [key for key in API_KEYS if key not in used_keys] |
|
if not available_keys: |
|
used_keys.clear() |
|
available_keys = API_KEYS |
|
api_key = random.choice(available_keys) |
|
used_keys.add(api_key) |
|
|
|
current_client, current_key = setup_gemini(api_key) |
|
logger.info(f"流式响应尝试第 {attempt + 1} 次,使用密钥: {current_key[:20]}...") |
|
|
|
for chunk in current_client.models.generate_content_stream(model=model_name, contents=content_parts, config=config): |
|
if chunk and hasattr(chunk, 'text') and chunk.text: |
|
data = {'id': f'chatcmpl-{int(time.time())}-{random.randint(1000, 9999)}', 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': model_name, 'choices': [{'index': 0, 'delta': {'role': 'assistant', 'content': chunk.text}, 'finish_reason': None}]} |
|
yield f'data: {json.dumps(data, ensure_ascii=False)}\n\n' |
|
await asyncio.sleep(0.01) |
|
|
|
final_data = {'id': f'chatcmpl-{int(time.time())}-{random.randint(1000, 9999)}', 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': model_name, 'choices': [{'index': 0, 'delta': {}, 'finish_reason': 'stop'}]} |
|
yield f'data: {json.dumps(final_data, ensure_ascii=False)}\n\n' |
|
yield 'data: [DONE]\n\n' |
|
|
|
logger.info(f"流式响应成功,使用密钥: {current_key[:20]}...") |
|
return |
|
except Exception as e: |
|
last_error = e |
|
error_str = str(e).lower() |
|
if any(code in error_str for code in ['400', '401', '403', '429', '500', '502', '503', '504']): |
|
logger.warning(f"流式响应第 {attempt + 1} 次尝试失败: {str(e)}") |
|
if attempt < max_retries - 1: |
|
await asyncio.sleep(1) |
|
continue |
|
else: |
|
break |
|
logger.error(f"流式响应所有重试失败: {str(last_error)}") |
|
error_message, finish_reason = handle_error(last_error) |
|
error_data = {'id': f'chatcmpl-{int(time.time())}-error', 'object': 'chat.completion.chunk', 'created': int(time.time()), 'model': model_name, 'choices': [{'index': 0, 'delta': {'role': 'assistant', 'content': error_message}, 'finish_reason': finish_reason}]} |
|
yield f'data: {json.dumps(error_data, ensure_ascii=False)}\n\n' |
|
yield 'data: [DONE]\n\n' |
|
|
|
@app.get("/v1/models") |
|
async def list_models(): |
|
"""获取可用模型列表""" |
|
try: |
|
return {"object": "list", "data": SUPPORTED_MODELS} |
|
except Exception as e: |
|
logger.error(f"获取模型列表出错: {str(e)}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@app.get("/v1/models/{model_id}") |
|
async def get_model_info(model_id: str): |
|
"""获取特定模型信息""" |
|
try: |
|
for model in SUPPORTED_MODELS: |
|
if model["id"] == model_id: |
|
return model |
|
raise HTTPException(status_code=404, detail=f"模型 {model_id} 未找到") |
|
except HTTPException: |
|
raise |
|
except Exception as e: |
|
logger.error(f"获取模型信息出错: {str(e)}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@app.get("/v1/chat/completions/v1/models") |
|
async def list_models_alternative(): |
|
"""获取可用模型列表 - 兼容路径""" |
|
try: |
|
return {"object": "list", "data": SUPPORTED_MODELS} |
|
except Exception as e: |
|
logger.error(f"获取模型列表出错: {str(e)}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@app.get("/health") |
|
async def health_check(): |
|
"""健康检查端点""" |
|
try: |
|
return {"status": "healthy", "timestamp": int(time.time()), "api": "gemini-official", "available_models": [model["id"] for model in SUPPORTED_MODELS], "version": "1.3.0"} |
|
except Exception as e: |
|
logger.error(f"健康检查失败: {str(e)}") |
|
return {"status": "unhealthy", "timestamp": int(time.time()), "error": str(e)} |
|
|
|
@app.get("/") |
|
async def root(): |
|
"""根路径信息""" |
|
return {"name": "Gemini Official API (ap2)", "version": "1.3.0", "description": "Google Gemini官方API接口服务", "endpoints": {"models": "/v1/models", "models_alt": "/v1/chat/completions/v1/models", "chat": "/v1/chat/completions", "health": "/health"}} |
|
|
|
@app.exception_handler(404) |
|
async def not_found_handler(request: Request, exc: HTTPException): |
|
"""处理404错误""" |
|
return {"error": "未找到", "requested_path": str(request.url.path), "message": "请求的路径不存在", "available_endpoints": {"models": "/v1/models", "models_alt": "/v1/chat/completions/v1/models", "chat": "/v1/chat/completions", "health": "/health", "info": "/"}} |
|
|
|
if __name__ == "__main__": |
|
port = int(os.environ.get("PORT", 7861)) |
|
print(f"🚀 启动Gemini官方API服务器于端口 {port}") |
|
print(f"📊 支持的模型: {[model['id'] for model in SUPPORTED_MODELS]}") |
|
print(f"🔑 已配置 {len(API_KEYS)} 个API密钥") |
|
print("🔄 支持自动重试和密钥轮换") |
|
uvicorn.run(app, host="0.0.0.0", port=port) |