from __future__ import annotations import os, re, json, uuid, random, string, logging, asyncio from datetime import datetime, timedelta from typing import List, Callable, Any, Optional import httpx from fastapi import FastAPI, HTTPException from fastapi.responses import StreamingResponse, JSONResponse from pydantic import BaseModel, Field # ────────────────────────── logging ────────────────────────────────────── logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO"), format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", ) log = logging.getLogger("snapzion-service") log.info("snapzion service starting …") # ────────────────────────── ENV & constants ───────────────────────────── SYSTEM_PROMPT = os.getenv( "SYSTEM_PROMPT", "You are a prompt-safety model. Decide if the prompt is safe. " "Respond with 'safe' or 'not safe'.", ) SAFETY_API_KEY = os.getenv("SAFETY_API_KEY", "sk-F8l9ALDrJSpVCWJ3G1XbqP09oE3UD09Jf0t4WSlnrSJFdTtX") SAFETY_MODEL_URL = os.getenv( "SAFETY_MODEL_URL", "https://api.typegpt.net/v1/chat/completions", ) MAX_RETRIES = int(os.getenv("MAX_RETRIES", "5")) INITIAL_DELAY = float(os.getenv("INITIAL_DELAY", "0.5")) MAX_DELAY = float(os.getenv("MAX_DELAY", "2.5")) # ────────────────────────── FastAPI / HTTPX ──────────────────────────── app = FastAPI(title="Snapzion Image-Gen API | NAI", version="2.4.1") _http: Optional[httpx.AsyncClient] = None @app.on_event("startup") async def _startup(): global _http _http = httpx.AsyncClient( timeout=30, limits=httpx.Limits(max_connections=100, max_keepalive_connections=40), ) log.info("HTTPX pool ready ✓") # ────────────────────────── Pydantic models ──────────────────────────── class ChatMessage(BaseModel): role: str content: str class ChatRequest(BaseModel): model: str messages: List[ChatMessage] stream: bool = Field(default=False) # ────────────────────────── Helpers ──────────────────────────────────── def _fake_user() -> tuple[str, str, str]: first = random.choice("Alice Bob Carol David Evelyn Frank Grace Hector Ivy Jackie".split()) last = random.choice("Smith Johnson Davis Miller Thompson Garcia Brown Wilson Martin Clark".split()) email = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8)) + "@example.com" cust = "cus_" + ''.join(random.choices(string.ascii_letters + string.digits, k=14)) return f"{first} {last}", email, cust async def _retry(fn: Callable, *a, **kw) -> Any: max_tries = kw.pop("max_retries", MAX_RETRIES) delay = INITIAL_DELAY for n in range(1, max_tries + 1): try: return await fn(*a, **kw) except httpx.HTTPStatusError as exc: if exc.response.status_code == 400: log.warning("%s try %d/%d: HTTP 400 error: %s", fn.__name__, n, max_tries, exc) if n == max_tries: log.error("%s failed after %d tries: HTTP 400 error: %s", fn.__name__, n, exc) raise else: log.error("%s failed with status %d: %s", fn.__name__, exc.response.status_code, exc) raise except Exception as exc: if n == max_tries: log.error("%s failed after %d tries: %s", fn.__name__, n, exc) raise log.warning("%s try %d/%d: %s", fn.__name__, n, max_tries, exc) await asyncio.sleep(delay + random.uniform(0, 0.4)) delay = min(delay * 2, MAX_DELAY) # ────────────────────────── Safety check ─────────────────────────────── async def _raw_safety(prompt: str) -> bool: assert _http hdrs = {"Authorization": f"Bearer {SAFETY_API_KEY}", "Content-Type": "application/json"} payload = { "model": "meta-llama/Meta-Llama-3-8B-Instruct-Lite", "messages": [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": prompt}, ], } r = await _http.post(SAFETY_MODEL_URL, json=payload, headers=hdrs) r.raise_for_status() raw = r.json()["choices"][0]["message"]["content"].strip().lower() log.debug("Safety raw reply: %r", raw) if re.search(r"\b(not\s+safe|unsafe)\b", raw): log.warning("Prompt-safety verdict: NOT SAFE") return False if re.search(r"\bsafe\b", raw): log.info("Prompt-safety verdict: SAFE") return True log.warning("Prompt-safety unknown reply %r → NOT SAFE", raw) return False async def is_safe(prompt: str) -> bool: return await _retry(_raw_safety, prompt) # ────────────────────────── Blackbox Image API ───────────────────────── async def _raw_blackbox(prompt: str) -> str: assert _http name, email, _ = _fake_user() user_id = ''.join(random.choices(string.digits, k=21)) expiry = (datetime.utcnow().replace(microsecond=0) + timedelta(days=30)).isoformat() + "Z" payload = { "query": prompt, "session": { "user": { "name": name, "email": email, "image": "https://lh3.googleusercontent.com/a/ACg8ocI-ze5Qe42S-j8xaCL6X7KSVwfiOae4fONqpTxzt0d2_a2FIld1=s96-c", "id": user_id }, "expires": expiry } } headers = { "accept": "*/*", "accept-language": "en-US,en;q=0.9,ru;q=0.8", "content-type": "text/plain;charset=UTF-8", "origin": "https://www.blackbox.ai", "priority": "u=1, i", "referer": "https://www.blackbox.ai/", "sec-ch-ua": '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/135.0.0.0 Safari/537.36" ), } resp = await _http.post("https://www.blackbox.ai/api/image-generator", json=payload, headers=headers) resp.raise_for_status() try: return resp.json().get("markdown", "").strip() except json.JSONDecodeError: return resp.text.strip() async def blackbox(prompt: str) -> str: return await _retry(_raw_blackbox, prompt) # ────────────────────────── Main route ───────────────────────────────── @app.post("/v1/chat/completions") async def chat(req: ChatRequest): if _http is None: raise HTTPException(503, "HTTP client not ready") user_prompt = next((m.content for m in reversed(req.messages) if m.role == "user"), "") if not user_prompt: raise HTTPException(400, "User prompt missing") try: if not await is_safe(user_prompt): return JSONResponse({"error": "Your prompt is considered unsafe."}, status_code=400) except httpx.HTTPStatusError as exc: return JSONResponse({"error": f"Safety check failed: HTTP {exc.response.status_code}", "reason": str(exc)}, status_code=503) try: md = await blackbox(user_prompt) except httpx.HTTPStatusError as exc: return JSONResponse({"error": f"Image generation failed: HTTP {exc.response.status_code}", "reason": str(exc)}, status_code=503) except Exception as exc: return JSONResponse({"error": "Image generation failed after retries.", "reason": str(exc)}, status_code=503) md = re.sub(r"!\[[^\]]*\]\(https://storage\.googleapis\.com([^\)]*)\)", f"![{user_prompt}](https://cdn.snapzion.com\\1)", md) uid, ts = str(uuid.uuid4()), int(datetime.now().timestamp()) if not req.stream: return { "id": uid, "object": "chat.completion", "created": ts, "model": "Image-Generator", "choices": [{ "index": 0, "message": {"role": "assistant", "content": md}, "finish_reason": "stop", }], "usage": None, } async def sse(): chunk1 = {"id": uid, "object": "chat.completion.chunk", "created": ts, "model": "Image-Generator", "choices": [{"index": 0, "delta": {"role": "assistant", "content": md}, "finish_reason": None}], "usage": None} yield f"data: {json.dumps(chunk1)}\n\n" chunk2 = {"id": uid, "object": "chat.completion.chunk", "created": ts, "model": "Image-Generator", "choices": [{"index": 0, "delta": {"role": "assistant", "content": ""}, "finish_reason": "stop"}], "usage": None} yield f"data: {json.dumps(chunk2)}\n\n" yield "data: [DONE]\n\n" return StreamingResponse(sse(), media_type="text/event-stream")