victor HF Staff commited on
Commit
4326cd6
·
unverified ·
1 Parent(s): c62aca6

Add background generation polling for conversation updates (#1882)

Browse files

* Add background generation polling for conversations

Introduces a BackgroundGenerationPoller Svelte component and store to track and poll background conversation generations when users navigate away during loading. Updates conversation streaming server logic to persist messages if the client detaches, ensuring conversation state is saved. Integrates poller into layout and conversation page, improving reliability of background message generation and UI updates.

* Improve background generation polling and loading state

* Refactor backgroundGenerations store to use $state

* Update +server.ts

* Harden background generation polling

* Format updated polling flows

src/lib/components/BackgroundGenerationPoller.svelte ADDED
@@ -0,0 +1,154 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ <script lang="ts">
2
+ import { browser, dev } from "$app/environment";
3
+ import { invalidate } from "$app/navigation";
4
+
5
+ import {
6
+ type BackgroundGeneration,
7
+ backgroundGenerationEntries,
8
+ removeBackgroundGeneration,
9
+ } from "$lib/stores/backgroundGenerations";
10
+ import { handleResponse, useAPIClient } from "$lib/APIClient";
11
+ import { UrlDependency } from "$lib/types/UrlDependency";
12
+ import { MessageUpdateStatus, MessageUpdateType } from "$lib/types/MessageUpdate";
13
+ import type { Message } from "$lib/types/Message";
14
+
15
+ const POLL_INTERVAL_MS = 1000;
16
+
17
+ const client = useAPIClient();
18
+ const pollers = new Map<string, () => void>();
19
+ const inflight = new Set<string>();
20
+ const assistantSnapshots = new Map<string, string>();
21
+ const failureCounts = new Map<string, number>();
22
+
23
+ $effect.root(() => {
24
+ if (!browser) {
25
+ pollers.clear();
26
+ return;
27
+ }
28
+
29
+ let destroyed = false;
30
+
31
+ const stopPoller = (id: string) => {
32
+ const stop = pollers.get(id);
33
+ if (!stop) return;
34
+
35
+ stop();
36
+ pollers.delete(id);
37
+ inflight.delete(id);
38
+ assistantSnapshots.delete(id);
39
+ failureCounts.delete(id);
40
+ };
41
+
42
+ const pollOnce = async (id: string) => {
43
+ if (destroyed || inflight.has(id)) return;
44
+ inflight.add(id);
45
+ if (dev) {
46
+ console.log("background generation poll", id);
47
+ }
48
+
49
+ try {
50
+ const response = await client.conversations({ id }).get();
51
+ const conversation = handleResponse(response);
52
+ const messages = conversation?.messages ?? [];
53
+ const lastAssistant = [...messages]
54
+ .reverse()
55
+ .find((message: Message) => message.from === "assistant");
56
+
57
+ const hasFinalAnswer =
58
+ lastAssistant?.updates?.some((update) => update.type === MessageUpdateType.FinalAnswer) ??
59
+ false;
60
+ const hasError =
61
+ lastAssistant?.updates?.some(
62
+ (update) =>
63
+ update.type === MessageUpdateType.Status &&
64
+ update.status === MessageUpdateStatus.Error
65
+ ) ?? false;
66
+
67
+ const snapshot = lastAssistant
68
+ ? JSON.stringify({
69
+ id: lastAssistant.id,
70
+ updatedAt: lastAssistant.updatedAt,
71
+ contentLength: lastAssistant.content?.length ?? 0,
72
+ updatesLength: lastAssistant.updates?.length ?? 0,
73
+ })
74
+ : "__none__";
75
+ const previousSnapshot = assistantSnapshots.get(id);
76
+ let shouldInvalidateConversation = false;
77
+
78
+ if (lastAssistant) {
79
+ assistantSnapshots.set(id, snapshot);
80
+ if (snapshot !== previousSnapshot) {
81
+ shouldInvalidateConversation = true;
82
+ }
83
+ } else if (assistantSnapshots.has(id)) {
84
+ assistantSnapshots.delete(id);
85
+ shouldInvalidateConversation = true;
86
+ }
87
+
88
+ if (lastAssistant && (hasFinalAnswer || hasError)) {
89
+ removeBackgroundGeneration(id);
90
+ assistantSnapshots.delete(id);
91
+ failureCounts.delete(id);
92
+ shouldInvalidateConversation = true;
93
+ await invalidate(UrlDependency.ConversationList);
94
+ }
95
+
96
+ if (shouldInvalidateConversation) {
97
+ await invalidate(UrlDependency.Conversation);
98
+ }
99
+
100
+ failureCounts.delete(id);
101
+ } catch (err) {
102
+ console.error("Background generation poll failed", err);
103
+ const failures = (failureCounts.get(id) ?? 0) + 1;
104
+ failureCounts.set(id, failures);
105
+ if (failures >= 3) {
106
+ removeBackgroundGeneration(id);
107
+ assistantSnapshots.delete(id);
108
+ failureCounts.delete(id);
109
+ await invalidate(UrlDependency.ConversationList);
110
+ }
111
+ } finally {
112
+ inflight.delete(id);
113
+ }
114
+ };
115
+
116
+ const startPoller = (entry: BackgroundGeneration) => {
117
+ if (pollers.has(entry.id)) return;
118
+
119
+ const intervalId = setInterval(() => {
120
+ void pollOnce(entry.id);
121
+ }, POLL_INTERVAL_MS);
122
+
123
+ pollers.set(entry.id, () => clearInterval(intervalId));
124
+ void pollOnce(entry.id);
125
+ };
126
+
127
+ $effect(() => {
128
+ const entries = backgroundGenerationEntries;
129
+
130
+ if (destroyed) return;
131
+
132
+ const activeIds = new Set(entries.map((entry) => entry.id));
133
+
134
+ for (const id of pollers.keys()) {
135
+ if (!activeIds.has(id)) {
136
+ stopPoller(id);
137
+ }
138
+ }
139
+
140
+ for (const entry of entries) {
141
+ startPoller(entry);
142
+ }
143
+ });
144
+
145
+ return () => {
146
+ destroyed = true;
147
+ for (const stop of pollers.values()) stop();
148
+ pollers.clear();
149
+ inflight.clear();
150
+ assistantSnapshots.clear();
151
+ failureCounts.clear();
152
+ };
153
+ });
154
+ </script>
src/lib/stores/backgroundGenerations.svelte.ts ADDED
@@ -0,0 +1,32 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ export type BackgroundGeneration = {
2
+ id: string;
3
+ startedAt: number;
4
+ };
5
+
6
+ export const backgroundGenerationEntries = $state<BackgroundGeneration[]>([]);
7
+
8
+ export function addBackgroundGeneration(entry: BackgroundGeneration) {
9
+ const index = backgroundGenerationEntries.findIndex(({ id }) => id === entry.id);
10
+
11
+ if (index === -1) {
12
+ backgroundGenerationEntries.push(entry);
13
+ return;
14
+ }
15
+
16
+ backgroundGenerationEntries[index] = entry;
17
+ }
18
+
19
+ export function removeBackgroundGeneration(id: string) {
20
+ const index = backgroundGenerationEntries.findIndex((entry) => entry.id === id);
21
+ if (index === -1) return;
22
+
23
+ backgroundGenerationEntries.splice(index, 1);
24
+ }
25
+
26
+ export function clearBackgroundGenerations() {
27
+ backgroundGenerationEntries.length = 0;
28
+ }
29
+
30
+ export function hasBackgroundGeneration(id: string) {
31
+ return backgroundGenerationEntries.some((entry) => entry.id === id);
32
+ }
src/lib/stores/backgroundGenerations.ts ADDED
@@ -0,0 +1 @@
 
 
1
+ export * from "./backgroundGenerations.svelte";
src/routes/+layout.svelte CHANGED
@@ -23,6 +23,7 @@
23
  import { isAborted } from "$lib/stores/isAborted";
24
  import IconShare from "$lib/components/icons/IconShare.svelte";
25
  import { shareModal } from "$lib/stores/shareModal";
 
26
 
27
  let { data = $bindable(), children } = $props();
28
 
@@ -237,6 +238,8 @@
237
  <OverloadedModal onClose={() => (overloadedModalOpen = false)} />
238
  {/if}
239
 
 
 
240
  <div
241
  class="fixed grid h-full w-screen grid-cols-1 grid-rows-[auto,1fr] overflow-hidden text-smd {!isNavCollapsed
242
  ? 'md:grid-cols-[290px,1fr]'
 
23
  import { isAborted } from "$lib/stores/isAborted";
24
  import IconShare from "$lib/components/icons/IconShare.svelte";
25
  import { shareModal } from "$lib/stores/shareModal";
26
+ import BackgroundGenerationPoller from "$lib/components/BackgroundGenerationPoller.svelte";
27
 
28
  let { data = $bindable(), children } = $props();
29
 
 
238
  <OverloadedModal onClose={() => (overloadedModalOpen = false)} />
239
  {/if}
240
 
241
+ <BackgroundGenerationPoller />
242
+
243
  <div
244
  class="fixed grid h-full w-screen grid-cols-1 grid-rows-[auto,1fr] overflow-hidden text-smd {!isNavCollapsed
245
  ? 'md:grid-cols-[290px,1fr]'
src/routes/conversation/[id]/+page.svelte CHANGED
@@ -22,7 +22,11 @@
22
  import type { v4 } from "uuid";
23
  import { useSettingsStore } from "$lib/stores/settings.js";
24
  import { browser } from "$app/environment";
25
-
 
 
 
 
26
  import type { TreeNode, TreeId } from "$lib/utils/tree/tree";
27
  import "katex/dist/katex.min.css";
28
  import { updateDebouncer } from "$lib/utils/updates.js";
@@ -443,6 +447,33 @@
443
  messages = data.messages;
444
  });
