nnnn / litellm /llms /openai.py
nonhuman's picture
Upload 165 files
395201c
from typing import Optional, Union, Any
import types, time, json
import httpx
from .base import BaseLLM
from litellm.utils import ModelResponse, Choices, Message, CustomStreamWrapper, convert_to_model_response_object, Usage
from typing import Callable, Optional
import aiohttp, requests
import litellm
from .prompt_templates.factory import prompt_factory, custom_prompt
from openai import OpenAI, AsyncOpenAI
class OpenAIError(Exception):
def __init__(self, status_code, message, request: Optional[httpx.Request]=None, response: Optional[httpx.Response]=None):
self.status_code = status_code
self.message = message
if request:
self.request = request
else:
self.request = httpx.Request(method="POST", url="https://api.openai.com/v1")
if response:
self.response = response
else:
self.response = httpx.Response(status_code=status_code, request=self.request)
super().__init__(
self.message
) # Call the base class constructor with the parameters it needs
class OpenAIConfig():
"""
Reference: https://platform.openai.com/docs/api-reference/chat/create
The class `OpenAIConfig` provides configuration for the OpenAI's Chat API interface. Below are the parameters:
- `frequency_penalty` (number or null): Defaults to 0. Allows a value between -2.0 and 2.0. Positive values penalize new tokens based on their existing frequency in the text so far, thereby minimizing repetition.
- `function_call` (string or object): This optional parameter controls how the model calls functions.
- `functions` (array): An optional parameter. It is a list of functions for which the model may generate JSON inputs.
- `logit_bias` (map): This optional parameter modifies the likelihood of specified tokens appearing in the completion.
- `max_tokens` (integer or null): This optional parameter helps to set the maximum number of tokens to generate in the chat completion.
- `n` (integer or null): This optional parameter helps to set how many chat completion choices to generate for each input message.
- `presence_penalty` (number or null): Defaults to 0. It penalizes new tokens based on if they appear in the text so far, hence increasing the model's likelihood to talk about new topics.
- `stop` (string / array / null): Specifies up to 4 sequences where the API will stop generating further tokens.
- `temperature` (number or null): Defines the sampling temperature to use, varying between 0 and 2.
- `top_p` (number or null): An alternative to sampling with temperature, used for nucleus sampling.
"""
frequency_penalty: Optional[int]=None
function_call: Optional[Union[str, dict]]=None
functions: Optional[list]=None
logit_bias: Optional[dict]=None
max_tokens: Optional[int]=None
n: Optional[int]=None
presence_penalty: Optional[int]=None
stop: Optional[Union[str, list]]=None
temperature: Optional[int]=None
top_p: Optional[int]=None
def __init__(self,
frequency_penalty: Optional[int]=None,
function_call: Optional[Union[str, dict]]=None,
functions: Optional[list]=None,
logit_bias: Optional[dict]=None,
max_tokens: Optional[int]=None,
n: Optional[int]=None,
presence_penalty: Optional[int]=None,
stop: Optional[Union[str, list]]=None,
temperature: Optional[int]=None,
top_p: Optional[int]=None,) -> None:
locals_ = locals()
for key, value in locals_.items():
if key != 'self' and value is not None:
setattr(self.__class__, key, value)
@classmethod
def get_config(cls):
return {k: v for k, v in cls.__dict__.items()
if not k.startswith('__')
and not isinstance(v, (types.FunctionType, types.BuiltinFunctionType, classmethod, staticmethod))
and v is not None}
class OpenAITextCompletionConfig():
"""
Reference: https://platform.openai.com/docs/api-reference/completions/create
The class `OpenAITextCompletionConfig` provides configuration for the OpenAI's text completion API interface. Below are the parameters:
- `best_of` (integer or null): This optional parameter generates server-side completions and returns the one with the highest log probability per token.
- `echo` (boolean or null): This optional parameter will echo back the prompt in addition to the completion.
- `frequency_penalty` (number or null): Defaults to 0. It is a numbers from -2.0 to 2.0, where positive values decrease the model's likelihood to repeat the same line.
- `logit_bias` (map): This optional parameter modifies the likelihood of specified tokens appearing in the completion.
- `logprobs` (integer or null): This optional parameter includes the log probabilities on the most likely tokens as well as the chosen tokens.
- `max_tokens` (integer or null): This optional parameter sets the maximum number of tokens to generate in the completion.
- `n` (integer or null): This optional parameter sets how many completions to generate for each prompt.
- `presence_penalty` (number or null): Defaults to 0 and can be between -2.0 and 2.0. Positive values increase the model's likelihood to talk about new topics.
- `stop` (string / array / null): Specifies up to 4 sequences where the API will stop generating further tokens.
- `suffix` (string or null): Defines the suffix that comes after a completion of inserted text.
- `temperature` (number or null): This optional parameter defines the sampling temperature to use.
- `top_p` (number or null): An alternative to sampling with temperature, used for nucleus sampling.
"""
best_of: Optional[int]=None
echo: Optional[bool]=None
frequency_penalty: Optional[int]=None
logit_bias: Optional[dict]=None
logprobs: Optional[int]=None
max_tokens: Optional[int]=None
n: Optional[int]=None
presence_penalty: Optional[int]=None
stop: Optional[Union[str, list]]=None
suffix: Optional[str]=None
temperature: Optional[float]=None
top_p: Optional[float]=None
def __init__(self,
best_of: Optional[int]=None,
echo: Optional[bool]=None,
frequency_penalty: Optional[int]=None,
logit_bias: Optional[dict]=None,
logprobs: Optional[int]=None,
max_tokens: Optional[int]=None,
n: Optional[int]=None,
presence_penalty: Optional[int]=None,
stop: Optional[Union[str, list]]=None,
suffix: Optional[str]=None,
temperature: Optional[float]=None,
top_p: Optional[float]=None) -> None:
locals_ = locals()
for key, value in locals_.items():
if key != 'self' and value is not None:
setattr(self.__class__, key, value)
@classmethod
def get_config(cls):
return {k: v for k, v in cls.__dict__.items()
if not k.startswith('__')
and not isinstance(v, (types.FunctionType, types.BuiltinFunctionType, classmethod, staticmethod))
and v is not None}
class OpenAIChatCompletion(BaseLLM):
def __init__(self) -> None:
super().__init__()
def completion(self,
model_response: ModelResponse,
timeout: float,
model: Optional[str]=None,
messages: Optional[list]=None,
print_verbose: Optional[Callable]=None,
api_key: Optional[str]=None,
api_base: Optional[str]=None,
acompletion: bool = False,
logging_obj=None,
optional_params=None,
litellm_params=None,
logger_fn=None,
headers: Optional[dict]=None,
custom_prompt_dict: dict={},
client=None
):
super().completion()
exception_mapping_worked = False
try:
if headers:
optional_params["extra_headers"] = headers
if model is None or messages is None:
raise OpenAIError(status_code=422, message=f"Missing model or messages")
if not isinstance(timeout, float):
raise OpenAIError(status_code=422, message=f"Timeout needs to be a float")
for _ in range(2): # if call fails due to alternating messages, retry with reformatted message
data = {
"model": model,
"messages": messages,
**optional_params
}
## LOGGING
logging_obj.pre_call(
input=messages,
api_key=api_key,
additional_args={"headers": headers, "api_base": api_base, "acompletion": acompletion, "complete_input_dict": data},
)
try:
max_retries = data.pop("max_retries", 2)
if acompletion is True:
if optional_params.get("stream", False):
return self.async_streaming(logging_obj=logging_obj, data=data, model=model, api_base=api_base, api_key=api_key, timeout=timeout, client=client, max_retries=max_retries)
else:
return self.acompletion(data=data, model_response=model_response, api_base=api_base, api_key=api_key, timeout=timeout, client=client, max_retries=max_retries)
elif optional_params.get("stream", False):
return self.streaming(logging_obj=logging_obj, data=data, model=model, api_base=api_base, api_key=api_key, timeout=timeout, client=client, max_retries=max_retries)
else:
if not isinstance(max_retries, int):
raise OpenAIError(status_code=422, message="max retries must be an int")
if client is None:
openai_client = OpenAI(api_key=api_key, base_url=api_base, http_client=litellm.client_session, timeout=timeout, max_retries=max_retries)
else:
openai_client = client
response = openai_client.chat.completions.create(**data) # type: ignore
logging_obj.post_call(
input=None,
api_key=api_key,
original_response=response,
additional_args={"complete_input_dict": data},
)
return convert_to_model_response_object(response_object=json.loads(response.model_dump_json()), model_response_object=model_response)
except Exception as e:
if "Conversation roles must alternate user/assistant" in str(e) or "user and assistant roles should be alternating" in str(e):
# reformat messages to ensure user/assistant are alternating, if there's either 2 consecutive 'user' messages or 2 consecutive 'assistant' message, add a blank 'user' or 'assistant' message to ensure compatibility
new_messages = []
for i in range(len(messages)-1):
new_messages.append(messages[i])
if messages[i]["role"] == messages[i+1]["role"]:
if messages[i]["role"] == "user":
new_messages.append({"role": "assistant", "content": ""})
else:
new_messages.append({"role": "user", "content": ""})
new_messages.append(messages[-1])
messages = new_messages
elif "Last message must have role `user`" in str(e):
new_messages = messages
new_messages.append({"role": "user", "content": ""})
messages = new_messages
else:
raise e
except OpenAIError as e:
exception_mapping_worked = True
raise e
except Exception as e:
raise e
async def acompletion(self,
data: dict,
model_response: ModelResponse,
timeout: float,
api_key: Optional[str]=None,
api_base: Optional[str]=None,
client=None,
max_retries=None,
):
response = None
try:
if client is None:
openai_aclient = AsyncOpenAI(api_key=api_key, base_url=api_base, http_client=litellm.aclient_session, timeout=timeout, max_retries=max_retries)
else:
openai_aclient = client
response = await openai_aclient.chat.completions.create(**data)
return convert_to_model_response_object(response_object=json.loads(response.model_dump_json()), model_response_object=model_response)
except Exception as e:
if response and hasattr(response, "text"):
raise OpenAIError(status_code=500, message=f"{str(e)}\n\nOriginal Response: {response.text}")
else:
if type(e).__name__ == "ReadTimeout":
raise OpenAIError(status_code=408, message=f"{type(e).__name__}")
else:
raise OpenAIError(status_code=500, message=f"{str(e)}")
def streaming(self,
logging_obj,
timeout: float,
data: dict,
model: str,
api_key: Optional[str]=None,
api_base: Optional[str]=None,
client = None,
max_retries=None
):
if client is None:
openai_client = OpenAI(api_key=api_key, base_url=api_base, http_client=litellm.client_session, timeout=timeout, max_retries=max_retries)
else:
openai_client = client
response = openai_client.chat.completions.create(**data)
streamwrapper = CustomStreamWrapper(completion_stream=response, model=model, custom_llm_provider="openai",logging_obj=logging_obj)
return streamwrapper
async def async_streaming(self,
logging_obj,
timeout: float,
data: dict,
model: str,
api_key: Optional[str]=None,
api_base: Optional[str]=None,
client=None,
max_retries=None,
):
response = None
try:
if client is None:
openai_aclient = AsyncOpenAI(api_key=api_key, base_url=api_base, http_client=litellm.aclient_session, timeout=timeout, max_retries=max_retries)
else:
openai_aclient = client
response = await openai_aclient.chat.completions.create(**data)
streamwrapper = CustomStreamWrapper(completion_stream=response, model=model, custom_llm_provider="openai",logging_obj=logging_obj)
async for transformed_chunk in streamwrapper:
yield transformed_chunk
except Exception as e: # need to exception handle here. async exceptions don't get caught in sync functions.
if response is not None and hasattr(response, "text"):
raise OpenAIError(status_code=500, message=f"{str(e)}\n\nOriginal Response: {response.text}")
else:
if type(e).__name__ == "ReadTimeout":
raise OpenAIError(status_code=408, message=f"{type(e).__name__}")
else:
raise OpenAIError(status_code=500, message=f"{str(e)}")
async def aembedding(
self,
data: dict,
model_response: ModelResponse,
timeout: float,
api_key: Optional[str]=None,
api_base: Optional[str]=None,
client=None,
max_retries=None,
):
response = None
try:
if client is None:
openai_aclient = AsyncOpenAI(api_key=api_key, base_url=api_base, http_client=litellm.aclient_session, timeout=timeout, max_retries=max_retries)
else:
openai_aclient = client
response = await openai_aclient.embeddings.create(**data) # type: ignore
return response
except Exception as e:
raise e
def embedding(self,
model: str,
input: list,
timeout: float,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
model_response: Optional[litellm.utils.EmbeddingResponse] = None,
logging_obj=None,
optional_params=None,
client=None,
aembedding=None,
):
super().embedding()
exception_mapping_worked = False
try:
model = model
data = {
"model": model,
"input": input,
**optional_params
}
max_retries = data.pop("max_retries", 2)
if not isinstance(max_retries, int):
raise OpenAIError(status_code=422, message="max retries must be an int")
if aembedding == True:
response = self.aembedding(data=data, model_response=model_response, api_base=api_base, api_key=api_key, timeout=timeout, client=client, max_retries=max_retries) # type: ignore
return response
if client is None:
openai_client = OpenAI(api_key=api_key, base_url=api_base, http_client=litellm.client_session, timeout=timeout, max_retries=max_retries)
else:
openai_client = client
## LOGGING
logging_obj.pre_call(
input=input,
api_key=api_key,
additional_args={"complete_input_dict": data, "api_base": api_base},
)
## COMPLETION CALL
response = openai_client.embeddings.create(**data) # type: ignore
## LOGGING
logging_obj.post_call(
input=input,
api_key=api_key,
additional_args={"complete_input_dict": data},
original_response=response,
)
return convert_to_model_response_object(response_object=json.loads(response.model_dump_json()), model_response_object=model_response, response_type="embedding") # type: ignore
except OpenAIError as e:
exception_mapping_worked = True
raise e
except Exception as e:
if exception_mapping_worked:
raise e
else:
import traceback
raise OpenAIError(status_code=500, message=traceback.format_exc())
class OpenAITextCompletion(BaseLLM):
_client_session: httpx.Client
def __init__(self) -> None:
super().__init__()
self._client_session = self.create_client_session()
def validate_environment(self, api_key):
headers = {
"content-type": "application/json",
}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
return headers
def convert_to_model_response_object(self, response_object: Optional[dict]=None, model_response_object: Optional[ModelResponse]=None):
try:
## RESPONSE OBJECT
if response_object is None or model_response_object is None:
raise ValueError("Error in response object format")
choice_list=[]
for idx, choice in enumerate(response_object["choices"]):
message = Message(content=choice["text"], role="assistant")
choice = Choices(finish_reason=choice["finish_reason"], index=idx, message=message)
choice_list.append(choice)
model_response_object.choices = choice_list
if "usage" in response_object:
model_response_object.usage = response_object["usage"]
if "id" in response_object:
model_response_object.id = response_object["id"]
if "model" in response_object:
model_response_object.model = response_object["model"]
model_response_object._hidden_params["original_response"] = response_object # track original response, if users make a litellm.text_completion() request, we can return the original response
return model_response_object
except Exception as e:
raise e
def completion(self,
model_response: ModelResponse,
api_key: str,
model: str,
messages: list,
print_verbose: Optional[Callable]=None,
api_base: Optional[str]=None,
logging_obj=None,
acompletion: bool = False,
optional_params=None,
litellm_params=None,
logger_fn=None,
headers: Optional[dict]=None):
super().completion()
exception_mapping_worked = False
try:
if headers is None:
headers = self.validate_environment(api_key=api_key)
if model is None or messages is None:
raise OpenAIError(status_code=422, message=f"Missing model or messages")
api_base = f"{api_base}/completions"
if len(messages)>0 and "content" in messages[0] and type(messages[0]["content"]) == list:
prompt = messages[0]["content"]
else:
prompt = " ".join([message["content"] for message in messages]) # type: ignore
data = {
"model": model,
"prompt": prompt,
**optional_params
}
## LOGGING
logging_obj.pre_call(
input=messages,
api_key=api_key,
additional_args={"headers": headers, "api_base": api_base, "complete_input_dict": data},
)
if acompletion == True:
if optional_params.get("stream", False):
return self.async_streaming(logging_obj=logging_obj, api_base=api_base, data=data, headers=headers, model_response=model_response, model=model)
else:
return self.acompletion(api_base=api_base, data=data, headers=headers, model_response=model_response, prompt=prompt, api_key=api_key, logging_obj=logging_obj, model=model) # type: ignore
elif optional_params.get("stream", False):
return self.streaming(logging_obj=logging_obj, api_base=api_base, data=data, headers=headers, model_response=model_response, model=model)
else:
response = httpx.post(
url=f"{api_base}",
json=data,
headers=headers,
)
if response.status_code != 200:
raise OpenAIError(status_code=response.status_code, message=response.text)
## LOGGING
logging_obj.post_call(
input=prompt,
api_key=api_key,
original_response=response,
additional_args={
"headers": headers,
"api_base": api_base,
},
)
## RESPONSE OBJECT
return self.convert_to_model_response_object(response_object=response.json(), model_response_object=model_response)
except Exception as e:
raise e
async def acompletion(self,
logging_obj,
api_base: str,
data: dict,
headers: dict,
model_response: ModelResponse,
prompt: str,
api_key: str,
model: str):
async with httpx.AsyncClient() as client:
response = await client.post(api_base, json=data, headers=headers, timeout=litellm.request_timeout)
response_json = response.json()
if response.status_code != 200:
raise OpenAIError(status_code=response.status_code, message=response.text)
## LOGGING
logging_obj.post_call(
input=prompt,
api_key=api_key,
original_response=response,
additional_args={
"headers": headers,
"api_base": api_base,
},
)
## RESPONSE OBJECT
return self.convert_to_model_response_object(response_object=response_json, model_response_object=model_response)
def streaming(self,
logging_obj,
api_base: str,
data: dict,
headers: dict,
model_response: ModelResponse,
model: str
):
with httpx.stream(
url=f"{api_base}",
json=data,
headers=headers,
method="POST",
timeout=litellm.request_timeout
) as response:
if response.status_code != 200:
raise OpenAIError(status_code=response.status_code, message=response.text)
streamwrapper = CustomStreamWrapper(completion_stream=response.iter_lines(), model=model, custom_llm_provider="text-completion-openai",logging_obj=logging_obj)
for transformed_chunk in streamwrapper:
yield transformed_chunk
async def async_streaming(self,
logging_obj,
api_base: str,
data: dict,
headers: dict,
model_response: ModelResponse,
model: str):
client = httpx.AsyncClient()
async with client.stream(
url=f"{api_base}",
json=data,
headers=headers,
method="POST",
timeout=litellm.request_timeout
) as response:
if response.status_code != 200:
raise OpenAIError(status_code=response.status_code, message=response.text)
streamwrapper = CustomStreamWrapper(completion_stream=response.aiter_lines(), model=model, custom_llm_provider="text-completion-openai",logging_obj=logging_obj)
async for transformed_chunk in streamwrapper:
yield transformed_chunk