wuyiqunLu commited on
Commit
7286745
1 Parent(s): e777dce

feat: update final_error to result and add cancel callback (#103)

Browse files

<img width="1288" alt="image"
src="https://github.com/landing-ai/vision-agent-ui/assets/132986242/743a66c9-65f9-4698-9d67-3ef6eb209ef3">

<img width="1271" alt="image"
src="https://github.com/landing-ai/vision-agent-ui/assets/132986242/4ac27c6f-85b9-496b-b436-1921c2ba052c">

.github/workflows/playwright.yml CHANGED
@@ -5,7 +5,7 @@ jobs:
5
  run-e2es:
6
  timeout-minutes: 5
7
  runs-on: ubuntu-latest
8
- if: github.event_name == 'deployment_status' && github.event.deployment_status.state == 'success'
9
  steps:
10
  - uses: actions/checkout@v4
11
  - name: Setup pnpm
 
5
  run-e2es:
6
  timeout-minutes: 5
7
  runs-on: ubuntu-latest
8
+ if: github.event_name == 'deployment_status' && github.event.deployment_status.state == 'success' && github.event.deployment_status.environment_url
9
  steps:
10
  - uses: actions/checkout@v4
11
  - name: Setup pnpm
app/api/vision-agent/route.ts CHANGED
@@ -1,8 +1,5 @@
1
  import { StreamingTextResponse } from 'ai';
2
 
3
- // import { auth } from '@/auth';
4
- import { MessageUI } from '@/lib/types';
5
-
6
  import { logger, withLogging } from '@/lib/logger';
7
  import { getPresignedUrl } from '@/lib/aws';
8
  import { dbPostUpdateMessageResponse } from '@/lib/db/functions';
@@ -179,6 +176,43 @@ export const POST = withLogging(
179
  let time = Date.now();
180
  const results: PrismaJson.MessageBody[] = [];
181
  const stream = new ReadableStream({
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
182
  async start(controller) {
183
  const parseLine = async (
184
  line: string,
@@ -289,9 +323,10 @@ export const POST = withLogging(
289
  await dbPostUpdateMessageResponse(messageId, {
290
  response: processMsgs.map(res => JSON.stringify(res)).join('\n'),
291
  result: results.find(
292
- res => res.type === 'final_code',
293
- ) as PrismaJson.FinalCodeBody,
294
  responseBody: processMsgs,
 
295
  });
296
  logger.info(
297
  session,
 
1
  import { StreamingTextResponse } from 'ai';
2
 
 
 
 
3
  import { logger, withLogging } from '@/lib/logger';
4
  import { getPresignedUrl } from '@/lib/aws';
5
  import { dbPostUpdateMessageResponse } from '@/lib/db/functions';
 
176
  let time = Date.now();
177
  const results: PrismaJson.MessageBody[] = [];
178
  const stream = new ReadableStream({
179
+ async cancel(reason) {
180
+ logger.info(
181
+ session,
182
+ {
183
+ message: 'Streaming cancelled',
184
+ maxChunkSize,
185
+ reason,
186
+ },
187
+ request,
188
+ '__AGENT_STREAM_CANCELLED',
189
+ );
190
+ const processMsgs = results.filter(
191
+ res => res.type !== 'final_code',
192
+ ) as PrismaJson.AgentResponseBodies;
193
+ await dbPostUpdateMessageResponse(
194
+ messageId,
195
+ {
196
+ response: processMsgs.map(res => JSON.stringify(res)).join('\n'),
197
+ result: {
198
+ type: 'final_error',
199
+ status: 'failed',
200
+ payload: {
201
+ name:
202
+ reason instanceof Error ? reason.name : 'Stream cancelled',
203
+ value:
204
+ reason instanceof Error
205
+ ? reason.stack ?? ''
206
+ : 'Unknown error',
207
+ traceback_raw: [],
208
+ },
209
+ },
210
+ responseBody: processMsgs,
211
+ streamDuration: (Date.now() - time) / 1000,
212
+ },
213
+ false, // shouldRevalidatePath
214
+ );
215
+ },
216
  async start(controller) {
217
  const parseLine = async (
218
  line: string,
 
323
  await dbPostUpdateMessageResponse(messageId, {
324
  response: processMsgs.map(res => JSON.stringify(res)).join('\n'),
325
  result: results.find(
326
+ res => res.type === 'final_code' || res.type === 'final_error',
327
+ ) as PrismaJson.FinalResultBody,
328
  responseBody: processMsgs,
329
+ streamDuration: (Date.now() - time) / 1000,
330
  });
331
  logger.info(
332
  session,
components/ChatInterface.tsx CHANGED
@@ -24,11 +24,12 @@ const ChatInterface: React.FC<ChatInterfaceProps> = ({ chat, userId }) => {
24
  data-state={messageCodeResult?.payload ? 'open' : 'closed'}
25
  className="pl-4 peer absolute right-0 inset-y-0 hidden translate-x-full data-[state=open]:translate-x-0 z-30 duration-300 ease-in-out xl:flex flex-col items-start xl:w-1/2 h-full dark:bg-zinc-950 overflow-auto"
26
  >
27
- {messageCodeResult?.payload && (
28
- <Card className="size-full overflow-auto">
29
- <CodeResultDisplay codeResult={messageCodeResult.payload} />
30
- </Card>
31
- )}
 
32
  </div>
33
  <div className="w-full flex justify-center pr-0 animate-in duration-300 ease-in-out peer-[[data-state=open]]:xl:pr-[50%]">
34
  <ChatList chat={chat} userId={userId} />
 
24
  data-state={messageCodeResult?.payload ? 'open' : 'closed'}
25
  className="pl-4 peer absolute right-0 inset-y-0 hidden translate-x-full data-[state=open]:translate-x-0 z-30 duration-300 ease-in-out xl:flex flex-col items-start xl:w-1/2 h-full dark:bg-zinc-950 overflow-auto"
26
  >
27
+ {messageCodeResult?.type === 'final_code' &&
28
+ messageCodeResult.payload && (
29
+ <Card className="size-full overflow-auto">
30
+ <CodeResultDisplay codeResult={messageCodeResult.payload} />
31
+ </Card>
32
+ )}
33
  </div>
34
  <div className="w-full flex justify-center pr-0 animate-in duration-300 ease-in-out peer-[[data-state=open]]:xl:pr-[50%]">
35
  <ChatList chat={chat} userId={userId} />
components/chat/ChatMessage.tsx CHANGED
@@ -75,17 +75,16 @@ export const ChatMessage: React.FC<ChatMessageProps> = ({
75
  }) => {
76
  const [messageId, setMessageId] = useAtom(selectedMessageId);
77
  const { id, mediaUrl, prompt, response, result, responseBody } = message;
78
- const [formattedSections, finalResult, finalError] = useMemo(
79
  () =>
80
  formatStreamLogs(
81
- responseBody ??
82
- wipAssistantMessage ??
83
  (response ? getParsedStreamLogs(response) : []),
 
84
  ),
85
  [response, wipAssistantMessage, responseBody],
86
  );
87
- // prioritize the result from the message over the WIP message
88
- const codeResult = result?.payload ?? finalResult;
89
  return (
90
  <div
91
  className={cn(
@@ -143,7 +142,7 @@ export const ChatMessage: React.FC<ChatMessageProps> = ({
143
  </TableCell>
144
  <TableCell className="font-medium">
145
  <ChunkTypeToText
146
- useTimer={!codeResult && !finalError}
147
  chunk={section}
148
  />
149
  </TableCell>
@@ -154,15 +153,15 @@ export const ChatMessage: React.FC<ChatMessageProps> = ({
154
  ))}
155
  </TableBody>
156
  </Table>
157
- {codeResult && (
158
  <>
159
  <div className="xl:hidden">
160
- <CodeResultDisplay codeResult={codeResult} />
161
  </div>
162
  <p>✨ Coding complete</p>
163
  </>
164
  )}
165
- {!codeResult && finalError && (
166
  <>
167
  <p>❌ {finalError.name}</p>
168
  <div>
 
75
  }) => {
76
  const [messageId, setMessageId] = useAtom(selectedMessageId);
77
  const { id, mediaUrl, prompt, response, result, responseBody } = message;
78
+ const { formattedSections, finalResult, finalError } = useMemo(
79
  () =>
80
  formatStreamLogs(
81
+ wipAssistantMessage ??
82
+ responseBody ??
83
  (response ? getParsedStreamLogs(response) : []),
84
+ result,
85
  ),
86
  [response, wipAssistantMessage, responseBody],
87
  );
 
 
88
  return (
89
  <div
90
  className={cn(
 
142
  </TableCell>
143
  <TableCell className="font-medium">
144
  <ChunkTypeToText
145
+ useTimer={!finalResult && !finalError}
146
  chunk={section}
147
  />
148
  </TableCell>
 
153
  ))}
154
  </TableBody>
155
  </Table>
156
+ {finalResult && (
157
  <>
158
  <div className="xl:hidden">
159
+ <CodeResultDisplay codeResult={finalResult} />
160
  </div>
161
  <p>✨ Coding complete</p>
162
  </>
163
  )}
164
+ {!finalResult && finalError && (
165
  <>
166
  <p>❌ {finalError.name}</p>
167
  <div>
lib/db/functions.ts CHANGED
@@ -183,19 +183,21 @@ export async function dbPostCreateMessage(
183
  export async function dbPostUpdateMessageResponse(
184
  messageId: string,
185
  messageResponse: MessageAssistantResponse,
 
186
  ) {
187
  await prisma.message.update({
188
  data: {
189
  response: messageResponse.response,
190
  result: messageResponse.result ?? undefined,
191
  responseBody: messageResponse.responseBody,
 
192
  },
193
  where: {
194
  id: messageId,
195
  },
196
  });
197
 
198
- revalidatePath('/chat');
199
  }
200
 
201
  export async function dbDeleteChat(chatId: string) {
 
183
  export async function dbPostUpdateMessageResponse(
184
  messageId: string,
185
  messageResponse: MessageAssistantResponse,
186
+ shouldRevalidatePath = true,
187
  ) {
188
  await prisma.message.update({
189
  data: {
190
  response: messageResponse.response,
191
  result: messageResponse.result ?? undefined,
192
  responseBody: messageResponse.responseBody,
193
+ streamDuration: messageResponse.streamDuration,
194
  },
195
  where: {
196
  id: messageId,
197
  },
198
  });
199
 
200
+ shouldRevalidatePath && revalidatePath('/chat');
201
  }
202
 
203
  export async function dbDeleteChat(chatId: string) {
lib/db/prisma.ts CHANGED
@@ -57,6 +57,8 @@ declare global {
57
  payload: StructuredError;
58
  }
59
 
 
 
60
  type MessageBody =
61
  | PlanAndToolsBody
62
  | CodeBody
 
57
  payload: StructuredError;
58
  }
59
 
60
+ type FinalResultBody = FinalCodeBody | FinalErrorBody;
61
+
62
  type MessageBody =
63
  | PlanAndToolsBody
64
  | CodeBody
lib/hooks/useVisionAgent.ts CHANGED
@@ -1,7 +1,7 @@
1
  import { useChat } from 'ai/react';
2
  import { toast } from 'react-hot-toast';
3
  import { useEffect, useRef } from 'react';
4
- import { ChatWithMessages, MessageUI } from '../types';
5
  import { convertDBMessageToAPIMessage } from '../utils/message';
6
  import { useSetAtom } from 'jotai';
7
  import { selectedMessageId } from '@/state/chat';
@@ -25,7 +25,7 @@ const useVisionAgent = (chat: ChatWithMessages) => {
25
  toast.error(response.statusText);
26
  }
27
  },
28
- onFinish: message => {
29
  router.refresh();
30
  setMessageId(currMessageId.current);
31
  },
@@ -47,7 +47,11 @@ const useVisionAgent = (chat: ChatWithMessages) => {
47
  */
48
  const once = useRef(true);
49
  useEffect(() => {
50
- if (!isLoading && !latestDbMessage.result && once.current) {
 
 
 
 
51
  once.current = false;
52
  reload();
53
  }
 
1
  import { useChat } from 'ai/react';
2
  import { toast } from 'react-hot-toast';
3
  import { useEffect, useRef } from 'react';
4
+ import { ChatWithMessages } from '../types';
5
  import { convertDBMessageToAPIMessage } from '../utils/message';
6
  import { useSetAtom } from 'jotai';
7
  import { selectedMessageId } from '@/state/chat';
 
25
  toast.error(response.statusText);
26
  }
27
  },
28
+ onFinish: () => {
29
  router.refresh();
30
  setMessageId(currMessageId.current);
31
  },
 
47
  */
48
  const once = useRef(true);
49
  useEffect(() => {
50
+ if (
51
+ !isLoading &&
52
+ !(latestDbMessage.response || latestDbMessage.responseBody) &&
53
+ once.current
54
+ ) {
55
  once.current = false;
56
  reload();
57
  }
lib/types.ts CHANGED
@@ -5,9 +5,10 @@ export type ChatWithMessages = Chat & { messages: Message[] };
5
 
6
  export type MessageUserInput = Pick<Message, 'prompt' | 'mediaUrl'>;
7
  export type MessageAssistantResponse = {
8
- result?: PrismaJson.FinalCodeBody;
9
  response: string;
10
  responseBody: PrismaJson.AgentResponseBodies;
 
11
  };
12
 
13
  export type MessageUI = Pick<MessageAI, 'role' | 'content' | 'id'>;
 
5
 
6
  export type MessageUserInput = Pick<Message, 'prompt' | 'mediaUrl'>;
7
  export type MessageAssistantResponse = {
8
+ result?: PrismaJson.FinalResultBody;
9
  response: string;
10
  responseBody: PrismaJson.AgentResponseBodies;
11
+ streamDuration: number;
12
  };
13
 
14
  export type MessageUI = Pick<MessageAI, 'role' | 'content' | 'id'>;
lib/utils/content.ts CHANGED
@@ -32,42 +32,51 @@ export type WIPChunkBodyGroup = PrismaJson.MessageBody & {
32
  */
33
  export const formatStreamLogs = (
34
  content: WIPChunkBodyGroup[] | null,
35
- ): [
36
- WIPChunkBodyGroup[],
37
- PrismaJson.FinalCodeBody['payload']?,
38
- PrismaJson.StructuredError?,
39
- ] => {
40
- if (!content) return [[], undefined, undefined];
 
 
 
 
41
 
42
  // Merge consecutive logs of the same type to the latest status
43
- const groupedSections = content.reduce((acc: WIPChunkBodyGroup[], curr) => {
44
- const lastGroup = acc[acc.length - 1];
45
- if (
46
- acc.length > 0 &&
47
- lastGroup.type === curr.type &&
48
- curr.status !== 'started'
49
- ) {
50
- acc[acc.length - 1] = {
51
- ...curr,
52
- // always use the timestamp of the first log
53
- timestamp: lastGroup?.timestamp,
54
- // duration is the difference between the last log and the first log
55
- duration:
56
- lastGroup?.timestamp && curr.timestamp
57
- ? Date.parse(curr.timestamp) - Date.parse(lastGroup.timestamp)
58
- : undefined,
59
- };
60
- } else {
61
- acc.push(curr);
62
- }
63
- return acc;
64
- }, []);
 
 
 
65
 
66
- return [
67
- groupedSections.filter(section => WIPLogTypes.includes(section.type)),
68
- groupedSections.find(section => section.type === 'final_code')
 
 
69
  ?.payload as PrismaJson.FinalCodeBody['payload'],
70
- groupedSections.find(section => section.type === 'final_error')
71
  ?.payload as PrismaJson.StructuredError,
72
- ];
73
  };
 
32
  */
33
  export const formatStreamLogs = (
34
  content: WIPChunkBodyGroup[] | null,
35
+ result: PrismaJson.FinalResultBody | null,
36
+ ): {
37
+ formattedSections: WIPChunkBodyGroup[];
38
+ finalResult?: PrismaJson.FinalCodeBody['payload'];
39
+ finalError?: PrismaJson.StructuredError;
40
+ } => {
41
+ if (!content)
42
+ return {
43
+ formattedSections: [],
44
+ };
45
 
46
  // Merge consecutive logs of the same type to the latest status
47
+ const groupedSections = [...content, ...(result ? [result] : [])].reduce(
48
+ (acc: WIPChunkBodyGroup[], curr: WIPChunkBodyGroup) => {
49
+ const lastGroup = acc[acc.length - 1];
50
+ if (
51
+ acc.length > 0 &&
52
+ lastGroup.type === curr.type &&
53
+ curr.status !== 'started'
54
+ ) {
55
+ acc[acc.length - 1] = {
56
+ ...curr,
57
+ // always use the timestamp of the first log
58
+ timestamp: lastGroup?.timestamp,
59
+ // duration is the difference between the last log and the first log
60
+ duration:
61
+ lastGroup?.timestamp && curr.timestamp
62
+ ? Date.parse(curr.timestamp) - Date.parse(lastGroup.timestamp)
63
+ : undefined,
64
+ };
65
+ } else {
66
+ acc.push(curr);
67
+ }
68
+ return acc;
69
+ },
70
+ [],
71
+ );
72
 
73
+ return {
74
+ formattedSections: groupedSections.filter(section =>
75
+ WIPLogTypes.includes(section.type),
76
+ ),
77
+ finalResult: groupedSections.find(section => section.type === 'final_code')
78
  ?.payload as PrismaJson.FinalCodeBody['payload'],
79
+ finalError: groupedSections.find(section => section.type === 'final_error')
80
  ?.payload as PrismaJson.StructuredError,
81
+ };
82
  };
lib/utils/message.ts CHANGED
@@ -1,5 +1,5 @@
1
  import { Message } from '@prisma/client';
2
- import { MessageAssistantResponse, MessageUI } from '../types';
3
 
4
  /**
5
  * The Message we saved to database consists of a prompt and a response
@@ -17,7 +17,7 @@ export const convertDBMessageToAPIMessage = (
17
  content: prompt,
18
  });
19
  }
20
- if (result) {
21
  acc.push({
22
  id: id + '-assistant',
23
  role: 'assistant',
 
1
  import { Message } from '@prisma/client';
2
+ import { MessageUI } from '../types';
3
 
4
  /**
5
  * The Message we saved to database consists of a prompt and a response
 
17
  content: prompt,
18
  });
19
  }
20
+ if (result && result.type === 'final_code') {
21
  acc.push({
22
  id: id + '-assistant',
23
  role: 'assistant',
prisma/migrations/20240617015509_add_stream_duration_to_message/data-migration.mjs ADDED
@@ -0,0 +1,59 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import { PrismaClient } from '@prisma/client';
2
+
3
+ const prisma = new PrismaClient();
4
+
5
+ async function main() {
6
+ await prisma.$transaction(
7
+ async tx => {
8
+ const messages = await tx.message.findMany();
9
+ for (const message of messages) {
10
+ const { responseBody, response, result } = message;
11
+ let newResponseBody = responseBody;
12
+ let newResult = result;
13
+ if (responseBody && result) {
14
+ continue;
15
+ }
16
+ if (!responseBody && response) {
17
+ console.log('update response body', message.id);
18
+ newResponseBody = response
19
+ .split('\n')
20
+ .filter(chunk => !!chunk.trim())
21
+ .map(chunk => JSON.parse(chunk))
22
+ .filter(
23
+ body => body.type !== 'final_error' && body.type !== 'final_code',
24
+ );
25
+ }
26
+ if (!result && response) {
27
+ console.log('update response result', message.id);
28
+ newResult = response
29
+ .split('\n')
30
+ .filter(chunk => !!chunk.trim())
31
+ .map(chunk => JSON.parse(chunk))
32
+ ?.find(
33
+ body => body.type === 'final_error' || body.type === 'final_code',
34
+ );
35
+ }
36
+
37
+ await tx.message.update({
38
+ where: { id: message.id },
39
+ data: {
40
+ ...(newResult ? { result: newResult } : {}),
41
+ ...(newResponseBody ? { responseBody: newResponseBody } : {}),
42
+ },
43
+ });
44
+ }
45
+ },
46
+ { timeout: 30000 },
47
+ );
48
+ }
49
+
50
+ main()
51
+ .catch(async e => {
52
+ console.error(e);
53
+ await prisma.$disconnect();
54
+ process.exit(1);
55
+ })
56
+ .finally(async () => {
57
+ console.log('finished');
58
+ await prisma.$disconnect();
59
+ });
prisma/migrations/20240617015509_add_stream_duration_to_message/migration.sql ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ -- AlterTable
2
+ ALTER TABLE "message" ADD COLUMN "streamDuration" INTEGER;
prisma/schema.prisma CHANGED
@@ -47,10 +47,11 @@ model Message {
47
  mediaUrl String
48
  prompt String
49
  response String?
50
- /// [FinalCodeBody]
51
  result Json?
52
  /// [AgentResponseBodies]
53
  responseBody Json?
 
54
  chat Chat @relation(fields: [chatId], references: [id], onDelete: Cascade)
55
  user User? @relation(fields: [userId], references: [id])
56
 
 
47
  mediaUrl String
48
  prompt String
49
  response String?
50
+ /// [FinalResultBody]
51
  result Json?
52
  /// [AgentResponseBodies]
53
  responseBody Json?
54
+ streamDuration Int?
55
  chat Chat @relation(fields: [chatId], references: [id], onDelete: Cascade)
56
  user User? @relation(fields: [userId], references: [id])
57
 
tests/e2e/index.spec.ts CHANGED
@@ -83,3 +83,10 @@ test.describe('Page with example 1 can be opened', () => {
83
  await checkMainElementVisible(page);
84
  });
85
  });
 
 
 
 
 
 
 
 
83
  await checkMainElementVisible(page);
84
  });
85
  });
86
+
87
+ const TEST_FINAL_ERROR_CHAT_ID_1 = '6bLzMKS';
88
+ test('Page with final error in response can be opened', async ({ page }) => {
89
+ await page.goto(`/chat/${TEST_FINAL_ERROR_CHAT_ID_1}`);
90
+
91
+ await expect(page.getByText('❌ StreamingError')).toBeVisible();
92
+ });