machineuser
Sync widgets demo
94753b6
raw
history blame
No virus
2.31 kB
import type { InferenceTask, Options, RequestArgs } from "../../types";
import { makeRequestOptions } from "../../lib/makeRequestOptions";
import type { EventSourceMessage } from "../../vendor/fetch-event-source/parse";
import { getLines, getMessages } from "../../vendor/fetch-event-source/parse";
/**
* Primitive to make custom inference calls that expect server-sent events, and returns the response through a generator
*/
export async function* streamingRequest<T>(
args: RequestArgs,
options?: Options & {
/** When a model can be used for multiple tasks, and we want to run a non-default task */
task?: string | InferenceTask;
/** To load default model if needed */
taskHint?: InferenceTask;
}
): AsyncGenerator<T> {
const { url, info } = await makeRequestOptions({ ...args, stream: true }, options);
const response = await (options?.fetch ?? fetch)(url, info);
if (options?.retry_on_error !== false && response.status === 503 && !options?.wait_for_model) {
return streamingRequest(args, {
...options,
wait_for_model: true,
});
}
if (!response.ok) {
if (response.headers.get("Content-Type")?.startsWith("application/json")) {
const output = await response.json();
if (output.error) {
throw new Error(output.error);
}
}
throw new Error(`Server response contains error: ${response.status}`);
}
if (!response.headers.get("content-type")?.startsWith("text/event-stream")) {
throw new Error(
`Server does not support event stream content type, it returned ` + response.headers.get("content-type")
);
}
if (!response.body) {
return;
}
const reader = response.body.getReader();
let events: EventSourceMessage[] = [];
const onEvent = (event: EventSourceMessage) => {
// accumulate events in array
events.push(event);
};
const onChunk = getLines(
getMessages(
() => {},
() => {},
onEvent
)
);
try {
while (true) {
const { done, value } = await reader.read();
if (done) return;
onChunk(value);
for (const event of events) {
if (event.data.length > 0) {
const data = JSON.parse(event.data);
if (typeof data === "object" && data !== null && "error" in data) {
throw new Error(data.error);
}
yield data as T;
}
}
events = [];
}
} finally {
reader.releaseLock();
}
}