Spaces:
Paused
Paused
| """ | |
| - call /messages on Anthropic API | |
| - Make streaming + non-streaming request - just pass it through direct to Anthropic. No need to do anything special here | |
| - Ensure requests are logged in the DB - stream + non-stream | |
| """ | |
| import json | |
| from typing import AsyncIterator, Dict, List, Optional, Union, cast | |
| import httpx | |
| import litellm | |
| from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj | |
| from litellm.llms.base_llm.anthropic_messages.transformation import ( | |
| BaseAnthropicMessagesConfig, | |
| ) | |
| from litellm.llms.custom_httpx.http_handler import ( | |
| AsyncHTTPHandler, | |
| get_async_httpx_client, | |
| ) | |
| from litellm.types.llms.anthropic_messages.anthropic_response import ( | |
| AnthropicMessagesResponse, | |
| ) | |
| from litellm.types.router import GenericLiteLLMParams | |
| from litellm.types.utils import ProviderSpecificHeader | |
| from litellm.utils import ProviderConfigManager, client | |
| class AnthropicMessagesHandler: | |
| async def _handle_anthropic_streaming( | |
| response: httpx.Response, | |
| request_body: dict, | |
| litellm_logging_obj: LiteLLMLoggingObj, | |
| ) -> AsyncIterator: | |
| """Helper function to handle Anthropic streaming responses using the existing logging handlers""" | |
| from datetime import datetime | |
| from litellm.proxy.pass_through_endpoints.streaming_handler import ( | |
| PassThroughStreamingHandler, | |
| ) | |
| from litellm.proxy.pass_through_endpoints.success_handler import ( | |
| PassThroughEndpointLogging, | |
| ) | |
| from litellm.types.passthrough_endpoints.pass_through_endpoints import ( | |
| EndpointType, | |
| ) | |
| # Create success handler object | |
| passthrough_success_handler_obj = PassThroughEndpointLogging() | |
| # Use the existing streaming handler for Anthropic | |
| start_time = datetime.now() | |
| return PassThroughStreamingHandler.chunk_processor( | |
| response=response, | |
| request_body=request_body, | |
| litellm_logging_obj=litellm_logging_obj, | |
| endpoint_type=EndpointType.ANTHROPIC, | |
| start_time=start_time, | |
| passthrough_success_handler_obj=passthrough_success_handler_obj, | |
| url_route="/v1/messages", | |
| ) | |
| async def anthropic_messages( | |
| max_tokens: int, | |
| messages: List[Dict], | |
| model: str, | |
| metadata: Optional[Dict] = None, | |
| stop_sequences: Optional[List[str]] = None, | |
| stream: Optional[bool] = False, | |
| system: Optional[str] = None, | |
| temperature: Optional[float] = None, | |
| thinking: Optional[Dict] = None, | |
| tool_choice: Optional[Dict] = None, | |
| tools: Optional[List[Dict]] = None, | |
| top_k: Optional[int] = None, | |
| top_p: Optional[float] = None, | |
| api_key: Optional[str] = None, | |
| api_base: Optional[str] = None, | |
| client: Optional[AsyncHTTPHandler] = None, | |
| custom_llm_provider: Optional[str] = None, | |
| **kwargs, | |
| ) -> Union[AnthropicMessagesResponse, AsyncIterator]: | |
| """ | |
| Makes Anthropic `/v1/messages` API calls In the Anthropic API Spec | |
| """ | |
| # Use provided client or create a new one | |
| optional_params = GenericLiteLLMParams(**kwargs) | |
| ( | |
| model, | |
| _custom_llm_provider, | |
| dynamic_api_key, | |
| dynamic_api_base, | |
| ) = litellm.get_llm_provider( | |
| model=model, | |
| custom_llm_provider=custom_llm_provider, | |
| api_base=optional_params.api_base, | |
| api_key=optional_params.api_key, | |
| ) | |
| anthropic_messages_provider_config: Optional[BaseAnthropicMessagesConfig] = ( | |
| ProviderConfigManager.get_provider_anthropic_messages_config( | |
| model=model, | |
| provider=litellm.LlmProviders(_custom_llm_provider), | |
| ) | |
| ) | |
| if anthropic_messages_provider_config is None: | |
| raise ValueError( | |
| f"Anthropic messages provider config not found for model: {model}" | |
| ) | |
| if client is None or not isinstance(client, AsyncHTTPHandler): | |
| async_httpx_client = get_async_httpx_client( | |
| llm_provider=litellm.LlmProviders.ANTHROPIC | |
| ) | |
| else: | |
| async_httpx_client = client | |
| litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj", None) | |
| # Prepare headers | |
| provider_specific_header = cast( | |
| Optional[ProviderSpecificHeader], kwargs.get("provider_specific_header", None) | |
| ) | |
| extra_headers = ( | |
| provider_specific_header.get("extra_headers", {}) | |
| if provider_specific_header | |
| else {} | |
| ) | |
| headers = anthropic_messages_provider_config.validate_environment( | |
| headers=extra_headers or {}, | |
| model=model, | |
| api_key=api_key, | |
| ) | |
| litellm_logging_obj.update_environment_variables( | |
| model=model, | |
| optional_params=dict(optional_params), | |
| litellm_params={ | |
| "metadata": kwargs.get("metadata", {}), | |
| "preset_cache_key": None, | |
| "stream_response": {}, | |
| **optional_params.model_dump(exclude_unset=True), | |
| }, | |
| custom_llm_provider=_custom_llm_provider, | |
| ) | |
| # Prepare request body | |
| request_body = locals().copy() | |
| request_body = { | |
| k: v | |
| for k, v in request_body.items() | |
| if k | |
| in anthropic_messages_provider_config.get_supported_anthropic_messages_params( | |
| model=model | |
| ) | |
| and v is not None | |
| } | |
| request_body["stream"] = stream | |
| request_body["model"] = model | |
| litellm_logging_obj.stream = stream | |
| litellm_logging_obj.model_call_details.update(request_body) | |
| # Make the request | |
| request_url = anthropic_messages_provider_config.get_complete_url( | |
| api_base=api_base, model=model | |
| ) | |
| litellm_logging_obj.pre_call( | |
| input=[{"role": "user", "content": json.dumps(request_body)}], | |
| api_key="", | |
| additional_args={ | |
| "complete_input_dict": request_body, | |
| "api_base": str(request_url), | |
| "headers": headers, | |
| }, | |
| ) | |
| response = await async_httpx_client.post( | |
| url=request_url, | |
| headers=headers, | |
| data=json.dumps(request_body), | |
| stream=stream or False, | |
| ) | |
| response.raise_for_status() | |
| # used for logging + cost tracking | |
| litellm_logging_obj.model_call_details["httpx_response"] = response | |
| if stream: | |
| return await AnthropicMessagesHandler._handle_anthropic_streaming( | |
| response=response, | |
| request_body=request_body, | |
| litellm_logging_obj=litellm_logging_obj, | |
| ) | |
| else: | |
| return response.json() | |