gpt / gpt4free /g4f /__init__.py
LoRDxdd's picture
Add gpt4free API for Hugging Face
a4b70d9
from __future__ import annotations
import os
import logging
from typing import Union, Optional, Coroutine
from . import debug, version
from .models import Model
from .client import Client, AsyncClient
from .typing import Messages, CreateResult, AsyncResult, ImageType
from .cookies import get_cookies, set_cookies
from .providers.types import ProviderType
from .providers.helper import concat_chunks, async_concat_chunks
from .client.service import get_model_and_provider
# Configure logger
logger = logging.getLogger("g4f")
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(logging.BASIC_FORMAT))
logger.addHandler(handler)
logger.setLevel(logging.ERROR)
class ChatCompletion:
@staticmethod
def _prepare_request(model: Union[Model, str],
messages: Messages,
provider: Union[ProviderType, str, None],
stream: bool,
image: ImageType,
image_name: Optional[str],
ignore_working: bool,
ignore_stream: bool,
**kwargs):
"""Shared pre-processing for sync/async create methods."""
if image is not None:
kwargs["media"] = [(image, image_name)]
elif "images" in kwargs:
kwargs["media"] = kwargs.pop("images")
model, provider = get_model_and_provider(
model, provider, stream,
ignore_working,
ignore_stream,
has_images="media" in kwargs,
)
if "proxy" not in kwargs:
proxy = os.environ.get("G4F_PROXY")
if proxy:
kwargs["proxy"] = proxy
if ignore_stream:
kwargs["ignore_stream"] = True
return model, provider, kwargs
@staticmethod
def create(model: Union[Model, str],
messages: Messages,
provider: Union[ProviderType, str, None] = None,
stream: bool = False,
image: ImageType = None,
image_name: Optional[str] = None,
ignore_working: bool = False,
ignore_stream: bool = False,
**kwargs) -> Union[CreateResult, str]:
model, provider, kwargs = ChatCompletion._prepare_request(
model, messages, provider, stream, image, image_name,
ignore_working, ignore_stream, **kwargs
)
result = provider.create_function(model, messages, stream=stream, **kwargs)
return result if stream or ignore_stream else concat_chunks(result)
@staticmethod
def create_async(model: Union[Model, str],
messages: Messages,
provider: Union[ProviderType, str, None] = None,
stream: bool = False,
image: ImageType = None,
image_name: Optional[str] = None,
ignore_working: bool = False,
ignore_stream: bool = False,
**kwargs) -> Union[AsyncResult, Coroutine[str]]:
model, provider, kwargs = ChatCompletion._prepare_request(
model, messages, provider, stream, image, image_name,
ignore_working, ignore_stream, **kwargs
)
result = provider.async_create_function(model, messages, stream=stream, **kwargs)
if not stream and not ignore_stream and hasattr(result, "__aiter__"):
result = async_concat_chunks(result)
return result