Spaces:
Sleeping
Sleeping
""" | |
Unified /v1/messages endpoint - (Anthropic Spec) | |
""" | |
import asyncio | |
import json | |
import time | |
import traceback | |
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status | |
from fastapi.responses import StreamingResponse | |
import litellm | |
from litellm._logging import verbose_proxy_logger | |
from litellm.proxy._types import * | |
from litellm.proxy.auth.user_api_key_auth import user_api_key_auth | |
from litellm.proxy.common_request_processing import ProxyBaseLLMRequestProcessing | |
from litellm.proxy.common_utils.http_parsing_utils import _read_request_body | |
from litellm.proxy.litellm_pre_call_utils import add_litellm_data_to_request | |
from litellm.proxy.utils import ProxyLogging | |
router = APIRouter() | |
async def async_data_generator_anthropic( | |
response, | |
user_api_key_dict: UserAPIKeyAuth, | |
request_data: dict, | |
proxy_logging_obj: ProxyLogging, | |
): | |
verbose_proxy_logger.debug("inside generator") | |
try: | |
time.time() | |
async for chunk in response: | |
verbose_proxy_logger.debug( | |
"async_data_generator: received streaming chunk - {}".format(chunk) | |
) | |
### CALL HOOKS ### - modify outgoing data | |
chunk = await proxy_logging_obj.async_post_call_streaming_hook( | |
user_api_key_dict=user_api_key_dict, response=chunk | |
) | |
yield chunk | |
except Exception as e: | |
verbose_proxy_logger.exception( | |
"litellm.proxy.proxy_server.async_data_generator(): Exception occured - {}".format( | |
str(e) | |
) | |
) | |
await proxy_logging_obj.post_call_failure_hook( | |
user_api_key_dict=user_api_key_dict, | |
original_exception=e, | |
request_data=request_data, | |
) | |
verbose_proxy_logger.debug( | |
f"\033[1;31mAn error occurred: {e}\n\n Debug this by setting `--debug`, e.g. `litellm --model gpt-3.5-turbo --debug`" | |
) | |
if isinstance(e, HTTPException): | |
raise e | |
else: | |
error_traceback = traceback.format_exc() | |
error_msg = f"{str(e)}\n\n{error_traceback}" | |
proxy_exception = ProxyException( | |
message=getattr(e, "message", error_msg), | |
type=getattr(e, "type", "None"), | |
param=getattr(e, "param", "None"), | |
code=getattr(e, "status_code", 500), | |
) | |
error_returned = json.dumps({"error": proxy_exception.to_dict()}) | |
yield f"data: {error_returned}\n\n" | |
async def anthropic_response( # noqa: PLR0915 | |
fastapi_response: Response, | |
request: Request, | |
user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), | |
): | |
""" | |
Use `{PROXY_BASE_URL}/anthropic/v1/messages` instead - [Docs](https://docs.litellm.ai/docs/anthropic_completion). | |
This was a BETA endpoint that calls 100+ LLMs in the anthropic format. | |
""" | |
from litellm.proxy.proxy_server import ( | |
general_settings, | |
llm_router, | |
proxy_config, | |
proxy_logging_obj, | |
user_api_base, | |
user_max_tokens, | |
user_model, | |
user_request_timeout, | |
user_temperature, | |
version, | |
) | |
request_data = await _read_request_body(request=request) | |
data: dict = {**request_data} | |
try: | |
data["model"] = ( | |
general_settings.get("completion_model", None) # server default | |
or user_model # model name passed via cli args | |
or data.get("model", None) # default passed in http request | |
) | |
if user_model: | |
data["model"] = user_model | |
data = await add_litellm_data_to_request( | |
data=data, # type: ignore | |
request=request, | |
general_settings=general_settings, | |
user_api_key_dict=user_api_key_dict, | |
version=version, | |
proxy_config=proxy_config, | |
) | |
# override with user settings, these are params passed via cli | |
if user_temperature: | |
data["temperature"] = user_temperature | |
if user_request_timeout: | |
data["request_timeout"] = user_request_timeout | |
if user_max_tokens: | |
data["max_tokens"] = user_max_tokens | |
if user_api_base: | |
data["api_base"] = user_api_base | |
### MODEL ALIAS MAPPING ### | |
# check if model name in model alias map | |
# get the actual model name | |
if data["model"] in litellm.model_alias_map: | |
data["model"] = litellm.model_alias_map[data["model"]] | |
### CALL HOOKS ### - modify incoming data before calling the model | |
data = await proxy_logging_obj.pre_call_hook( # type: ignore | |
user_api_key_dict=user_api_key_dict, data=data, call_type="text_completion" | |
) | |
### ROUTE THE REQUESTs ### | |
router_model_names = llm_router.model_names if llm_router is not None else [] | |
# skip router if user passed their key | |
if ( | |
llm_router is not None and data["model"] in router_model_names | |
): # model in router model list | |
llm_response = asyncio.create_task(llm_router.aanthropic_messages(**data)) | |
elif ( | |
llm_router is not None | |
and llm_router.model_group_alias is not None | |
and data["model"] in llm_router.model_group_alias | |
): # model set in model_group_alias | |
llm_response = asyncio.create_task(llm_router.aanthropic_messages(**data)) | |
elif ( | |
llm_router is not None and data["model"] in llm_router.deployment_names | |
): # model in router deployments, calling a specific deployment on the router | |
llm_response = asyncio.create_task( | |
llm_router.aanthropic_messages(**data, specific_deployment=True) | |
) | |
elif ( | |
llm_router is not None and data["model"] in llm_router.get_model_ids() | |
): # model in router model list | |
llm_response = asyncio.create_task(llm_router.aanthropic_messages(**data)) | |
elif ( | |
llm_router is not None | |
and data["model"] not in router_model_names | |
and ( | |
llm_router.default_deployment is not None | |
or len(llm_router.pattern_router.patterns) > 0 | |
) | |
): # model in router deployments, calling a specific deployment on the router | |
llm_response = asyncio.create_task(llm_router.aanthropic_messages(**data)) | |
elif user_model is not None: # `litellm --model <your-model-name>` | |
llm_response = asyncio.create_task(litellm.anthropic_messages(**data)) | |
else: | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
detail={ | |
"error": "completion: Invalid model name passed in model=" | |
+ data.get("model", "") | |
}, | |
) | |
# Await the llm_response task | |
response = await llm_response | |
hidden_params = getattr(response, "_hidden_params", {}) or {} | |
model_id = hidden_params.get("model_id", None) or "" | |
cache_key = hidden_params.get("cache_key", None) or "" | |
api_base = hidden_params.get("api_base", None) or "" | |
response_cost = hidden_params.get("response_cost", None) or "" | |
### ALERTING ### | |
asyncio.create_task( | |
proxy_logging_obj.update_request_status( | |
litellm_call_id=data.get("litellm_call_id", ""), status="success" | |
) | |
) | |
verbose_proxy_logger.debug("final response: %s", response) | |
fastapi_response.headers.update( | |
ProxyBaseLLMRequestProcessing.get_custom_headers( | |
user_api_key_dict=user_api_key_dict, | |
model_id=model_id, | |
cache_key=cache_key, | |
api_base=api_base, | |
version=version, | |
response_cost=response_cost, | |
request_data=data, | |
hidden_params=hidden_params, | |
) | |
) | |
if ( | |
"stream" in data and data["stream"] is True | |
): # use generate_responses to stream responses | |
selected_data_generator = async_data_generator_anthropic( | |
response=response, | |
user_api_key_dict=user_api_key_dict, | |
request_data=data, | |
proxy_logging_obj=proxy_logging_obj, | |
) | |
return StreamingResponse( | |
selected_data_generator, # type: ignore | |
media_type="text/event-stream", | |
) | |
verbose_proxy_logger.info("\nResponse from Litellm:\n{}".format(response)) | |
return response | |
except Exception as e: | |
await proxy_logging_obj.post_call_failure_hook( | |
user_api_key_dict=user_api_key_dict, original_exception=e, request_data=data | |
) | |
verbose_proxy_logger.exception( | |
"litellm.proxy.proxy_server.anthropic_response(): Exception occured - {}".format( | |
str(e) | |
) | |
) | |
error_msg = f"{str(e)}" | |
raise ProxyException( | |
message=getattr(e, "message", error_msg), | |
type=getattr(e, "type", "None"), | |
param=getattr(e, "param", "None"), | |
code=getattr(e, "status_code", 500), | |
) | |