from __future__ import annotations import json import time import base64 from curl_cffi.requests import AsyncSession from .base_provider import AsyncProvider, format_prompt class PerplexityAi(AsyncProvider): url = "https://www.perplexity.ai" working = True supports_gpt_35_turbo = True _sources = [] @classmethod async def create_async( cls, model: str, messages: list[dict[str, str]], proxy: str = None, **kwargs ) -> str: url = cls.url + "/socket.io/?EIO=4&transport=polling" async with AsyncSession(proxies={"https": proxy}, impersonate="chrome107") as session: url_session = "https://www.perplexity.ai/api/auth/session" response = await session.get(url_session) response = await session.get(url, params={"t": timestamp()}) response.raise_for_status() sid = json.loads(response.text[1:])["sid"] data = '40{"jwt":"anonymous-ask-user"}' response = await session.post(url, params={"t": timestamp(), "sid": sid}, data=data) response.raise_for_status() data = "424" + json.dumps([ "perplexity_ask", format_prompt(messages), { "version":"2.1", "source":"default", "language":"en", "timezone": time.tzname[0], "search_focus":"internet", "mode":"concise" } ]) response = await session.post(url, params={"t": timestamp(), "sid": sid}, data=data) response.raise_for_status() while True: response = await session.get(url, params={"t": timestamp(), "sid": sid}) response.raise_for_status() for line in response.text.splitlines(): if line.startswith("434"): result = json.loads(json.loads(line[3:])[0]["text"]) cls._sources = [{ "name": source["name"], "url": source["url"], "snippet": source["snippet"] } for source in result["web_results"]] return result["answer"] @classmethod def get_sources(cls): return cls._sources @classmethod @property def params(cls): params = [ ("model", "str"), ("messages", "list[dict[str, str]]"), ("stream", "bool"), ("proxy", "str"), ] param = ", ".join([": ".join(p) for p in params]) return f"g4f.provider.{cls.__name__} supports: ({param})" def timestamp() -> str: return base64.urlsafe_b64encode(int(time.time()-1407782612).to_bytes(4, 'big')).decode()