2api / src /utils.py
lin7zhi's picture
Upload folder using huggingface_hub
69fec20 verified
from datetime import datetime, timezone
from typing import List, Optional
from config import get_api_password, get_panel_password
from fastapi import Depends, HTTPException, Header, Query, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from log import log
# HTTP Bearer security scheme
security = HTTPBearer()
# ====================== OAuth Configuration ======================
GEMINICLI_USER_AGENT = "GeminiCLI/0.1.5 (Windows; AMD64)"
ANTIGRAVITY_USER_AGENT = "antigravity/1.11.3 windows/amd64"
# OAuth Configuration - 标准模式
CLIENT_ID = "681255809395-oo8ft2oprdrnp9e3aqf6av3hmdib135j.apps.googleusercontent.com"
CLIENT_SECRET = "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"
SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/userinfo.profile",
]
# Antigravity OAuth Configuration
ANTIGRAVITY_CLIENT_ID = "1071006060591-tmhssin2h21lcre235vtolojh4g403ep.apps.googleusercontent.com"
ANTIGRAVITY_CLIENT_SECRET = "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"
ANTIGRAVITY_SCOPES = [
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/userinfo.email',
'https://www.googleapis.com/auth/userinfo.profile',
'https://www.googleapis.com/auth/cclog',
'https://www.googleapis.com/auth/experimentsandconfigs'
]
# 统一的 Token URL(两种模式相同)
TOKEN_URL = "https://oauth2.googleapis.com/token"
# 回调服务器配置
CALLBACK_HOST = "localhost"
# ====================== Model Configuration ======================
# Default Safety Settings for Google API
DEFAULT_SAFETY_SETTINGS = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_IMAGE_HATE", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_IMAGE_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_IMAGE_HARASSMENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_IMAGE_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_JAILBREAK", "threshold": "BLOCK_NONE"},
]
# Model name lists for different features
BASE_MODELS = [
"gemini-2.5-pro",
"gemini-2.5-flash",
"gemini-3-pro-preview",
"gemini-3-flash-preview"
]
# ====================== Model Helper Functions ======================
def is_fake_streaming_model(model_name: str) -> bool:
"""Check if model name indicates fake streaming should be used."""
return model_name.startswith("假流式/")
def is_anti_truncation_model(model_name: str) -> bool:
"""Check if model name indicates anti-truncation should be used."""
return model_name.startswith("流式抗截断/")
def get_base_model_from_feature_model(model_name: str) -> str:
"""Get base model name from feature model name."""
# Remove feature prefixes
for prefix in ["假流式/", "流式抗截断/"]:
if model_name.startswith(prefix):
return model_name[len(prefix) :]
return model_name
def get_available_models(router_type: str = "openai") -> List[str]:
"""
Get available models with feature prefixes.
Args:
router_type: "openai" or "gemini"
Returns:
List of model names with feature prefixes
"""
models = []
for base_model in BASE_MODELS:
# 基础模型
models.append(base_model)
# 假流式模型 (前缀格式)
models.append(f"假流式/{base_model}")
# 流式抗截断模型 (仅在流式传输时有效,前缀格式)
models.append(f"流式抗截断/{base_model}")
# 支持thinking模式后缀与功能前缀组合
# 新增: 支持多后缀组合 (thinking + search)
thinking_suffixes = ["-maxthinking", "-nothinking"]
search_suffix = "-search"
# 1. 单独的 thinking 后缀
for thinking_suffix in thinking_suffixes:
models.append(f"{base_model}{thinking_suffix}")
models.append(f"假流式/{base_model}{thinking_suffix}")
models.append(f"流式抗截断/{base_model}{thinking_suffix}")
# 2. 单独的 search 后缀
models.append(f"{base_model}{search_suffix}")
models.append(f"假流式/{base_model}{search_suffix}")
models.append(f"流式抗截断/{base_model}{search_suffix}")
# 3. thinking + search 组合后缀
for thinking_suffix in thinking_suffixes:
combined_suffix = f"{thinking_suffix}{search_suffix}"
models.append(f"{base_model}{combined_suffix}")
models.append(f"假流式/{base_model}{combined_suffix}")
models.append(f"流式抗截断/{base_model}{combined_suffix}")
return models
# ====================== Authentication Functions ======================
async def authenticate_flexible(
request: Request,
authorization: Optional[str] = Header(None),
x_api_key: Optional[str] = Header(None, alias="x-api-key"),
access_token: Optional[str] = Header(None, alias="access_token"),
x_goog_api_key: Optional[str] = Header(None, alias="x-goog-api-key"),
key: Optional[str] = Query(None)
) -> str:
"""
统一的灵活认证函数,支持多种认证方式
此函数可以直接用作 FastAPI 的 Depends 依赖
支持的认证方式:
- URL 参数: key
- HTTP 头部: Authorization (Bearer token)
- HTTP 头部: x-api-key
- HTTP 头部: access_token
- HTTP 头部: x-goog-api-key
Args:
request: FastAPI Request 对象
authorization: Authorization 头部值(自动注入)
x_api_key: x-api-key 头部值(自动注入)
access_token: access_token 头部值(自动注入)
x_goog_api_key: x-goog-api-key 头部值(自动注入)
key: URL 参数 key(自动注入)
Returns:
验证通过的token
Raises:
HTTPException: 认证失败时抛出异常
使用示例:
@router.post("/endpoint")
async def endpoint(token: str = Depends(authenticate_flexible)):
# token 已验证通过
pass
"""
password = await get_api_password()
token = None
auth_method = None
# 1. 尝试从 URL 参数 key 获取(Google 官方标准方式)
if key:
token = key
auth_method = "URL parameter 'key'"
# 2. 尝试从 x-goog-api-key 头部获取(Google API 标准方式)
elif x_goog_api_key:
token = x_goog_api_key
auth_method = "x-goog-api-key header"
# 3. 尝试从 x-api-key 头部获取
elif x_api_key:
token = x_api_key
auth_method = "x-api-key header"
# 4. 尝试从 access_token 头部获取
elif access_token:
token = access_token
auth_method = "access_token header"
# 5. 尝试从 Authorization 头部获取
elif authorization:
if not authorization.startswith("Bearer "):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication scheme. Use 'Bearer <token>'",
headers={"WWW-Authenticate": "Bearer"},
)
token = authorization[7:] # 移除 "Bearer " 前缀
auth_method = "Authorization Bearer header"
# 检查是否提供了任何认证凭据
if not token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing authentication credentials. Use 'key' URL parameter, 'x-goog-api-key', 'x-api-key', 'access_token' header, or 'Authorization: Bearer <token>'",
headers={"WWW-Authenticate": "Bearer"},
)
# 验证 token
if token != password:
log.error(f"Authentication failed using {auth_method}")
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="密码错误"
)
log.debug(f"Authentication successful using {auth_method}")
return token
# 为了保持向后兼容,保留旧函数名作为别名
authenticate_bearer = authenticate_flexible
authenticate_gemini_flexible = authenticate_flexible
# ====================== Panel Authentication Functions ======================
async def verify_panel_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
"""
简化的控制面板密码验证函数
直接验证Bearer token是否等于控制面板密码
Args:
credentials: HTTPAuthorizationCredentials 自动注入
Returns:
验证通过的token
Raises:
HTTPException: 密码错误时抛出401异常
"""
password = await get_panel_password()
if credentials.credentials != password:
raise HTTPException(status_code=401, detail="密码错误")
return credentials.credentials