445
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
446
  // create a linear list of `messagesPath` from `messages` that is a tree of threaded messages
447
  let messagesPath = $derived(createMessagesPath(messages));
448
  let messagesAlternatives = $derived(createMessagesAlternatives(messages));
@@ -453,10 +484,26 @@
453
  }
454
  });
455
 
456
- beforeNavigate(() => {
457
- if (page.params.id) {
458
- $isAborted = true;
459
- loading = false;
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
460
  }
461
  });
462
 
 
22
  import type { v4 } from "uuid";
23
  import { useSettingsStore } from "$lib/stores/settings.js";
24
  import { browser } from "$app/environment";
25
+ import {
26
+ addBackgroundGeneration,
27
+ hasBackgroundGeneration,
28
+ removeBackgroundGeneration,
29
+ } from "$lib/stores/backgroundGenerations";
30
  import type { TreeNode, TreeId } from "$lib/utils/tree/tree";
31
  import "katex/dist/katex.min.css";
32
  import { updateDebouncer } from "$lib/utils/updates.js";
 
447
  messages = data.messages;
448
  });
449
 
450
+ function isConversationStreaming(msgs: Message[]): boolean {
451
+ const lastAssistant = [...msgs].reverse().find((msg) => msg.from === "assistant");
452
+ if (!lastAssistant) return false;
453
+ const hasFinalAnswer =
454
+ lastAssistant.updates?.some((update) => update.type === MessageUpdateType.FinalAnswer) ??
455
+ false;
456
+ const hasError =
457
+ lastAssistant.updates?.some(
458
+ (update) =>
459
+ update.type === MessageUpdateType.Status && update.status === MessageUpdateStatus.Error
460
+ ) ?? false;
461
+ return !hasFinalAnswer && !hasError;
462
+ }
463
+
464
+ $effect(() => {
465
+ const streaming = isConversationStreaming(messages);
466
+ if (streaming) {
467
+ loading = true;
468
+ } else if (!pending) {
469
+ loading = false;
470
+ }
471
+
472
+ if (!streaming && browser) {
473
+ removeBackgroundGeneration(page.params.id);
474
+ }
475
+ });
476
+
477
  // create a linear list of `messagesPath` from `messages` that is a tree of threaded messages
