TST / main.py
AIMaster7's picture
Update main.py
3607724 verified
import asyncio
import json
import logging
import random
import string
import time
import uuid
from typing import AsyncGenerator, Dict, List, Any
import aiohttp
import brotli
from fastapi import FastAPI, Request, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type, RetryError
# ─── Logging ───
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
datefmt="%H:%M:%S",
)
logger = logging.getLogger("proxy")
# ─── Config ───
BLACKBOX_URL = "https://www.blackbox.ai/api/chat"
REQUEST_TIMEOUT = 300
HEADERS = {
"authority": "www.blackbox.ai",
"method": "POST",
"path": "/api/chat",
"scheme": "https",
"accept": "*/*",
"accept-encoding": "gzip, deflate, br, zstd",
"accept-language": "en-US,en;q=0.9",
"origin": "https://www.blackbox.ai",
"priority": "u=1, i",
"referer": "https://www.blackbox.ai/",
"sec-ch-ua": '"Chromium";v="136", "Google Chrome";v="136", "Not.A/Brand";v="99"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36",
"content-type": "application/json",
}
# ─── FastAPI ───
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
HTTP_SESSION: aiohttp.ClientSession = None
RETRYABLE_STATUSES = {400, 429, 500, 502, 503, 504}
_ascii = string.ascii_letters + string.digits
def _rand(n, pool=_ascii): return ''.join(random.choice(pool) for _ in range(n))
def random_email(): return _rand(12) + "@gmail.com"
def random_id(): return _rand(21, string.digits)
def random_customer_id(): return "cus_" + _rand(12)
def generate_7char_id(): return _rand(7)
def build_payload(messages: List[Dict[str, Any]]) -> Dict[str, Any]:
msg_id = generate_7char_id()
if messages:
messages[-1]["id"] = msg_id
now = int(time.time())
return {
"messages": messages,
"agentMode": {},
"id": msg_id,
"previewToken": None,
"userId": None,
"codeModelMode": True,
"trendingAgentMode": {},
"isMicMode": False,
"userSystemPrompt": None,
"maxTokens": 1024,
"playgroundTopP": None,
"playgroundTemperature": None,
"isChromeExt": False,
"githubToken": "",
"clickedAnswer2": False,
"clickedAnswer3": False,
"clickedForceWebSearch": False,
"visitFromDelta": False,
"isMemoryEnabled": False,
"mobileClient": False,
"userSelectedModel": None,
"validated": "00f37b34-a166-4efb-bce5-1312d87f2f94",
"imageGenerationMode": False,
"webSearchModePrompt": False,
"deepSearchMode": True,
"domains": None,
"vscodeClient": False,
"codeInterpreterMode": False,
"customProfile": {"name": "", "occupation": "", "traits": [], "additionalInfo": "", "enableNewChats": False},
"session": {
"user": {
"name": _rand(10),
"email": random_email(),
"image": "https://lh3.googleusercontent.com/a/default",
"id": random_id()
},
"expires": "2025-06-09T19:36:08.220Z",
"isNewUser": False
},
"isPremium": True,
"subscriptionCache": {
"status": "PREMIUM",
"customerId": random_customer_id(),
"expiryTimestamp": now + 60 * 86400,
"lastChecked": int(time.time() * 1000),
"isTrialSubscription": False
},
"beastMode": False,
"reasoningMode": False,
"designerMode": False
}
class RetryableStatusError(Exception):
def __init__(self, status: int, text: str):
super().__init__(f"status={status} body={text[:100]}...")
def log_retry(retry_state):
rid = retry_state.kwargs.get("request_id", "unknown")
logger.warning("[%s] retry %s/3 due to %s", rid, retry_state.attempt_number, retry_state.outcome.exception())
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10), retry=retry_if_exception_type(
(aiohttp.ClientConnectionError, aiohttp.ClientResponseError, asyncio.TimeoutError, RetryableStatusError)),
before_sleep=log_retry)
async def get_blackbox_response(*, data, stream: bool, request_id: str) -> AsyncGenerator[str, None]:
global HTTP_SESSION
if not HTTP_SESSION:
HTTP_SESSION = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT))
async with HTTP_SESSION.post(BLACKBOX_URL, json=data, headers=HEADERS, timeout=REQUEST_TIMEOUT) as resp:
if resp.status != 200:
body = await resp.text()
logger.error("[%s] Upstream %s error: %s", request_id, BLACKBOX_URL, resp.status)
if resp.status in RETRYABLE_STATUSES:
raise RetryableStatusError(resp.status, body)
raise HTTPException(status_code=502, detail=f"Upstream error {resp.status}")
if stream:
async for chunk in resp.content.iter_any():
if chunk:
yield chunk.decode("utf-8", "ignore")
else:
yield await resp.text()
@app.middleware("http")
async def add_request_id(request: Request, call_next):
request.state.request_id = rid = str(uuid.uuid4())
logger.info("[%s] %s %s", rid, request.method, request.url.path)
start = time.perf_counter()
resp = await call_next(request)
logger.info("[%s] finished in %.2fs", rid, time.perf_counter() - start)
return resp
@app.get("/")
async def root():
return {"message": "API is running"}
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/v1/chat/completions")
async def chat_completions(request: Request):
rid = request.state.request_id
try:
body = await request.json()
messages = body.get("messages", [])
if not messages:
raise HTTPException(status_code=400, detail="Missing 'messages'")
stream = body.get("stream", False)
# Use exact deepresearch payload if flag is passed
if body.get("test_payload") == "deepsearch":
payload = {
"messages": [{"id": "s2eB86t", "content": "google", "role": "user"}],
"agentMode": {},
"id": "s2eB86t",
"previewToken": None,
"userId": None,
"codeModelMode": True,
"trendingAgentMode": {},
"isMicMode": False,
"userSystemPrompt": None,
"maxTokens": 1024,
"playgroundTopP": None,
"playgroundTemperature": None,
"isChromeExt": False,
"githubToken": "",
"clickedAnswer2": False,
"clickedAnswer3": False,
"clickedForceWebSearch": False,
"visitFromDelta": False,
"isMemoryEnabled": False,
"mobileClient": False,
"userSelectedModel": None,
"validated": "00f37b34-a166-4efb-bce5-1312d87f2f94",
"imageGenerationMode": False,
"webSearchModePrompt": False,
"deepSearchMode": True,
"domains": None,
"vscodeClient": False,
"codeInterpreterMode": False,
"customProfile": {
"name": "",
"occupation": "",
"traits": [],
"additionalInfo": "",
"enableNewChats": False
},
"session": {
"user": {
"name": "S.C gaming",
"email": "simarmanbir@gmail.com",
"image": "https://lh3.googleusercontent.com/a/ACg8ocI-ze5Qe42S-j8xaCL6X7KSVwfiOae4fONqpTxzt0d2_a2FIld1=s96-c",
"id": "100846841133312010974"
},
"expires": "2025-06-09T19:36:08.220Z",
"isNewUser": False
},
"isPremium": True,
"subscriptionCache": {
"status": "PREMIUM",
"customerId": "cus_Rtiok4sPQNoo1c",
"expiryTimestamp": 1749108685,
"lastChecked": 1746822333827,
"isTrialSubscription": True
},
"beastMode": False,
"reasoningMode": False,
"designerMode": False
}
else:
payload = build_payload(messages)
if not stream:
chunks = []
async for part in get_blackbox_response(data=payload, stream=False, request_id=rid):
if part.startswith("Error:"):
raise HTTPException(status_code=502, detail=part)
chunks.append(part)
return {
"id": str(uuid.uuid4()),
"object": "chat.completion",
"created": int(time.time()),
"model": "DeepResearch",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": "".join(chunks)},
"finish_reason": "stop"
}]
}
async def event_stream():
async for chunk in get_blackbox_response(data=payload, stream=True, request_id=rid):
msg = {
"id": str(uuid.uuid4()),
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": "DeepResearch",
"choices": [{"index": 0, "delta": {"content": chunk}}],
}
yield f"data: {json.dumps(msg)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="Invalid JSON")
except RetryError as re:
logger.error("[%s] retries failed: %s", rid, re)
raise HTTPException(status_code=502, detail="Blackbox upstream failed")
except Exception as e:
logger.exception("[%s] error", rid)
raise HTTPException(status_code=500, detail="Internal proxy error")