"""Wrapper around OpenAI APIs.""" from __future__ import annotations import logging import sys from typing import ( Any, Callable, Dict, Generator, List, Mapping, Optional, Set, Tuple, Union, ) from pydantic import BaseModel, Extra, Field, root_validator from tenacity import ( before_sleep_log, retry, retry_if_exception_type, stop_after_attempt, wait_exponential, ) from langchain.llms.base import BaseLLM from langchain.schema import Generation, LLMResult from langchain.utils import get_from_dict_or_env logger = logging.getLogger(__name__) def update_token_usage( keys: Set[str], response: Dict[str, Any], token_usage: Dict[str, Any] ) -> None: """Update token usage.""" _keys_to_use = keys.intersection(response["usage"]) for _key in _keys_to_use: if _key not in token_usage: token_usage[_key] = response["usage"][_key] else: token_usage[_key] += response["usage"][_key] def _update_response(response: Dict[str, Any], stream_response: Dict[str, Any]) -> None: """Update response from the stream response.""" response["choices"][0]["text"] += stream_response["choices"][0]["text"] response["choices"][0]["finish_reason"] = stream_response["choices"][0][ "finish_reason" ] response["choices"][0]["logprobs"] = stream_response["choices"][0]["logprobs"] def _streaming_response_template() -> Dict[str, Any]: return { "choices": [ { "text": "", "finish_reason": None, "logprobs": None, } ] } def _create_retry_decorator(llm: Union[BaseOpenAI, OpenAIChat]) -> Callable[[Any], Any]: import openai min_seconds = 4 max_seconds = 10 # Wait 2^x * 1 second between each retry starting with # 4 seconds, then up to 10 seconds, then 10 seconds afterwards return retry( reraise=True, stop=stop_after_attempt(llm.max_retries), wait=wait_exponential(multiplier=1, min=min_seconds, max=max_seconds), retry=( retry_if_exception_type(openai.error.Timeout) | retry_if_exception_type(openai.error.APIError) | retry_if_exception_type(openai.error.APIConnectionError) | retry_if_exception_type(openai.error.RateLimitError) | retry_if_exception_type(openai.error.ServiceUnavailableError) ), before_sleep=before_sleep_log(logger, logging.WARNING), ) def completion_with_retry(llm: Union[BaseOpenAI, OpenAIChat], **kwargs: Any) -> Any: """Use tenacity to retry the completion call.""" retry_decorator = _create_retry_decorator(llm) @retry_decorator def _completion_with_retry(**kwargs: Any) -> Any: return llm.client.create(**kwargs) return _completion_with_retry(**kwargs) async def acompletion_with_retry( llm: Union[BaseOpenAI, OpenAIChat], **kwargs: Any ) -> Any: """Use tenacity to retry the async completion call.""" retry_decorator = _create_retry_decorator(llm) @retry_decorator async def _completion_with_retry(**kwargs: Any) -> Any: # Use OpenAI's async api https://github.com/openai/openai-python#async-api return await llm.client.acreate(**kwargs) return await _completion_with_retry(**kwargs) class BaseOpenAI(BaseLLM, BaseModel): """Wrapper around OpenAI large language models. To use, you should have the ``openai`` python package installed, and the environment variable ``OPENAI_API_KEY`` set with your API key. Any parameters that are valid to be passed to the openai.create call can be passed in, even if not explicitly saved on this class. Example: .. code-block:: python from langchain.llms import OpenAI openai = OpenAI(model_name="text-davinci-003") """ client: Any #: :meta private: model_name: str = "text-davinci-003" """Model name to use.""" temperature: float = 0.7 """What sampling temperature to use.""" max_tokens: int = 256 """The maximum number of tokens to generate in the completion. -1 returns as many tokens as possible given the prompt and the models maximal context size.""" top_p: float = 1 """Total probability mass of tokens to consider at each step.""" frequency_penalty: float = 0 """Penalizes repeated tokens according to frequency.""" presence_penalty: float = 0 """Penalizes repeated tokens.""" n: int = 1 """How many completions to generate for each prompt.""" best_of: int = 1 """Generates best_of completions server-side and returns the "best".""" model_kwargs: Dict[str, Any] = Field(default_factory=dict) """Holds any model parameters valid for `create` call not explicitly specified.""" openai_api_key: Optional[str] = None batch_size: int = 20 """Batch size to use when passing multiple documents to generate.""" request_timeout: Optional[Union[float, Tuple[float, float]]] = None """Timeout for requests to OpenAI completion API. Default is 600 seconds.""" logit_bias: Optional[Dict[str, float]] = Field(default_factory=dict) """Adjust the probability of specific tokens being generated.""" max_retries: int = 6 """Maximum number of retries to make when generating.""" streaming: bool = False """Whether to stream the results or not.""" def __new__(cls, **data: Any) -> Union[OpenAIChat, BaseOpenAI]: # type: ignore """Initialize the OpenAI object.""" if data.get("model_name", "").startswith("gpt-3.5-turbo"): return OpenAIChat(**data) return super().__new__(cls) class Config: """Configuration for this pydantic object.""" extra = Extra.ignore @root_validator(pre=True, allow_reuse=True) def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]: """Build extra kwargs from additional params that were passed in.""" all_required_field_names = {field.alias for field in cls.__fields__.values()} extra = values.get("model_kwargs", {}) for field_name in list(values): if field_name not in all_required_field_names: if field_name in extra: raise ValueError(f"Found {field_name} supplied twice.") logger.warning( f"""WARNING! {field_name} is not default parameter. {field_name} was transfered to model_kwargs. Please confirm that {field_name} is what you intended.""" ) extra[field_name] = values.pop(field_name) values["model_kwargs"] = extra return values @root_validator(allow_reuse=True) def validate_environment(cls, values: Dict) -> Dict: """Validate that api key and python package exists in environment.""" openai_api_key = get_from_dict_or_env( values, "openai_api_key", "OPENAI_API_KEY" ) try: import openai openai.api_key = openai_api_key values["client"] = openai.Completion except ImportError: raise ValueError( "Could not import openai python package. " "Please it install it with `pip install openai`." ) if values["streaming"] and values["n"] > 1: raise ValueError("Cannot stream results when n > 1.") if values["streaming"] and values.get("best_of") and values["best_of"] > 1: raise ValueError("Cannot stream results when best_of > 1.") return values @property def _default_params(self) -> Dict[str, Any]: """Get the default parameters for calling OpenAI API.""" normal_params = { "temperature": self.temperature, "max_tokens": self.max_tokens, "top_p": self.top_p, "frequency_penalty": self.frequency_penalty, "presence_penalty": self.presence_penalty, "n": self.n, # "best_of": self.best_of, "request_timeout": self.request_timeout, "logit_bias": self.logit_bias, } return {**normal_params, **self.model_kwargs} def _generate( self, prompts: List[str], stop: Optional[List[str]] = None ) -> LLMResult: """Call out to OpenAI's endpoint with k unique prompts. Args: prompts: The prompts to pass into the model. stop: Optional list of stop words to use when generating. Returns: The full LLM output. Example: .. code-block:: python response = openai.generate(["Tell me a joke."]) """ # TODO: write a unit test for this params = self._invocation_params sub_prompts = self.get_sub_prompts(params, prompts, stop) choices = [] token_usage: Dict[str, int] = {} # Get the token usage from the response. # Includes prompt, completion, and total tokens used. _keys = {"completion_tokens", "prompt_tokens", "total_tokens"} for _prompts in sub_prompts: if self.streaming: if len(_prompts) > 1: raise ValueError("Cannot stream results with multiple prompts.") params["stream"] = True response = _streaming_response_template() for stream_resp in completion_with_retry( self, prompt=_prompts, **params ): self.callback_manager.on_llm_new_token( stream_resp["choices"][0]["text"], verbose=self.verbose, logprobs=stream_resp["choices"][0]["logprobs"], ) _update_response(response, stream_resp) choices.extend(response["choices"]) else: response = completion_with_retry(self, prompt=_prompts, **params) choices.extend(response["choices"]) if not self.streaming: # Can't update token usage if streaming update_token_usage(_keys, response, token_usage) return self.create_llm_result(choices, prompts, token_usage) async def _agenerate( self, prompts: List[str], stop: Optional[List[str]] = None ) -> LLMResult: """Call out to OpenAI's endpoint async with k unique prompts.""" params = self._invocation_params sub_prompts = self.get_sub_prompts(params, prompts, stop) choices = [] token_usage: Dict[str, int] = {} # Get the token usage from the response. # Includes prompt, completion, and total tokens used. _keys = {"completion_tokens", "prompt_tokens", "total_tokens"} for _prompts in sub_prompts: if self.streaming: if len(_prompts) > 1: raise ValueError("Cannot stream results with multiple prompts.") params["stream"] = True response = _streaming_response_template() async for stream_resp in await acompletion_with_retry( self, prompt=_prompts, **params ): if self.callback_manager.is_async: await self.callback_manager.on_llm_new_token( stream_resp["choices"][0]["text"], verbose=self.verbose, logprobs=stream_resp["choices"][0]["logprobs"], ) else: self.callback_manager.on_llm_new_token( stream_resp["choices"][0]["text"], verbose=self.verbose, logprobs=stream_resp["choices"][0]["logprobs"], ) _update_response(response, stream_resp) choices.extend(response["choices"]) else: response = await acompletion_with_retry(self, prompt=_prompts, **params) choices.extend(response["choices"]) if not self.streaming: # Can't update token usage if streaming update_token_usage(_keys, response, token_usage) return self.create_llm_result(choices, prompts, token_usage) def get_sub_prompts( self, params: Dict[str, Any], prompts: List[str], stop: Optional[List[str]] = None, ) -> List[List[str]]: """Get the sub prompts for llm call.""" if stop is not None: if "stop" in params: raise ValueError("`stop` found in both the input and default params.") params["stop"] = stop if params["max_tokens"] == -1: if len(prompts) != 1: raise ValueError( "max_tokens set to -1 not supported for multiple inputs." ) params["max_tokens"] = self.max_tokens_for_prompt(prompts[0]) sub_prompts = [ prompts[i : i + self.batch_size] for i in range(0, len(prompts), self.batch_size) ] return sub_prompts def create_llm_result( self, choices: Any, prompts: List[str], token_usage: Dict[str, int] ) -> LLMResult: """Create the LLMResult from the choices and prompts.""" generations = [] for i, _ in enumerate(prompts): sub_choices = choices[i * self.n : (i + 1) * self.n] generations.append( [ Generation( text=choice["text"], generation_info=dict( finish_reason=choice.get("finish_reason"), logprobs=choice.get("logprobs"), ), ) for choice in sub_choices ] ) return LLMResult( generations=generations, llm_output={"token_usage": token_usage} ) def stream(self, prompt: str, stop: Optional[List[str]] = None) -> Generator: """Call OpenAI with streaming flag and return the resulting generator. BETA: this is a beta feature while we figure out the right abstraction. Once that happens, this interface could change. Args: prompt: The prompts to pass into the model. stop: Optional list of stop words to use when generating. Returns: A generator representing the stream of tokens from OpenAI. Example: .. code-block:: python generator = openai.stream("Tell me a joke.") for token in generator: yield token """ params = self.prep_streaming_params(stop) generator = self.client.create(prompt=prompt, **params) return generator def prep_streaming_params(self, stop: Optional[List[str]] = None) -> Dict[str, Any]: """Prepare the params for streaming.""" params = self._invocation_params if params.get('best_of') and params["best_of"] != 1: raise ValueError("OpenAI only supports best_of == 1 for streaming") if stop is not None: if "stop" in params: raise ValueError("`stop` found in both the input and default params.") params["stop"] = stop params["stream"] = True return params @property def _invocation_params(self) -> Dict[str, Any]: """Get the parameters used to invoke the model.""" return self._default_params @property def _identifying_params(self) -> Mapping[str, Any]: """Get the identifying parameters.""" return {**{"model_name": self.model_name}, **self._default_params} @property def _llm_type(self) -> str: """Return type of llm.""" return "openai" def get_num_tokens(self, text: str) -> int: """Calculate num tokens with tiktoken package.""" # tiktoken NOT supported for Python 3.8 or below if sys.version_info[1] <= 8: return super().get_num_tokens(text) try: import tiktoken except ImportError: raise ValueError( "Could not import tiktoken python package. " "This is needed in order to calculate get_num_tokens. " "Please it install it with `pip install tiktoken`." ) encoder = "gpt2" if self.model_name in ("text-davinci-003", "text-davinci-002"): encoder = "p50k_base" if self.model_name.startswith("code"): encoder = "p50k_base" # create a GPT-3 encoder instance enc = tiktoken.get_encoding(encoder) # encode the text using the GPT-3 encoder tokenized_text = enc.encode(text) # calculate the number of tokens in the encoded text return len(tokenized_text) def modelname_to_contextsize(self, modelname: str) -> int: """Calculate the maximum number of tokens possible to generate for a model. text-davinci-003: 4,097 tokens text-curie-001: 2,048 tokens text-babbage-001: 2,048 tokens text-ada-001: 2,048 tokens code-davinci-002: 8,000 tokens code-cushman-001: 2,048 tokens Args: modelname: The modelname we want to know the context size for. Returns: The maximum context size Example: .. code-block:: python max_tokens = openai.modelname_to_contextsize("text-davinci-003") """ if modelname == "text-davinci-003": return 4097 elif modelname == "text-curie-001": return 2048 elif modelname == "text-babbage-001": return 2048 elif modelname == "text-ada-001": return 2048 elif modelname == "code-davinci-002": return 8000 elif modelname == "code-cushman-001": return 2048 else: return 4097 def max_tokens_for_prompt(self, prompt: str) -> int: """Calculate the maximum number of tokens possible to generate for a prompt. Args: prompt: The prompt to pass into the model. Returns: The maximum number of tokens to generate for a prompt. Example: .. code-block:: python max_tokens = openai.max_token_for_prompt("Tell me a joke.") """ num_tokens = self.get_num_tokens(prompt) # get max context size for model by name max_size = self.modelname_to_contextsize(self.model_name) return max_size - num_tokens class OpenAI(BaseOpenAI): """Generic OpenAI class that uses model name.""" @property def _invocation_params(self) -> Dict[str, Any]: return {**{"model": self.model_name}, **super()._invocation_params} class AzureOpenAI(BaseOpenAI): """Azure specific OpenAI class that uses deployment name.""" deployment_name: str = "" """Deployment name to use.""" @property def _identifying_params(self) -> Mapping[str, Any]: return { **{"deployment_name": self.deployment_name}, **super()._identifying_params, } @property def _invocation_params(self) -> Dict[str, Any]: return {**{"engine": self.deployment_name}, **super()._invocation_params} class OpenAIChat(BaseLLM, BaseModel): """Wrapper around OpenAI Chat large language models. To use, you should have the ``openai`` python package installed, and the environment variable ``OPENAI_API_KEY`` set with your API key. Any parameters that are valid to be passed to the openai.create call can be passed in, even if not explicitly saved on this class. Example: .. code-block:: python from langchain.llms import OpenAIChat openaichat = OpenAIChat(model_name="gpt-3.5-turbo") """ client: Any #: :meta private: model_name: str = "gpt-3.5-turbo" """Model name to use.""" model_kwargs: Dict[str, Any] = Field(default_factory=dict) """Holds any model parameters valid for `create` call not explicitly specified.""" openai_api_key: Optional[str] = None max_retries: int = 6 """Maximum number of retries to make when generating.""" prefix_messages: List = Field(default_factory=list) """Series of messages for Chat input.""" streaming: bool = False """Whether to stream the results or not.""" class Config: """Configuration for this pydantic object.""" extra = Extra.ignore @root_validator(pre=True, allow_reuse=True) def build_extra(cls, values: Dict[str, Any]) -> Dict[str, Any]: """Build extra kwargs from additional params that were passed in.""" all_required_field_names = {field.alias for field in cls.__fields__.values()} extra = values.get("model_kwargs", {}) for field_name in list(values): if field_name not in all_required_field_names: if field_name in extra: raise ValueError(f"Found {field_name} supplied twice.") extra[field_name] = values.pop(field_name) values["model_kwargs"] = extra return values @root_validator(allow_reuse=True) def validate_environment(cls, values: Dict) -> Dict: """Validate that api key and python package exists in environment.""" openai_api_key = get_from_dict_or_env( values, "openai_api_key", "OPENAI_API_KEY" ) try: import openai openai.api_key = openai_api_key except ImportError: raise ValueError( "Could not import openai python package. " "Please it install it with `pip install openai`." ) try: values["client"] = openai.ChatCompletion except AttributeError: raise ValueError( "`openai` has no `ChatCompletion` attribute, this is likely " "due to an old version of the openai package. Try upgrading it " "with `pip install --upgrade openai`." ) return values @property def _default_params(self) -> Dict[str, Any]: """Get the default parameters for calling OpenAI API.""" return self.model_kwargs def _get_chat_params( self, prompts: List[str], stop: Optional[List[str]] = None ) -> Tuple: if len(prompts) > 1: raise ValueError( f"OpenAIChat currently only supports single prompt, got {prompts}" ) messages = self.prefix_messages + [{"role": "user", "content": prompts[0]}] params: Dict[str, Any] = {**{"model": self.model_name}, **self._default_params} if stop is not None: if "stop" in params: raise ValueError("`stop` found in both the input and default params.") params["stop"] = stop return messages, params def _generate( self, prompts: List[str], stop: Optional[List[str]] = None ) -> LLMResult: messages, params = self._get_chat_params(prompts, stop) if self.streaming: response = "" params["stream"] = True for stream_resp in completion_with_retry(self, messages=messages, **params): token = stream_resp["choices"][0]["delta"].get("content", "") response += token self.callback_manager.on_llm_new_token( token, verbose=self.verbose, ) return LLMResult( generations=[[Generation(text=response)]], ) else: full_response = completion_with_retry(self, messages=messages, **params) return LLMResult( generations=[ [Generation(text=full_response["choices"][0]["message"]["content"])] ], llm_output={"token_usage": full_response["usage"]}, ) async def _agenerate( self, prompts: List[str], stop: Optional[List[str]] = None ) -> LLMResult: messages, params = self._get_chat_params(prompts, stop) if self.streaming: response = "" params["stream"] = True async for stream_resp in await acompletion_with_retry( self, messages=messages, **params ): token = stream_resp["choices"][0]["delta"].get("content", "") response += token if self.callback_manager.is_async: await self.callback_manager.on_llm_new_token( token, verbose=self.verbose, ) else: self.callback_manager.on_llm_new_token( token, verbose=self.verbose, ) return LLMResult( generations=[[Generation(text=response)]], ) else: full_response = await acompletion_with_retry( self, messages=messages, **params ) return LLMResult( generations=[ [Generation(text=full_response["choices"][0]["message"]["content"])] ], llm_output={"token_usage": full_response["usage"]}, ) @property def _identifying_params(self) -> Mapping[str, Any]: """Get the identifying parameters.""" return {**{"model_name": self.model_name}, **self._default_params} @property def _llm_type(self) -> str: """Return type of llm.""" return "openai-chat" class AzureOpenAIChat(OpenAIChat): """Azure specific OpenAI class that uses deployment name.""" deployment_name: str = "" """Deployment name to use.""" @property def _identifying_params(self) -> Mapping[str, Any]: return { **{"deployment_name": self.deployment_name}, **super()._identifying_params, }