wuyiqunLu commited on
Commit
6fb9264
1 Parent(s): 04735a9

feat: add timestamp to msg body (#86)

Browse files

and refactor the agent endpoint code a bit

<img width="1443" alt="image"
src="https://github.com/landing-ai/vision-agent-ui/assets/132986242/bd79c37a-3fd4-47cb-a0ca-16dfe7c2c2ab">

Files changed (2) hide show
  1. app/api/vision-agent/route.ts +193 -188
  2. lib/utils/content.ts +1 -0
app/api/vision-agent/route.ts CHANGED
@@ -1,4 +1,4 @@
1
- import { StreamingTextResponse, experimental_StreamData } from 'ai';
2
 
3
  // import { auth } from '@/auth';
4
  import { MessageUI, ResultPayload, SignedPayload } from '@/lib/types';
@@ -12,6 +12,15 @@ import { getPresignedUrl } from '@/lib/aws';
12
  export const dynamic = 'force-dynamic';
13
  export const maxDuration = 300; // This function can run for a maximum of 5 minutes
14
  const TIMEOUT_MILI_SECONDS = 2 * 60 * 1000;
 
 
 
 
 
 
 
 
 
15
 
16
  const uploadBase64 = async (
17
  base64: string,
@@ -45,6 +54,54 @@ const uploadBase64 = async (
45
  }
46
  };
47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48
  export const POST = withLogging(
49
  async (
50
  session,
@@ -90,220 +147,168 @@ export const POST = withLogging(
90
  );
91
  formData.append('image', mediaUrl);
92
 
93
- const agentHost = process.env.LND_TIER ? 'http://publicrestapi-app-lndsvc.publicrestapi.svc.cluster.local:5000' : 'https://api.dev.landing.ai';
 
 
94
 
95
  const fetchResponse = await fetch(
96
  `${agentHost}/v1/agent/chat?agent_class=vision_agent&self_reflection=false`,
97
- // `https://api.landing.ai/v1/agent/chat?agent_class=vision_agent&self_reflection=false`,
98
  // `http://localhost:5001/v1/agent/chat?agent_class=vision_agent&self_reflection=false`,
99
  {
100
  method: 'POST',
101
  headers: {
102
  // default to dev apikey
103
- apikey: process.env.LND_TIER === 'production' ? 'land_sk_nMnUf8xiJJUjyw1l5QaIJJ4ZyrvPthzVmPAIG7TtJY7F9CW6lu' : 'land_sk_DKeoYtaZZrYqJ9TMMiXe4BIQgJcZ0s3XAoB0JT3jv73FFqnr6k'
 
 
 
104
  },
105
  body: formData,
106
  },
107
  );
108
 
109
- if (!fetchResponse.ok) {
110
- if (fetchResponse.body) {
111
- const reader = fetchResponse.body.getReader();
112
- return new StreamingTextResponse(
113
- new ReadableStream({
114
- async start(controller) {
115
- try {
116
- const { done, value } = await reader.read();
117
- if (!done) {
118
- const errorText = new TextDecoder().decode(value);
119
- logger.error(session, { message: errorText }, request);
120
- controller.error(new Error(`Response error: ${errorText}`));
121
- }
122
- } catch (e) {
123
- logger.error(session, (e as Error).message, request);
124
  }
125
- },
126
- }),
127
- {
128
- status: 400,
129
- },
130
- );
131
- } else {
132
- return new StreamingTextResponse(
133
- new ReadableStream({
134
- start(controller) {
135
- logger.error(
136
- session,
137
- { message: 'Response error: No response body' },
138
- request,
139
- );
140
- controller.error(new Error('Response error: No response body'));
141
- },
142
- }),
143
- {
144
- status: 400,
145
  },
146
- );
147
- }
 
 
 
148
  }
149
  // const streamData = new experimental_StreamData();
150
 
151
- if (fetchResponse.body) {
152
- const encoder = new TextEncoder();
153
- const decoder = new TextDecoder('utf-8');
154
- let maxChunkSize = 0;
155
- let buffer = '';
156
- let time = Date.now();
157
- const stream = new ReadableStream({
158
- async start(controller) {
159
- // const parser = createParser(streamParser);
160
- for await (const chunk of fetchResponse.body as any) {
161
- const data = decoder.decode(chunk);
162
- buffer += data;
163
- maxChunkSize = Math.max(data.length, maxChunkSize);
164
- const lines = buffer
165
- .split('\n')
166
- .filter(line => line.trim().length > 0);
167
- if (lines.length === 0) {
168
- if (Date.now() - time > TIMEOUT_MILI_SECONDS) {
169
- logger.info(
170
- session,
171
- {
172
- message: 'Agent timed out',
173
- },
174
- request,
175
- '__Agent_timeout__',
176
- );
177
- controller.enqueue(
178
- encoder.encode(
179
- JSON.stringify({
180
- type: 'final_error',
181
- status: 'failed',
182
- payload: {
183
- name: 'AgentTimeout',
184
- value: `Haven't received any response in last ${TIMEOUT_MILI_SECONDS / 60000} minutes, agent timed out.`,
185
- traceback_raw: [],
186
- },
187
- }) + '\n',
188
- ),
189
- );
190
- controller.close();
191
- return;
192
- }
193
- } else {
194
- time = Date.now();
195
- }
196
- buffer = lines.pop() ?? ''; // Save the last incomplete line back to the buffer
197
- let done = false;
198
- const parseLine = async (
199
- line: string,
200
- errorCallback?: (e: Error) => void,
201
- ) => {
202
- try {
203
- const msg = JSON.parse(line);
204
- if (msg.type === 'final_code' || msg.type === 'final_error') {
205
- done = true;
206
- }
207
- if (
208
- msg.type !== 'final_code' &&
209
- (msg.type !== 'code' ||
210
- msg.status === 'started' ||
211
- msg.status === 'running')
212
- ) {
213
- return line;
214
- }
215
- if (msg.type === 'code') {
216
- const result = JSON.parse(
217
- msg.payload.result,
218
- ) as ResultPayload;
219
- if (result && result.results) {
220
- msg.payload.result = JSON.stringify({
221
- ...result,
222
- results: result.results.map((_result: any) => {
223
- return {
224
- ..._result,
225
- png: undefined,
226
- mp4: undefined,
227
- };
228
- }),
229
- });
230
- }
231
- return JSON.stringify(msg);
232
- }
233
- const result = JSON.parse(msg.payload.result) as ResultPayload;
234
- for (let index = 0; index < result.results.length; index++) {
235
- const png = result.results[index].png ?? '';
236
- const mp4 = result.results[index].mp4 ?? '';
237
- if (!png && !mp4) continue;
238
- const resp = await uploadBase64(
239
- png
240
- ? 'data:image/png;base64,' + png
241
- : 'data:video/mp4;base64,' + mp4,
242
- messages[messages.length - 1].id,
243
- json.id,
244
- index,
245
- user,
246
- );
247
- if (png) result.results[index].png = resp;
248
- if (mp4) result.results[index].mp4 = resp;
249
- }
250
- msg.payload.result = JSON.stringify(result);
251
- return JSON.stringify(msg);
252
- } catch (e) {
253
- errorCallback?.(e as Error);
254
- }
255
- };
256
- for (let line of lines) {
257
- if (!line.trim()) {
258
- continue;
259
- }
260
- const parsedLine = await parseLine(line, (e: Error) => {
261
- console.error(e);
262
- logger.error(
263
- session,
264
- {
265
- line,
266
- message: e.message,
267
- },
268
- request,
269
- );
270
- controller.error(e);
271
- });
272
  controller.enqueue(
273
- encoder.encode(
274
- parsedLine?.trim() ? parsedLine?.trim() + '\n' : '',
275
- ),
276
  );
 
 
277
  }
278
- if (buffer) {
279
- const parsedBuffer = await parseLine(buffer);
280
- if (parsedBuffer?.trim()) {
281
- buffer = '';
282
- controller.enqueue(encoder.encode(parsedBuffer.trim() + '\n'));
283
- }
284
- }
285
- if (done) {
286
- logger.info(
 
 
287
  session,
288
  {
289
- message: 'Streaming finished',
290
- maxChunkSize,
291
  },
292
  request,
293
- '__AGENT_DONE',
294
  );
295
- controller.close();
 
 
 
 
 
 
 
 
 
 
296
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
297
  }
298
- },
299
- });
300
- return new Response(stream, {
301
- headers: {
302
- 'Content-Type': 'application/x-ndjson',
303
- },
304
- });
305
- } else {
306
- return fetchResponse;
307
- }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
308
  },
309
  );
 
1
+ import { JSONValue, StreamingTextResponse, experimental_StreamData } from 'ai';
2
 
3
  // import { auth } from '@/auth';
4
  import { MessageUI, ResultPayload, SignedPayload } from '@/lib/types';
 
12
  export const dynamic = 'force-dynamic';
13
  export const maxDuration = 300; // This function can run for a maximum of 5 minutes
14
  const TIMEOUT_MILI_SECONDS = 2 * 60 * 1000;
15
+ const FINAL_TIMEOUT_ERROR = {
16
+ type: 'final_error',
17
+ status: 'failed',
18
+ payload: {
19
+ name: 'AgentTimeout',
20
+ value: `Haven't received any response in last ${TIMEOUT_MILI_SECONDS / 60000} minutes, agent timed out.`,
21
+ traceback_raw: [],
22
+ },
23
+ };
24
 
25
  const uploadBase64 = async (
26
  base64: string,
 
54
  }
55
  };
56
 
57
+ const modifyCodePayload = async (
58
+ msg: Record<string, any>,
59
+ messageId: string,
60
+ chatId: string,
61
+ user: string,
62
+ ) => {
63
+ if (
64
+ msg.type !== 'final_code' &&
65
+ (msg.type !== 'code' ||
66
+ msg.status === 'started' ||
67
+ msg.status === 'running')
68
+ ) {
69
+ return msg;
70
+ }
71
+ const result = JSON.parse(msg.payload.result) as ResultPayload;
72
+ if (msg.type === 'code') {
73
+ if (result && result.results) {
74
+ msg.payload.result = {
75
+ ...result,
76
+ results: result.results.map((_result: any) => {
77
+ return {
78
+ ..._result,
79
+ png: undefined,
80
+ mp4: undefined,
81
+ };
82
+ }),
83
+ };
84
+ }
85
+ return msg;
86
+ }
87
+ for (let index = 0; index < result.results.length; index++) {
88
+ const png = result.results[index].png ?? '';
89
+ const mp4 = result.results[index].mp4 ?? '';
90
+ if (!png && !mp4) continue;
91
+ const resp = await uploadBase64(
92
+ png ? 'data:image/png;base64,' + png : 'data:video/mp4;base64,' + mp4,
93
+ messageId,
94
+ chatId,
95
+ index,
96
+ user,
97
+ );
98
+ if (png) result.results[index].png = resp;
99
+ if (mp4) result.results[index].mp4 = resp;
100
+ }
101
+ msg.payload.result = result;
102
+ return msg;
103
+ };
104
+
105
  export const POST = withLogging(
106
  async (
107
  session,
 
147
  );
148
  formData.append('image', mediaUrl);
149
 
150
+ const agentHost = process.env.LND_TIER
151
+ ? 'http://publicrestapi-app-lndsvc.publicrestapi.svc.cluster.local:5000'
152
+ : 'https://api.dev.landing.ai';
153
 
154
  const fetchResponse = await fetch(
155
  `${agentHost}/v1/agent/chat?agent_class=vision_agent&self_reflection=false`,
156
+ // `https://api.dev.landing.ai/v1/agent/chat?agent_class=vision_agent&self_reflection=false`,
157
  // `http://localhost:5001/v1/agent/chat?agent_class=vision_agent&self_reflection=false`,
158
  {
159
  method: 'POST',
160
  headers: {
161
  // default to dev apikey
162
+ apikey:
163
+ process.env.LND_TIER === 'production'
164
+ ? 'land_sk_nMnUf8xiJJUjyw1l5QaIJJ4ZyrvPthzVmPAIG7TtJY7F9CW6lu'
165
+ : 'land_sk_DKeoYtaZZrYqJ9TMMiXe4BIQgJcZ0s3XAoB0JT3jv73FFqnr6k',
166
  },
167
  body: formData,
168
  },
169
  );
170
 
171
+ if (!fetchResponse.ok && fetchResponse.body) {
172
+ const reader = fetchResponse.body.getReader();
173
+ return new StreamingTextResponse(
174
+ new ReadableStream({
175
+ async start(controller) {
176
+ try {
177
+ const { done, value } = await reader?.read();
178
+ if (!done) {
179
+ const errorText = new TextDecoder().decode(value);
180
+ logger.error(session, { message: errorText }, request);
181
+ controller.error(new Error(`Response error: ${errorText}`));
 
 
 
 
182
  }
183
+ } catch (e) {
184
+ logger.error(session, (e as Error).message, request);
185
+ }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
186
  },
187
+ }),
188
+ {
189
+ status: 400,
190
+ },
191
+ );
192
  }
193
  // const streamData = new experimental_StreamData();
194
 
195
+ if (!fetchResponse.body) {
196
+ return fetchResponse;
197
+ }
198
+ const encoder = new TextEncoder();
199
+ const decoder = new TextDecoder('utf-8');
200
+ let maxChunkSize = 0;
201
+ let buffer = '';
202
+ let time = Date.now();
203
+ let done = false;
204
+ const stream = new ReadableStream({
205
+ async start(controller) {
206
+ // const parser = createParser(streamParser);
207
+ for await (const chunk of fetchResponse.body as any) {
208
+ const data = decoder.decode(chunk);
209
+ buffer += data;
210
+ maxChunkSize = Math.max(data.length, maxChunkSize);
211
+ const lines = buffer
212
+ .split('\n')
213
+ .filter(line => line.trim().length > 0);
214
+ if (lines.length === 0) {
215
+ if (Date.now() - time > TIMEOUT_MILI_SECONDS) {
216
+ logger.info(
217
+ session,
218
+ {
219
+ message: 'Agent timed out',
220
+ },
221
+ request,
222
+ '__Agent_timeout__',
223
+ );
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
224
  controller.enqueue(
225
+ encoder.encode(JSON.stringify(FINAL_TIMEOUT_ERROR) + '\n'),
 
 
226
  );
227
+ controller.close();
228
+ return;
229
  }
230
+ } else {
231
+ time = Date.now();
232
+ }
233
+ buffer = lines.pop() ?? ''; // Save the last incomplete line back to the buffer
234
+ const parseLine = async (
235
+ line: string,
236
+ ignoreParsingError = false,
237
+ ) => {
238
+ const handleError = (e: Error) => {
239
+ console.error(e);
240
+ logger.error(
241
  session,
242
  {
243
+ line,
244
+ message: e.message,
245
  },
246
  request,
 
247
  );
248
+ controller.error(e);
249
+ };
250
+ let msg = null;
251
+ try {
252
+ msg = JSON.parse(line);
253
+ if (msg.type === 'final_code' || msg.type === 'final_error') {
254
+ done = true;
255
+ }
256
+ } catch (e) {
257
+ if (ignoreParsingError) return;
258
+ handleError(e as Error);
259
  }
260
+ if (!msg) return;
261
+ try {
262
+ const modifiedMsg = await modifyCodePayload(
263
+ {
264
+ ...msg,
265
+ timestamp: new Date(),
266
+ },
267
+ messages[messages.length - 1].id,
268
+ json.id,
269
+ user,
270
+ );
271
+ return modifiedMsg;
272
+ } catch (e) {
273
+ handleError(e as Error);
274
+ }
275
+ };
276
+ for (let line of lines) {
277
+ const parsedMsg = await parseLine(line);
278
+ controller.enqueue(
279
+ parsedMsg ? encoder.encode(JSON.stringify(parsedMsg) + '\n') : '',
280
+ );
281
  }
282
+ if (buffer) {
283
+ const parsedBuffer = await parseLine(buffer, true);
284
+ if (parsedBuffer) {
285
+ buffer = '';
286
+ controller.enqueue(
287
+ encoder.encode(JSON.stringify(parsedBuffer) + '\n'),
288
+ );
289
+ } else {
290
+ controller.enqueue('');
291
+ }
292
+ }
293
+ if (done) {
294
+ logger.info(
295
+ session,
296
+ {
297
+ message: 'Streaming finished',
298
+ maxChunkSize,
299
+ },
300
+ request,
301
+ '__AGENT_DONE',
302
+ );
303
+ controller.close();
304
+ }
305
+ }
306
+ },
307
+ });
308
+ return new Response(stream, {
309
+ headers: {
310
+ 'Content-Type': 'application/x-ndjson',
311
+ },
312
+ });
313
  },
314
  );
lib/utils/content.ts CHANGED
@@ -83,6 +83,7 @@ export type ChunkBody =
83
  | {
84
  type: 'plans' | 'tools' | 'code' | 'final_code' | 'final_error';
85
  status: 'started' | 'completed' | 'failed' | 'running';
 
86
  payload:
87
  | Array<Record<string, string>> // PlansBody | ToolsBody
88
  | CodeResult; // CodeBody
 
83
  | {
84
  type: 'plans' | 'tools' | 'code' | 'final_code' | 'final_error';
85
  status: 'started' | 'completed' | 'failed' | 'running';
86
+ timestamp: string;
87
  payload:
88
  | Array<Record<string, string>> // PlansBody | ToolsBody
89
  | CodeResult; // CodeBody