/* Queues incoming prompts/responses and periodically flushes them to configured * logging backend. */ import { logger } from "../logger"; import { PromptLogEntry } from "."; import { sheets } from "./backends"; const FLUSH_INTERVAL = 1000 * 10; const MAX_BATCH_SIZE = 25; const queue: PromptLogEntry[] = []; const log = logger.child({ module: "log-queue" }); let started = false; let timeoutId: NodeJS.Timeout | null = null; let retrying = false; let consecutiveFailedBatches = 0; export const enqueue = (payload: PromptLogEntry) => { if (!started) { log.warn("Log queue not started, discarding incoming log entry."); return; } queue.push(payload); }; export const flush = async () => { if (!started) { return; } if (queue.length > 0) { const batchSize = Math.min(MAX_BATCH_SIZE, queue.length); const nextBatch = queue.splice(0, batchSize); log.info({ size: nextBatch.length }, "Submitting new batch."); try { await sheets.appendBatch(nextBatch); retrying = false; consecutiveFailedBatches = 0; } catch (e: any) { if (retrying) { log.error( { message: e.message, stack: e.stack }, "Failed twice to flush batch, discarding." ); retrying = false; consecutiveFailedBatches++; } else { // Put the batch back at the front of the queue and try again log.warn( { message: e.message, stack: e.stack }, "Failed to flush batch. Retrying." ); queue.unshift(...nextBatch); retrying = true; setImmediate(() => flush()); return; } } } const useHalfInterval = queue.length > MAX_BATCH_SIZE / 2; scheduleFlush(useHalfInterval); }; export const start = async () => { try { await sheets.init(() => stop()); log.info("Logging backend initialized."); started = true; } catch (e) { log.error(e, "Could not initialize logging backend."); return; } scheduleFlush(); }; export const stop = () => { if (timeoutId) { clearTimeout(timeoutId); } log.info("Stopping log queue."); started = false; }; const scheduleFlush = (halfInterval = false) => { if (consecutiveFailedBatches > 3) { // TODO: may cause memory issues on busy servers, though if we crash that // may actually fix the problem with logs randomly not being flushed. const oneMinute = 60 * 1000; const maxBackoff = 10 * oneMinute; const backoff = Math.min(consecutiveFailedBatches * oneMinute, maxBackoff); timeoutId = setTimeout(() => { flush(); }, backoff); log.warn( { consecutiveFailedBatches, backoffMs: backoff }, "Failed to flush 3 batches in a row, pausing for a few minutes." ); return; } if (halfInterval) { log.warn( { queueSize: queue.length }, "Queue is falling behind, switching to faster flush interval." ); } timeoutId = setTimeout( () => { flush(); }, halfInterval ? FLUSH_INTERVAL / 2 : FLUSH_INTERVAL ); };