Spaces:
Sleeping
Sleeping
# What is this? | |
## Handler file for OpenAI-like endpoints. | |
## Allows jina ai embedding calls - which don't allow 'encoding_format' in payload. | |
import json | |
from typing import Optional | |
import httpx | |
import litellm | |
from litellm.llms.custom_httpx.http_handler import ( | |
AsyncHTTPHandler, | |
HTTPHandler, | |
get_async_httpx_client, | |
) | |
from litellm.types.utils import EmbeddingResponse | |
from ..common_utils import OpenAILikeBase, OpenAILikeError | |
class OpenAILikeEmbeddingHandler(OpenAILikeBase): | |
def __init__(self, **kwargs): | |
pass | |
async def aembedding( | |
self, | |
input: list, | |
data: dict, | |
model_response: EmbeddingResponse, | |
timeout: float, | |
api_key: str, | |
api_base: str, | |
logging_obj, | |
headers: dict, | |
client=None, | |
) -> EmbeddingResponse: | |
response = None | |
try: | |
if client is None or not isinstance(client, AsyncHTTPHandler): | |
async_client = get_async_httpx_client( | |
llm_provider=litellm.LlmProviders.OPENAI, | |
params={"timeout": timeout}, | |
) | |
else: | |
async_client = client | |
try: | |
response = await async_client.post( | |
api_base, | |
headers=headers, | |
data=json.dumps(data), | |
) # type: ignore | |
response.raise_for_status() | |
response_json = response.json() | |
except httpx.HTTPStatusError as e: | |
raise OpenAILikeError( | |
status_code=e.response.status_code, | |
message=e.response.text if e.response else str(e), | |
) | |
except httpx.TimeoutException: | |
raise OpenAILikeError( | |
status_code=408, message="Timeout error occurred." | |
) | |
except Exception as e: | |
raise OpenAILikeError(status_code=500, message=str(e)) | |
## LOGGING | |
logging_obj.post_call( | |
input=input, | |
api_key=api_key, | |
additional_args={"complete_input_dict": data}, | |
original_response=response_json, | |
) | |
return EmbeddingResponse(**response_json) | |
except Exception as e: | |
## LOGGING | |
logging_obj.post_call( | |
input=input, | |
api_key=api_key, | |
original_response=str(e), | |
) | |
raise e | |
def embedding( | |
self, | |
model: str, | |
input: list, | |
timeout: float, | |
logging_obj, | |
api_key: Optional[str], | |
api_base: Optional[str], | |
optional_params: dict, | |
model_response: Optional[EmbeddingResponse] = None, | |
client=None, | |
aembedding=None, | |
custom_endpoint: Optional[bool] = None, | |
headers: Optional[dict] = None, | |
) -> EmbeddingResponse: | |
api_base, headers = self._validate_environment( | |
api_base=api_base, | |
api_key=api_key, | |
endpoint_type="embeddings", | |
headers=headers, | |
custom_endpoint=custom_endpoint, | |
) | |
model = model | |
data = {"model": model, "input": input, **optional_params} | |
## LOGGING | |
logging_obj.pre_call( | |
input=input, | |
api_key=api_key, | |
additional_args={"complete_input_dict": data, "api_base": api_base}, | |
) | |
if aembedding is True: | |
return self.aembedding(data=data, input=input, logging_obj=logging_obj, model_response=model_response, api_base=api_base, api_key=api_key, timeout=timeout, client=client, headers=headers) # type: ignore | |
if client is None or isinstance(client, AsyncHTTPHandler): | |
self.client = HTTPHandler(timeout=timeout) # type: ignore | |
else: | |
self.client = client | |
## EMBEDDING CALL | |
try: | |
response = self.client.post( | |
api_base, | |
headers=headers, | |
data=json.dumps(data), | |
) # type: ignore | |
response.raise_for_status() # type: ignore | |
response_json = response.json() # type: ignore | |
except httpx.HTTPStatusError as e: | |
raise OpenAILikeError( | |
status_code=e.response.status_code, | |
message=e.response.text, | |
) | |
except httpx.TimeoutException: | |
raise OpenAILikeError(status_code=408, message="Timeout error occurred.") | |
except Exception as e: | |
raise OpenAILikeError(status_code=500, message=str(e)) | |
## LOGGING | |
logging_obj.post_call( | |
input=input, | |
api_key=api_key, | |
additional_args={"complete_input_dict": data}, | |
original_response=response_json, | |
) | |
return litellm.EmbeddingResponse(**response_json) | |