Spaces:
Running
Running
import { MESSAGES_BEFORE_LOGIN, ENABLE_ASSISTANTS_RAG } from "$env/static/private"; | |
import { startOfHour } from "date-fns"; | |
import { authCondition, requiresUser } from "$lib/server/auth"; | |
import { collections } from "$lib/server/database"; | |
import { models } from "$lib/server/models"; | |
import { ERROR_MESSAGES } from "$lib/stores/errors"; | |
import type { Message } from "$lib/types/Message"; | |
import { error } from "@sveltejs/kit"; | |
import { ObjectId } from "mongodb"; | |
import { z } from "zod"; | |
import type { MessageUpdate } from "$lib/types/MessageUpdate"; | |
import { runWebSearch } from "$lib/server/websearch/runWebSearch"; | |
import { abortedGenerations } from "$lib/server/abortedGenerations"; | |
import { summarize } from "$lib/server/summarize"; | |
import { uploadFile } from "$lib/server/files/uploadFile"; | |
import sizeof from "image-size"; | |
import type { Assistant } from "$lib/types/Assistant"; | |
import { convertLegacyConversation } from "$lib/utils/tree/convertLegacyConversation"; | |
import { isMessageId } from "$lib/utils/tree/isMessageId"; | |
import { buildSubtree } from "$lib/utils/tree/buildSubtree.js"; | |
import { addChildren } from "$lib/utils/tree/addChildren.js"; | |
import { addSibling } from "$lib/utils/tree/addSibling.js"; | |
import { preprocessMessages } from "$lib/server/preprocessMessages.js"; | |
import { usageLimits } from "$lib/server/usageLimits"; | |
import { isURLLocal } from "$lib/server/isURLLocal.js"; | |
export async function POST({ request, locals, params, getClientAddress }) { | |
const id = z.string().parse(params.id); | |
const convId = new ObjectId(id); | |
const promptedAt = new Date(); | |
const userId = locals.user?._id ?? locals.sessionId; | |
// check user | |
if (!userId) { | |
throw error(401, "Unauthorized"); | |
} | |
// check if the user has access to the conversation | |
const convBeforeCheck = await collections.conversations.findOne({ | |
_id: convId, | |
...authCondition(locals), | |
}); | |
if (convBeforeCheck && !convBeforeCheck.rootMessageId) { | |
const res = await collections.conversations.updateOne( | |
{ | |
_id: convId, | |
}, | |
{ | |
$set: { | |
...convBeforeCheck, | |
...convertLegacyConversation(convBeforeCheck), | |
}, | |
} | |
); | |
if (!res.acknowledged) { | |
throw error(500, "Failed to convert conversation"); | |
} | |
} | |
const conv = await collections.conversations.findOne({ | |
_id: convId, | |
...authCondition(locals), | |
}); | |
if (!conv) { | |
throw error(404, "Conversation not found"); | |
} | |
// register the event for ratelimiting | |
await collections.messageEvents.insertOne({ | |
userId, | |
createdAt: new Date(), | |
ip: getClientAddress(), | |
}); | |
const messagesBeforeLogin = MESSAGES_BEFORE_LOGIN ? parseInt(MESSAGES_BEFORE_LOGIN) : 0; | |
// guest mode check | |
if (!locals.user?._id && requiresUser && messagesBeforeLogin) { | |
const totalMessages = | |
( | |
await collections.conversations | |
.aggregate([ | |
{ $match: { ...authCondition(locals), "messages.from": "assistant" } }, | |
{ $project: { messages: 1 } }, | |
{ $limit: messagesBeforeLogin + 1 }, | |
{ $unwind: "$messages" }, | |
{ $match: { "messages.from": "assistant" } }, | |
{ $count: "messages" }, | |
]) | |
.toArray() | |
)[0]?.messages ?? 0; | |
if (totalMessages > messagesBeforeLogin) { | |
throw error(429, "Exceeded number of messages before login"); | |
} | |
} | |
if (usageLimits?.messagesPerMinute) { | |
// check if the user is rate limited | |
const nEvents = Math.max( | |
await collections.messageEvents.countDocuments({ userId }), | |
await collections.messageEvents.countDocuments({ ip: getClientAddress() }) | |
); | |
if (nEvents > usageLimits.messagesPerMinute) { | |
throw error(429, ERROR_MESSAGES.rateLimited); | |
} | |
} | |
if (usageLimits?.messages && conv.messages.length > usageLimits.messages) { | |
throw error( | |
429, | |
`This conversation has more than ${usageLimits.messages} messages. Start a new one to continue` | |
); | |
} | |
// fetch the model | |
const model = models.find((m) => m.id === conv.model); | |
if (!model) { | |
throw error(410, "Model not available anymore"); | |
} | |
// finally parse the content of the request | |
const json = await request.json(); | |
const { | |
inputs: newPrompt, | |
id: messageId, | |
is_retry: isRetry, | |
is_continue: isContinue, | |
web_search: webSearch, | |
files: b64files, | |
} = z | |
.object({ | |
id: z.string().uuid().refine(isMessageId).optional(), // parent message id to append to for a normal message, or the message id for a retry/continue | |
inputs: z.optional( | |
z | |
.string() | |
.trim() | |
.min(1) | |
.transform((s) => s.replace(/\r\n/g, "\n")) | |
), | |
is_retry: z.optional(z.boolean()), | |
is_continue: z.optional(z.boolean()), | |
web_search: z.optional(z.boolean()), | |
files: z.optional(z.array(z.string())), | |
}) | |
.parse(json); | |
if (usageLimits?.messageLength && (newPrompt?.length ?? 0) > usageLimits.messageLength) { | |
throw error(400, "Message too long."); | |
} | |
// files is an array of base64 strings encoding Blob objects | |
// we need to convert this array to an array of File objects | |
const files = b64files?.map((file) => { | |
const blob = Buffer.from(file, "base64"); | |
return new File([blob], "image.png"); | |
}); | |
// check sizes | |
if (files) { | |
const filechecks = await Promise.all( | |
files.map(async (file) => { | |
const dimensions = sizeof(Buffer.from(await file.arrayBuffer())); | |
return ( | |
file.size > 2 * 1024 * 1024 || | |
(dimensions.width ?? 0) > 224 || | |
(dimensions.height ?? 0) > 224 | |
); | |
}) | |
); | |
if (filechecks.some((check) => check)) { | |
throw error(413, "File too large, should be <2MB and 224x224 max."); | |
} | |
} | |
let hashes: undefined | string[]; | |
if (files) { | |
hashes = await Promise.all(files.map(async (file) => await uploadFile(file, conv))); | |
} | |
// we will append tokens to the content of this message | |
let messageToWriteToId: Message["id"] | undefined = undefined; | |
// used for building the prompt, subtree of the conversation that goes from the latest message to the root | |
let messagesForPrompt: Message[] = []; | |
if (isContinue && messageId) { | |
// if it's the last message and we continue then we build the prompt up to the last message | |
// we will strip the end tokens afterwards when the prompt is built | |
if ((conv.messages.find((msg) => msg.id === messageId)?.children?.length ?? 0) > 0) { | |
throw error(400, "Can only continue the last message"); | |
} | |
messageToWriteToId = messageId; | |
messagesForPrompt = buildSubtree(conv, messageId); | |
} else if (isRetry && messageId) { | |
// two cases, if we're retrying a user message with a newPrompt set, | |
// it means we're editing a user message | |
// if we're retrying on an assistant message, newPrompt cannot be set | |
// it means we're retrying the last assistant message for a new answer | |
const messageToRetry = conv.messages.find((message) => message.id === messageId); | |
if (!messageToRetry) { | |
throw error(404, "Message not found"); | |
} | |
if (messageToRetry.from === "user" && newPrompt) { | |
// add a sibling to this message from the user, with the alternative prompt | |
// add a children to that sibling, where we can write to | |
const newUserMessageId = addSibling( | |
conv, | |
{ from: "user", content: newPrompt, createdAt: new Date(), updatedAt: new Date() }, | |
messageId | |
); | |
messageToWriteToId = addChildren( | |
conv, | |
{ | |
from: "assistant", | |
content: "", | |
files: hashes, | |
createdAt: new Date(), | |
updatedAt: new Date(), | |
}, | |
newUserMessageId | |
); | |
messagesForPrompt = buildSubtree(conv, newUserMessageId); | |
} else if (messageToRetry.from === "assistant") { | |
// we're retrying an assistant message, to generate a new answer | |
// just add a sibling to the assistant answer where we can write to | |
messageToWriteToId = addSibling( | |
conv, | |
{ from: "assistant", content: "", createdAt: new Date(), updatedAt: new Date() }, | |
messageId | |
); | |
messagesForPrompt = buildSubtree(conv, messageId); | |
messagesForPrompt.pop(); // don't need the latest assistant message in the prompt since we're retrying it | |
} | |
} else { | |
// just a normal linear conversation, so we add the user message | |
// and the blank assistant message back to back | |
const newUserMessageId = addChildren( | |
conv, | |
{ | |
from: "user", | |
content: newPrompt ?? "", | |
files: hashes, | |
createdAt: new Date(), | |
updatedAt: new Date(), | |
}, | |
messageId | |
); | |
messageToWriteToId = addChildren( | |
conv, | |
{ | |
from: "assistant", | |
content: "", | |
createdAt: new Date(), | |
updatedAt: new Date(), | |
}, | |
newUserMessageId | |
); | |
// build the prompt from the user message | |
messagesForPrompt = buildSubtree(conv, newUserMessageId); | |
} | |
const messageToWriteTo = conv.messages.find((message) => message.id === messageToWriteToId); | |
if (!messageToWriteTo) { | |
throw error(500, "Failed to create message"); | |
} | |
if (messagesForPrompt.length === 0) { | |
throw error(500, "Failed to create prompt"); | |
} | |
// update the conversation with the new messages | |
await collections.conversations.updateOne( | |
{ | |
_id: convId, | |
}, | |
{ | |
$set: { | |
messages: conv.messages, | |
title: conv.title, | |
updatedAt: new Date(), | |
}, | |
} | |
); | |
let doneStreaming = false; | |
// we now build the stream | |
const stream = new ReadableStream({ | |
async start(controller) { | |
messageToWriteTo.updates ??= []; | |
function update(newUpdate: MessageUpdate) { | |
if (newUpdate.type !== "stream") { | |
messageToWriteTo?.updates?.push(newUpdate); | |
} | |
if (newUpdate.type === "stream" && newUpdate.token === "") { | |
return; | |
} | |
controller.enqueue(JSON.stringify(newUpdate) + "\n"); | |
if (newUpdate.type === "finalAnswer") { | |
// 4096 of spaces to make sure the browser doesn't blocking buffer that holding the response | |
controller.enqueue(" ".repeat(4096)); | |
} | |
} | |
update({ type: "status", status: "started" }); | |
const summarizeIfNeeded = (async () => { | |
if (conv.title === "New Chat" && conv.messages.length === 3) { | |
try { | |
conv.title = (await summarize(conv.messages[1].content)) ?? conv.title; | |
update({ type: "status", status: "title", message: conv.title }); | |
await collections.conversations.updateOne( | |
{ | |
_id: convId, | |
}, | |
{ | |
$set: { | |
title: conv?.title, | |
updatedAt: new Date(), | |
}, | |
} | |
); | |
} catch (e) { | |
console.error(e); | |
} | |
} | |
})(); | |
await collections.conversations.updateOne( | |
{ | |
_id: convId, | |
}, | |
{ | |
$set: { | |
title: conv.title, | |
updatedAt: new Date(), | |
}, | |
} | |
); | |
// check if assistant has a rag | |
const assistant = await collections.assistants.findOne< | |
Pick<Assistant, "rag" | "dynamicPrompt" | "generateSettings"> | |
>( | |
{ _id: conv.assistantId }, | |
{ projection: { rag: 1, dynamicPrompt: 1, generateSettings: 1 } } | |
); | |
const assistantHasDynamicPrompt = | |
ENABLE_ASSISTANTS_RAG === "true" && !!assistant && !!assistant?.dynamicPrompt; | |
const assistantHasWebSearch = | |
ENABLE_ASSISTANTS_RAG === "true" && | |
!!assistant && | |
!!assistant.rag && | |
(assistant.rag.allowedLinks.length > 0 || | |
assistant.rag.allowedDomains.length > 0 || | |
assistant.rag.allowAllDomains); | |
// perform websearch if needed | |
if (!isContinue && (webSearch || assistantHasWebSearch)) { | |
messageToWriteTo.webSearch = await runWebSearch( | |
conv, | |
messagesForPrompt, | |
update, | |
assistant?.rag | |
); | |
} | |
let preprompt = conv.preprompt; | |
if (assistantHasDynamicPrompt && preprompt) { | |
// process the preprompt | |
const urlRegex = /{{\s?url=(.*?)\s?}}/g; | |
let match; | |
while ((match = urlRegex.exec(preprompt)) !== null) { | |
try { | |
const url = new URL(match[1]); | |
if (await isURLLocal(url)) { | |
throw new Error("URL couldn't be fetched, it resolved to a local address."); | |
} | |
const res = await fetch(url.href); | |
if (!res.ok) { | |
throw new Error("URL couldn't be fetched, error " + res.status); | |
} | |
const text = await res.text(); | |
preprompt = preprompt.replaceAll(match[0], text); | |
} catch (e) { | |
preprompt = preprompt.replaceAll(match[0], (e as Error).message); | |
} | |
} | |
if (messagesForPrompt[0].from === "system") { | |
messagesForPrompt[0].content = preprompt; | |
} | |
} | |
// inject websearch result & optionally images into the messages | |
const processedMessages = await preprocessMessages( | |
messagesForPrompt, | |
messageToWriteTo.webSearch, | |
model.multimodal, | |
convId | |
); | |
const previousText = messageToWriteTo.content; | |
let hasError = false; | |
let buffer = ""; | |
messageToWriteTo.updatedAt = new Date(); | |
try { | |
const endpoint = await model.getEndpoint(); | |
for await (const output of await endpoint({ | |
messages: processedMessages, | |
preprompt, | |
continueMessage: isContinue, | |
generateSettings: assistant?.generateSettings, | |
})) { | |
// if not generated_text is here it means the generation is not done | |
if (!output.generated_text) { | |
if (!output.token.special) { | |
buffer += output.token.text; | |
// send the first 5 chars | |
// and leave the rest in the buffer | |
if (buffer.length >= 5) { | |
update({ | |
type: "stream", | |
token: buffer.slice(0, 5), | |
}); | |
buffer = buffer.slice(5); | |
} | |
// abort check | |
const date = abortedGenerations.get(convId.toString()); | |
if (date && date > promptedAt) { | |
break; | |
} | |
// no output check | |
if (!output) { | |
break; | |
} | |
// otherwise we just concatenate tokens | |
messageToWriteTo.content += output.token.text; | |
} | |
} else { | |
messageToWriteTo.interrupted = | |
!output.token.special && !model.parameters.stop?.includes(output.token.text); | |
// add output.generated text to the last message | |
// strip end tokens from the output.generated_text | |
const text = (model.parameters.stop ?? []).reduce((acc: string, curr: string) => { | |
if (acc.endsWith(curr)) { | |
messageToWriteTo.interrupted = false; | |
return acc.slice(0, acc.length - curr.length); | |
} | |
return acc; | |
}, output.generated_text.trimEnd()); | |
messageToWriteTo.content = previousText + text; | |
} | |
} | |
} catch (e) { | |
hasError = true; | |
update({ type: "status", status: "error", message: (e as Error).message }); | |
} finally { | |
// check if no output was generated | |
if (!hasError && messageToWriteTo.content === previousText) { | |
update({ | |
type: "status", | |
status: "error", | |
message: "No output was generated. Something went wrong.", | |
}); | |
} | |
if (buffer) { | |
update({ | |
type: "stream", | |
token: buffer, | |
}); | |
} | |
} | |
await collections.conversations.updateOne( | |
{ | |
_id: convId, | |
}, | |
{ | |
$set: { | |
messages: conv.messages, | |
title: conv?.title, | |
updatedAt: new Date(), | |
}, | |
} | |
); | |
// used to detect if cancel() is called bc of interrupt or just because the connection closes | |
doneStreaming = true; | |
update({ | |
type: "finalAnswer", | |
text: messageToWriteTo.content, | |
}); | |
await summarizeIfNeeded; | |
controller.close(); | |
return; | |
}, | |
async cancel() { | |
if (!doneStreaming) { | |
await collections.conversations.updateOne( | |
{ | |
_id: convId, | |
}, | |
{ | |
$set: { | |
messages: conv.messages, | |
title: conv.title, | |
updatedAt: new Date(), | |
}, | |
} | |
); | |
} | |
}, | |
}); | |
if (conv.assistantId) { | |
await collections.assistantStats.updateOne( | |
{ assistantId: conv.assistantId, "date.at": startOfHour(new Date()), "date.span": "hour" }, | |
{ $inc: { count: 1 } }, | |
{ upsert: true } | |
); | |
} | |
// Todo: maybe we should wait for the message to be saved before ending the response - in case of errors | |
return new Response(stream, { | |
headers: { | |
"Content-Type": "text/event-stream", | |
}, | |
}); | |
} | |
export async function DELETE({ locals, params }) { | |
const convId = new ObjectId(params.id); | |
const conv = await collections.conversations.findOne({ | |
_id: convId, | |
...authCondition(locals), | |
}); | |
if (!conv) { | |
throw error(404, "Conversation not found"); | |
} | |
await collections.conversations.deleteOne({ _id: conv._id }); | |
return new Response(); | |
} | |
export async function PATCH({ request, locals, params }) { | |
const { title } = z | |
.object({ title: z.string().trim().min(1).max(100) }) | |
.parse(await request.json()); | |
const convId = new ObjectId(params.id); | |
const conv = await collections.conversations.findOne({ | |
_id: convId, | |
...authCondition(locals), | |
}); | |
if (!conv) { | |
throw error(404, "Conversation not found"); | |
} | |
await collections.conversations.updateOne( | |
{ | |
_id: convId, | |
}, | |
{ | |
$set: { | |
title, | |
}, | |
} | |
); | |
return new Response(); | |
} | |