Spaces:
Paused
Paused
| import { EventSourceParserStream } from 'eventsource-parser/stream'; | |
| import type { ParsedEvent } from 'eventsource-parser'; | |
| type TextStreamUpdate = { | |
| done: boolean; | |
| value: string; | |
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | |
| citations?: any; | |
| // eslint-disable-next-line @typescript-eslint/no-explicit-any | |
| error?: any; | |
| usage?: ResponseUsage; | |
| }; | |
| type ResponseUsage = { | |
| /** Including images and tools if any */ | |
| prompt_tokens: number; | |
| /** The tokens generated */ | |
| completion_tokens: number; | |
| /** Sum of the above two fields */ | |
| total_tokens: number; | |
| }; | |
| // createOpenAITextStream takes a responseBody with a SSE response, | |
| // and returns an async generator that emits delta updates with large deltas chunked into random sized chunks | |
| export async function createOpenAITextStream( | |
| responseBody: ReadableStream<Uint8Array>, | |
| splitLargeDeltas: boolean | |
| ): Promise<AsyncGenerator<TextStreamUpdate>> { | |
| const eventStream = responseBody | |
| .pipeThrough(new TextDecoderStream()) | |
| .pipeThrough(new EventSourceParserStream()) | |
| .getReader(); | |
| let iterator = openAIStreamToIterator(eventStream); | |
| if (splitLargeDeltas) { | |
| iterator = streamLargeDeltasAsRandomChunks(iterator); | |
| } | |
| return iterator; | |
| } | |
| async function* openAIStreamToIterator( | |
| reader: ReadableStreamDefaultReader<ParsedEvent> | |
| ): AsyncGenerator<TextStreamUpdate> { | |
| while (true) { | |
| const { value, done } = await reader.read(); | |
| if (done) { | |
| yield { done: true, value: '' }; | |
| break; | |
| } | |
| if (!value) { | |
| continue; | |
| } | |
| const data = value.data; | |
| if (data.startsWith('[DONE]')) { | |
| yield { done: true, value: '' }; | |
| break; | |
| } | |
| try { | |
| const parsedData = JSON.parse(data); | |
| console.log(parsedData); | |
| if (parsedData.error) { | |
| yield { done: true, value: '', error: parsedData.error }; | |
| break; | |
| } | |
| if (parsedData.citations) { | |
| yield { done: false, value: '', citations: parsedData.citations }; | |
| continue; | |
| } | |
| yield { | |
| done: false, | |
| value: parsedData.choices?.[0]?.delta?.content ?? '', | |
| usage: parsedData.usage | |
| }; | |
| } catch (e) { | |
| console.error('Error extracting delta from SSE event:', e); | |
| } | |
| } | |
| } | |
| // streamLargeDeltasAsRandomChunks will chunk large deltas (length > 5) into random sized chunks between 1-3 characters | |
| // This is to simulate a more fluid streaming, even though some providers may send large chunks of text at once | |
| async function* streamLargeDeltasAsRandomChunks( | |
| iterator: AsyncGenerator<TextStreamUpdate> | |
| ): AsyncGenerator<TextStreamUpdate> { | |
| for await (const textStreamUpdate of iterator) { | |
| if (textStreamUpdate.done) { | |
| yield textStreamUpdate; | |
| return; | |
| } | |
| if (textStreamUpdate.citations) { | |
| yield textStreamUpdate; | |
| continue; | |
| } | |
| let content = textStreamUpdate.value; | |
| if (content.length < 5) { | |
| yield { done: false, value: content }; | |
| continue; | |
| } | |
| while (content != '') { | |
| const chunkSize = Math.min(Math.floor(Math.random() * 3) + 1, content.length); | |
| const chunk = content.slice(0, chunkSize); | |
| yield { done: false, value: chunk }; | |
| // Do not sleep if the tab is hidden | |
| // Timers are throttled to 1s in hidden tabs | |
| if (document?.visibilityState !== 'hidden') { | |
| await sleep(5); | |
| } | |
| content = content.slice(chunkSize); | |
| } | |
| } | |
| } | |
| const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); | |