| const { v4 } = require('uuid'); |
| const { sleep } = require('@librechat/agents'); |
| const { logger } = require('@librechat/data-schemas'); |
| const { sendEvent, getBalanceConfig, getModelMaxTokens, countTokens } = require('@librechat/api'); |
| const { |
| Time, |
| Constants, |
| RunStatus, |
| CacheKeys, |
| ContentTypes, |
| ToolCallTypes, |
| EModelEndpoint, |
| retrievalMimeTypes, |
| AssistantStreamEvents, |
| } = require('librechat-data-provider'); |
| const { |
| initThread, |
| recordUsage, |
| saveUserMessage, |
| addThreadMetadata, |
| saveAssistantMessage, |
| } = require('~/server/services/Threads'); |
| const { runAssistant, createOnTextProgress } = require('~/server/services/AssistantService'); |
| const { createErrorHandler } = require('~/server/controllers/assistants/errors'); |
| const validateAuthor = require('~/server/middleware/assistants/validateAuthor'); |
| const { createRun, StreamRunManager } = require('~/server/services/Runs'); |
| const { addTitle } = require('~/server/services/Endpoints/assistants'); |
| const { createRunBody } = require('~/server/services/createRunBody'); |
| const { getTransactions } = require('~/models/Transaction'); |
| const { checkBalance } = require('~/models/balanceMethods'); |
| const { getConvo } = require('~/models/Conversation'); |
| const getLogStores = require('~/cache/getLogStores'); |
| const { getOpenAIClient } = require('./helpers'); |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| const chatV2 = async (req, res) => { |
| logger.debug('[/assistants/chat/] req.body', req.body); |
| const appConfig = req.config; |
|
|
| |
| const { |
| text, |
| model, |
| endpoint, |
| files = [], |
| promptPrefix, |
| assistant_id, |
| instructions, |
| endpointOption, |
| thread_id: _thread_id, |
| messageId: _messageId, |
| conversationId: convoId, |
| parentMessageId: _parentId = Constants.NO_PARENT, |
| clientTimestamp, |
| } = req.body; |
|
|
| |
| let openai; |
| |
| let thread_id = _thread_id; |
| |
| let run_id; |
| |
| let parentMessageId = _parentId; |
| |
| let previousMessages = []; |
| |
| let conversation = null; |
| |
| let file_ids = []; |
| |
| let attachedFileIds = new Set(); |
| |
| let requestMessage = null; |
|
|
| const userMessageId = v4(); |
| const responseMessageId = v4(); |
|
|
| |
| const conversationId = convoId ?? v4(); |
|
|
| const cache = getLogStores(CacheKeys.ABORT_KEYS); |
| const cacheKey = `${req.user.id}:${conversationId}`; |
|
|
| |
| let completedRun; |
|
|
| const getContext = () => ({ |
| openai, |
| run_id, |
| endpoint, |
| cacheKey, |
| thread_id, |
| completedRun, |
| assistant_id, |
| conversationId, |
| parentMessageId, |
| responseMessageId, |
| }); |
|
|
| const handleError = createErrorHandler({ req, res, getContext }); |
|
|
| try { |
| res.on('close', async () => { |
| if (!completedRun) { |
| await handleError(new Error('Request closed')); |
| } |
| }); |
|
|
| if (convoId && !_thread_id) { |
| completedRun = true; |
| throw new Error('Missing thread_id for existing conversation'); |
| } |
|
|
| if (!assistant_id) { |
| completedRun = true; |
| throw new Error('Missing assistant_id'); |
| } |
|
|
| const checkBalanceBeforeRun = async () => { |
| const balanceConfig = getBalanceConfig(appConfig); |
| if (!balanceConfig?.enabled) { |
| return; |
| } |
| const transactions = |
| (await getTransactions({ |
| user: req.user.id, |
| context: 'message', |
| conversationId, |
| })) ?? []; |
|
|
| const totalPreviousTokens = Math.abs( |
| transactions.reduce((acc, curr) => acc + curr.rawAmount, 0), |
| ); |
|
|
| |
| const promptBuffer = parentMessageId === Constants.NO_PARENT && !_thread_id ? 200 : 0; |
| |
| let promptTokens = (await countTokens(text + (promptPrefix ?? ''))) + 5; |
| promptTokens += totalPreviousTokens + promptBuffer; |
| |
| promptTokens = Math.min(promptTokens, getModelMaxTokens(model)); |
|
|
| await checkBalance({ |
| req, |
| res, |
| txData: { |
| model, |
| user: req.user.id, |
| tokenType: 'prompt', |
| amount: promptTokens, |
| }, |
| }); |
| }; |
|
|
| const { openai: _openai, client } = await getOpenAIClient({ |
| req, |
| res, |
| endpointOption, |
| initAppClient: true, |
| }); |
|
|
| openai = _openai; |
| await validateAuthor({ req, openai }); |
|
|
| if (previousMessages.length) { |
| parentMessageId = previousMessages[previousMessages.length - 1].messageId; |
| } |
|
|
| let userMessage = { |
| role: 'user', |
| content: [ |
| { |
| type: ContentTypes.TEXT, |
| text, |
| }, |
| ], |
| metadata: { |
| messageId: userMessageId, |
| }, |
| }; |
|
|
| |
| const body = createRunBody({ |
| assistant_id, |
| model, |
| promptPrefix, |
| instructions, |
| endpointOption, |
| clientTimestamp, |
| }); |
|
|
| const getRequestFileIds = async () => { |
| let thread_file_ids = []; |
| if (convoId) { |
| const convo = await getConvo(req.user.id, convoId); |
| if (convo && convo.file_ids) { |
| thread_file_ids = convo.file_ids; |
| } |
| } |
|
|
| if (files.length || thread_file_ids.length) { |
| attachedFileIds = new Set([...file_ids, ...thread_file_ids]); |
|
|
| let attachmentIndex = 0; |
| for (const file of files) { |
| file_ids.push(file.file_id); |
| if (file.type.startsWith('image')) { |
| userMessage.content.push({ |
| type: ContentTypes.IMAGE_FILE, |
| [ContentTypes.IMAGE_FILE]: { file_id: file.file_id }, |
| }); |
| } |
|
|
| if (!userMessage.attachments) { |
| userMessage.attachments = []; |
| } |
|
|
| userMessage.attachments.push({ |
| file_id: file.file_id, |
| tools: [{ type: ToolCallTypes.CODE_INTERPRETER }], |
| }); |
|
|
| if (file.type.startsWith('image')) { |
| continue; |
| } |
|
|
| const mimeType = file.type; |
| const isSupportedByRetrieval = retrievalMimeTypes.some((regex) => regex.test(mimeType)); |
| if (isSupportedByRetrieval) { |
| userMessage.attachments[attachmentIndex].tools.push({ |
| type: ToolCallTypes.FILE_SEARCH, |
| }); |
| } |
|
|
| attachmentIndex++; |
| } |
| } |
| }; |
|
|
| |
| let userMessagePromise; |
|
|
| const initializeThread = async () => { |
| await getRequestFileIds(); |
|
|
| |
| const initThreadBody = { |
| messages: [userMessage], |
| metadata: { |
| user: req.user.id, |
| conversationId, |
| }, |
| }; |
|
|
| const result = await initThread({ openai, body: initThreadBody, thread_id }); |
| thread_id = result.thread_id; |
|
|
| createOnTextProgress({ |
| openai, |
| conversationId, |
| userMessageId, |
| messageId: responseMessageId, |
| thread_id, |
| }); |
|
|
| requestMessage = { |
| user: req.user.id, |
| text, |
| messageId: userMessageId, |
| parentMessageId, |
| |
| files, |
| file_ids, |
| conversationId, |
| isCreatedByUser: true, |
| assistant_id, |
| thread_id, |
| model: assistant_id, |
| endpoint, |
| }; |
|
|
| previousMessages.push(requestMessage); |
|
|
| |
| userMessagePromise = saveUserMessage(req, { ...requestMessage, model }); |
|
|
| conversation = { |
| conversationId, |
| endpoint, |
| promptPrefix: promptPrefix, |
| instructions: instructions, |
| assistant_id, |
| |
| }; |
|
|
| if (file_ids.length) { |
| conversation.file_ids = file_ids; |
| } |
| }; |
|
|
| const promises = [initializeThread(), checkBalanceBeforeRun()]; |
| await Promise.all(promises); |
|
|
| const sendInitialResponse = () => { |
| sendEvent(res, { |
| sync: true, |
| conversationId, |
| |
| requestMessage, |
| responseMessage: { |
| user: req.user.id, |
| messageId: openai.responseMessage.messageId, |
| parentMessageId: userMessageId, |
| conversationId, |
| assistant_id, |
| thread_id, |
| model: assistant_id, |
| }, |
| }); |
| }; |
|
|
| |
| let response; |
|
|
| const processRun = async (retry = false) => { |
| if (endpoint === EModelEndpoint.azureAssistants) { |
| body.model = openai._options.model; |
| openai.attachedFileIds = attachedFileIds; |
| if (retry) { |
| response = await runAssistant({ |
| openai, |
| thread_id, |
| run_id, |
| in_progress: openai.in_progress, |
| }); |
| return; |
| } |
|
|
| |
| |
| |
| |
| const run = await createRun({ |
| openai, |
| thread_id, |
| body, |
| }); |
|
|
| run_id = run.id; |
| await cache.set(cacheKey, `${thread_id}:${run_id}`, Time.TEN_MINUTES); |
| sendInitialResponse(); |
|
|
| |
| response = await runAssistant({ openai, thread_id, run_id }); |
| return; |
| } |
|
|
| |
| const handlers = { |
| [AssistantStreamEvents.ThreadRunCreated]: async (event) => { |
| await cache.set(cacheKey, `${thread_id}:${event.data.id}`, Time.TEN_MINUTES); |
| run_id = event.data.id; |
| sendInitialResponse(); |
| }, |
| }; |
|
|
| |
| const config = appConfig.endpoints?.[endpoint] ?? {}; |
| |
| const allConfig = appConfig.endpoints?.all; |
|
|
| const streamRunManager = new StreamRunManager({ |
| req, |
| res, |
| openai, |
| handlers, |
| thread_id, |
| attachedFileIds, |
| parentMessageId: userMessageId, |
| responseMessage: openai.responseMessage, |
| streamRate: allConfig?.streamRate ?? config.streamRate, |
| |
|
|
| |
| }); |
|
|
| await streamRunManager.runAssistant({ |
| thread_id, |
| body, |
| }); |
|
|
| response = streamRunManager; |
| response.text = streamRunManager.intermediateText; |
| }; |
|
|
| await processRun(); |
| logger.debug('[/assistants/chat/] response', { |
| run: response.run, |
| steps: response.steps, |
| }); |
|
|
| if (response.run.status === RunStatus.CANCELLED) { |
| logger.debug('[/assistants/chat/] Run cancelled, handled by `abortRun`'); |
| return res.end(); |
| } |
|
|
| if (response.run.status === RunStatus.IN_PROGRESS) { |
| processRun(true); |
| } |
|
|
| completedRun = response.run; |
|
|
| |
| const responseMessage = { |
| ...(response.responseMessage ?? response.finalMessage), |
| text: response.text, |
| parentMessageId: userMessageId, |
| conversationId, |
| user: req.user.id, |
| assistant_id, |
| thread_id, |
| model: assistant_id, |
| endpoint, |
| spec: endpointOption.spec, |
| iconURL: endpointOption.iconURL, |
| }; |
|
|
| sendEvent(res, { |
| final: true, |
| conversation, |
| requestMessage: { |
| parentMessageId, |
| thread_id, |
| }, |
| }); |
| res.end(); |
|
|
| if (userMessagePromise) { |
| await userMessagePromise; |
| } |
| await saveAssistantMessage(req, { ...responseMessage, model }); |
|
|
| if (parentMessageId === Constants.NO_PARENT && !_thread_id) { |
| addTitle(req, { |
| text, |
| responseText: response.text, |
| conversationId, |
| client, |
| }); |
| } |
|
|
| await addThreadMetadata({ |
| openai, |
| thread_id, |
| messageId: responseMessage.messageId, |
| messages: response.messages, |
| }); |
|
|
| if (!response.run.usage) { |
| await sleep(3000); |
| completedRun = await openai.beta.threads.runs.retrieve(response.run.id, { thread_id }); |
| if (completedRun.usage) { |
| await recordUsage({ |
| ...completedRun.usage, |
| user: req.user.id, |
| model: completedRun.model ?? model, |
| conversationId, |
| }); |
| } |
| } else { |
| await recordUsage({ |
| ...response.run.usage, |
| user: req.user.id, |
| model: response.run.model ?? model, |
| conversationId, |
| }); |
| } |
| } catch (error) { |
| await handleError(error); |
| } |
| }; |
|
|
| module.exports = chatV2; |
|
|