jason9693 nsarrazin HF staff commited on
Commit
6f3588a
1 Parent(s): 118aa57

[#547] fix stream generation bug (#561)

Browse files

* fix stream generation bug

* Update src/routes/conversation/[id]/+server.ts

Co-authored-by: Nathan Sarrazin <sarrazin.nathan@gmail.com>

* fix types

* lint

* modified empty token return

* fix lint

* move empty token control flow

* add type check

---------

Co-authored-by: Nathan Sarrazin <sarrazin.nathan@gmail.com>

src/routes/conversation/[id]/+page.svelte CHANGED
@@ -140,11 +140,14 @@
140
  const reader = response?.body?.pipeThrough(encoder).getReader();
141
  let finalAnswer = "";
142
 
 
 
 
 
 
143
  // this is a bit ugly
144
  // we read the stream until we get the final answer
145
  while (finalAnswer === "") {
146
- await new Promise((r) => setTimeout(r, 25));
147
-
148
  // check for abort
149
  if (isAborted) {
150
  reader?.cancel();
@@ -163,6 +166,8 @@
163
  return;
164
  }
165
 
 
 
166
  // if it's not done we parse the value, which contains all messages
167
  const inputs = value.split("\n");
168
  inputs.forEach(async (el: string) => {
@@ -210,6 +215,10 @@
210
  }
211
  } catch (parseError) {
212
  // in case of parsing error we wait for the next message
 
 
 
 
213
  return;
214
  }
215
  });
 
140
  const reader = response?.body?.pipeThrough(encoder).getReader();
141
  let finalAnswer = "";
142
 
143
+ // set str queue
144
+ // ex) if the last response is => {"type": "stream", "token":
145
+ // It should be => {"type": "stream", "token": "Hello"} = prev_input_chunk + "Hello"}
146
+ let prev_input_chunk = [""];
147
+
148
  // this is a bit ugly
149
  // we read the stream until we get the final answer
150
  while (finalAnswer === "") {
 
 
151
  // check for abort
152
  if (isAborted) {
153
  reader?.cancel();
 
166
  return;
167
  }
168
 
169
+ value = prev_input_chunk.pop() + value;
170
+
171
  // if it's not done we parse the value, which contains all messages
172
  const inputs = value.split("\n");
173
  inputs.forEach(async (el: string) => {
 
215
  }
216
  } catch (parseError) {
217
  // in case of parsing error we wait for the next message
218
+
219
+ if (el === inputs[inputs.length - 1]) {
220
+ prev_input_chunk.push(el);
221
+ }
222
  return;
223
  }
224
  });
src/routes/conversation/[id]/+server.ts CHANGED
@@ -194,7 +194,16 @@ export async function POST({ request, locals, params, getClientAddress }) {
194
  if (newUpdate.type !== "stream") {
195
  updates.push(newUpdate);
196
  }
 
 
 
 
197
  controller.enqueue(JSON.stringify(newUpdate) + "\n");
 
 
 
 
 
198
  }
199
 
200
  update({ type: "status", status: "started" });
 
194
  if (newUpdate.type !== "stream") {
195
  updates.push(newUpdate);
196
  }
197
+
198
+ if (newUpdate.type === "stream" && newUpdate.token === "") {
199
+ return;
200
+ }
201
  controller.enqueue(JSON.stringify(newUpdate) + "\n");
202
+
203
+ if (newUpdate.type === "finalAnswer") {
204
+ // 4096 of spaces to make sure the browser doesn't blocking buffer that holding the response
205
+ controller.enqueue(" ".repeat(4096));
206
+ }
207
  }
208
 
209
  update({ type: "status", status: "started" });