Spaces:
Sleeping
Sleeping
import asyncio | |
import contextvars | |
from functools import partial | |
from typing import Any, Coroutine, Dict, Iterable, List, Literal, Optional, Union | |
import httpx | |
import litellm | |
from litellm.constants import request_timeout | |
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj | |
from litellm.llms.base_llm.responses.transformation import BaseResponsesAPIConfig | |
from litellm.llms.custom_httpx.llm_http_handler import BaseLLMHTTPHandler | |
from litellm.responses.litellm_completion_transformation.handler import ( | |
LiteLLMCompletionTransformationHandler, | |
) | |
from litellm.responses.utils import ResponsesAPIRequestUtils | |
from litellm.types.llms.openai import ( | |
Reasoning, | |
ResponseIncludable, | |
ResponseInputParam, | |
ResponsesAPIOptionalRequestParams, | |
ResponsesAPIResponse, | |
ResponseTextConfigParam, | |
ToolChoice, | |
ToolParam, | |
) | |
from litellm.types.responses.main import * | |
from litellm.types.router import GenericLiteLLMParams | |
from litellm.utils import ProviderConfigManager, client | |
from .streaming_iterator import BaseResponsesAPIStreamingIterator | |
####### ENVIRONMENT VARIABLES ################### | |
# Initialize any necessary instances or variables here | |
base_llm_http_handler = BaseLLMHTTPHandler() | |
litellm_completion_transformation_handler = LiteLLMCompletionTransformationHandler() | |
################################################# | |
async def aresponses( | |
input: Union[str, ResponseInputParam], | |
model: str, | |
include: Optional[List[ResponseIncludable]] = None, | |
instructions: Optional[str] = None, | |
max_output_tokens: Optional[int] = None, | |
metadata: Optional[Dict[str, Any]] = None, | |
parallel_tool_calls: Optional[bool] = None, | |
previous_response_id: Optional[str] = None, | |
reasoning: Optional[Reasoning] = None, | |
store: Optional[bool] = None, | |
stream: Optional[bool] = None, | |
temperature: Optional[float] = None, | |
text: Optional[ResponseTextConfigParam] = None, | |
tool_choice: Optional[ToolChoice] = None, | |
tools: Optional[Iterable[ToolParam]] = None, | |
top_p: Optional[float] = None, | |
truncation: Optional[Literal["auto", "disabled"]] = None, | |
user: Optional[str] = None, | |
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. | |
# The extra values given here take precedence over values defined on the client or passed to this method. | |
extra_headers: Optional[Dict[str, Any]] = None, | |
extra_query: Optional[Dict[str, Any]] = None, | |
extra_body: Optional[Dict[str, Any]] = None, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
# LiteLLM specific params, | |
custom_llm_provider: Optional[str] = None, | |
**kwargs, | |
) -> Union[ResponsesAPIResponse, BaseResponsesAPIStreamingIterator]: | |
""" | |
Async: Handles responses API requests by reusing the synchronous function | |
""" | |
local_vars = locals() | |
try: | |
loop = asyncio.get_event_loop() | |
kwargs["aresponses"] = True | |
# get custom llm provider so we can use this for mapping exceptions | |
if custom_llm_provider is None: | |
_, custom_llm_provider, _, _ = litellm.get_llm_provider( | |
model=model, api_base=local_vars.get("base_url", None) | |
) | |
func = partial( | |
responses, | |
input=input, | |
model=model, | |
include=include, | |
instructions=instructions, | |
max_output_tokens=max_output_tokens, | |
metadata=metadata, | |
parallel_tool_calls=parallel_tool_calls, | |
previous_response_id=previous_response_id, | |
reasoning=reasoning, | |
store=store, | |
stream=stream, | |
temperature=temperature, | |
text=text, | |
tool_choice=tool_choice, | |
tools=tools, | |
top_p=top_p, | |
truncation=truncation, | |
user=user, | |
extra_headers=extra_headers, | |
extra_query=extra_query, | |
extra_body=extra_body, | |
timeout=timeout, | |
custom_llm_provider=custom_llm_provider, | |
**kwargs, | |
) | |
ctx = contextvars.copy_context() | |
func_with_context = partial(ctx.run, func) | |
init_response = await loop.run_in_executor(None, func_with_context) | |
if asyncio.iscoroutine(init_response): | |
response = await init_response | |
else: | |
response = init_response | |
# Update the responses_api_response_id with the model_id | |
if isinstance(response, ResponsesAPIResponse): | |
response = ResponsesAPIRequestUtils._update_responses_api_response_id_with_model_id( | |
responses_api_response=response, | |
litellm_metadata=kwargs.get("litellm_metadata", {}), | |
custom_llm_provider=custom_llm_provider, | |
) | |
return response | |
except Exception as e: | |
raise litellm.exception_type( | |
model=model, | |
custom_llm_provider=custom_llm_provider, | |
original_exception=e, | |
completion_kwargs=local_vars, | |
extra_kwargs=kwargs, | |
) | |
def responses( | |
input: Union[str, ResponseInputParam], | |
model: str, | |
include: Optional[List[ResponseIncludable]] = None, | |
instructions: Optional[str] = None, | |
max_output_tokens: Optional[int] = None, | |
metadata: Optional[Dict[str, Any]] = None, | |
parallel_tool_calls: Optional[bool] = None, | |
previous_response_id: Optional[str] = None, | |
reasoning: Optional[Reasoning] = None, | |
store: Optional[bool] = None, | |
stream: Optional[bool] = None, | |
temperature: Optional[float] = None, | |
text: Optional[ResponseTextConfigParam] = None, | |
tool_choice: Optional[ToolChoice] = None, | |
tools: Optional[Iterable[ToolParam]] = None, | |
top_p: Optional[float] = None, | |
truncation: Optional[Literal["auto", "disabled"]] = None, | |
user: Optional[str] = None, | |
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. | |
# The extra values given here take precedence over values defined on the client or passed to this method. | |
extra_headers: Optional[Dict[str, Any]] = None, | |
extra_query: Optional[Dict[str, Any]] = None, | |
extra_body: Optional[Dict[str, Any]] = None, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
# LiteLLM specific params, | |
custom_llm_provider: Optional[str] = None, | |
**kwargs, | |
): | |
""" | |
Synchronous version of the Responses API. | |
Uses the synchronous HTTP handler to make requests. | |
""" | |
local_vars = locals() | |
try: | |
litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj") # type: ignore | |
litellm_call_id: Optional[str] = kwargs.get("litellm_call_id", None) | |
_is_async = kwargs.pop("aresponses", False) is True | |
# get llm provider logic | |
litellm_params = GenericLiteLLMParams(**kwargs) | |
( | |
model, | |
custom_llm_provider, | |
dynamic_api_key, | |
dynamic_api_base, | |
) = litellm.get_llm_provider( | |
model=model, | |
custom_llm_provider=custom_llm_provider, | |
api_base=litellm_params.api_base, | |
api_key=litellm_params.api_key, | |
) | |
# get provider config | |
responses_api_provider_config: Optional[BaseResponsesAPIConfig] = ( | |
ProviderConfigManager.get_provider_responses_api_config( | |
model=model, | |
provider=litellm.LlmProviders(custom_llm_provider), | |
) | |
) | |
local_vars.update(kwargs) | |
# Get ResponsesAPIOptionalRequestParams with only valid parameters | |
response_api_optional_params: ResponsesAPIOptionalRequestParams = ( | |
ResponsesAPIRequestUtils.get_requested_response_api_optional_param( | |
local_vars | |
) | |
) | |
if responses_api_provider_config is None: | |
return litellm_completion_transformation_handler.response_api_handler( | |
model=model, | |
input=input, | |
responses_api_request=response_api_optional_params, | |
custom_llm_provider=custom_llm_provider, | |
_is_async=_is_async, | |
stream=stream, | |
**kwargs, | |
) | |
# Get optional parameters for the responses API | |
responses_api_request_params: Dict = ( | |
ResponsesAPIRequestUtils.get_optional_params_responses_api( | |
model=model, | |
responses_api_provider_config=responses_api_provider_config, | |
response_api_optional_params=response_api_optional_params, | |
) | |
) | |
# Pre Call logging | |
litellm_logging_obj.update_environment_variables( | |
model=model, | |
user=user, | |
optional_params=dict(responses_api_request_params), | |
litellm_params={ | |
"litellm_call_id": litellm_call_id, | |
**responses_api_request_params, | |
}, | |
custom_llm_provider=custom_llm_provider, | |
) | |
# Call the handler with _is_async flag instead of directly calling the async handler | |
response = base_llm_http_handler.response_api_handler( | |
model=model, | |
input=input, | |
responses_api_provider_config=responses_api_provider_config, | |
response_api_optional_request_params=responses_api_request_params, | |
custom_llm_provider=custom_llm_provider, | |
litellm_params=litellm_params, | |
logging_obj=litellm_logging_obj, | |
extra_headers=extra_headers, | |
extra_body=extra_body, | |
timeout=timeout or request_timeout, | |
_is_async=_is_async, | |
client=kwargs.get("client"), | |
fake_stream=responses_api_provider_config.should_fake_stream( | |
model=model, stream=stream, custom_llm_provider=custom_llm_provider | |
), | |
litellm_metadata=kwargs.get("litellm_metadata", {}), | |
) | |
# Update the responses_api_response_id with the model_id | |
if isinstance(response, ResponsesAPIResponse): | |
response = ResponsesAPIRequestUtils._update_responses_api_response_id_with_model_id( | |
responses_api_response=response, | |
litellm_metadata=kwargs.get("litellm_metadata", {}), | |
custom_llm_provider=custom_llm_provider, | |
) | |
return response | |
except Exception as e: | |
raise litellm.exception_type( | |
model=model, | |
custom_llm_provider=custom_llm_provider, | |
original_exception=e, | |
completion_kwargs=local_vars, | |
extra_kwargs=kwargs, | |
) | |
async def adelete_responses( | |
response_id: str, | |
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. | |
# The extra values given here take precedence over values defined on the client or passed to this method. | |
extra_headers: Optional[Dict[str, Any]] = None, | |
extra_query: Optional[Dict[str, Any]] = None, | |
extra_body: Optional[Dict[str, Any]] = None, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
# LiteLLM specific params, | |
custom_llm_provider: Optional[str] = None, | |
**kwargs, | |
) -> DeleteResponseResult: | |
""" | |
Async version of the DELETE Responses API | |
DELETE /v1/responses/{response_id} endpoint in the responses API | |
""" | |
local_vars = locals() | |
try: | |
loop = asyncio.get_event_loop() | |
kwargs["adelete_responses"] = True | |
# get custom llm provider from response_id | |
decoded_response_id: DecodedResponseId = ( | |
ResponsesAPIRequestUtils._decode_responses_api_response_id( | |
response_id=response_id, | |
) | |
) | |
response_id = decoded_response_id.get("response_id") or response_id | |
custom_llm_provider = ( | |
decoded_response_id.get("custom_llm_provider") or custom_llm_provider | |
) | |
func = partial( | |
delete_responses, | |
response_id=response_id, | |
custom_llm_provider=custom_llm_provider, | |
extra_headers=extra_headers, | |
extra_query=extra_query, | |
extra_body=extra_body, | |
timeout=timeout, | |
**kwargs, | |
) | |
ctx = contextvars.copy_context() | |
func_with_context = partial(ctx.run, func) | |
init_response = await loop.run_in_executor(None, func_with_context) | |
if asyncio.iscoroutine(init_response): | |
response = await init_response | |
else: | |
response = init_response | |
return response | |
except Exception as e: | |
raise litellm.exception_type( | |
model=None, | |
custom_llm_provider=custom_llm_provider, | |
original_exception=e, | |
completion_kwargs=local_vars, | |
extra_kwargs=kwargs, | |
) | |
def delete_responses( | |
response_id: str, | |
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. | |
# The extra values given here take precedence over values defined on the client or passed to this method. | |
extra_headers: Optional[Dict[str, Any]] = None, | |
extra_query: Optional[Dict[str, Any]] = None, | |
extra_body: Optional[Dict[str, Any]] = None, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
# LiteLLM specific params, | |
custom_llm_provider: Optional[str] = None, | |
**kwargs, | |
) -> Union[DeleteResponseResult, Coroutine[Any, Any, DeleteResponseResult]]: | |
""" | |
Synchronous version of the DELETE Responses API | |
DELETE /v1/responses/{response_id} endpoint in the responses API | |
""" | |
local_vars = locals() | |
try: | |
litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj") # type: ignore | |
litellm_call_id: Optional[str] = kwargs.get("litellm_call_id", None) | |
_is_async = kwargs.pop("adelete_responses", False) is True | |
# get llm provider logic | |
litellm_params = GenericLiteLLMParams(**kwargs) | |
# get custom llm provider from response_id | |
decoded_response_id: DecodedResponseId = ( | |
ResponsesAPIRequestUtils._decode_responses_api_response_id( | |
response_id=response_id, | |
) | |
) | |
response_id = decoded_response_id.get("response_id") or response_id | |
custom_llm_provider = ( | |
decoded_response_id.get("custom_llm_provider") or custom_llm_provider | |
) | |
if custom_llm_provider is None: | |
raise ValueError("custom_llm_provider is required but passed as None") | |
# get provider config | |
responses_api_provider_config: Optional[BaseResponsesAPIConfig] = ( | |
ProviderConfigManager.get_provider_responses_api_config( | |
model=None, | |
provider=litellm.LlmProviders(custom_llm_provider), | |
) | |
) | |
if responses_api_provider_config is None: | |
raise ValueError( | |
f"DELETE responses is not supported for {custom_llm_provider}" | |
) | |
local_vars.update(kwargs) | |
# Pre Call logging | |
litellm_logging_obj.update_environment_variables( | |
model=None, | |
optional_params={ | |
"response_id": response_id, | |
}, | |
litellm_params={ | |
"litellm_call_id": litellm_call_id, | |
}, | |
custom_llm_provider=custom_llm_provider, | |
) | |
# Call the handler with _is_async flag instead of directly calling the async handler | |
response = base_llm_http_handler.delete_response_api_handler( | |
response_id=response_id, | |
custom_llm_provider=custom_llm_provider, | |
responses_api_provider_config=responses_api_provider_config, | |
litellm_params=litellm_params, | |
logging_obj=litellm_logging_obj, | |
extra_headers=extra_headers, | |
extra_body=extra_body, | |
timeout=timeout or request_timeout, | |
_is_async=_is_async, | |
client=kwargs.get("client"), | |
) | |
return response | |
except Exception as e: | |
raise litellm.exception_type( | |
model=None, | |
custom_llm_provider=custom_llm_provider, | |
original_exception=e, | |
completion_kwargs=local_vars, | |
extra_kwargs=kwargs, | |
) | |
async def aget_responses( | |
response_id: str, | |
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. | |
# The extra values given here take precedence over values defined on the client or passed to this method. | |
extra_headers: Optional[Dict[str, Any]] = None, | |
extra_query: Optional[Dict[str, Any]] = None, | |
extra_body: Optional[Dict[str, Any]] = None, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
# LiteLLM specific params, | |
custom_llm_provider: Optional[str] = None, | |
**kwargs, | |
) -> ResponsesAPIResponse: | |
""" | |
Async: Fetch a response by its ID. | |
GET /v1/responses/{response_id} endpoint in the responses API | |
Args: | |
response_id: The ID of the response to fetch. | |
custom_llm_provider: Optional provider name. If not specified, will be decoded from response_id. | |
Returns: | |
The response object with complete information about the stored response. | |
""" | |
local_vars = locals() | |
try: | |
loop = asyncio.get_event_loop() | |
kwargs["aget_responses"] = True | |
# get custom llm provider from response_id | |
decoded_response_id: DecodedResponseId = ( | |
ResponsesAPIRequestUtils._decode_responses_api_response_id( | |
response_id=response_id, | |
) | |
) | |
response_id = decoded_response_id.get("response_id") or response_id | |
custom_llm_provider = ( | |
decoded_response_id.get("custom_llm_provider") or custom_llm_provider | |
) | |
func = partial( | |
get_responses, | |
response_id=response_id, | |
custom_llm_provider=custom_llm_provider, | |
extra_headers=extra_headers, | |
extra_query=extra_query, | |
extra_body=extra_body, | |
timeout=timeout, | |
**kwargs, | |
) | |
ctx = contextvars.copy_context() | |
func_with_context = partial(ctx.run, func) | |
init_response = await loop.run_in_executor(None, func_with_context) | |
if asyncio.iscoroutine(init_response): | |
response = await init_response | |
else: | |
response = init_response | |
# Update the responses_api_response_id with the model_id | |
if isinstance(response, ResponsesAPIResponse): | |
response = ResponsesAPIRequestUtils._update_responses_api_response_id_with_model_id( | |
responses_api_response=response, | |
litellm_metadata=kwargs.get("litellm_metadata", {}), | |
custom_llm_provider=custom_llm_provider, | |
) | |
return response | |
except Exception as e: | |
raise litellm.exception_type( | |
model=None, | |
custom_llm_provider=custom_llm_provider, | |
original_exception=e, | |
completion_kwargs=local_vars, | |
extra_kwargs=kwargs, | |
) | |
def get_responses( | |
response_id: str, | |
# Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. | |
# The extra values given here take precedence over values defined on the client or passed to this method. | |
extra_headers: Optional[Dict[str, Any]] = None, | |
extra_query: Optional[Dict[str, Any]] = None, | |
extra_body: Optional[Dict[str, Any]] = None, | |
timeout: Optional[Union[float, httpx.Timeout]] = None, | |
# LiteLLM specific params, | |
custom_llm_provider: Optional[str] = None, | |
**kwargs, | |
) -> Union[ResponsesAPIResponse, Coroutine[Any, Any, ResponsesAPIResponse]]: | |
""" | |
Fetch a response by its ID. | |
GET /v1/responses/{response_id} endpoint in the responses API | |
Args: | |
response_id: The ID of the response to fetch. | |
custom_llm_provider: Optional provider name. If not specified, will be decoded from response_id. | |
Returns: | |
The response object with complete information about the stored response. | |
""" | |
local_vars = locals() | |
try: | |
litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj") # type: ignore | |
litellm_call_id: Optional[str] = kwargs.get("litellm_call_id", None) | |
_is_async = kwargs.pop("aget_responses", False) is True | |
# get llm provider logic | |
litellm_params = GenericLiteLLMParams(**kwargs) | |
# get custom llm provider from response_id | |
decoded_response_id: DecodedResponseId = ( | |
ResponsesAPIRequestUtils._decode_responses_api_response_id( | |
response_id=response_id, | |
) | |
) | |
response_id = decoded_response_id.get("response_id") or response_id | |
custom_llm_provider = ( | |
decoded_response_id.get("custom_llm_provider") or custom_llm_provider | |
) | |
if custom_llm_provider is None: | |
raise ValueError("custom_llm_provider is required but passed as None") | |
# get provider config | |
responses_api_provider_config: Optional[BaseResponsesAPIConfig] = ( | |
ProviderConfigManager.get_provider_responses_api_config( | |
model=None, | |
provider=litellm.LlmProviders(custom_llm_provider), | |
) | |
) | |
if responses_api_provider_config is None: | |
raise ValueError( | |
f"GET responses is not supported for {custom_llm_provider}" | |
) | |
local_vars.update(kwargs) | |
# Pre Call logging | |
litellm_logging_obj.update_environment_variables( | |
model=None, | |
optional_params={ | |
"response_id": response_id, | |
}, | |
litellm_params={ | |
"litellm_call_id": litellm_call_id, | |
}, | |
custom_llm_provider=custom_llm_provider, | |
) | |
# Call the handler with _is_async flag instead of directly calling the async handler | |
response = base_llm_http_handler.get_responses( | |
response_id=response_id, | |
custom_llm_provider=custom_llm_provider, | |
responses_api_provider_config=responses_api_provider_config, | |
litellm_params=litellm_params, | |
logging_obj=litellm_logging_obj, | |
extra_headers=extra_headers, | |
extra_body=extra_body, | |
timeout=timeout or request_timeout, | |
_is_async=_is_async, | |
client=kwargs.get("client"), | |
) | |
# Update the responses_api_response_id with the model_id | |
if isinstance(response, ResponsesAPIResponse): | |
response = ResponsesAPIRequestUtils._update_responses_api_response_id_with_model_id( | |
responses_api_response=response, | |
litellm_metadata=kwargs.get("litellm_metadata", {}), | |
custom_llm_provider=custom_llm_provider, | |
) | |
return response | |
except Exception as e: | |
raise litellm.exception_type( | |
model=None, | |
custom_llm_provider=custom_llm_provider, | |
original_exception=e, | |
completion_kwargs=local_vars, | |
extra_kwargs=kwargs, | |
) |