|
import type { MessageUpdate, TextStreamUpdate } from "$lib/types/MessageUpdate"; |
|
|
|
type MessageUpdateRequestOptions = { |
|
base: string; |
|
inputs?: string; |
|
messageId?: string; |
|
isRetry: boolean; |
|
isContinue: boolean; |
|
webSearch: boolean; |
|
files?: string[]; |
|
}; |
|
export async function fetchMessageUpdates( |
|
conversationId: string, |
|
opts: MessageUpdateRequestOptions, |
|
abortSignal: AbortSignal |
|
): Promise<AsyncGenerator<MessageUpdate>> { |
|
const abortController = new AbortController(); |
|
abortSignal.addEventListener("abort", () => abortController.abort()); |
|
|
|
const response = await fetch(`${opts.base}/conversation/${conversationId}`, { |
|
method: "POST", |
|
headers: { "Content-Type": "application/json" }, |
|
body: JSON.stringify({ |
|
inputs: opts.inputs, |
|
id: opts.messageId, |
|
is_retry: opts.isRetry, |
|
is_continue: opts.isContinue, |
|
web_search: opts.webSearch, |
|
files: opts.files, |
|
}), |
|
signal: abortController.signal, |
|
}); |
|
|
|
if (!response.ok) { |
|
const errorMessage = await response |
|
.json() |
|
.then((obj) => obj.message) |
|
.catch(() => `Request failed with status code ${response.status}: ${response.statusText}`); |
|
throw Error(errorMessage); |
|
} |
|
if (!response.body) { |
|
throw Error("Body not defined"); |
|
} |
|
return smoothAsyncIterator( |
|
streamMessageUpdatesToFullWords(endpointStreamToIterator(response, abortController)) |
|
); |
|
} |
|
|
|
async function* endpointStreamToIterator( |
|
response: Response, |
|
abortController: AbortController |
|
): AsyncGenerator<MessageUpdate> { |
|
const reader = response.body?.pipeThrough(new TextDecoderStream()).getReader(); |
|
if (!reader) throw Error("Response for endpoint had no body"); |
|
|
|
|
|
reader.closed.then(() => abortController.abort()); |
|
|
|
|
|
abortController.signal.addEventListener("abort", () => reader.cancel()); |
|
|
|
|
|
|
|
let prevChunk = ""; |
|
while (!abortController.signal.aborted) { |
|
const { done, value } = await reader.read(); |
|
if (done) { |
|
abortController.abort(); |
|
break; |
|
} |
|
if (!value) continue; |
|
|
|
const { messageUpdates, remainingText } = parseMessageUpdates(prevChunk + value); |
|
prevChunk = remainingText; |
|
for (const messageUpdate of messageUpdates) yield messageUpdate; |
|
} |
|
} |
|
|
|
function parseMessageUpdates(value: string): { |
|
messageUpdates: MessageUpdate[]; |
|
remainingText: string; |
|
} { |
|
const inputs = value.split("\n"); |
|
const messageUpdates: MessageUpdate[] = []; |
|
for (const input of inputs) { |
|
try { |
|
messageUpdates.push(JSON.parse(input) as MessageUpdate); |
|
} catch (error) { |
|
|
|
if (error instanceof SyntaxError) { |
|
return { |
|
messageUpdates, |
|
remainingText: inputs.at(-1) ?? "", |
|
}; |
|
} |
|
} |
|
} |
|
return { messageUpdates, remainingText: "" }; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async function* streamMessageUpdatesToFullWords( |
|
iterator: AsyncGenerator<MessageUpdate> |
|
): AsyncGenerator<MessageUpdate> { |
|
let bufferedStreamUpdates: TextStreamUpdate[] = []; |
|
|
|
const endAlphanumeric = /[a-zA-Z0-9À-ž'`]+$/; |
|
const beginnningAlphanumeric = /^[a-zA-Z0-9À-ž'`]+/; |
|
|
|
for await (const messageUpdate of iterator) { |
|
if (messageUpdate.type !== "stream") { |
|
yield messageUpdate; |
|
continue; |
|
} |
|
bufferedStreamUpdates.push(messageUpdate); |
|
|
|
let lastIndexEmitted = 0; |
|
for (let i = 1; i < bufferedStreamUpdates.length; i++) { |
|
const prevEndsAlphanumeric = endAlphanumeric.test(bufferedStreamUpdates[i - 1].token); |
|
const currBeginsAlphanumeric = beginnningAlphanumeric.test(bufferedStreamUpdates[i].token); |
|
const shouldCombine = prevEndsAlphanumeric && currBeginsAlphanumeric; |
|
const combinedTooMany = i - lastIndexEmitted >= 5; |
|
if (shouldCombine && !combinedTooMany) continue; |
|
|
|
|
|
yield { |
|
type: "stream", |
|
token: bufferedStreamUpdates |
|
.slice(lastIndexEmitted, i) |
|
.map((_) => _.token) |
|
.join(""), |
|
}; |
|
lastIndexEmitted = i; |
|
} |
|
bufferedStreamUpdates = bufferedStreamUpdates.slice(lastIndexEmitted); |
|
} |
|
for (const messageUpdate of bufferedStreamUpdates) yield messageUpdate; |
|
} |
|
|
|
|
|
|
|
|
|
|
|
async function* smoothAsyncIterator<T>(iterator: AsyncGenerator<T>): AsyncGenerator<T> { |
|
const eventTarget = new EventTarget(); |
|
let done = false; |
|
const valuesBuffer: T[] = []; |
|
const valueTimesMS: number[] = []; |
|
|
|
const next = async () => { |
|
const obj = await iterator.next(); |
|
if (obj.done) { |
|
done = true; |
|
} else { |
|
valuesBuffer.push(obj.value); |
|
valueTimesMS.push(performance.now()); |
|
next(); |
|
} |
|
eventTarget.dispatchEvent(new Event("next")); |
|
}; |
|
next(); |
|
|
|
let timeOfLastEmitMS = performance.now(); |
|
while (!done || valuesBuffer.length > 0) { |
|
|
|
const sampledTimesMS = valueTimesMS.slice(-30); |
|
|
|
|
|
const anomalyThresholdMS = 2000; |
|
const anomalyDurationMS = sampledTimesMS |
|
.map((time, i, times) => time - times[i - 1]) |
|
.slice(1) |
|
.filter((time) => time > anomalyThresholdMS) |
|
.reduce((a, b) => a + b, 0); |
|
|
|
|
|
const totalTimeMSBetweenValues = sampledTimesMS.at(-1)! - sampledTimesMS[0]; |
|
const timeMSBetweenValues = totalTimeMSBetweenValues - anomalyDurationMS; |
|
|
|
const averageTimeMSBetweenValues = Math.min( |
|
200, |
|
timeMSBetweenValues / (sampledTimesMS.length - 1) |
|
); |
|
const timeSinceLastEmitMS = performance.now() - timeOfLastEmitMS; |
|
|
|
|
|
const gotNext = await Promise.race([ |
|
sleep(Math.max(5, averageTimeMSBetweenValues - timeSinceLastEmitMS)), |
|
waitForEvent(eventTarget, "next"), |
|
]); |
|
|
|
|
|
if (gotNext) continue; |
|
|
|
|
|
if (valuesBuffer.length === 0) continue; |
|
|
|
|
|
timeOfLastEmitMS = performance.now(); |
|
|
|
yield valuesBuffer.shift()!; |
|
} |
|
} |
|
|
|
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); |
|
const waitForEvent = (eventTarget: EventTarget, eventName: string) => |
|
new Promise<boolean>((resolve) => |
|
eventTarget.addEventListener(eventName, () => resolve(true), { once: true }) |
|
); |
|
|