from __future__ import annotations from asyncio import AbstractEventLoop from concurrent.futures import ThreadPoolExecutor from abc import ABC, abstractmethod from .helper import get_event_loop, get_cookies, format_prompt from ..typing import AsyncGenerator, CreateResult class BaseProvider(ABC): url: str working = False needs_auth = False supports_stream = False supports_gpt_35_turbo = False supports_gpt_4 = False @staticmethod @abstractmethod def create_completion( model: str, messages: list[dict[str, str]], stream: bool, **kwargs ) -> CreateResult: raise NotImplementedError() @classmethod async def create_async( cls, model: str, messages: list[dict[str, str]], *, loop: AbstractEventLoop = None, executor: ThreadPoolExecutor = None, **kwargs ) -> str: if not loop: loop = get_event_loop() def create_func(): return "".join(cls.create_completion( model, messages, False, **kwargs )) return await loop.run_in_executor( executor, create_func ) @classmethod @property def params(cls): params = [ ("model", "str"), ("messages", "list[dict[str, str]]"), ("stream", "bool"), ] param = ", ".join([": ".join(p) for p in params]) return f"g4f.provider.{cls.__name__} supports: ({param})" class AsyncProvider(BaseProvider): @classmethod def create_completion( cls, model: str, messages: list[dict[str, str]], stream: bool = False, **kwargs ) -> CreateResult: loop = get_event_loop() coro = cls.create_async(model, messages, **kwargs) yield loop.run_until_complete(coro) @staticmethod @abstractmethod async def create_async( model: str, messages: list[dict[str, str]], **kwargs ) -> str: raise NotImplementedError() class AsyncGeneratorProvider(AsyncProvider): supports_stream = True @classmethod def create_completion( cls, model: str, messages: list[dict[str, str]], stream: bool = True, **kwargs ) -> CreateResult: loop = get_event_loop() generator = cls.create_async_generator( model, messages, stream=stream, **kwargs ) gen = generator.__aiter__() while True: try: yield loop.run_until_complete(gen.__anext__()) except StopAsyncIteration: break @classmethod async def create_async( cls, model: str, messages: list[dict[str, str]], **kwargs ) -> str: return "".join([ chunk async for chunk in cls.create_async_generator( model, messages, stream=False, **kwargs ) ]) @staticmethod @abstractmethod def create_async_generator( model: str, messages: list[dict[str, str]], **kwargs ) -> AsyncGenerator: raise NotImplementedError()