Spaces:
Running
Running
| """ | |
| OpenAI-compatible API wrapping Perplexity Ask (free/anonymous). | |
| Hosted on Hugging Face Spaces (Docker). | |
| """ | |
| import json | |
| import uuid | |
| import time | |
| import threading | |
| from datetime import datetime | |
| from typing import Optional | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.responses import StreamingResponse, JSONResponse | |
| from pydantic import BaseModel, Field | |
| # ββ Scraping libs ββββββββββββββββββββββββββββββββββββββββββββββ | |
| try: | |
| from curl_cffi.requests import Session as CurlSession | |
| HAS_CURL_CFFI = True | |
| except ImportError: | |
| HAS_CURL_CFFI = False | |
| try: | |
| import cloudscraper | |
| HAS_CLOUDSCRAPER = True | |
| except ImportError: | |
| HAS_CLOUDSCRAPER = False | |
| # ββ Constants ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BASE_URL = "https://www.perplexity.ai" | |
| ASK_URL = f"{BASE_URL}/rest/sse/perplexity_ask" | |
| MAX_RETRIES = 3 | |
| RETRY_DELAY = 2 | |
| TARGET_USAGE = "ask_text_0_markdown" | |
| MODEL_NAME = "perplexity-ask" | |
| HEADERS = { | |
| "Accept": "text/event-stream", | |
| "Accept-Language": "fr,fr-FR;q=0.9,en-US;q=0.8,en;q=0.7", | |
| "Referer": f"{BASE_URL}/", | |
| "Origin": BASE_URL, | |
| "content-type": "application/json", | |
| "X-Perplexity-Request-Reason": "perplexity-query-state-provider", | |
| "DNT": "1", | |
| "Sec-GPC": "1", | |
| "Sec-Fetch-Dest": "empty", | |
| "Sec-Fetch-Mode": "cors", | |
| "Sec-Fetch-Site": "same-origin", | |
| "Cache-Control": "no-cache", | |
| "Pragma": "no-cache", | |
| } | |
| # ββ Session Pool (thread-safe) ββββββββββββββββββββββββββββββββ | |
| class SessionManager: | |
| """Manages a reusable scraping session with automatic refresh.""" | |
| def __init__(self): | |
| self._lock = threading.Lock() | |
| self._session = None | |
| self._backend: Optional[str] = None | |
| self._created_at: float = 0 | |
| self._max_age: float = 300 # refresh every 5 min | |
| def _check_cloudflare(self, status_code: int, body: str = ""): | |
| if status_code in (403, 503) and ( | |
| "cloudflare" in body.lower() or "cf-ray" in body.lower() | |
| ): | |
| raise RuntimeError(f"Blocked by Cloudflare (HTTP {status_code})") | |
| def _build_session(self): | |
| """Try curl_cffi then cloudscraper.""" | |
| if HAS_CURL_CFFI: | |
| try: | |
| s = CurlSession(impersonate="chrome120") | |
| r = s.get(BASE_URL, timeout=20) | |
| self._check_cloudflare(r.status_code, r.text) | |
| r.raise_for_status() | |
| print(f"[session] curl_cffi OK β cookies: {list(s.cookies.keys())}") | |
| return s, "curl_cffi" | |
| except Exception as e: | |
| print(f"[session] curl_cffi failed: {e}") | |
| if HAS_CLOUDSCRAPER: | |
| try: | |
| s = cloudscraper.create_scraper( | |
| browser={ | |
| "browser": "chrome", | |
| "platform": "windows", | |
| "mobile": False, | |
| } | |
| ) | |
| r = s.get(BASE_URL, timeout=20) | |
| self._check_cloudflare(r.status_code, r.text) | |
| r.raise_for_status() | |
| print(f"[session] cloudscraper OK β cookies: {list(s.cookies.keys())}") | |
| return s, "cloudscraper" | |
| except Exception as e: | |
| print(f"[session] cloudscraper failed: {e}") | |
| raise RuntimeError("No scraping backend available") | |
| def get(self): | |
| with self._lock: | |
| now = time.time() | |
| if self._session is None or (now - self._created_at) > self._max_age: | |
| self._session, self._backend = self._build_session() | |
| self._created_at = now | |
| return self._session | |
| def invalidate(self): | |
| with self._lock: | |
| self._session = None | |
| sessions = SessionManager() | |
| # ββ Perplexity core βββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_payload(query: str) -> dict: | |
| return { | |
| "params": { | |
| "attachments": [], | |
| "language": "en-US", | |
| "timezone": "Europe/Paris", | |
| "search_focus": "internet", | |
| "sources": ["web"], | |
| "frontend_uuid": str(uuid.uuid4()), | |
| "mode": "copilot", | |
| "model_preference": "turbo", | |
| "is_related_query": False, | |
| "is_sponsored": False, | |
| "frontend_context_uuid": str(uuid.uuid4()), | |
| "prompt_source": "user", | |
| "query_source": "home", | |
| "is_incognito": False, | |
| "use_schematized_api": True, | |
| "send_back_text_in_streaming_api": False, | |
| "supported_block_use_cases": [ | |
| "answer_modes", "media_items", "knowledge_cards", | |
| "inline_entity_cards", "place_widgets", "finance_widgets", | |
| "news_widgets", "search_result_widgets", "inline_images", | |
| "diff_blocks", "answer_tabs", "in_context_suggestions", | |
| ], | |
| "skip_search_enabled": True, | |
| "source": "default", | |
| "version": "2.18", | |
| }, | |
| "query_str": query, | |
| } | |
| def _extract_chunks(patch: dict) -> list[str]: | |
| op = patch.get("op") | |
| path = patch.get("path", "") | |
| if op == "replace" and path == "": | |
| return patch.get("value", {}).get("chunks", []) | |
| if op == "add" and "/chunks/" in path: | |
| v = patch.get("value", "") | |
| return [v] if v else [] | |
| return [] | |
| def _parse_stream_full(resp) -> tuple[str, list[dict]]: | |
| """Parse entire SSE stream, return (answer, sources).""" | |
| full = "" | |
| sources = [] | |
| seen_urls = set() | |
| for raw_line in resp.iter_lines(): | |
| if isinstance(raw_line, bytes): | |
| raw_line = raw_line.decode("utf-8", errors="replace") | |
| if not raw_line or not raw_line.startswith("data:"): | |
| continue | |
| json_str = raw_line[len("data:"):].strip() | |
| if not json_str or json_str == "{}": | |
| continue | |
| try: | |
| event = json.loads(json_str) | |
| except json.JSONDecodeError: | |
| continue | |
| is_final = event.get("final_sse_message") or event.get("final") | |
| for block in event.get("blocks", []): | |
| usage = block.get("intended_usage", "") | |
| # sources | |
| for key in ("web_result_block", "sources_mode_block"): | |
| for wr in block.get(key, {}).get("web_results", []): | |
| url = wr.get("url", "") | |
| if url and url not in seen_urls: | |
| seen_urls.add(url) | |
| sources.append({ | |
| "name": wr.get("name", ""), | |
| "url": url, | |
| "snippet": wr.get("snippet", ""), | |
| }) | |
| pb = block.get("plan_block", {}) | |
| for step in pb.get("steps", []): | |
| for wr in step.get("web_results_content", {}).get("web_results", []): | |
| url = wr.get("url", "") | |
| if url and url not in seen_urls: | |
| seen_urls.add(url) | |
| sources.append({ | |
| "name": wr.get("name", ""), | |
| "url": url, | |
| "snippet": wr.get("snippet", ""), | |
| }) | |
| if usage != TARGET_USAGE: | |
| continue | |
| diff = block.get("diff_block", {}) | |
| if diff.get("field") == "markdown_block": | |
| for patch in diff.get("patches", []): | |
| for chunk in _extract_chunks(patch): | |
| if chunk: | |
| full += chunk | |
| if is_final: | |
| md = block.get("markdown_block", {}) | |
| if md.get("answer"): | |
| full = md["answer"] | |
| return full, sources | |
| def _iter_stream_chunks(resp): | |
| """Yield text chunks as they arrive (for SSE streaming).""" | |
| for raw_line in resp.iter_lines(): | |
| if isinstance(raw_line, bytes): | |
| raw_line = raw_line.decode("utf-8", errors="replace") | |
| if not raw_line or not raw_line.startswith("data:"): | |
| continue | |
| json_str = raw_line[len("data:"):].strip() | |
| if not json_str or json_str == "{}": | |
| continue | |
| try: | |
| event = json.loads(json_str) | |
| except json.JSONDecodeError: | |
| continue | |
| is_final = event.get("final_sse_message") or event.get("final") | |
| for block in event.get("blocks", []): | |
| usage = block.get("intended_usage", "") | |
| if usage != TARGET_USAGE: | |
| continue | |
| diff = block.get("diff_block", {}) | |
| if diff.get("field") == "markdown_block": | |
| for patch in diff.get("patches", []): | |
| for chunk in _extract_chunks(patch): | |
| if chunk: | |
| yield chunk | |
| if is_final: | |
| md = block.get("markdown_block", {}) | |
| if md.get("answer"): | |
| yield md["answer"] | |
| def _do_request(query: str, stream: bool = False): | |
| """ | |
| Send query to Perplexity. Returns response object for streaming | |
| or (answer, sources) tuple for non-streaming. | |
| """ | |
| payload = _build_payload(query) | |
| headers = {**HEADERS, "X-Request-ID": str(uuid.uuid4())} | |
| last_err = None | |
| for attempt in range(1, MAX_RETRIES + 1): | |
| try: | |
| session = sessions.get() | |
| resp = session.post( | |
| ASK_URL, headers=headers, json=payload, stream=True, timeout=60 | |
| ) | |
| if resp.status_code in (403, 503): | |
| body = "" | |
| try: | |
| body = resp.text[:500] | |
| except Exception: | |
| pass | |
| sessions.invalidate() | |
| raise RuntimeError( | |
| f"Blocked (HTTP {resp.status_code})" | |
| ) | |
| resp.raise_for_status() | |
| if stream: | |
| return resp # caller will iterate | |
| return _parse_stream_full(resp) | |
| except Exception as e: | |
| last_err = e | |
| print(f"[ask] attempt {attempt}/{MAX_RETRIES} failed: {e}") | |
| sessions.invalidate() | |
| if attempt < MAX_RETRIES: | |
| time.sleep(RETRY_DELAY) | |
| raise RuntimeError(f"All retries failed: {last_err}") | |
| # ββ Pydantic models (OpenAI-compatible) βββββββββββββββββββββββ | |
| class ChatMessage(BaseModel): | |
| role: str = "user" | |
| content: str = "" | |
| class ChatCompletionRequest(BaseModel): | |
| model: str = MODEL_NAME | |
| messages: list[ChatMessage] | |
| stream: bool = False | |
| temperature: Optional[float] = None | |
| max_tokens: Optional[int] = None | |
| # ββ FastAPI app βββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI( | |
| title="Perplexity Ask β OpenAI Compatible API", | |
| version="1.0.0", | |
| ) | |
| def _messages_to_query(messages: list[ChatMessage]) -> str: | |
| """ | |
| Collapse the chat messages into a single query string. | |
| Uses the last user message; prepends system prompt if present. | |
| """ | |
| system_parts = [] | |
| user_query = "" | |
| for m in messages: | |
| if m.role == "system": | |
| system_parts.append(m.content) | |
| elif m.role == "user": | |
| user_query = m.content # take last user msg | |
| if system_parts: | |
| return "\n".join(system_parts) + "\n\n" + user_query | |
| return user_query | |
| def _make_chat_completion(answer: str, sources: list[dict], req_id: str) -> dict: | |
| """Build an OpenAI-style ChatCompletion response.""" | |
| # Append sources as footnotes | |
| if sources: | |
| answer += "\n\n---\n**Sources:**\n" | |
| for i, s in enumerate(sources, 1): | |
| answer += f"{i}. [{s.get('name', 'Link')}]({s.get('url', '')})\n" | |
| return { | |
| "id": req_id, | |
| "object": "chat.completion", | |
| "created": int(time.time()), | |
| "model": MODEL_NAME, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "message": {"role": "assistant", "content": answer}, | |
| "finish_reason": "stop", | |
| } | |
| ], | |
| "usage": { | |
| "prompt_tokens": 0, | |
| "completion_tokens": 0, | |
| "total_tokens": 0, | |
| }, | |
| } | |
| def _stream_openai_chunks(query: str, req_id: str): | |
| """Generator yielding SSE lines in OpenAI streaming format.""" | |
| try: | |
| resp = _do_request(query, stream=True) | |
| for chunk_text in _iter_stream_chunks(resp): | |
| data = { | |
| "id": req_id, | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": MODEL_NAME, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {"content": chunk_text}, | |
| "finish_reason": None, | |
| } | |
| ], | |
| } | |
| yield f"data: {json.dumps(data)}\n\n" | |
| # Final chunk | |
| final = { | |
| "id": req_id, | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": MODEL_NAME, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {}, | |
| "finish_reason": "stop", | |
| } | |
| ], | |
| } | |
| yield f"data: {json.dumps(final)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| except Exception as e: | |
| err = { | |
| "id": req_id, | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": MODEL_NAME, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {"content": f"\n\n[ERROR] {e}"}, | |
| "finish_reason": "stop", | |
| } | |
| ], | |
| } | |
| yield f"data: {json.dumps(err)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| # ββ Endpoints βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| async def root(): | |
| return { | |
| "message": "Perplexity Ask API β OpenAI compatible", | |
| "endpoints": [ | |
| "/v1/models", | |
| "/v1/chat/completions", | |
| "/health", | |
| ], | |
| } | |
| async def health(): | |
| return {"status": "ok"} | |
| async def list_models(): | |
| return { | |
| "object": "list", | |
| "data": [ | |
| { | |
| "id": MODEL_NAME, | |
| "object": "model", | |
| "created": 1700000000, | |
| "owned_by": "perplexity-community", | |
| } | |
| ], | |
| } | |
| async def chat_completions(req: ChatCompletionRequest): | |
| query = _messages_to_query(req.messages) | |
| if not query.strip(): | |
| raise HTTPException(status_code=400, detail="Empty query") | |
| req_id = f"chatcmpl-{uuid.uuid4().hex[:12]}" | |
| # ββ Streaming ββ | |
| if req.stream: | |
| return StreamingResponse( | |
| _stream_openai_chunks(query, req_id), | |
| media_type="text/event-stream", | |
| headers={ | |
| "Cache-Control": "no-cache", | |
| "X-Accel-Buffering": "no", | |
| }, | |
| ) | |
| # ββ Non-streaming ββ | |
| try: | |
| answer, sources = _do_request(query, stream=False) | |
| except RuntimeError as e: | |
| raise HTTPException(status_code=502, detail=str(e)) | |
| if not answer: | |
| raise HTTPException(status_code=502, detail="No answer received from Perplexity") | |
| return JSONResponse(_make_chat_completion(answer, sources, req_id)) | |
| # ββ Catch-all for /chat/completions without /v1 prefix ββββββββ | |
| async def chat_completions_no_prefix(req: ChatCompletionRequest): | |
| return await chat_completions(req) |