| | const { sleep } = require('@librechat/agents'); |
| | const { logger } = require('@librechat/data-schemas'); |
| | const { RunStatus, defaultOrderQuery, CacheKeys } = require('librechat-data-provider'); |
| | const getLogStores = require('~/cache/getLogStores'); |
| | const { retrieveRun } = require('./methods'); |
| | const RunManager = require('./RunManager'); |
| |
|
| | async function withTimeout(promise, timeoutMs, timeoutMessage) { |
| | let timeoutHandle; |
| |
|
| | const timeoutPromise = new Promise((_, reject) => { |
| | timeoutHandle = setTimeout(() => { |
| | logger.debug(timeoutMessage); |
| | reject(new Error('Operation timed out')); |
| | }, timeoutMs); |
| | }); |
| |
|
| | try { |
| | return await Promise.race([promise, timeoutPromise]); |
| | } finally { |
| | clearTimeout(timeoutHandle); |
| | } |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async function createRun({ openai, thread_id, body }) { |
| | return await openai.beta.threads.runs.create(thread_id, body); |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async function waitForRun({ |
| | openai, |
| | run_id, |
| | thread_id, |
| | runManager, |
| | pollIntervalMs = 2000, |
| | timeout = 60000 * 3, |
| | }) { |
| | let timeElapsed = 0; |
| | let run; |
| |
|
| | const cache = getLogStores(CacheKeys.ABORT_KEYS); |
| | const cacheKey = `${openai.req.user.id}:${openai.responseMessage.conversationId}`; |
| |
|
| | let i = 0; |
| | let lastSeenStatus = null; |
| | const runIdLog = `run_id: ${run_id}`; |
| | const runInfo = `user: ${openai.req.user.id} | thread_id: ${thread_id} | ${runIdLog}`; |
| | const raceTimeoutMs = 3000; |
| | let maxRetries = 5; |
| | while (timeElapsed < timeout) { |
| | i++; |
| | logger.debug(`[heartbeat ${i}] ${runIdLog} | Retrieving run status...`); |
| | let updatedRun; |
| |
|
| | let attempt = 0; |
| | let startTime = Date.now(); |
| | while (!updatedRun && attempt < maxRetries) { |
| | try { |
| | updatedRun = await withTimeout( |
| | retrieveRun({ thread_id, run_id, timeout: raceTimeoutMs, openai }), |
| | raceTimeoutMs, |
| | `[heartbeat ${i}] ${runIdLog} | Run retrieval timed out after ${raceTimeoutMs} ms. Trying again (attempt ${ |
| | attempt + 1 |
| | } of ${maxRetries})...`, |
| | ); |
| | const endTime = Date.now(); |
| | logger.debug( |
| | `[heartbeat ${i}] ${runIdLog} | Elapsed run retrieval time: ${endTime - startTime}`, |
| | ); |
| | } catch (error) { |
| | attempt++; |
| | startTime = Date.now(); |
| | logger.warn(`${runIdLog} | Error retrieving run status`, error); |
| | } |
| | } |
| |
|
| | if (!updatedRun) { |
| | const errorMessage = `[waitForRun] ${runIdLog} | Run retrieval failed after ${maxRetries} attempts`; |
| | throw new Error(errorMessage); |
| | } |
| |
|
| | run = updatedRun; |
| | attempt = 0; |
| | const runStatus = `${runInfo} | status: ${run.status}`; |
| |
|
| | if (run.status !== lastSeenStatus) { |
| | logger.debug(`[${run.status}] ${runInfo}`); |
| | lastSeenStatus = run.status; |
| | } |
| |
|
| | logger.debug(`[heartbeat ${i}] ${runStatus}`); |
| |
|
| | let cancelStatus; |
| | try { |
| | const timeoutMessage = `[heartbeat ${i}] ${runIdLog} | Cancel Status check operation timed out.`; |
| | cancelStatus = await withTimeout(cache.get(cacheKey), raceTimeoutMs, timeoutMessage); |
| | } catch (error) { |
| | logger.warn(`Error retrieving cancel status: ${error}`); |
| | } |
| |
|
| | if (cancelStatus === 'cancelled') { |
| | logger.warn(`[waitForRun] ${runStatus} | RUN CANCELLED`); |
| | throw new Error('Run cancelled'); |
| | } |
| |
|
| | if (![RunStatus.IN_PROGRESS, RunStatus.QUEUED].includes(run.status)) { |
| | logger.debug(`[FINAL] ${runInfo} | status: ${run.status}`); |
| | await runManager.fetchRunSteps({ |
| | openai, |
| | thread_id: thread_id, |
| | run_id: run_id, |
| | runStatus: run.status, |
| | final: true, |
| | }); |
| | break; |
| | } |
| |
|
| | |
| | await runManager.fetchRunSteps({ |
| | openai, |
| | thread_id: thread_id, |
| | run_id: run_id, |
| | runStatus: run.status, |
| | }); |
| |
|
| | await sleep(pollIntervalMs); |
| | timeElapsed += pollIntervalMs; |
| | } |
| |
|
| | if (timeElapsed >= timeout) { |
| | const timeoutMessage = `[waitForRun] ${runInfo} | status: ${run.status} | timed out after ${timeout} ms`; |
| | logger.warn(timeoutMessage); |
| | throw new Error(timeoutMessage); |
| | } |
| |
|
| | return run; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async function _retrieveRunSteps({ openai, thread_id, run_id }) { |
| | const runSteps = await openai.beta.threads.runs.steps.list(run_id, { thread_id }); |
| | return runSteps; |
| | } |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | async function _handleRun({ openai, run_id, thread_id }) { |
| | let steps = []; |
| | let messages = []; |
| | const runManager = new RunManager({ |
| | |
| | |
| | |
| | |
| | |
| | |
| | final: async ({ step, runStatus, stepsByStatus }) => { |
| | console.log(`Final step for ${run_id} with status ${runStatus}`); |
| | console.dir(step, { depth: null }); |
| |
|
| | const promises = []; |
| | promises.push(openai.beta.threads.messages.list(thread_id, defaultOrderQuery)); |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| | for (const [_status, stepsPromises] of Object.entries(stepsByStatus)) { |
| | promises.push(...stepsPromises); |
| | } |
| |
|
| | const resolved = await Promise.all(promises); |
| | const res = resolved.shift(); |
| | messages = res.data.filter((msg) => msg.run_id === run_id); |
| | resolved.push(step); |
| | steps = resolved; |
| | }, |
| | }); |
| |
|
| | const run = await waitForRun({ |
| | openai, |
| | run_id, |
| | thread_id, |
| | runManager, |
| | pollIntervalMs: 2000, |
| | timeout: 60000, |
| | }); |
| | const actions = []; |
| | if (run.required_action) { |
| | const { submit_tool_outputs } = run.required_action; |
| | submit_tool_outputs.tool_calls.forEach((item) => { |
| | const functionCall = item.function; |
| | const args = JSON.parse(functionCall.arguments); |
| | actions.push({ |
| | tool: functionCall.name, |
| | toolInput: args, |
| | toolCallId: item.id, |
| | run_id, |
| | thread_id, |
| | }); |
| | }); |
| | } |
| |
|
| | return { run, steps, messages, actions }; |
| | } |
| |
|
| | module.exports = { |
| | sleep, |
| | createRun, |
| | waitForRun, |
| | |
| | |
| | }; |
| |
|