Spaces:
No application file
No application file
/* Queues incoming prompts/responses and periodically flushes them to configured | |
* logging backend. */ | |
import { logger } from "../logger"; | |
import { PromptLogEntry } from "."; | |
import { sheets } from "./backends"; | |
const FLUSH_INTERVAL = 1000 * 10; | |
const MAX_BATCH_SIZE = 25; | |
const queue: PromptLogEntry[] = []; | |
const log = logger.child({ module: "log-queue" }); | |
let started = false; | |
let timeoutId: NodeJS.Timeout | null = null; | |
let retrying = false; | |
let consecutiveFailedBatches = 0; | |
export const enqueue = (payload: PromptLogEntry) => { | |
if (!started) { | |
log.warn("Log queue not started, discarding incoming log entry."); | |
return; | |
} | |
queue.push(payload); | |
}; | |
export const flush = async () => { | |
if (!started) { | |
return; | |
} | |
if (queue.length > 0) { | |
const batchSize = Math.min(MAX_BATCH_SIZE, queue.length); | |
const nextBatch = queue.splice(0, batchSize); | |
log.info({ size: nextBatch.length }, "Submitting new batch."); | |
try { | |
await sheets.appendBatch(nextBatch); | |
retrying = false; | |
consecutiveFailedBatches = 0; | |
} catch (e: any) { | |
if (retrying) { | |
log.error( | |
{ message: e.message, stack: e.stack }, | |
"Failed twice to flush batch, discarding." | |
); | |
retrying = false; | |
consecutiveFailedBatches++; | |
} else { | |
// Put the batch back at the front of the queue and try again | |
log.warn( | |
{ message: e.message, stack: e.stack }, | |
"Failed to flush batch. Retrying." | |
); | |
queue.unshift(...nextBatch); | |
retrying = true; | |
setImmediate(() => flush()); | |
return; | |
} | |
} | |
} | |
const useHalfInterval = queue.length > MAX_BATCH_SIZE / 2; | |
scheduleFlush(useHalfInterval); | |
}; | |
export const start = async () => { | |
try { | |
await sheets.init(() => stop()); | |
log.info("Logging backend initialized."); | |
started = true; | |
} catch (e) { | |
log.error(e, "Could not initialize logging backend."); | |
return; | |
} | |
scheduleFlush(); | |
}; | |
export const stop = () => { | |
if (timeoutId) { | |
clearTimeout(timeoutId); | |
} | |
log.info("Stopping log queue."); | |
started = false; | |
}; | |
const scheduleFlush = (halfInterval = false) => { | |
if (consecutiveFailedBatches > 3) { | |
// TODO: may cause memory issues on busy servers, though if we crash that | |
// may actually fix the problem with logs randomly not being flushed. | |
const oneMinute = 60 * 1000; | |
const maxBackoff = 10 * oneMinute; | |
const backoff = Math.min(consecutiveFailedBatches * oneMinute, maxBackoff); | |
timeoutId = setTimeout(() => { | |
flush(); | |
}, backoff); | |
log.warn( | |
{ consecutiveFailedBatches, backoffMs: backoff }, | |
"Failed to flush 3 batches in a row, pausing for a few minutes." | |
); | |
return; | |
} | |
if (halfInterval) { | |
log.warn( | |
{ queueSize: queue.length }, | |
"Queue is falling behind, switching to faster flush interval." | |
); | |
} | |
timeoutId = setTimeout( | |
() => { | |
flush(); | |
}, | |
halfInterval ? FLUSH_INTERVAL / 2 : FLUSH_INTERVAL | |
); | |
}; | |