File size: 7,647 Bytes
bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 bbe4eea a8aec61 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 |
import { RunResponseContent } from '@/types/os'
import { useCallback } from 'react'
/**
* Processes a single JSON chunk by passing it to the provided callback.
* @param chunk - A parsed JSON object of type RunResponseContent.
* @param onChunk - Callback to handle the chunk.
*/
function processChunk(
chunk: RunResponseContent,
onChunk: (chunk: RunResponseContent) => void
) {
onChunk(chunk)
}
// TODO: Make new format the default and phase out legacy format
/**
* Detects if the incoming data is in the legacy format (direct RunResponseContent)
* @param data - The parsed data object
* @returns true if it's in the legacy format, false if it's in the new format
*/
function isLegacyFormat(data: RunResponseContent): boolean {
return (
typeof data === 'object' &&
data !== null &&
'event' in data &&
!('data' in data) &&
typeof data.event === 'string'
)
}
interface NewFormatData {
event: string
data: string | Record<string, unknown>
}
type LegacyEventFormat = RunResponseContent & { event: string }
function convertNewFormatToLegacy(
newFormatData: NewFormatData
): LegacyEventFormat {
const { event, data } = newFormatData
// Parse the data field if it's a string
let parsedData: Record<string, unknown>
if (typeof data === 'string') {
try {
// First try to parse as JSON
parsedData = JSON.parse(data)
} catch {
parsedData = {}
}
} else {
parsedData = data
}
const { ...cleanData } = parsedData
// Convert to legacy format by flattening the structure
return {
event: event,
...cleanData
} as LegacyEventFormat
}
/**
* Parses a string buffer to extract complete JSON objects.
*
* This function discards any extraneous data before the first '{', then
* repeatedly finds and processes complete JSON objects.
*
* @param text - The accumulated string buffer.
* @param onChunk - Callback to process each parsed JSON object.
* @returns Remaining string that did not form a complete JSON object.
*/
/**
* Extracts complete JSON objects from a buffer string **incrementally**.
* - It allows partial JSON objects to accumulate across chunks.
* - It ensures real-time streaming updates.
*/
function parseBuffer(
buffer: string,
onChunk: (chunk: RunResponseContent) => void
): string {
let currentIndex = 0
let jsonStartIndex = buffer.indexOf('{', currentIndex)
// Process as many complete JSON objects as possible.
while (jsonStartIndex !== -1 && jsonStartIndex < buffer.length) {
let braceCount = 0
let inString = false
let escapeNext = false
let jsonEndIndex = -1
let i = jsonStartIndex
// Walk through the string to find the matching closing brace.
for (; i < buffer.length; i++) {
const char = buffer[i]
if (inString) {
if (escapeNext) {
escapeNext = false
} else if (char === '\\') {
escapeNext = true
} else if (char === '"') {
inString = false
}
} else {
if (char === '"') {
inString = true
} else if (char === '{') {
braceCount++
} else if (char === '}') {
braceCount--
if (braceCount === 0) {
jsonEndIndex = i
break
}
}
}
}
// If we found a complete JSON object, try to parse it.
if (jsonEndIndex !== -1) {
const jsonString = buffer.slice(jsonStartIndex, jsonEndIndex + 1)
try {
const parsed = JSON.parse(jsonString)
// Check if it's in the legacy format - use as is
if (isLegacyFormat(parsed)) {
processChunk(parsed, onChunk)
} else {
// New format - convert to legacy format for compatibility
const legacyChunk = convertNewFormatToLegacy(parsed)
processChunk(legacyChunk, onChunk)
}
} catch {
// Move past the starting brace to avoid re-parsing the same invalid JSON.
jsonStartIndex = buffer.indexOf('{', jsonStartIndex + 1)
continue
}
// Move currentIndex past the parsed JSON and trim any leading whitespace.
currentIndex = jsonEndIndex + 1
buffer = buffer.slice(currentIndex).trim()
// Reset currentIndex and search for the next JSON object.
currentIndex = 0
jsonStartIndex = buffer.indexOf('{', currentIndex)
} else {
// If a complete JSON object is not found, break out and wait for more data.
break
}
}
// Return any unprocessed (partial) data.
return buffer
}
/**
* Custom React hook to handle streaming API responses as JSON objects.
*
* This hook supports two streaming formats:
* 1. Legacy format: Direct JSON objects matching RunResponseContent interface
* 2. New format: Event/data structure with { event: string, data: string|object }
*
* The hook:
* - Accumulates partial JSON data from streaming responses.
* - Extracts complete JSON objects and processes them via onChunk.
* - Automatically detects new format and converts it to legacy format for compatibility.
* - Parses stringified data field if it's a string (supports both JSON and Python dict syntax).
* - Removes redundant event field from data object during conversion.
* - Handles errors via onError and signals completion with onComplete.
*
* @returns An object containing the streamResponse function.
*/
export default function useAIResponseStream() {
const streamResponse = useCallback(
async (options: {
apiUrl: string
headers?: Record<string, string>
requestBody: FormData | Record<string, unknown>
onChunk: (chunk: RunResponseContent) => void
onError: (error: Error) => void
onComplete: () => void
}): Promise<void> => {
const {
apiUrl,
headers = {},
requestBody,
onChunk,
onError,
onComplete
} = options
// Buffer to accumulate partial JSON data.
let buffer = ''
try {
const response = await fetch(apiUrl, {
method: 'POST',
headers: {
// Set content-type only for non-FormData requests.
...(!(requestBody instanceof FormData) && {
'Content-Type': 'application/json'
}),
...headers
},
body:
requestBody instanceof FormData
? requestBody
: JSON.stringify(requestBody)
})
if (!response.ok) {
const errorData = await response.json()
throw errorData
}
if (!response.body) {
throw new Error('No response body')
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
// Recursively process the stream.
const processStream = async (): Promise<void> => {
const { done, value } = await reader.read()
if (done) {
// Process any final data in the buffer.
buffer = parseBuffer(buffer, onChunk)
onComplete()
return
}
// Decode, sanitize, and accumulate the chunk
buffer += decoder.decode(value, { stream: true })
// Parse any complete JSON objects available in the buffer.
buffer = parseBuffer(buffer, onChunk)
await processStream()
}
await processStream()
} catch (error) {
if (typeof error === 'object' && error !== null && 'detail' in error) {
onError(new Error(String(error.detail)))
} else {
onError(new Error(String(error)))
}
}
},
[]
)
return { streamResponse }
}
|