File size: 3,041 Bytes
9de8f9d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/* Queues incoming prompts/responses and periodically flushes them to configured
 * logging backend. */

import { logger } from "../logger";
import { PromptLogEntry } from ".";
import { sheets } from "./backends";

const FLUSH_INTERVAL = 1000 * 10;
const MAX_BATCH_SIZE = 25;

const queue: PromptLogEntry[] = [];
const log = logger.child({ module: "log-queue" });

let started = false;
let timeoutId: NodeJS.Timeout | null = null;
let retrying = false;
let consecutiveFailedBatches = 0;

export const enqueue = (payload: PromptLogEntry) => {
  if (!started) {
    log.warn("Log queue not started, discarding incoming log entry.");
    return;
  }
  queue.push(payload);
};

export const flush = async () => {
  if (!started) {
    return;
  }

  if (queue.length > 0) {
    const batchSize = Math.min(MAX_BATCH_SIZE, queue.length);
    const nextBatch = queue.splice(0, batchSize);
    log.info({ size: nextBatch.length }, "Submitting new batch.");
    try {
      await sheets.appendBatch(nextBatch);
      retrying = false;
      consecutiveFailedBatches = 0;
    } catch (e: any) {
      if (retrying) {
        log.error(
          { message: e.message, stack: e.stack },
          "Failed twice to flush batch, discarding."
        );
        retrying = false;
        consecutiveFailedBatches++;
      } else {
        // Put the batch back at the front of the queue and try again
        log.warn(
          { message: e.message, stack: e.stack },
          "Failed to flush batch. Retrying."
        );
        queue.unshift(...nextBatch);
        retrying = true;
        setImmediate(() => flush());
        return;
      }
    }
  }

  const useHalfInterval = queue.length > MAX_BATCH_SIZE / 2;
  scheduleFlush(useHalfInterval);
};

export const start = async () => {
  try {
    await sheets.init(() => stop());
    log.info("Logging backend initialized.");
    started = true;
  } catch (e) {
    log.error(e, "Could not initialize logging backend.");
    return;
  }
  scheduleFlush();
};

export const stop = () => {
  if (timeoutId) {
    clearTimeout(timeoutId);
  }
  log.info("Stopping log queue.");
  started = false;
};

const scheduleFlush = (halfInterval = false) => {
  if (consecutiveFailedBatches > 3) {
    // TODO: may cause memory issues on busy servers, though if we crash that
    // may actually fix the problem with logs randomly not being flushed.
    const oneMinute = 60 * 1000;
    const maxBackoff = 10 * oneMinute;
    const backoff = Math.min(consecutiveFailedBatches * oneMinute, maxBackoff);
    timeoutId = setTimeout(() => {
      flush();
    }, backoff);
    log.warn(
      { consecutiveFailedBatches, backoffMs: backoff },
      "Failed to flush 3 batches in a row, pausing for a few minutes."
    );
    return;
  }

  if (halfInterval) {
    log.warn(
      { queueSize: queue.length },
      "Queue is falling behind, switching to faster flush interval."
    );
  }

  timeoutId = setTimeout(
    () => {
      flush();
    },
    halfInterval ? FLUSH_INTERVAL / 2 : FLUSH_INTERVAL
  );
};