478
  let messagesPath = $derived(createMessagesPath(messages));
479
  let messagesAlternatives = $derived(createMessagesAlternatives(messages));
 
484
  }
485
  });
486
 
487
+ beforeNavigate((navigation) => {
488
+ if (!page.params.id) return;
489
+
490
+ const navigatingAway =
491
+ navigation.to?.route.id !== page.route.id || navigation.to?.params?.id !== page.params.id;
492
+
493
+ if (loading && navigatingAway) {
494
+ addBackgroundGeneration({ id: page.params.id, startedAt: Date.now() });
495
+ }
496
+
497
+ $isAborted = true;
498
+ loading = false;
499
+ });
500
+
501
+ onMount(() => {
502
+ const hasBackgroundEntry = hasBackgroundGeneration(page.params.id);
503
+ const streaming = isConversationStreaming(messages);
504
+ if (hasBackgroundEntry && streaming) {
505
+ addBackgroundGeneration({ id: page.params.id, startedAt: Date.now() });
506
+ loading = true;
507
  }
508
  });
509
 
src/routes/conversation/[id]/+server.ts CHANGED
@@ -320,9 +320,17 @@ export async function POST({ request, locals, params, getClientAddress }) {
320
  );
321
 
322
  let doneStreaming = false;
 
323
 
