wuyiqunLu commited on
Commit
f3db631
1 Parent(s): 154a1e7

fix: when api sending multiple chunks together with incomplete json (#52)

Browse files
Files changed (1) hide show
  1. app/api/vision-agent/route.ts +67 -60
app/api/vision-agent/route.ts CHANGED
@@ -10,6 +10,46 @@ import { cleanAnswerMessage, cleanInputMessage } from '@/lib/messageUtils';
10
  export const dynamic = 'force-dynamic';
11
  export const maxDuration = 300; // This function can run for a maximum of 5 minutes
12
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  export const POST = withLogging(
14
  async (
15
  session,
@@ -110,74 +150,41 @@ export const POST = withLogging(
110
 
111
  if (fetchResponse.body) {
112
  const encoder = new TextEncoder();
113
- const decoder = new TextDecoder();
 
114
  const stream = fetchResponse.body.pipeThrough(
115
  new TransformStream({
116
  transform: async (chunk, controller) => {
117
- const data = decoder.decode(chunk);
118
- data.split('\n').forEach(line => {
 
 
 
119
  if (!line.trim()) {
120
- return;
121
  }
122
- try {
123
- const json = JSON.parse(line);
124
- let message = (json.log ?? '') + '\n';
125
- if (json.task || json.plan || json.reflection) {
126
- const arr = json.plan
127
- ? json.plan
128
- : json.task
129
- ? [json.task]
130
- : [json.reflection];
131
- const keys = Object.keys(arr[0]);
132
- message += '\n';
133
- message += '| ' + keys.join(' | ') + ' |' + '\n';
134
- message +=
135
- new Array(keys.length + 1).fill('|').join(' :- ') + '\n';
136
- arr.forEach((obj: any) => {
137
- message +=
138
- '| ' +
139
- keys.map(key => obj[key]).join(' | ') +
140
- ' |' +
141
- '\n';
142
- });
143
- message += '\n';
144
  }
145
- if (json.tools) {
146
- message += '\n';
147
- message += '| ' + 'Descriptions' + ' |' + '\n';
148
- message += '| ' + ':-' + ' |' + '\n';
149
- json.tools.forEach((tool: string) => {
150
- message += '| ' + tool + ' |' + '\n';
151
- });
152
- message += '\n';
153
- }
154
- if (json.code) {
155
- message += `\`\`\`python\n${json.code}\n\`\`\`\n`;
156
- }
157
- if (json.result) {
158
- message += `\`\`\`\n${json.result}\n\`\`\`\n`;
159
- }
160
-
161
- logger.info(
162
- session,
163
- {
164
- message,
165
- },
166
- request,
167
- '__AGENT_RESPONSE',
168
- );
169
  controller.enqueue(encoder.encode(message));
170
- } catch (e) {
171
- console.log(data);
172
- logger.error(
173
- session,
174
- { message: (e as Error).message, data },
175
- request,
176
- );
177
- controller.error(e);
178
- controller.terminate();
179
  }
180
- });
181
  },
182
  }),
183
  );
 
10
  export const dynamic = 'force-dynamic';
11
  export const maxDuration = 300; // This function can run for a maximum of 5 minutes
12
 
13
+ const parseLine = (line: string) => {
14
+ try {
15
+ const json = JSON.parse(line);
16
+ let message = (json.log ?? '') + '\n';
17
+ if (json.task || json.plan || json.reflection) {
18
+ const arr = json.plan
19
+ ? json.plan
20
+ : json.task
21
+ ? [json.task]
22
+ : [json.reflection];
23
+ const keys = Object.keys(arr[0]);
24
+ message += '\n';
25
+ message += '| ' + keys.join(' | ') + ' |' + '\n';
26
+ message += new Array(keys.length + 1).fill('|').join(' :- ') + '\n';
27
+ arr.forEach((obj: any) => {
28
+ message += '| ' + keys.map(key => obj[key]).join(' | ') + ' |' + '\n';
29
+ });
30
+ message += '\n';
31
+ }
32
+ if (json.tools) {
33
+ message += '\n';
34
+ message += '| ' + 'Descriptions' + ' |' + '\n';
35
+ message += '| ' + ':-' + ' |' + '\n';
36
+ json.tools.forEach((tool: string) => {
37
+ message += '| ' + tool + ' |' + '\n';
38
+ });
39
+ message += '\n';
40
+ }
41
+ if (json.code) {
42
+ message += `\`\`\`python\n${json.code}\n\`\`\`\n`;
43
+ }
44
+ if (json.result) {
45
+ message += `\`\`\`\n${json.result}\n\`\`\`\n`;
46
+ }
47
+ return { message };
48
+ } catch (e) {
49
+ return { error: e };
50
+ }
51
+ };
52
+
53
  export const POST = withLogging(
54
  async (
55
  session,
 
150
 
151
  if (fetchResponse.body) {
152
  const encoder = new TextEncoder();
153
+ const decoder = new TextDecoder('utf-8');
154
+ let buffer = '';
155
  const stream = fetchResponse.body.pipeThrough(
156
  new TransformStream({
157
  transform: async (chunk, controller) => {
158
+ const data = decoder.decode(chunk, { stream: true });
159
+ buffer += data;
160
+ let lines = buffer.split('\n');
161
+ buffer = lines.pop() ?? ''; // Save the last incomplete line back to the buffer
162
+ for (let line of lines) {
163
  if (!line.trim()) {
164
+ continue;
165
  }
166
+ if (line.trim()) {
167
+ const { message, error } = parseLine(line.trim());
168
+ if (message) {
169
+ controller.enqueue(encoder.encode(message));
170
+ } else if (error) {
171
+ logger.error(
172
+ session,
173
+ { message: (error as Error).message, data },
174
+ request,
175
+ );
176
+ controller.error(error);
177
+ controller.terminate();
 
 
 
 
 
 
 
 
 
 
178
  }
179
+ }
180
+ }
181
+ if (buffer.trim()) {
182
+ const { message } = parseLine(buffer.trim());
183
+ if (message) {
184
+ buffer = '';
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
185
  controller.enqueue(encoder.encode(message));
 
 
 
 
 
 
 
 
 
186
  }
187
+ }
188
  },
189
  }),
190
  );