Spaces:
Sleeping
Sleeping
""" | |
- call /messages on Anthropic API | |
- Make streaming + non-streaming request - just pass it through direct to Anthropic. No need to do anything special here | |
- Ensure requests are logged in the DB - stream + non-stream | |
""" | |
import json | |
from typing import AsyncIterator, Dict, List, Optional, Union, cast | |
import httpx | |
import litellm | |
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj | |
from litellm.llms.base_llm.anthropic_messages.transformation import ( | |
BaseAnthropicMessagesConfig, | |
) | |
from litellm.llms.custom_httpx.http_handler import ( | |
AsyncHTTPHandler, | |
get_async_httpx_client, | |
) | |
from litellm.types.llms.anthropic_messages.anthropic_response import ( | |
AnthropicMessagesResponse, | |
) | |
from litellm.types.router import GenericLiteLLMParams | |
from litellm.types.utils import ProviderSpecificHeader | |
from litellm.utils import ProviderConfigManager, client | |
class AnthropicMessagesHandler: | |
async def _handle_anthropic_streaming( | |
response: httpx.Response, | |
request_body: dict, | |
litellm_logging_obj: LiteLLMLoggingObj, | |
) -> AsyncIterator: | |
"""Helper function to handle Anthropic streaming responses using the existing logging handlers""" | |
from datetime import datetime | |
from litellm.proxy.pass_through_endpoints.streaming_handler import ( | |
PassThroughStreamingHandler, | |
) | |
from litellm.proxy.pass_through_endpoints.success_handler import ( | |
PassThroughEndpointLogging, | |
) | |
from litellm.types.passthrough_endpoints.pass_through_endpoints import ( | |
EndpointType, | |
) | |
# Create success handler object | |
passthrough_success_handler_obj = PassThroughEndpointLogging() | |
# Use the existing streaming handler for Anthropic | |
start_time = datetime.now() | |
return PassThroughStreamingHandler.chunk_processor( | |
response=response, | |
request_body=request_body, | |
litellm_logging_obj=litellm_logging_obj, | |
endpoint_type=EndpointType.ANTHROPIC, | |
start_time=start_time, | |
passthrough_success_handler_obj=passthrough_success_handler_obj, | |
url_route="/v1/messages", | |
) | |
async def anthropic_messages( | |
max_tokens: int, | |
messages: List[Dict], | |
model: str, | |
metadata: Optional[Dict] = None, | |
stop_sequences: Optional[List[str]] = None, | |
stream: Optional[bool] = False, | |
system: Optional[str] = None, | |
temperature: Optional[float] = None, | |
thinking: Optional[Dict] = None, | |
tool_choice: Optional[Dict] = None, | |
tools: Optional[List[Dict]] = None, | |
top_k: Optional[int] = None, | |
top_p: Optional[float] = None, | |
api_key: Optional[str] = None, | |
api_base: Optional[str] = None, | |
client: Optional[AsyncHTTPHandler] = None, | |
custom_llm_provider: Optional[str] = None, | |
**kwargs, | |
) -> Union[AnthropicMessagesResponse, AsyncIterator]: | |
""" | |
Makes Anthropic `/v1/messages` API calls In the Anthropic API Spec | |
""" | |
# Use provided client or create a new one | |
optional_params = GenericLiteLLMParams(**kwargs) | |
( | |
model, | |
_custom_llm_provider, | |
dynamic_api_key, | |
dynamic_api_base, | |
) = litellm.get_llm_provider( | |
model=model, | |
custom_llm_provider=custom_llm_provider, | |
api_base=optional_params.api_base, | |
api_key=optional_params.api_key, | |
) | |
anthropic_messages_provider_config: Optional[BaseAnthropicMessagesConfig] = ( | |
ProviderConfigManager.get_provider_anthropic_messages_config( | |
model=model, | |
provider=litellm.LlmProviders(_custom_llm_provider), | |
) | |
) | |
if anthropic_messages_provider_config is None: | |
raise ValueError( | |
f"Anthropic messages provider config not found for model: {model}" | |
) | |
if client is None or not isinstance(client, AsyncHTTPHandler): | |
async_httpx_client = get_async_httpx_client( | |
llm_provider=litellm.LlmProviders.ANTHROPIC | |
) | |
else: | |
async_httpx_client = client | |
litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj", None) | |
# Prepare headers | |
provider_specific_header = cast( | |
Optional[ProviderSpecificHeader], kwargs.get("provider_specific_header", None) | |
) | |
extra_headers = ( | |
provider_specific_header.get("extra_headers", {}) | |
if provider_specific_header | |
else {} | |
) | |
headers = anthropic_messages_provider_config.validate_environment( | |
headers=extra_headers or {}, | |
model=model, | |
api_key=api_key, | |
) | |
litellm_logging_obj.update_environment_variables( | |
model=model, | |
optional_params=dict(optional_params), | |
litellm_params={ | |
"metadata": kwargs.get("metadata", {}), | |
"preset_cache_key": None, | |
"stream_response": {}, | |
**optional_params.model_dump(exclude_unset=True), | |
}, | |
custom_llm_provider=_custom_llm_provider, | |
) | |
# Prepare request body | |
request_body = locals().copy() | |
request_body = { | |
k: v | |
for k, v in request_body.items() | |
if k | |
in anthropic_messages_provider_config.get_supported_anthropic_messages_params( | |
model=model | |
) | |
and v is not None | |
} | |
request_body["stream"] = stream | |
request_body["model"] = model | |
litellm_logging_obj.stream = stream | |
litellm_logging_obj.model_call_details.update(request_body) | |
# Make the request | |
request_url = anthropic_messages_provider_config.get_complete_url( | |
api_base=api_base, model=model | |
) | |
litellm_logging_obj.pre_call( | |
input=[{"role": "user", "content": json.dumps(request_body)}], | |
api_key="", | |
additional_args={ | |
"complete_input_dict": request_body, | |
"api_base": str(request_url), | |
"headers": headers, | |
}, | |
) | |
response = await async_httpx_client.post( | |
url=request_url, | |
headers=headers, | |
data=json.dumps(request_body), | |
stream=stream or False, | |
) | |
response.raise_for_status() | |
# used for logging + cost tracking | |
litellm_logging_obj.model_call_details["httpx_response"] = response | |
if stream: | |
return await AnthropicMessagesHandler._handle_anthropic_streaming( | |
response=response, | |
request_body=request_body, | |
litellm_logging_obj=litellm_logging_obj, | |
) | |
else: | |
return response.json() | |