LibreChat / client /src /hooks /useServerStream.ts
N.Achyuth Reddy
Upload 683 files
9705b6c
import { useEffect } from 'react';
import { useResetRecoilState, useSetRecoilState } from 'recoil';
import {
/* @ts-ignore */
SSE,
createPayload,
useGetUserBalance,
tMessageSchema,
tConversationSchema,
useGetStartupConfig,
} from 'librechat-data-provider';
import type { TResPlugin, TMessage, TConversation, TSubmission } from 'librechat-data-provider';
import useConversations from './useConversations';
import { useAuthContext } from './AuthContext';
import store from '~/store';
type TResData = {
plugin: TResPlugin;
final?: boolean;
initial?: boolean;
requestMessage: TMessage;
responseMessage: TMessage;
conversation: TConversation;
};
export default function useServerStream(submission: TSubmission | null) {
const setMessages = useSetRecoilState(store.messages);
const setIsSubmitting = useSetRecoilState(store.isSubmitting);
const setConversation = useSetRecoilState(store.conversation);
const resetLatestMessage = useResetRecoilState(store.latestMessage);
const { token, isAuthenticated } = useAuthContext();
const { data: startupConfig } = useGetStartupConfig();
const { refreshConversations } = useConversations();
const balanceQuery = useGetUserBalance({
enabled: !!isAuthenticated && startupConfig?.checkBalance,
});
const messageHandler = (data: string, submission: TSubmission) => {
const {
messages,
message,
plugin,
plugins,
initialResponse,
isRegenerate = false,
} = submission;
if (isRegenerate) {
setMessages([
...messages,
{
...initialResponse,
text: data,
parentMessageId: message?.overrideParentMessageId ?? null,
messageId: message?.overrideParentMessageId + '_',
plugin: plugin ?? null,
plugins: plugins ?? [],
submitting: true,
// unfinished: true
},
]);
} else {
setMessages([
...messages,
message,
{
...initialResponse,
text: data,
parentMessageId: message?.messageId,
messageId: message?.messageId + '_',
plugin: plugin ?? null,
plugins: plugins ?? [],
submitting: true,
// unfinished: true
},
]);
}
};
const cancelHandler = (data: TResData, submission: TSubmission) => {
const { requestMessage, responseMessage, conversation } = data;
const { messages, isRegenerate = false } = submission;
// update the messages
if (isRegenerate) {
setMessages([...messages, responseMessage]);
} else {
setMessages([...messages, requestMessage, responseMessage]);
}
setIsSubmitting(false);
// refresh title
if (requestMessage.parentMessageId == '00000000-0000-0000-0000-000000000000') {
setTimeout(() => {
refreshConversations();
}, 2000);
// in case it takes too long.
setTimeout(() => {
refreshConversations();
}, 5000);
}
setConversation((prevState) => ({
...prevState,
...conversation,
}));
};
const createdHandler = (data: TResData, submission: TSubmission) => {
const { messages, message, initialResponse, isRegenerate = false } = submission;
if (isRegenerate) {
setMessages([
...messages,
{
...initialResponse,
parentMessageId: message?.overrideParentMessageId ?? null,
messageId: message?.overrideParentMessageId + '_',
submitting: true,
},
]);
} else {
setMessages([
...messages,
message,
{
...initialResponse,
parentMessageId: message?.messageId,
messageId: message?.messageId + '_',
submitting: true,
},
]);
}
const { conversationId } = message;
setConversation((prevState) =>
tConversationSchema.parse({
...prevState,
conversationId,
}),
);
resetLatestMessage();
};
const finalHandler = (data: TResData, submission: TSubmission) => {
const { requestMessage, responseMessage, conversation } = data;
const { messages, isRegenerate = false } = submission;
// update the messages
if (isRegenerate) {
setMessages([...messages, responseMessage]);
} else {
setMessages([...messages, requestMessage, responseMessage]);
}
setIsSubmitting(false);
// refresh title
if (requestMessage.parentMessageId == '00000000-0000-0000-0000-000000000000') {
setTimeout(() => {
refreshConversations();
}, 2000);
// in case it takes too long.
setTimeout(() => {
refreshConversations();
}, 5000);
}
setConversation((prevState) => ({
...prevState,
...conversation,
}));
};
const errorHandler = (data: TResData, submission: TSubmission) => {
const { messages, message } = submission;
console.log('Error:', data);
const errorResponse = tMessageSchema.parse({
...data,
error: true,
parentMessageId: message?.messageId,
});
setIsSubmitting(false);
setMessages([...messages, message, errorResponse]);
return;
};
const abortConversation = (conversationId = '', submission: TSubmission) => {
console.log(submission);
const { endpoint } = submission?.conversation || {};
fetch(`/api/ask/${endpoint}/abort`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${token}`,
},
body: JSON.stringify({
abortKey: conversationId,
}),
})
.then((response) => response.json())
.then((data) => {
console.log('aborted', data);
cancelHandler(data, submission);
})
.catch((error) => {
console.error('Error aborting request');
console.error(error);
// errorHandler({ text: 'Error aborting request' }, { ...submission, message });
});
return;
};
useEffect(() => {
if (submission === null) {
return;
}
if (Object.keys(submission).length === 0) {
return;
}
let { message } = submission;
const { server, payload } = createPayload(submission);
const events = new SSE(server, {
payload: JSON.stringify(payload),
headers: { 'Content-Type': 'application/json', Authorization: `Bearer ${token}` },
});
events.onmessage = (e: MessageEvent) => {
const data = JSON.parse(e.data);
if (data.final) {
const { plugins } = data;
finalHandler(data, { ...submission, plugins, message });
startupConfig?.checkBalance && balanceQuery.refetch();
console.log('final', data);
}
if (data.created) {
message = {
...data.message,
overrideParentMessageId: message?.overrideParentMessageId,
};
createdHandler(data, { ...submission, message });
} else {
const text = data.text || data.response;
const { plugin, plugins } = data;
if (data.message) {
messageHandler(text, { ...submission, plugin, plugins, message });
}
}
};
events.onopen = () => console.log('connection is opened');
events.oncancel = () =>
abortConversation(message?.conversationId ?? submission?.conversationId, submission);
events.onerror = function (e: MessageEvent) {
console.log('error in opening conn.');
startupConfig?.checkBalance && balanceQuery.refetch();
events.close();
const data = JSON.parse(e.data);
errorHandler(data, { ...submission, message });
};
setIsSubmitting(true);
events.stream();
return () => {
const isCancelled = events.readyState <= 1;
events.close();
// setSource(null);
if (isCancelled) {
const e = new Event('cancel');
events.dispatchEvent(e);
}
setIsSubmitting(false);
};
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [submission]);
}