Spaces:
Sleeping
Sleeping
File size: 6,530 Bytes
469eae6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
"""
- call /messages on Anthropic API
- Make streaming + non-streaming request - just pass it through direct to Anthropic. No need to do anything special here
- Ensure requests are logged in the DB - stream + non-stream
"""
import json
from typing import AsyncIterator, Dict, List, Optional, Union, cast
import httpx
import litellm
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
from litellm.llms.base_llm.anthropic_messages.transformation import (
BaseAnthropicMessagesConfig,
)
from litellm.llms.custom_httpx.http_handler import (
AsyncHTTPHandler,
get_async_httpx_client,
)
from litellm.types.llms.anthropic_messages.anthropic_response import (
AnthropicMessagesResponse,
)
from litellm.types.router import GenericLiteLLMParams
from litellm.types.utils import ProviderSpecificHeader
from litellm.utils import ProviderConfigManager, client
class AnthropicMessagesHandler:
@staticmethod
async def _handle_anthropic_streaming(
response: httpx.Response,
request_body: dict,
litellm_logging_obj: LiteLLMLoggingObj,
) -> AsyncIterator:
"""Helper function to handle Anthropic streaming responses using the existing logging handlers"""
from datetime import datetime
from litellm.proxy.pass_through_endpoints.streaming_handler import (
PassThroughStreamingHandler,
)
from litellm.proxy.pass_through_endpoints.success_handler import (
PassThroughEndpointLogging,
)
from litellm.types.passthrough_endpoints.pass_through_endpoints import (
EndpointType,
)
# Create success handler object
passthrough_success_handler_obj = PassThroughEndpointLogging()
# Use the existing streaming handler for Anthropic
start_time = datetime.now()
return PassThroughStreamingHandler.chunk_processor(
response=response,
request_body=request_body,
litellm_logging_obj=litellm_logging_obj,
endpoint_type=EndpointType.ANTHROPIC,
start_time=start_time,
passthrough_success_handler_obj=passthrough_success_handler_obj,
url_route="/v1/messages",
)
@client
async def anthropic_messages(
max_tokens: int,
messages: List[Dict],
model: str,
metadata: Optional[Dict] = None,
stop_sequences: Optional[List[str]] = None,
stream: Optional[bool] = False,
system: Optional[str] = None,
temperature: Optional[float] = None,
thinking: Optional[Dict] = None,
tool_choice: Optional[Dict] = None,
tools: Optional[List[Dict]] = None,
top_k: Optional[int] = None,
top_p: Optional[float] = None,
api_key: Optional[str] = None,
api_base: Optional[str] = None,
client: Optional[AsyncHTTPHandler] = None,
custom_llm_provider: Optional[str] = None,
**kwargs,
) -> Union[AnthropicMessagesResponse, AsyncIterator]:
"""
Makes Anthropic `/v1/messages` API calls In the Anthropic API Spec
"""
# Use provided client or create a new one
optional_params = GenericLiteLLMParams(**kwargs)
(
model,
_custom_llm_provider,
dynamic_api_key,
dynamic_api_base,
) = litellm.get_llm_provider(
model=model,
custom_llm_provider=custom_llm_provider,
api_base=optional_params.api_base,
api_key=optional_params.api_key,
)
anthropic_messages_provider_config: Optional[BaseAnthropicMessagesConfig] = (
ProviderConfigManager.get_provider_anthropic_messages_config(
model=model,
provider=litellm.LlmProviders(_custom_llm_provider),
)
)
if anthropic_messages_provider_config is None:
raise ValueError(
f"Anthropic messages provider config not found for model: {model}"
)
if client is None or not isinstance(client, AsyncHTTPHandler):
async_httpx_client = get_async_httpx_client(
llm_provider=litellm.LlmProviders.ANTHROPIC
)
else:
async_httpx_client = client
litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj", None)
# Prepare headers
provider_specific_header = cast(
Optional[ProviderSpecificHeader], kwargs.get("provider_specific_header", None)
)
extra_headers = (
provider_specific_header.get("extra_headers", {})
if provider_specific_header
else {}
)
headers = anthropic_messages_provider_config.validate_environment(
headers=extra_headers or {},
model=model,
api_key=api_key,
)
litellm_logging_obj.update_environment_variables(
model=model,
optional_params=dict(optional_params),
litellm_params={
"metadata": kwargs.get("metadata", {}),
"preset_cache_key": None,
"stream_response": {},
**optional_params.model_dump(exclude_unset=True),
},
custom_llm_provider=_custom_llm_provider,
)
# Prepare request body
request_body = locals().copy()
request_body = {
k: v
for k, v in request_body.items()
if k
in anthropic_messages_provider_config.get_supported_anthropic_messages_params(
model=model
)
and v is not None
}
request_body["stream"] = stream
request_body["model"] = model
litellm_logging_obj.stream = stream
litellm_logging_obj.model_call_details.update(request_body)
# Make the request
request_url = anthropic_messages_provider_config.get_complete_url(
api_base=api_base, model=model
)
litellm_logging_obj.pre_call(
input=[{"role": "user", "content": json.dumps(request_body)}],
api_key="",
additional_args={
"complete_input_dict": request_body,
"api_base": str(request_url),
"headers": headers,
},
)
response = await async_httpx_client.post(
url=request_url,
headers=headers,
data=json.dumps(request_body),
stream=stream or False,
)
response.raise_for_status()
# used for logging + cost tracking
litellm_logging_obj.model_call_details["httpx_response"] = response
if stream:
return await AnthropicMessagesHandler._handle_anthropic_streaming(
response=response,
request_body=request_body,
litellm_logging_obj=litellm_logging_obj,
)
else:
return response.json()
|