324
  let lastTokenTimestamp: undefined | Date = undefined;
325
 
 
 
 
 
 
 
 
326
  // we now build the stream
327
  const stream = new ReadableStream({
328
  async start(controller) {
@@ -406,21 +414,31 @@ export async function POST({ request, locals, params, getClientAddress }) {
406
  event = { ...event, token: event.token.padEnd(16, "\0") };
407
  }
408
 
409
- // Send the update to the client
410
- controller.enqueue(JSON.stringify(event) + "\n");
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
411
 
412
- // Send 4096 of spaces to make sure the browser doesn't blocking buffer that holding the response
413
- if (event.type === MessageUpdateType.FinalAnswer) {
414
- controller.enqueue(" ".repeat(4096));
415
  }
416
  }
417
 
418
- await collections.conversations.updateOne(
419
- { _id: convId },
420
- { $set: { title: conv.title, updatedAt: new Date() } }
421
- );
422
- messageToWriteTo.updatedAt = new Date();
423
-
424
  let hasError = false;
425
  const initialMessageContent = messageToWriteTo.content;
426
 
@@ -463,22 +481,18 @@ export async function POST({ request, locals, params, getClientAddress }) {
463
  }
464
  }
465
 
466
- await collections.conversations.updateOne(
467
- { _id: convId },
468
- { $set: { messages: conv.messages, title: conv?.title, updatedAt: new Date() } }
469
- );
470
 
471
  // used to detect if cancel() is called bc of interrupt or just because the connection closes
472
  doneStreaming = true;
473
-
474
- controller.close();
 
475
  },
476
  async cancel() {
477
  if (doneStreaming) return;
478
- await collections.conversations.updateOne(
479
- { _id: convId },
480
- { $set: { messages: conv.messages, title: conv.title, updatedAt: new Date() } }
481
- );
482
  },
483
  });
484
 
 
320
  );
321
 
322
  let doneStreaming = false;
323
+ let clientDetached = false;
324
 
325
  let lastTokenTimestamp: undefined | Date = undefined;
326
 
327
+ const persistConversation = async () => {
328
+ await collections.conversations.updateOne(
329
+ { _id: convId },
330
+ { $set: { messages: conv.messages, title: conv.title, updatedAt: new Date() } }
331
+ );
332
+ };
333
+
334
  // we now build the stream
335
  const stream = new ReadableStream({
336
  async start(controller) {
 
414
  event = { ...event, token: event.token.padEnd(16, "\0") };
415
  }
416
 
417
+ messageToWriteTo.updatedAt = new Date();
418
+
419
+ const enqueueUpdate = async () => {
420
+ if (clientDetached) return;
421
+ try {
422
+ controller.enqueue(JSON.stringify(event) + "\n");
423
+ if (event.type === MessageUpdateType.FinalAnswer) {
424
+ controller.enqueue(" ".repeat(4096));
425
+ }
426
+ } catch (err) {
427
+ clientDetached = true;
428
+ logger.info(
429
+ { conversationId: convId.toString() },
430
+ "Client detached during message streaming"
431
+ );
432
+ }
433
+ };
434
+
435
+ await enqueueUpdate();
436
 
437
+ if (clientDetached) {
438
+ await persistConversation();
 
439
  }
440
  }
441
 
 
 
 
 
 
 
442
  let hasError = false;
443
  const initialMessageContent = messageToWriteTo.content;
444
 
 
481
  }
482
  }
483
 
484
+ await persistConversation();
 
 
 
485
 
486
  // used to detect if cancel() is called bc of interrupt or just because the connection closes
487
  doneStreaming = true;
488
+ if (!clientDetached) {
489
+ controller.close();
490
+ }
491
  },
492
  async cancel() {
493
  if (doneStreaming) return;
494
+ clientDetached = true;
495
+ await persistConversation();
 
 
496
  },
497
  });
498