import type { MessageUpdate, TextStreamUpdate } from "$lib/types/MessageUpdate"; type MessageUpdateRequestOptions = { base: string; inputs?: string; messageId?: string; isRetry: boolean; isContinue: boolean; webSearch: boolean; files?: string[]; }; export async function fetchMessageUpdates( conversationId: string, opts: MessageUpdateRequestOptions, abortSignal: AbortSignal ): Promise> { const abortController = new AbortController(); abortSignal.addEventListener("abort", () => abortController.abort()); const response = await fetch(`${opts.base}/conversation/${conversationId}`, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ inputs: opts.inputs, id: opts.messageId, is_retry: opts.isRetry, is_continue: opts.isContinue, web_search: opts.webSearch, files: opts.files, }), signal: abortController.signal, }); if (!response.ok) { const errorMessage = await response .json() .then((obj) => obj.message) .catch(() => `Request failed with status code ${response.status}: ${response.statusText}`); throw Error(errorMessage); } if (!response.body) { throw Error("Body not defined"); } return smoothAsyncIterator( streamMessageUpdatesToFullWords(endpointStreamToIterator(response, abortController)) ); } async function* endpointStreamToIterator( response: Response, abortController: AbortController ): AsyncGenerator { const reader = response.body?.pipeThrough(new TextDecoderStream()).getReader(); if (!reader) throw Error("Response for endpoint had no body"); // Handle any cases where we must abort reader.closed.then(() => abortController.abort()); // Handle logic for aborting abortController.signal.addEventListener("abort", () => reader.cancel()); // ex) If the last response is => {"type": "stream", "token": // It should be => {"type": "stream", "token": "Hello"} = prev_input_chunk + "Hello"} let prevChunk = ""; while (!abortController.signal.aborted) { const { done, value } = await reader.read(); if (done) { abortController.abort(); break; } if (!value) continue; const { messageUpdates, remainingText } = parseMessageUpdates(prevChunk + value); prevChunk = remainingText; for (const messageUpdate of messageUpdates) yield messageUpdate; } } function parseMessageUpdates(value: string): { messageUpdates: MessageUpdate[]; remainingText: string; } { const inputs = value.split("\n"); const messageUpdates: MessageUpdate[] = []; for (const input of inputs) { try { messageUpdates.push(JSON.parse(input) as MessageUpdate); } catch (error) { // in case of parsing error, we return what we were able to parse if (error instanceof SyntaxError) { return { messageUpdates, remainingText: inputs.at(-1) ?? "", }; } } } return { messageUpdates, remainingText: "" }; } /** * Emits all the message updates immediately that aren't "stream" type * Emits a concatenated "stream" type message update once it detects a full word * Example: "what" " don" "'t" => "what" " don't" * Only supports latin languages, ignores others */ async function* streamMessageUpdatesToFullWords( iterator: AsyncGenerator ): AsyncGenerator { let bufferedStreamUpdates: TextStreamUpdate[] = []; const endAlphanumeric = /[a-zA-Z0-9À-ž'`]+$/; const beginnningAlphanumeric = /^[a-zA-Z0-9À-ž'`]+/; for await (const messageUpdate of iterator) { if (messageUpdate.type !== "stream") { yield messageUpdate; continue; } bufferedStreamUpdates.push(messageUpdate); let lastIndexEmitted = 0; for (let i = 1; i < bufferedStreamUpdates.length; i++) { const prevEndsAlphanumeric = endAlphanumeric.test(bufferedStreamUpdates[i - 1].token); const currBeginsAlphanumeric = beginnningAlphanumeric.test(bufferedStreamUpdates[i].token); const shouldCombine = prevEndsAlphanumeric && currBeginsAlphanumeric; const combinedTooMany = i - lastIndexEmitted >= 5; if (shouldCombine && !combinedTooMany) continue; // Combine tokens together and emit yield { type: "stream", token: bufferedStreamUpdates .slice(lastIndexEmitted, i) .map((_) => _.token) .join(""), }; lastIndexEmitted = i; } bufferedStreamUpdates = bufferedStreamUpdates.slice(lastIndexEmitted); } for (const messageUpdate of bufferedStreamUpdates) yield messageUpdate; } /** * Attempts to smooth out the time between values emitted by an async iterator * by waiting for the average time between values to emit the next value */ async function* smoothAsyncIterator(iterator: AsyncGenerator): AsyncGenerator { const eventTarget = new EventTarget(); let done = false; const valuesBuffer: T[] = []; const valueTimesMS: number[] = []; const next = async () => { const obj = await iterator.next(); if (obj.done) { done = true; } else { valuesBuffer.push(obj.value); valueTimesMS.push(performance.now()); next(); } eventTarget.dispatchEvent(new Event("next")); }; next(); let timeOfLastEmitMS = performance.now(); while (!done || valuesBuffer.length > 0) { // Only consider the last X times between tokens const sampledTimesMS = valueTimesMS.slice(-30); // Get the total time spent in abnormal periods const anomalyThresholdMS = 2000; const anomalyDurationMS = sampledTimesMS .map((time, i, times) => time - times[i - 1]) .slice(1) .filter((time) => time > anomalyThresholdMS) .reduce((a, b) => a + b, 0); // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const totalTimeMSBetweenValues = sampledTimesMS.at(-1)! - sampledTimesMS[0]; const timeMSBetweenValues = totalTimeMSBetweenValues - anomalyDurationMS; const averageTimeMSBetweenValues = Math.min( 200, timeMSBetweenValues / (sampledTimesMS.length - 1) ); const timeSinceLastEmitMS = performance.now() - timeOfLastEmitMS; // Emit after waiting duration or cancel if "next" event is emitted const gotNext = await Promise.race([ sleep(Math.max(5, averageTimeMSBetweenValues - timeSinceLastEmitMS)), waitForEvent(eventTarget, "next"), ]); // Go to next iteration so we can re-calculate when to emit if (gotNext) continue; // Nothing in buffer to emit if (valuesBuffer.length === 0) continue; // Emit timeOfLastEmitMS = performance.now(); // eslint-disable-next-line @typescript-eslint/no-non-null-assertion yield valuesBuffer.shift()!; } } const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); const waitForEvent = (eventTarget: EventTarget, eventName: string) => new Promise((resolve) => eventTarget.addEventListener(eventName, () => resolve(true), { once: true }) );