| const { v4 } = require('uuid'); |
| const { sleep } = require('@librechat/agents'); |
| const { logger } = require('@librechat/data-schemas'); |
| const { sendEvent, getBalanceConfig, getModelMaxTokens, countTokens } = require('@librechat/api'); |
| const { |
| Time, |
| Constants, |
| RunStatus, |
| CacheKeys, |
| ContentTypes, |
| EModelEndpoint, |
| ViolationTypes, |
| ImageVisionTool, |
| checkOpenAIStorage, |
| AssistantStreamEvents, |
| } = require('librechat-data-provider'); |
| const { |
| initThread, |
| recordUsage, |
| saveUserMessage, |
| checkMessageGaps, |
| addThreadMetadata, |
| saveAssistantMessage, |
| } = require('~/server/services/Threads'); |
| const { runAssistant, createOnTextProgress } = require('~/server/services/AssistantService'); |
| const validateAuthor = require('~/server/middleware/assistants/validateAuthor'); |
| const { formatMessage, createVisionPrompt } = require('~/app/clients/prompts'); |
| const { createRun, StreamRunManager } = require('~/server/services/Runs'); |
| const { addTitle } = require('~/server/services/Endpoints/assistants'); |
| const { createRunBody } = require('~/server/services/createRunBody'); |
| const { sendResponse } = require('~/server/middleware/error'); |
| const { getTransactions } = require('~/models/Transaction'); |
| const { checkBalance } = require('~/models/balanceMethods'); |
| const { getConvo } = require('~/models/Conversation'); |
| const getLogStores = require('~/cache/getLogStores'); |
| const { getOpenAIClient } = require('./helpers'); |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| const chatV1 = async (req, res) => { |
| const appConfig = req.config; |
| logger.debug('[/assistants/chat/] req.body', req.body); |
|
|
| const { |
| text, |
| model, |
| endpoint, |
| files = [], |
| promptPrefix, |
| assistant_id, |
| instructions, |
| endpointOption, |
| thread_id: _thread_id, |
| messageId: _messageId, |
| conversationId: convoId, |
| parentMessageId: _parentId = Constants.NO_PARENT, |
| clientTimestamp, |
| } = req.body; |
|
|
| |
| let openai; |
| |
| let thread_id = _thread_id; |
| |
| let run_id; |
| |
| let parentMessageId = _parentId; |
| |
| let previousMessages = []; |
| |
| let conversation = null; |
| |
| let file_ids = []; |
| |
| let attachedFileIds = new Set(); |
| |
| let requestMessage = null; |
| |
| let visionPromise; |
|
|
| const userMessageId = v4(); |
| const responseMessageId = v4(); |
|
|
| |
| const conversationId = convoId ?? v4(); |
|
|
| const cache = getLogStores(CacheKeys.ABORT_KEYS); |
| const cacheKey = `${req.user.id}:${conversationId}`; |
|
|
| |
| let completedRun; |
|
|
| const handleError = async (error) => { |
| const defaultErrorMessage = |
| 'The Assistant run failed to initialize. Try sending a message in a new conversation.'; |
| const messageData = { |
| thread_id, |
| assistant_id, |
| conversationId, |
| parentMessageId, |
| sender: 'System', |
| user: req.user.id, |
| shouldSaveMessage: false, |
| messageId: responseMessageId, |
| endpoint, |
| }; |
|
|
| if (error.message === 'Run cancelled') { |
| return res.end(); |
| } else if (error.message === 'Request closed' && completedRun) { |
| return; |
| } else if (error.message === 'Request closed') { |
| logger.debug('[/assistants/chat/] Request aborted on close'); |
| } else if (/Files.*are invalid/.test(error.message)) { |
| const errorMessage = `Files are invalid, or may not have uploaded yet.${ |
| endpoint === EModelEndpoint.azureAssistants |
| ? " If using Azure OpenAI, files are only available in the region of the assistant's model at the time of upload." |
| : '' |
| }`; |
| return sendResponse(req, res, messageData, errorMessage); |
| } else if (error?.message?.includes('string too long')) { |
| return sendResponse( |
| req, |
| res, |
| messageData, |
| 'Message too long. The Assistants API has a limit of 32,768 characters per message. Please shorten it and try again.', |
| ); |
| } else if (error?.message?.includes(ViolationTypes.TOKEN_BALANCE)) { |
| return sendResponse(req, res, messageData, error.message); |
| } else { |
| logger.error('[/assistants/chat/]', error); |
| } |
|
|
| if (!openai || !thread_id || !run_id) { |
| return sendResponse(req, res, messageData, defaultErrorMessage); |
| } |
|
|
| await sleep(2000); |
|
|
| try { |
| const status = await cache.get(cacheKey); |
| if (status === 'cancelled') { |
| logger.debug('[/assistants/chat/] Run already cancelled'); |
| return res.end(); |
| } |
| await cache.delete(cacheKey); |
| const cancelledRun = await openai.beta.threads.runs.cancel(run_id, { thread_id }); |
| logger.debug('[/assistants/chat/] Cancelled run:', cancelledRun); |
| } catch (error) { |
| logger.error('[/assistants/chat/] Error cancelling run', error); |
| } |
|
|
| await sleep(2000); |
|
|
| let run; |
| try { |
| run = await openai.beta.threads.runs.retrieve(run_id, { thread_id }); |
| await recordUsage({ |
| ...run.usage, |
| model: run.model, |
| user: req.user.id, |
| conversationId, |
| }); |
| } catch (error) { |
| logger.error('[/assistants/chat/] Error fetching or processing run', error); |
| } |
|
|
| let finalEvent; |
| try { |
| const runMessages = await checkMessageGaps({ |
| openai, |
| run_id, |
| endpoint, |
| thread_id, |
| conversationId, |
| latestMessageId: responseMessageId, |
| }); |
|
|
| const errorContentPart = { |
| text: { |
| value: |
| error?.message ?? 'There was an error processing your request. Please try again later.', |
| }, |
| type: ContentTypes.ERROR, |
| }; |
|
|
| if (!Array.isArray(runMessages[runMessages.length - 1]?.content)) { |
| runMessages[runMessages.length - 1].content = [errorContentPart]; |
| } else { |
| const contentParts = runMessages[runMessages.length - 1].content; |
| for (let i = 0; i < contentParts.length; i++) { |
| const currentPart = contentParts[i]; |
| |
| const toolCall = currentPart?.[ContentTypes.TOOL_CALL]; |
| if ( |
| toolCall && |
| toolCall?.function && |
| !(toolCall?.function?.output || toolCall?.function?.output?.length) |
| ) { |
| contentParts[i] = { |
| ...currentPart, |
| [ContentTypes.TOOL_CALL]: { |
| ...toolCall, |
| function: { |
| ...toolCall.function, |
| output: 'error processing tool', |
| }, |
| }, |
| }; |
| } |
| } |
| runMessages[runMessages.length - 1].content.push(errorContentPart); |
| } |
|
|
| finalEvent = { |
| final: true, |
| conversation: await getConvo(req.user.id, conversationId), |
| runMessages, |
| }; |
| } catch (error) { |
| logger.error('[/assistants/chat/] Error finalizing error process', error); |
| return sendResponse(req, res, messageData, 'The Assistant run failed'); |
| } |
|
|
| return sendResponse(req, res, finalEvent); |
| }; |
|
|
| try { |
| res.on('close', async () => { |
| if (!completedRun) { |
| await handleError(new Error('Request closed')); |
| } |
| }); |
|
|
| if (convoId && !_thread_id) { |
| completedRun = true; |
| throw new Error('Missing thread_id for existing conversation'); |
| } |
|
|
| if (!assistant_id) { |
| completedRun = true; |
| throw new Error('Missing assistant_id'); |
| } |
|
|
| const checkBalanceBeforeRun = async () => { |
| const balanceConfig = getBalanceConfig(appConfig); |
| if (!balanceConfig?.enabled) { |
| return; |
| } |
| const transactions = |
| (await getTransactions({ |
| user: req.user.id, |
| context: 'message', |
| conversationId, |
| })) ?? []; |
|
|
| const totalPreviousTokens = Math.abs( |
| transactions.reduce((acc, curr) => acc + curr.rawAmount, 0), |
| ); |
|
|
| |
| const promptBuffer = parentMessageId === Constants.NO_PARENT && !_thread_id ? 200 : 0; |
| |
| let promptTokens = (await countTokens(text + (promptPrefix ?? ''))) + 5; |
| promptTokens += totalPreviousTokens + promptBuffer; |
| |
| promptTokens = Math.min(promptTokens, getModelMaxTokens(model)); |
|
|
| await checkBalance({ |
| req, |
| res, |
| txData: { |
| model, |
| user: req.user.id, |
| tokenType: 'prompt', |
| amount: promptTokens, |
| }, |
| }); |
| }; |
|
|
| const { openai: _openai, client } = await getOpenAIClient({ |
| req, |
| res, |
| endpointOption, |
| initAppClient: true, |
| }); |
|
|
| openai = _openai; |
| await validateAuthor({ req, openai }); |
|
|
| if (previousMessages.length) { |
| parentMessageId = previousMessages[previousMessages.length - 1].messageId; |
| } |
|
|
| let userMessage = { |
| role: 'user', |
| content: text, |
| metadata: { |
| messageId: userMessageId, |
| }, |
| }; |
|
|
| |
| const body = createRunBody({ |
| assistant_id, |
| model, |
| promptPrefix, |
| instructions, |
| endpointOption, |
| clientTimestamp, |
| }); |
|
|
| const getRequestFileIds = async () => { |
| let thread_file_ids = []; |
| if (convoId) { |
| const convo = await getConvo(req.user.id, convoId); |
| if (convo && convo.file_ids) { |
| thread_file_ids = convo.file_ids; |
| } |
| } |
|
|
| file_ids = files.map(({ file_id }) => file_id); |
| if (file_ids.length || thread_file_ids.length) { |
| attachedFileIds = new Set([...file_ids, ...thread_file_ids]); |
| if (endpoint === EModelEndpoint.azureAssistants) { |
| userMessage.attachments = Array.from(attachedFileIds).map((file_id) => ({ |
| file_id, |
| tools: [{ type: 'file_search' }], |
| })); |
| } else { |
| userMessage.file_ids = Array.from(attachedFileIds); |
| } |
| } |
| }; |
|
|
| const addVisionPrompt = async () => { |
| if (!endpointOption.attachments) { |
| return; |
| } |
|
|
| |
| const attachments = await endpointOption.attachments; |
| if (attachments && attachments.every((attachment) => checkOpenAIStorage(attachment.source))) { |
| return; |
| } |
|
|
| const assistant = await openai.beta.assistants.retrieve(assistant_id); |
| const visionToolIndex = assistant.tools.findIndex( |
| (tool) => tool?.function && tool?.function?.name === ImageVisionTool.function.name, |
| ); |
|
|
| if (visionToolIndex === -1) { |
| return; |
| } |
|
|
| let visionMessage = { |
| role: 'user', |
| content: '', |
| }; |
| const files = await client.addImageURLs(visionMessage, attachments); |
| if (!visionMessage.image_urls?.length) { |
| return; |
| } |
|
|
| const imageCount = visionMessage.image_urls.length; |
| const plural = imageCount > 1; |
| visionMessage.content = createVisionPrompt(plural); |
| visionMessage = formatMessage({ message: visionMessage, endpoint: EModelEndpoint.openAI }); |
|
|
| visionPromise = openai.chat.completions |
| .create({ |
| messages: [visionMessage], |
| max_tokens: 4000, |
| }) |
| .catch((error) => { |
| logger.error('[/assistants/chat/] Error creating vision prompt', error); |
| }); |
|
|
| const pluralized = plural ? 's' : ''; |
| body.additional_instructions = `${ |
| body.additional_instructions ? `${body.additional_instructions}\n` : '' |
| }The user has uploaded ${imageCount} image${pluralized}. |
| Use the \`${ImageVisionTool.function.name}\` tool to retrieve ${ |
| plural ? '' : 'a ' |
| }detailed text description${pluralized} for ${plural ? 'each' : 'the'} image${pluralized}.`; |
|
|
| return files; |
| }; |
|
|
| |
| let userMessagePromise; |
|
|
| const initializeThread = async () => { |
| |
| const [processedFiles] = await Promise.all([addVisionPrompt(), getRequestFileIds()]); |
| |
| const initThreadBody = { |
| messages: [userMessage], |
| metadata: { |
| user: req.user.id, |
| conversationId, |
| }, |
| }; |
|
|
| if (processedFiles) { |
| for (const file of processedFiles) { |
| if (!checkOpenAIStorage(file.source)) { |
| attachedFileIds.delete(file.file_id); |
| const index = file_ids.indexOf(file.file_id); |
| if (index > -1) { |
| file_ids.splice(index, 1); |
| } |
| } |
| } |
|
|
| userMessage.file_ids = file_ids; |
| } |
|
|
| const result = await initThread({ openai, body: initThreadBody, thread_id }); |
| thread_id = result.thread_id; |
|
|
| createOnTextProgress({ |
| openai, |
| conversationId, |
| userMessageId, |
| messageId: responseMessageId, |
| thread_id, |
| }); |
|
|
| requestMessage = { |
| user: req.user.id, |
| text, |
| messageId: userMessageId, |
| parentMessageId, |
| |
| files, |
| file_ids, |
| conversationId, |
| isCreatedByUser: true, |
| assistant_id, |
| thread_id, |
| model: assistant_id, |
| endpoint, |
| }; |
|
|
| previousMessages.push(requestMessage); |
|
|
| |
| userMessagePromise = saveUserMessage(req, { ...requestMessage, model }); |
|
|
| conversation = { |
| conversationId, |
| endpoint, |
| promptPrefix: promptPrefix, |
| instructions: instructions, |
| assistant_id, |
| |
| }; |
|
|
| if (file_ids.length) { |
| conversation.file_ids = file_ids; |
| } |
| }; |
|
|
| const promises = [initializeThread(), checkBalanceBeforeRun()]; |
| await Promise.all(promises); |
|
|
| const sendInitialResponse = () => { |
| sendEvent(res, { |
| sync: true, |
| conversationId, |
| |
| requestMessage, |
| responseMessage: { |
| user: req.user.id, |
| messageId: openai.responseMessage.messageId, |
| parentMessageId: userMessageId, |
| conversationId, |
| assistant_id, |
| thread_id, |
| model: assistant_id, |
| }, |
| }); |
| }; |
|
|
| |
| let response; |
|
|
| const processRun = async (retry = false) => { |
| if (endpoint === EModelEndpoint.azureAssistants) { |
| body.model = openai._options.model; |
| openai.attachedFileIds = attachedFileIds; |
| openai.visionPromise = visionPromise; |
| if (retry) { |
| response = await runAssistant({ |
| openai, |
| thread_id, |
| run_id, |
| in_progress: openai.in_progress, |
| }); |
| return; |
| } |
|
|
| |
| |
| |
| |
| const run = await createRun({ |
| openai, |
| thread_id, |
| body, |
| }); |
|
|
| run_id = run.id; |
| await cache.set(cacheKey, `${thread_id}:${run_id}`, Time.TEN_MINUTES); |
| sendInitialResponse(); |
|
|
| |
| response = await runAssistant({ openai, thread_id, run_id }); |
| return; |
| } |
|
|
| |
| const handlers = { |
| [AssistantStreamEvents.ThreadRunCreated]: async (event) => { |
| await cache.set(cacheKey, `${thread_id}:${event.data.id}`, Time.TEN_MINUTES); |
| run_id = event.data.id; |
| sendInitialResponse(); |
| }, |
| }; |
|
|
| const streamRunManager = new StreamRunManager({ |
| req, |
| res, |
| openai, |
| handlers, |
| thread_id, |
| visionPromise, |
| attachedFileIds, |
| responseMessage: openai.responseMessage, |
| |
|
|
| |
| }); |
|
|
| await streamRunManager.runAssistant({ |
| thread_id, |
| body, |
| }); |
|
|
| response = streamRunManager; |
| }; |
|
|
| await processRun(); |
| logger.debug('[/assistants/chat/] response', { |
| run: response.run, |
| steps: response.steps, |
| }); |
|
|
| if (response.run.status === RunStatus.CANCELLED) { |
| logger.debug('[/assistants/chat/] Run cancelled, handled by `abortRun`'); |
| return res.end(); |
| } |
|
|
| if (response.run.status === RunStatus.IN_PROGRESS) { |
| processRun(true); |
| } |
|
|
| completedRun = response.run; |
|
|
| |
| const responseMessage = { |
| ...(response.responseMessage ?? response.finalMessage), |
| parentMessageId: userMessageId, |
| conversationId, |
| user: req.user.id, |
| assistant_id, |
| thread_id, |
| model: assistant_id, |
| endpoint, |
| spec: endpointOption.spec, |
| iconURL: endpointOption.iconURL, |
| }; |
|
|
| sendEvent(res, { |
| final: true, |
| conversation, |
| requestMessage: { |
| parentMessageId, |
| thread_id, |
| }, |
| }); |
| res.end(); |
|
|
| if (userMessagePromise) { |
| await userMessagePromise; |
| } |
| await saveAssistantMessage(req, { ...responseMessage, model }); |
|
|
| if (parentMessageId === Constants.NO_PARENT && !_thread_id) { |
| addTitle(req, { |
| text, |
| responseText: response.text, |
| conversationId, |
| client, |
| }); |
| } |
|
|
| await addThreadMetadata({ |
| openai, |
| thread_id, |
| messageId: responseMessage.messageId, |
| messages: response.messages, |
| }); |
|
|
| if (!response.run.usage) { |
| await sleep(3000); |
| completedRun = await openai.beta.threads.runs.retrieve(response.run.id, { thread_id }); |
| if (completedRun.usage) { |
| await recordUsage({ |
| ...completedRun.usage, |
| user: req.user.id, |
| model: completedRun.model ?? model, |
| conversationId, |
| }); |
| } |
| } else { |
| await recordUsage({ |
| ...response.run.usage, |
| user: req.user.id, |
| model: response.run.model ?? model, |
| conversationId, |
| }); |
| } |
| } catch (error) { |
| await handleError(error); |
| } |
| }; |
|
|
| module.exports = chatV1; |
|
|