Spaces:
Sleeping
Sleeping
import asyncio | |
import os | |
import ssl | |
import time | |
from typing import TYPE_CHECKING, Any, Callable, List, Mapping, Optional, Union | |
import httpx | |
from httpx import USE_CLIENT_DEFAULT, AsyncHTTPTransport, HTTPTransport | |
import litellm | |
from litellm.constants import _DEFAULT_TTL_FOR_HTTPX_CLIENTS | |
from litellm.litellm_core_utils.logging_utils import track_llm_api_timing | |
from litellm.types.llms.custom_http import * | |
if TYPE_CHECKING: | |
from litellm import LlmProviders | |
from litellm.litellm_core_utils.litellm_logging import ( | |
Logging as LiteLLMLoggingObject, | |
) | |
else: | |
LlmProviders = Any | |
LiteLLMLoggingObject = Any | |
try: | |
from litellm._version import version | |
except Exception: | |
version = "0.0.0" | |
headers = { | |
"User-Agent": f"litellm/{version}", | |
} | |
# https://www.python-httpx.org/advanced/timeouts | |
_DEFAULT_TIMEOUT = httpx.Timeout(timeout=5.0, connect=5.0) | |
def mask_sensitive_info(error_message): | |
# Find the start of the key parameter | |
if isinstance(error_message, str): | |
key_index = error_message.find("key=") | |
else: | |
return error_message | |
# If key is found | |
if key_index != -1: | |
# Find the end of the key parameter (next & or end of string) | |
next_param = error_message.find("&", key_index) | |
if next_param == -1: | |
# If no more parameters, mask until the end of the string | |
masked_message = error_message[: key_index + 4] + "[REDACTED_API_KEY]" | |
else: | |
# Replace the key with redacted value, keeping other parameters | |
masked_message = ( | |
error_message[: key_index + 4] | |
+ "[REDACTED_API_KEY]" | |
+ error_message[next_param:] | |
) | |
return masked_message | |
return error_message | |
class MaskedHTTPStatusError(httpx.HTTPStatusError): | |
def __init__( | |
self, original_error, message: Optional[str] = None, text: Optional[str] = None | |
): | |
# Create a new error with the masked URL | |
masked_url = mask_sensitive_info(str(original_error.request.url)) | |
# Create a new error that looks like the original, but with a masked URL | |
super().__init__( | |
message=original_error.message, | |
request=httpx.Request( | |
method=original_error.request.method, | |
url=masked_url, | |
headers=original_error.request.headers, | |
content=original_error.request.content, | |
), | |
response=httpx.Response( | |
status_code=original_error.response.status_code, | |
content=original_error.response.content, | |
headers=original_error.response.headers, | |
), | |
) | |
self.message = message | |
self.text = text | |
class AsyncHTTPHandler: | |
def __init__( | |
self, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
event_hooks: Optional[Mapping[str, List[Callable[..., Any]]]] = None, | |
concurrent_limit=1000, | |
client_alias: Optional[str] = None, # name for client in logs | |
ssl_verify: Optional[VerifyTypes] = None, | |
): | |
self.timeout = timeout | |
self.event_hooks = event_hooks | |
self.client = self.create_client( | |
timeout=timeout, | |
concurrent_limit=concurrent_limit, | |
event_hooks=event_hooks, | |
ssl_verify=ssl_verify, | |
) | |
self.client_alias = client_alias | |
def create_client( | |
self, | |
timeout: Optional[Union[float, httpx.Timeout]], | |
concurrent_limit: int, | |
event_hooks: Optional[Mapping[str, List[Callable[..., Any]]]], | |
ssl_verify: Optional[VerifyTypes] = None, | |
) -> httpx.AsyncClient: | |
# SSL certificates (a.k.a CA bundle) used to verify the identity of requested hosts. | |
# /path/to/certificate.pem | |
if ssl_verify is None: | |
ssl_verify = os.getenv("SSL_VERIFY", litellm.ssl_verify) | |
ssl_security_level = os.getenv("SSL_SECURITY_LEVEL") | |
# If ssl_verify is not False and we need a lower security level | |
if ( | |
not ssl_verify | |
and ssl_security_level | |
and isinstance(ssl_security_level, str) | |
): | |
# Create a custom SSL context with reduced security level | |
custom_ssl_context = ssl.create_default_context() | |
custom_ssl_context.set_ciphers(ssl_security_level) | |
# If ssl_verify is a path to a CA bundle, load it into our custom context | |
if isinstance(ssl_verify, str) and os.path.exists(ssl_verify): | |
custom_ssl_context.load_verify_locations(cafile=ssl_verify) | |
# Use our custom SSL context instead of the original ssl_verify value | |
ssl_verify = custom_ssl_context | |
# An SSL certificate used by the requested host to authenticate the client. | |
# /path/to/client.pem | |
cert = os.getenv("SSL_CERTIFICATE", litellm.ssl_certificate) | |
if timeout is None: | |
timeout = _DEFAULT_TIMEOUT | |
# Create a client with a connection pool | |
transport = self._create_async_transport() | |
return httpx.AsyncClient( | |
transport=transport, | |
event_hooks=event_hooks, | |
timeout=timeout, | |
limits=httpx.Limits( | |
max_connections=concurrent_limit, | |
max_keepalive_connections=concurrent_limit, | |
), | |
verify=ssl_verify, | |
cert=cert, | |
headers=headers, | |
) | |
async def close(self): | |
# Close the client when you're done with it | |
await self.client.aclose() | |
async def __aenter__(self): | |
return self.client | |
async def __aexit__(self): | |
# close the client when exiting | |
await self.client.aclose() | |
async def get( | |
self, | |
url: str, | |
params: Optional[dict] = None, | |
headers: Optional[dict] = None, | |
follow_redirects: Optional[bool] = None, | |
): | |
# Set follow_redirects to UseClientDefault if None | |
_follow_redirects = ( | |
follow_redirects if follow_redirects is not None else USE_CLIENT_DEFAULT | |
) | |
response = await self.client.get( | |
url, params=params, headers=headers, follow_redirects=_follow_redirects # type: ignore | |
) | |
return response | |
async def post( | |
self, | |
url: str, | |
data: Optional[Union[dict, str, bytes]] = None, # type: ignore | |
json: Optional[dict] = None, | |
params: Optional[dict] = None, | |
headers: Optional[dict] = None, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
stream: bool = False, | |
logging_obj: Optional[LiteLLMLoggingObject] = None, | |
): | |
start_time = time.time() | |
try: | |
if timeout is None: | |
timeout = self.timeout | |
req = self.client.build_request( | |
"POST", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore | |
) | |
response = await self.client.send(req, stream=stream) | |
response.raise_for_status() | |
return response | |
except (httpx.RemoteProtocolError, httpx.ConnectError): | |
# Retry the request with a new session if there is a connection error | |
new_client = self.create_client( | |
timeout=timeout, concurrent_limit=1, event_hooks=self.event_hooks | |
) | |
try: | |
return await self.single_connection_post_request( | |
url=url, | |
client=new_client, | |
data=data, | |
json=json, | |
params=params, | |
headers=headers, | |
stream=stream, | |
) | |
finally: | |
await new_client.aclose() | |
except httpx.TimeoutException as e: | |
end_time = time.time() | |
time_delta = round(end_time - start_time, 3) | |
headers = {} | |
error_response = getattr(e, "response", None) | |
if error_response is not None: | |
for key, value in error_response.headers.items(): | |
headers["response_headers-{}".format(key)] = value | |
raise litellm.Timeout( | |
message=f"Connection timed out. Timeout passed={timeout}, time taken={time_delta} seconds", | |
model="default-model-name", | |
llm_provider="litellm-httpx-handler", | |
headers=headers, | |
) | |
except httpx.HTTPStatusError as e: | |
if stream is True: | |
setattr(e, "message", await e.response.aread()) | |
setattr(e, "text", await e.response.aread()) | |
else: | |
setattr(e, "message", mask_sensitive_info(e.response.text)) | |
setattr(e, "text", mask_sensitive_info(e.response.text)) | |
setattr(e, "status_code", e.response.status_code) | |
raise e | |
except Exception as e: | |
raise e | |
async def put( | |
self, | |
url: str, | |
data: Optional[Union[dict, str]] = None, # type: ignore | |
json: Optional[dict] = None, | |
params: Optional[dict] = None, | |
headers: Optional[dict] = None, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
stream: bool = False, | |
): | |
try: | |
if timeout is None: | |
timeout = self.timeout | |
req = self.client.build_request( | |
"PUT", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore | |
) | |
response = await self.client.send(req) | |
response.raise_for_status() | |
return response | |
except (httpx.RemoteProtocolError, httpx.ConnectError): | |
# Retry the request with a new session if there is a connection error | |
new_client = self.create_client( | |
timeout=timeout, concurrent_limit=1, event_hooks=self.event_hooks | |
) | |
try: | |
return await self.single_connection_post_request( | |
url=url, | |
client=new_client, | |
data=data, | |
json=json, | |
params=params, | |
headers=headers, | |
stream=stream, | |
) | |
finally: | |
await new_client.aclose() | |
except httpx.TimeoutException as e: | |
headers = {} | |
error_response = getattr(e, "response", None) | |
if error_response is not None: | |
for key, value in error_response.headers.items(): | |
headers["response_headers-{}".format(key)] = value | |
raise litellm.Timeout( | |
message=f"Connection timed out after {timeout} seconds.", | |
model="default-model-name", | |
llm_provider="litellm-httpx-handler", | |
headers=headers, | |
) | |
except httpx.HTTPStatusError as e: | |
setattr(e, "status_code", e.response.status_code) | |
if stream is True: | |
setattr(e, "message", await e.response.aread()) | |
else: | |
setattr(e, "message", e.response.text) | |
raise e | |
except Exception as e: | |
raise e | |
async def patch( | |
self, | |
url: str, | |
data: Optional[Union[dict, str]] = None, # type: ignore | |
json: Optional[dict] = None, | |
params: Optional[dict] = None, | |
headers: Optional[dict] = None, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
stream: bool = False, | |
): | |
try: | |
if timeout is None: | |
timeout = self.timeout | |
req = self.client.build_request( | |
"PATCH", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore | |
) | |
response = await self.client.send(req) | |
response.raise_for_status() | |
return response | |
except (httpx.RemoteProtocolError, httpx.ConnectError): | |
# Retry the request with a new session if there is a connection error | |
new_client = self.create_client( | |
timeout=timeout, concurrent_limit=1, event_hooks=self.event_hooks | |
) | |
try: | |
return await self.single_connection_post_request( | |
url=url, | |
client=new_client, | |
data=data, | |
json=json, | |
params=params, | |
headers=headers, | |
stream=stream, | |
) | |
finally: | |
await new_client.aclose() | |
except httpx.TimeoutException as e: | |
headers = {} | |
error_response = getattr(e, "response", None) | |
if error_response is not None: | |
for key, value in error_response.headers.items(): | |
headers["response_headers-{}".format(key)] = value | |
raise litellm.Timeout( | |
message=f"Connection timed out after {timeout} seconds.", | |
model="default-model-name", | |
llm_provider="litellm-httpx-handler", | |
headers=headers, | |
) | |
except httpx.HTTPStatusError as e: | |
setattr(e, "status_code", e.response.status_code) | |
if stream is True: | |
setattr(e, "message", await e.response.aread()) | |
else: | |
setattr(e, "message", e.response.text) | |
raise e | |
except Exception as e: | |
raise e | |
async def delete( | |
self, | |
url: str, | |
data: Optional[Union[dict, str]] = None, # type: ignore | |
json: Optional[dict] = None, | |
params: Optional[dict] = None, | |
headers: Optional[dict] = None, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
stream: bool = False, | |
): | |
try: | |
if timeout is None: | |
timeout = self.timeout | |
req = self.client.build_request( | |
"DELETE", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore | |
) | |
response = await self.client.send(req, stream=stream) | |
response.raise_for_status() | |
return response | |
except (httpx.RemoteProtocolError, httpx.ConnectError): | |
# Retry the request with a new session if there is a connection error | |
new_client = self.create_client( | |
timeout=timeout, concurrent_limit=1, event_hooks=self.event_hooks | |
) | |
try: | |
return await self.single_connection_post_request( | |
url=url, | |
client=new_client, | |
data=data, | |
json=json, | |
params=params, | |
headers=headers, | |
stream=stream, | |
) | |
finally: | |
await new_client.aclose() | |
except httpx.HTTPStatusError as e: | |
setattr(e, "status_code", e.response.status_code) | |
if stream is True: | |
setattr(e, "message", await e.response.aread()) | |
else: | |
setattr(e, "message", e.response.text) | |
raise e | |
except Exception as e: | |
raise e | |
async def single_connection_post_request( | |
self, | |
url: str, | |
client: httpx.AsyncClient, | |
data: Optional[Union[dict, str, bytes]] = None, # type: ignore | |
json: Optional[dict] = None, | |
params: Optional[dict] = None, | |
headers: Optional[dict] = None, | |
stream: bool = False, | |
): | |
""" | |
Making POST request for a single connection client. | |
Used for retrying connection client errors. | |
""" | |
req = client.build_request( | |
"POST", url, data=data, json=json, params=params, headers=headers # type: ignore | |
) | |
response = await client.send(req, stream=stream) | |
response.raise_for_status() | |
return response | |
def __del__(self) -> None: | |
try: | |
asyncio.get_running_loop().create_task(self.close()) | |
except Exception: | |
pass | |
def _create_async_transport(self) -> Optional[AsyncHTTPTransport]: | |
""" | |
Create an async transport with IPv4 only if litellm.force_ipv4 is True. | |
Otherwise, return None. | |
Some users have seen httpx ConnectionError when using ipv6 - forcing ipv4 resolves the issue for them | |
""" | |
if litellm.force_ipv4: | |
return AsyncHTTPTransport(local_address="0.0.0.0") | |
else: | |
return None | |
class HTTPHandler: | |
def __init__( | |
self, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
concurrent_limit=1000, | |
client: Optional[httpx.Client] = None, | |
ssl_verify: Optional[Union[bool, str]] = None, | |
): | |
if timeout is None: | |
timeout = _DEFAULT_TIMEOUT | |
# SSL certificates (a.k.a CA bundle) used to verify the identity of requested hosts. | |
# /path/to/certificate.pem | |
if ssl_verify is None: | |
ssl_verify = os.getenv("SSL_VERIFY", litellm.ssl_verify) | |
# An SSL certificate used by the requested host to authenticate the client. | |
# /path/to/client.pem | |
cert = os.getenv("SSL_CERTIFICATE", litellm.ssl_certificate) | |
if client is None: | |
transport = self._create_sync_transport() | |
# Create a client with a connection pool | |
self.client = httpx.Client( | |
transport=transport, | |
timeout=timeout, | |
limits=httpx.Limits( | |
max_connections=concurrent_limit, | |
max_keepalive_connections=concurrent_limit, | |
), | |
verify=ssl_verify, | |
cert=cert, | |
headers=headers, | |
) | |
else: | |
self.client = client | |
def close(self): | |
# Close the client when you're done with it | |
self.client.close() | |
def get( | |
self, | |
url: str, | |
params: Optional[dict] = None, | |
headers: Optional[dict] = None, | |
follow_redirects: Optional[bool] = None, | |
): | |
# Set follow_redirects to UseClientDefault if None | |
_follow_redirects = ( | |
follow_redirects if follow_redirects is not None else USE_CLIENT_DEFAULT | |
) | |
response = self.client.get( | |
url, params=params, headers=headers, follow_redirects=_follow_redirects # type: ignore | |
) | |
return response | |
def post( | |
self, | |
url: str, | |
data: Optional[Union[dict, str, bytes]] = None, | |
json: Optional[Union[dict, str, List]] = None, | |
params: Optional[dict] = None, | |
headers: Optional[dict] = None, | |
stream: bool = False, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
files: Optional[dict] = None, | |
content: Any = None, | |
logging_obj: Optional[LiteLLMLoggingObject] = None, | |
): | |
try: | |
if timeout is not None: | |
req = self.client.build_request( | |
"POST", | |
url, | |
data=data, # type: ignore | |
json=json, | |
params=params, | |
headers=headers, | |
timeout=timeout, | |
files=files, | |
content=content, # type: ignore | |
) | |
else: | |
req = self.client.build_request( | |
"POST", url, data=data, json=json, params=params, headers=headers, files=files, content=content # type: ignore | |
) | |
response = self.client.send(req, stream=stream) | |
response.raise_for_status() | |
return response | |
except httpx.TimeoutException: | |
raise litellm.Timeout( | |
message=f"Connection timed out after {timeout} seconds.", | |
model="default-model-name", | |
llm_provider="litellm-httpx-handler", | |
) | |
except httpx.HTTPStatusError as e: | |
if stream is True: | |
setattr(e, "message", mask_sensitive_info(e.response.read())) | |
setattr(e, "text", mask_sensitive_info(e.response.read())) | |
else: | |
error_text = mask_sensitive_info(e.response.text) | |
setattr(e, "message", error_text) | |
setattr(e, "text", error_text) | |
setattr(e, "status_code", e.response.status_code) | |
raise e | |
except Exception as e: | |
raise e | |
def patch( | |
self, | |
url: str, | |
data: Optional[Union[dict, str]] = None, | |
json: Optional[Union[dict, str]] = None, | |
params: Optional[dict] = None, | |
headers: Optional[dict] = None, | |
stream: bool = False, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
): | |
try: | |
if timeout is not None: | |
req = self.client.build_request( | |
"PATCH", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore | |
) | |
else: | |
req = self.client.build_request( | |
"PATCH", url, data=data, json=json, params=params, headers=headers # type: ignore | |
) | |
response = self.client.send(req, stream=stream) | |
response.raise_for_status() | |
return response | |
except httpx.TimeoutException: | |
raise litellm.Timeout( | |
message=f"Connection timed out after {timeout} seconds.", | |
model="default-model-name", | |
llm_provider="litellm-httpx-handler", | |
) | |
except httpx.HTTPStatusError as e: | |
if stream is True: | |
setattr(e, "message", mask_sensitive_info(e.response.read())) | |
setattr(e, "text", mask_sensitive_info(e.response.read())) | |
else: | |
error_text = mask_sensitive_info(e.response.text) | |
setattr(e, "message", error_text) | |
setattr(e, "text", error_text) | |
setattr(e, "status_code", e.response.status_code) | |
raise e | |
except Exception as e: | |
raise e | |
def put( | |
self, | |
url: str, | |
data: Optional[Union[dict, str]] = None, | |
json: Optional[Union[dict, str]] = None, | |
params: Optional[dict] = None, | |
headers: Optional[dict] = None, | |
stream: bool = False, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
): | |
try: | |
if timeout is not None: | |
req = self.client.build_request( | |
"PUT", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore | |
) | |
else: | |
req = self.client.build_request( | |
"PUT", url, data=data, json=json, params=params, headers=headers # type: ignore | |
) | |
response = self.client.send(req, stream=stream) | |
return response | |
except httpx.TimeoutException: | |
raise litellm.Timeout( | |
message=f"Connection timed out after {timeout} seconds.", | |
model="default-model-name", | |
llm_provider="litellm-httpx-handler", | |
) | |
except Exception as e: | |
raise e | |
def delete( | |
self, | |
url: str, | |
data: Optional[Union[dict, str]] = None, # type: ignore | |
json: Optional[dict] = None, | |
params: Optional[dict] = None, | |
headers: Optional[dict] = None, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
stream: bool = False, | |
): | |
try: | |
if timeout is not None: | |
req = self.client.build_request( | |
"DELETE", url, data=data, json=json, params=params, headers=headers, timeout=timeout # type: ignore | |
) | |
else: | |
req = self.client.build_request( | |
"DELETE", url, data=data, json=json, params=params, headers=headers # type: ignore | |
) | |
response = self.client.send(req, stream=stream) | |
response.raise_for_status() | |
return response | |
except httpx.TimeoutException: | |
raise litellm.Timeout( | |
message=f"Connection timed out after {timeout} seconds.", | |
model="default-model-name", | |
llm_provider="litellm-httpx-handler", | |
) | |
except httpx.HTTPStatusError as e: | |
if stream is True: | |
setattr(e, "message", mask_sensitive_info(e.response.read())) | |
setattr(e, "text", mask_sensitive_info(e.response.read())) | |
else: | |
error_text = mask_sensitive_info(e.response.text) | |
setattr(e, "message", error_text) | |
setattr(e, "text", error_text) | |
setattr(e, "status_code", e.response.status_code) | |
raise e | |
except Exception as e: | |
raise e | |
def __del__(self) -> None: | |
try: | |
self.close() | |
except Exception: | |
pass | |
def _create_sync_transport(self) -> Optional[HTTPTransport]: | |
""" | |
Create an HTTP transport with IPv4 only if litellm.force_ipv4 is True. | |
Otherwise, return None. | |
Some users have seen httpx ConnectionError when using ipv6 - forcing ipv4 resolves the issue for them | |
""" | |
if litellm.force_ipv4: | |
return HTTPTransport(local_address="0.0.0.0") | |
else: | |
return None | |
def get_async_httpx_client( | |
llm_provider: Union[LlmProviders, httpxSpecialProvider], | |
params: Optional[dict] = None, | |
) -> AsyncHTTPHandler: | |
""" | |
Retrieves the async HTTP client from the cache | |
If not present, creates a new client | |
Caches the new client and returns it. | |
""" | |
_params_key_name = "" | |
if params is not None: | |
for key, value in params.items(): | |
try: | |
_params_key_name += f"{key}_{value}" | |
except Exception: | |
pass | |
_cache_key_name = "async_httpx_client" + _params_key_name + llm_provider | |
_cached_client = litellm.in_memory_llm_clients_cache.get_cache(_cache_key_name) | |
if _cached_client: | |
return _cached_client | |
if params is not None: | |
_new_client = AsyncHTTPHandler(**params) | |
else: | |
_new_client = AsyncHTTPHandler( | |
timeout=httpx.Timeout(timeout=600.0, connect=5.0) | |
) | |
litellm.in_memory_llm_clients_cache.set_cache( | |
key=_cache_key_name, | |
value=_new_client, | |
ttl=_DEFAULT_TTL_FOR_HTTPX_CLIENTS, | |
) | |
return _new_client | |
def _get_httpx_client(params: Optional[dict] = None) -> HTTPHandler: | |
""" | |
Retrieves the HTTP client from the cache | |
If not present, creates a new client | |
Caches the new client and returns it. | |
""" | |
_params_key_name = "" | |
if params is not None: | |
for key, value in params.items(): | |
try: | |
_params_key_name += f"{key}_{value}" | |
except Exception: | |
pass | |
_cache_key_name = "httpx_client" + _params_key_name | |
_cached_client = litellm.in_memory_llm_clients_cache.get_cache(_cache_key_name) | |
if _cached_client: | |
return _cached_client | |
if params is not None: | |
_new_client = HTTPHandler(**params) | |
else: | |
_new_client = HTTPHandler(timeout=httpx.Timeout(timeout=600.0, connect=5.0)) | |
litellm.in_memory_llm_clients_cache.set_cache( | |
key=_cache_key_name, | |
value=_new_client, | |
ttl=_DEFAULT_TTL_FOR_HTTPX_CLIENTS, | |
) | |
return _new_client | |