Spaces:
Sleeping
Sleeping
File size: 6,040 Bytes
469eae6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
from typing import List, Optional, Union
import litellm
from litellm.main import stream_chunk_builder
from litellm.responses.litellm_completion_transformation.transformation import (
LiteLLMCompletionResponsesConfig,
)
from litellm.responses.streaming_iterator import ResponsesAPIStreamingIterator
from litellm.types.llms.openai import (
OutputTextDeltaEvent,
ResponseCompletedEvent,
ResponseInputParam,
ResponsesAPIOptionalRequestParams,
ResponsesAPIStreamEvents,
ResponsesAPIStreamingResponse,
)
from litellm.types.utils import Delta as ChatCompletionDelta
from litellm.types.utils import (
ModelResponse,
ModelResponseStream,
StreamingChoices,
TextCompletionResponse,
)
class LiteLLMCompletionStreamingIterator(ResponsesAPIStreamingIterator):
"""
Async iterator for processing streaming responses from the Responses API.
"""
def __init__(
self,
litellm_custom_stream_wrapper: litellm.CustomStreamWrapper,
request_input: Union[str, ResponseInputParam],
responses_api_request: ResponsesAPIOptionalRequestParams,
):
self.litellm_custom_stream_wrapper: litellm.CustomStreamWrapper = (
litellm_custom_stream_wrapper
)
self.request_input: Union[str, ResponseInputParam] = request_input
self.responses_api_request: ResponsesAPIOptionalRequestParams = (
responses_api_request
)
self.collected_chat_completion_chunks: List[ModelResponseStream] = []
self.finished: bool = False
async def __anext__(
self,
) -> Union[ResponsesAPIStreamingResponse, ResponseCompletedEvent]:
try:
while True:
if self.finished is True:
raise StopAsyncIteration
# Get the next chunk from the stream
try:
chunk = await self.litellm_custom_stream_wrapper.__anext__()
self.collected_chat_completion_chunks.append(chunk)
response_api_chunk = (
self._transform_chat_completion_chunk_to_response_api_chunk(
chunk
)
)
if response_api_chunk:
return response_api_chunk
except StopAsyncIteration:
self.finished = True
response_completed_event = self._emit_response_completed_event()
if response_completed_event:
return response_completed_event
else:
raise StopAsyncIteration
except Exception as e:
# Handle HTTP errors
self.finished = True
raise e
def __iter__(self):
return self
def __next__(
self,
) -> Union[ResponsesAPIStreamingResponse, ResponseCompletedEvent]:
try:
while True:
if self.finished is True:
raise StopIteration
# Get the next chunk from the stream
try:
chunk = self.litellm_custom_stream_wrapper.__next__()
self.collected_chat_completion_chunks.append(chunk)
response_api_chunk = (
self._transform_chat_completion_chunk_to_response_api_chunk(
chunk
)
)
if response_api_chunk:
return response_api_chunk
except StopIteration:
self.finished = True
response_completed_event = self._emit_response_completed_event()
if response_completed_event:
return response_completed_event
else:
raise StopIteration
except Exception as e:
# Handle HTTP errors
self.finished = True
raise e
def _transform_chat_completion_chunk_to_response_api_chunk(
self, chunk: ModelResponseStream
) -> Optional[ResponsesAPIStreamingResponse]:
"""
Transform a chat completion chunk to a response API chunk.
This currently only handles emitting the OutputTextDeltaEvent, which is used by other tools using the responses API.
"""
return OutputTextDeltaEvent(
type=ResponsesAPIStreamEvents.OUTPUT_TEXT_DELTA,
item_id=chunk.id,
output_index=0,
content_index=0,
delta=self._get_delta_string_from_streaming_choices(chunk.choices),
)
def _get_delta_string_from_streaming_choices(
self, choices: List[StreamingChoices]
) -> str:
"""
Get the delta string from the streaming choices
For now this collected the first choice's delta string.
It's unclear how users expect litellm to translate multiple-choices-per-chunk to the responses API output.
"""
choice = choices[0]
chat_completion_delta: ChatCompletionDelta = choice.delta
return chat_completion_delta.content or ""
def _emit_response_completed_event(self) -> Optional[ResponseCompletedEvent]:
litellm_model_response: Optional[
Union[ModelResponse, TextCompletionResponse]
] = stream_chunk_builder(chunks=self.collected_chat_completion_chunks)
if litellm_model_response and isinstance(litellm_model_response, ModelResponse):
return ResponseCompletedEvent(
type=ResponsesAPIStreamEvents.RESPONSE_COMPLETED,
response=LiteLLMCompletionResponsesConfig.transform_chat_completion_response_to_responses_api_response(
request_input=self.request_input,
chat_completion_response=litellm_model_response,
responses_api_request=self.responses_api_request,
),
)
else:
return None
|