grok2api / app /services /grok /create.py
JXJBing's picture
Upload 45 files
1a9e2c2 verified
"""Post创建管理器 - 用于视频生成前的会话创建"""
import asyncio
import orjson
from typing import Dict, Any, Optional
from curl_cffi.requests import AsyncSession
from app.services.grok.statsig import get_dynamic_headers
from app.core.exception import GrokApiException
from app.core.config import setting
from app.core.logger import logger
# 常量
ENDPOINT = "https://grok.com/rest/media/post/create"
TIMEOUT = 30
BROWSER = "chrome133a"
class PostCreateManager:
"""会话创建管理器"""
@staticmethod
async def create(file_id: str, file_uri: str, auth_token: str) -> Optional[Dict[str, Any]]:
"""创建会话记录
Args:
file_id: 文件ID
file_uri: 文件URI
auth_token: 认证令牌
Returns:
会话信息字典,包含post_id等
"""
# 参数验证
if not file_id or not file_uri:
raise GrokApiException("文件ID或URI缺失", "INVALID_PARAMS")
if not auth_token:
raise GrokApiException("认证令牌缺失", "NO_AUTH_TOKEN")
try:
# 构建请求
data = {
"media_url": f"https://assets.grok.com/{file_uri}",
"media_type": "MEDIA_POST_TYPE_IMAGE"
}
cf = setting.grok_config.get("cf_clearance", "")
headers = {
**get_dynamic_headers("/rest/media/post/create"),
"Cookie": f"{auth_token};{cf}" if cf else auth_token
}
# 外层重试:可配置状态码(401/429等)
retry_codes = setting.grok_config.get("retry_status_codes", [401, 429])
MAX_OUTER_RETRY = 3
for outer_retry in range(MAX_OUTER_RETRY + 1): # +1 确保实际重试3次
# 内层重试:403代理池重试
max_403_retries = 5
retry_403_count = 0
while retry_403_count <= max_403_retries:
# 异步获取代理(支持代理池)
from app.core.proxy_pool import proxy_pool
# 如果是403重试且使用代理池,强制刷新代理
if retry_403_count > 0 and proxy_pool._enabled:
logger.info(f"[PostCreate] 403重试 {retry_403_count}/{max_403_retries},刷新代理...")
proxy = await proxy_pool.force_refresh()
else:
proxy = await setting.get_proxy_async("service")
proxies = {"http": proxy, "https": proxy} if proxy else None
# 发送请求
async with AsyncSession() as session:
response = await session.post(
ENDPOINT,
headers=headers,
json=data,
impersonate=BROWSER,
timeout=TIMEOUT,
proxies=proxies
)
# 内层403重试:仅当有代理池时触发
if response.status_code == 403 and proxy_pool._enabled:
retry_403_count += 1
if retry_403_count <= max_403_retries:
logger.warning(f"[PostCreate] 遇到403错误,正在重试 ({retry_403_count}/{max_403_retries})...")
await asyncio.sleep(0.5)
continue
# 内层重试全部失败
logger.error(f"[PostCreate] 403错误,已重试{retry_403_count-1}次,放弃")
# 检查可配置状态码错误 - 外层重试
if response.status_code in retry_codes:
if outer_retry < MAX_OUTER_RETRY:
delay = (outer_retry + 1) * 0.1 # 渐进延迟:0.1s, 0.2s, 0.3s
logger.warning(f"[PostCreate] 遇到{response.status_code}错误,外层重试 ({outer_retry+1}/{MAX_OUTER_RETRY}),等待{delay}s...")
await asyncio.sleep(delay)
break # 跳出内层循环,进入外层重试
else:
logger.error(f"[PostCreate] {response.status_code}错误,已重试{outer_retry}次,放弃")
raise GrokApiException(f"创建失败: {response.status_code}错误", "CREATE_ERROR")
if response.status_code == 200:
result = response.json()
post_id = result.get("post", {}).get("id", "")
if outer_retry > 0 or retry_403_count > 0:
logger.info(f"[PostCreate] 重试成功!")
logger.debug(f"[PostCreate] 成功,会话ID: {post_id}")
return {
"post_id": post_id,
"file_id": file_id,
"file_uri": file_uri,
"success": True,
"data": result
}
# 其他错误处理
try:
error = response.json()
msg = f"状态码: {response.status_code}, 详情: {error}"
except:
msg = f"状态码: {response.status_code}, 详情: {response.text[:200]}"
logger.error(f"[PostCreate] 失败: {msg}")
raise GrokApiException(f"创建失败: {msg}", "CREATE_ERROR")
except GrokApiException:
raise
except Exception as e:
logger.error(f"[PostCreate] 异常: {e}")
raise GrokApiException(f"创建异常: {e}", "CREATE_ERROR") from e