Spaces:
Configuration error
Configuration error
/** | |
* Very scuffed request queue. OpenAI's GPT-4 keys have a very strict rate limit | |
* of 40000 generated tokens per minute. We don't actually know how many tokens | |
* a given key has generated, so our queue will simply retry requests that fail | |
* with a non-billing related 429 over and over again until they succeed. | |
* | |
* Dequeueing can operate in one of two modes: | |
* - 'fair': requests are dequeued in the order they were enqueued. | |
* - 'random': requests are dequeued randomly, not really a queue at all. | |
* | |
* When a request to a proxied endpoint is received, we create a closure around | |
* the call to http-proxy-middleware and attach it to the request. This allows | |
* us to pause the request until we have a key available. Further, if the | |
* proxied request encounters a retryable error, we can simply put the request | |
* back in the queue and it will be retried later using the same closure. | |
*/ | |
import type { Handler, Request } from "express"; | |
import { config, DequeueMode } from "../config"; | |
import { keyPool, SupportedModel } from "../key-management"; | |
import { logger } from "../logger"; | |
import { AGNAI_DOT_CHAT_IP } from "./rate-limit"; | |
import { buildFakeSseMessage } from "./middleware/common"; | |
export type QueuePartition = "claude" | "turbo" | "gpt-4"; | |
const queue: Request[] = []; | |
const log = logger.child({ module: "request-queue" }); | |
let dequeueMode: DequeueMode = "fair"; | |
/** Maximum number of queue slots for Agnai.chat requests. */ | |
const AGNAI_CONCURRENCY_LIMIT = 15; | |
/** Maximum number of queue slots for individual users. */ | |
const USER_CONCURRENCY_LIMIT = 1; | |
const sameIpPredicate = (incoming: Request) => (queued: Request) => | |
queued.ip === incoming.ip; | |
const sameUserPredicate = (incoming: Request) => (queued: Request) => { | |
const incomingUser = incoming.user ?? { token: incoming.ip }; | |
const queuedUser = queued.user ?? { token: queued.ip }; | |
return queuedUser.token === incomingUser.token; | |
}; | |
export function enqueue(req: Request) { | |
let enqueuedRequestCount = 0; | |
let isGuest = req.user?.token === undefined; | |
if (isGuest) { | |
enqueuedRequestCount = queue.filter(sameIpPredicate(req)).length; | |
} else { | |
enqueuedRequestCount = queue.filter(sameUserPredicate(req)).length; | |
} | |
// All Agnai.chat requests come from the same IP, so we allow them to have | |
// more spots in the queue. Can't make it unlimited because people will | |
// intentionally abuse it. | |
// Authenticated users always get a single spot in the queue. | |
const maxConcurrentQueuedRequests = | |
isGuest && req.ip === AGNAI_DOT_CHAT_IP | |
? AGNAI_CONCURRENCY_LIMIT | |
: USER_CONCURRENCY_LIMIT; | |
if (enqueuedRequestCount >= maxConcurrentQueuedRequests) { | |
if (req.ip === AGNAI_DOT_CHAT_IP) { | |
// Re-enqueued requests are not counted towards the limit since they | |
// already made it through the queue once. | |
if (req.retryCount === 0) { | |
throw new Error("Too many agnai.chat requests are already queued"); | |
} | |
} else { | |
throw new Error("Your IP or token already has a request in the queue"); | |
} | |
} | |
queue.push(req); | |
req.queueOutTime = 0; | |
// shitty hack to remove hpm's event listeners on retried requests | |
removeProxyMiddlewareEventListeners(req); | |
// If the request opted into streaming, we need to register a heartbeat | |
// handler to keep the connection alive while it waits in the queue. We | |
// deregister the handler when the request is dequeued. | |
if (req.body.stream === "true" || req.body.stream === true) { | |
const res = req.res!; | |
if (!res.headersSent) { | |
initStreaming(req); | |
} | |
req.heartbeatInterval = setInterval(() => { | |
if (process.env.NODE_ENV === "production") { | |
req.res!.write(": queue heartbeat\n\n"); | |
} else { | |
req.log.info(`Sending heartbeat to request in queue.`); | |
const partition = getPartitionForRequest(req); | |
const avgWait = Math.round(getEstimatedWaitTime(partition) / 1000); | |
const currentDuration = Math.round((Date.now() - req.startTime) / 1000); | |
const debugMsg = `queue length: ${queue.length}; elapsed time: ${currentDuration}s; avg wait: ${avgWait}s`; | |
req.res!.write(buildFakeSseMessage("heartbeat", debugMsg, req)); | |
} | |
}, 10000); | |
} | |
// Register a handler to remove the request from the queue if the connection | |
// is aborted or closed before it is dequeued. | |
const removeFromQueue = () => { | |
req.log.info(`Removing aborted request from queue.`); | |
const index = queue.indexOf(req); | |
if (index !== -1) { | |
queue.splice(index, 1); | |
} | |
if (req.heartbeatInterval) { | |
clearInterval(req.heartbeatInterval); | |
} | |
}; | |
req.onAborted = removeFromQueue; | |
req.res!.once("close", removeFromQueue); | |
if (req.retryCount ?? 0 > 0) { | |
req.log.info({ retries: req.retryCount }, `Enqueued request for retry.`); | |
} else { | |
req.log.info(`Enqueued new request.`); | |
} | |
} | |
function getPartitionForRequest(req: Request): QueuePartition { | |
// There is a single request queue, but it is partitioned by model and API | |
// provider. | |
// - claude: requests for the Anthropic API, regardless of model | |
// - gpt-4: requests for the OpenAI API, specifically for GPT-4 models | |
// - turbo: effectively, all other requests | |
const provider = req.outboundApi; | |
const model = (req.body.model as SupportedModel) ?? "gpt-3.5-turbo"; | |
if (provider === "anthropic") { | |
return "claude"; | |
} | |
if (provider === "openai" && model.startsWith("gpt-4")) { | |
return "gpt-4"; | |
} | |
return "turbo"; | |
} | |
function getQueueForPartition(partition: QueuePartition): Request[] { | |
return queue.filter((req) => getPartitionForRequest(req) === partition); | |
} | |
export function dequeue(partition: QueuePartition): Request | undefined { | |
const modelQueue = getQueueForPartition(partition); | |
if (modelQueue.length === 0) { | |
return undefined; | |
} | |
let req: Request; | |
if (dequeueMode === "fair") { | |
// Dequeue the request that has been waiting the longest | |
req = modelQueue.reduce((prev, curr) => | |
prev.startTime < curr.startTime ? prev : curr | |
); | |
} else { | |
// Dequeue a random request | |
const index = Math.floor(Math.random() * modelQueue.length); | |
req = modelQueue[index]; | |
} | |
queue.splice(queue.indexOf(req), 1); | |
if (req.onAborted) { | |
req.res!.off("close", req.onAborted); | |
req.onAborted = undefined; | |
} | |
if (req.heartbeatInterval) { | |
clearInterval(req.heartbeatInterval); | |
} | |
// Track the time leaving the queue now, but don't add it to the wait times | |
// yet because we don't know if the request will succeed or fail. We track | |
// the time now and not after the request succeeds because we don't want to | |
// include the model processing time. | |
req.queueOutTime = Date.now(); | |
return req; | |
} | |
/** | |
* Naive way to keep the queue moving by continuously dequeuing requests. Not | |
* ideal because it limits throughput but we probably won't have enough traffic | |
* or keys for this to be a problem. If it does we can dequeue multiple | |
* per tick. | |
**/ | |
function processQueue() { | |
// This isn't completely correct, because a key can service multiple models. | |
// Currently if a key is locked out on one model it will also stop servicing | |
// the others, because we only track one rate limit per key. | |
const gpt4Lockout = keyPool.getLockoutPeriod("gpt-4"); | |
const turboLockout = keyPool.getLockoutPeriod("gpt-3.5-turbo"); | |
const claudeLockout = keyPool.getLockoutPeriod("claude-v1"); | |
const reqs: (Request | undefined)[] = []; | |
if (gpt4Lockout === 0) { | |
reqs.push(dequeue("gpt-4")); | |
} | |
if (turboLockout === 0) { | |
reqs.push(dequeue("turbo")); | |
} | |
if (claudeLockout === 0) { | |
reqs.push(dequeue("claude")); | |
} | |
reqs.filter(Boolean).forEach((req) => { | |
if (req?.proceed) { | |
req.log.info({ retries: req.retryCount }, `Dequeuing request.`); | |
req.proceed(); | |
} | |
}); | |
setTimeout(processQueue, 50); | |
} | |
/** | |
* Kill stalled requests after 5 minutes, and remove tracked wait times after 2 | |
* minutes. | |
**/ | |
function cleanQueue() { | |
const now = Date.now(); | |
const oldRequests = queue.filter( | |
(req) => now - (req.startTime ?? now) > 5 * 60 * 1000 | |
); | |
oldRequests.forEach((req) => { | |
req.log.info(`Removing request from queue after 5 minutes.`); | |
killQueuedRequest(req); | |
}); | |
const index = waitTimes.findIndex( | |
(waitTime) => now - waitTime.end > 300 * 1000 | |
); | |
const removed = waitTimes.splice(0, index + 1); | |
log.trace( | |
{ stalledRequests: oldRequests.length, prunedWaitTimes: removed.length }, | |
`Cleaning up request queue.` | |
); | |
setTimeout(cleanQueue, 20 * 1000); | |
} | |
export function start() { | |
processQueue(); | |
cleanQueue(); | |
log.info(`Started request queue.`); | |
} | |
let waitTimes: { partition: QueuePartition; start: number; end: number }[] = []; | |
/** Adds a successful request to the list of wait times. */ | |
export function trackWaitTime(req: Request) { | |
waitTimes.push({ | |
partition: getPartitionForRequest(req), | |
start: req.startTime!, | |
end: req.queueOutTime ?? Date.now(), | |
}); | |
} | |
/** Returns average wait time in milliseconds. */ | |
export function getEstimatedWaitTime(partition: QueuePartition) { | |
const now = Date.now(); | |
const recentWaits = waitTimes.filter( | |
(wt) => wt.partition === partition && now - wt.end < 300 * 1000 | |
); | |
if (recentWaits.length === 0) { | |
return 0; | |
} | |
return ( | |
recentWaits.reduce((sum, wt) => sum + wt.end - wt.start, 0) / | |
recentWaits.length | |
); | |
} | |
export function getQueueLength(partition: QueuePartition | "all" = "all") { | |
if (partition === "all") { | |
return queue.length; | |
} | |
const modelQueue = getQueueForPartition(partition); | |
return modelQueue.length; | |
} | |
export function createQueueMiddleware(proxyMiddleware: Handler): Handler { | |
return (req, res, next) => { | |
if (config.queueMode === "none") { | |
return proxyMiddleware(req, res, next); | |
} | |
req.proceed = () => { | |
proxyMiddleware(req, res, next); | |
}; | |
try { | |
enqueue(req); | |
} catch (err: any) { | |
req.res!.status(429).json({ | |
type: "proxy_error", | |
message: err.message, | |
stack: err.stack, | |
proxy_note: `Only one request can be queued at a time. If you don't have another request queued, your IP or user token might be in use by another request.`, | |
}); | |
} | |
}; | |
} | |
function killQueuedRequest(req: Request) { | |
if (!req.res || req.res.writableEnded) { | |
req.log.warn(`Attempted to terminate request that has already ended.`); | |
return; | |
} | |
const res = req.res; | |
try { | |
const message = `Your request has been terminated by the proxy because it has been in the queue for more than 5 minutes. The queue is currently ${queue.length} requests long.`; | |
if (res.headersSent) { | |
const fakeErrorEvent = buildFakeSseMessage( | |
"proxy queue error", | |
message, | |
req | |
); | |
res.write(fakeErrorEvent); | |
res.end(); | |
} else { | |
res.status(500).json({ error: message }); | |
} | |
} catch (e) { | |
req.log.error(e, `Error killing stalled request.`); | |
} | |
} | |
function initStreaming(req: Request) { | |
req.log.info(`Initiating streaming for new queued request.`); | |
const res = req.res!; | |
res.statusCode = 200; | |
res.setHeader("Content-Type", "text/event-stream"); | |
res.setHeader("Cache-Control", "no-cache"); | |
res.setHeader("Connection", "keep-alive"); | |
res.setHeader("X-Accel-Buffering", "no"); // nginx-specific fix | |
res.flushHeaders(); | |
res.write("\n"); | |
res.write(": joining queue\n\n"); | |
} | |
/** | |
* http-proxy-middleware attaches a bunch of event listeners to the req and | |
* res objects which causes problems with our approach to re-enqueuing failed | |
* proxied requests. This function removes those event listeners. | |
* We don't have references to the original event listeners, so we have to | |
* look through the list and remove HPM's listeners by looking for particular | |
* strings in the listener functions. This is an astoundingly shitty way to do | |
* this, but it's the best I can come up with. | |
*/ | |
function removeProxyMiddlewareEventListeners(req: Request) { | |
// node_modules/http-proxy-middleware/dist/plugins/default/debug-proxy-errors-plugin.js:29 | |
// res.listeners('close') | |
const RES_ONCLOSE = `Destroying proxyRes in proxyRes close event`; | |
// node_modules/http-proxy-middleware/dist/plugins/default/debug-proxy-errors-plugin.js:19 | |
// res.listeners('error') | |
const RES_ONERROR = `Socket error in proxyReq event`; | |
// node_modules/http-proxy/lib/http-proxy/passes/web-incoming.js:146 | |
// req.listeners('aborted') | |
const REQ_ONABORTED = `proxyReq.abort()`; | |
// node_modules/http-proxy/lib/http-proxy/passes/web-incoming.js:156 | |
// req.listeners('error') | |
const REQ_ONERROR = `if (req.socket.destroyed`; | |
const res = req.res!; | |
const resOnClose = res | |
.listeners("close") | |
.find((listener) => listener.toString().includes(RES_ONCLOSE)); | |
if (resOnClose) { | |
res.removeListener("close", resOnClose as any); | |
} | |
const resOnError = res | |
.listeners("error") | |
.find((listener) => listener.toString().includes(RES_ONERROR)); | |
if (resOnError) { | |
res.removeListener("error", resOnError as any); | |
} | |
const reqOnAborted = req | |
.listeners("aborted") | |
.find((listener) => listener.toString().includes(REQ_ONABORTED)); | |
if (reqOnAborted) { | |
req.removeListener("aborted", reqOnAborted as any); | |
} | |
const reqOnError = req | |
.listeners("error") | |
.find((listener) => listener.toString().includes(REQ_ONERROR)); | |
if (reqOnError) { | |
req.removeListener("error", reqOnError as any); | |
} | |
} | |