| |
| |
| |
| |
| |
| |
|
|
|
|
| import { PassThrough } from 'stream';
|
| import { parseToolCalls, toOpenAIToolCalls, toAnthropicToolUse, detectToolCallStart } from './tool-prompt.js';
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| function createRepeatDetector(minRepeatLen = 150) {
|
| let fullText = '';
|
|
|
| return {
|
| |
| |
| |
|
|
| feed(chunk) {
|
| if (!chunk) return { emit: '', repeated: false };
|
|
|
| const prevLen = fullText.length;
|
| fullText += chunk;
|
|
|
|
|
| if (fullText.length < minRepeatLen * 2) {
|
| return { emit: chunk, repeated: false };
|
| }
|
|
|
|
|
|
|
| const windowSize = Math.min(minRepeatLen, Math.floor(fullText.length / 3));
|
| if (windowSize < 80) return { emit: chunk, repeated: false };
|
|
|
|
|
| const needle = fullText.substring(fullText.length - windowSize);
|
| const searchArea = fullText.substring(0, fullText.length - windowSize);
|
| const idx = searchArea.indexOf(needle);
|
|
|
| if (idx >= 0) {
|
|
|
|
|
|
|
| const repeatStart = fullText.length - windowSize;
|
|
|
|
|
| const safeEnd = repeatStart - prevLen;
|
| const emit = safeEnd > 0 ? chunk.substring(0, safeEnd) : '';
|
|
|
|
|
| fullText = fullText.substring(0, repeatStart);
|
|
|
| console.log(`[RepeatDetect] 检测到重复 (${windowSize} chars),截断输出`);
|
| return { emit, repeated: true };
|
| }
|
|
|
| return { emit: chunk, repeated: false };
|
| },
|
|
|
| getText() {
|
| return fullText;
|
| },
|
| };
|
| }
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| export function probeStream(upstreamStream, timeoutMs = 30000) {
|
| return new Promise((resolve, reject) => {
|
| let resolved = false;
|
| let rawChunks = [];
|
|
|
| function cleanup() {
|
| clearTimeout(timer);
|
| upstreamStream.removeListener('data', onData);
|
| upstreamStream.removeListener('end', onEnd);
|
| upstreamStream.removeListener('error', onError);
|
| }
|
|
|
|
|
| const timer = setTimeout(() => {
|
| if (resolved) return;
|
| resolved = true;
|
| cleanup();
|
| upstreamStream.destroy();
|
| reject(new Error('probeStream timeout'));
|
| }, timeoutMs);
|
|
|
| const parser = createJsonStreamParser((obj) => {
|
| if (resolved) return;
|
|
|
| if (obj.type === 'streamingError') {
|
| resolved = true;
|
| cleanup();
|
| upstreamStream.resume();
|
| reject(Object.assign(
|
| new Error(obj.data || 'Streaming error'),
|
| { statusCode: 429 }
|
| ));
|
| return;
|
| }
|
|
|
| if (obj.type === 'chunk' || obj.type === 'reasoningContent') {
|
| resolved = true;
|
| cleanup();
|
| upstreamStream.pause();
|
|
|
| const wrapped = new PassThrough();
|
| for (const chunk of rawChunks) wrapped.write(chunk);
|
| upstreamStream.pipe(wrapped);
|
| upstreamStream.resume();
|
|
|
| resolve(wrapped);
|
| }
|
| });
|
|
|
| function onData(chunk) {
|
| rawChunks.push(chunk);
|
| parser.feed(chunk);
|
| }
|
| function onEnd() {
|
| if (resolved) return;
|
| parser.flush();
|
| if (!resolved) {
|
| resolved = true;
|
| cleanup();
|
| const wrapped = new PassThrough();
|
| for (const chunk of rawChunks) wrapped.write(chunk);
|
| wrapped.end();
|
| resolve(wrapped);
|
| }
|
| }
|
| function onError(err) {
|
| if (resolved) return;
|
| resolved = true;
|
| cleanup();
|
| reject(err);
|
| }
|
|
|
| upstreamStream.on('data', onData);
|
| upstreamStream.on('end', onEnd);
|
| upstreamStream.on('error', onError);
|
| });
|
| }
|
|
|
| |
| |
| |
|
|
| function createJsonStreamParser(onObject) {
|
| let buffer = '';
|
| let depth = 0;
|
| let inString = false;
|
| let escape = false;
|
| let objStart = -1;
|
|
|
| return {
|
| feed(chunk) {
|
| buffer += chunk;
|
| for (let i = 0; i < buffer.length; i++) {
|
| const ch = buffer[i];
|
|
|
| if (escape) { escape = false; continue; }
|
| if (ch === '\\' && inString) { escape = true; continue; }
|
| if (ch === '"') { inString = !inString; continue; }
|
| if (inString) continue;
|
|
|
| if (ch === '{') {
|
| if (depth === 0) objStart = i;
|
| depth++;
|
| } else if (ch === '}') {
|
| depth--;
|
| if (depth === 0 && objStart >= 0) {
|
| const jsonStr = buffer.substring(objStart, i + 1);
|
| try { onObject(JSON.parse(jsonStr)); } catch {}
|
| objStart = -1;
|
| }
|
| }
|
| }
|
|
|
|
|
| if (objStart >= 0) {
|
| buffer = buffer.substring(objStart);
|
| objStart = 0;
|
| } else if (depth === 0) {
|
| buffer = '';
|
| }
|
| },
|
| flush() {
|
| if (buffer.trim()) {
|
| try { onObject(JSON.parse(buffer.trim())); } catch {}
|
| }
|
| buffer = '';
|
| },
|
| };
|
| }
|
|
|
| |
| |
| |
|
|
| export function transformToOpenAISSE(upstreamStream, res, model, requestId, onStreamError) {
|
| res.writeHead(200, {
|
| 'Content-Type': 'text/event-stream',
|
| 'Cache-Control': 'no-cache',
|
| 'Connection': 'keep-alive',
|
| 'Access-Control-Allow-Origin': '*',
|
| });
|
|
|
| const ts = Math.floor(Date.now() / 1000);
|
|
|
| function writeChunk(delta, finishReason = null) {
|
| const obj = {
|
| id: requestId,
|
| object: 'chat.completion.chunk',
|
| created: ts,
|
| model,
|
| choices: [{ index: 0, delta, finish_reason: finishReason }],
|
| };
|
| res.write(`data: ${JSON.stringify(obj)}\n\n`);
|
| }
|
|
|
|
|
| writeChunk({ role: 'assistant', content: '' });
|
|
|
| const detector = createRepeatDetector();
|
| let stopped = false;
|
|
|
| function finishStream() {
|
| if (res.writableEnded || stopped) return;
|
| stopped = true;
|
| writeChunk({}, 'stop');
|
| res.write('data: [DONE]\n\n');
|
| res.end();
|
| }
|
|
|
| const parser = createJsonStreamParser((obj) => {
|
| if (stopped) return;
|
| switch (obj.type) {
|
| case 'chunk': {
|
| const { emit, repeated } = detector.feed(obj.data);
|
| if (emit) writeChunk({ content: emit });
|
| if (repeated) finishStream();
|
| break;
|
| }
|
|
|
| case 'reasoningContent':
|
|
|
| writeChunk({ reasoning_content: obj.data });
|
| break;
|
|
|
| case 'finalResult':
|
| finishStream();
|
| break;
|
|
|
| case 'streamingError':
|
| if (onStreamError) onStreamError(obj.data);
|
| finishStream();
|
| break;
|
| }
|
| });
|
|
|
| upstreamStream.on('data', (chunk) => parser.feed(chunk));
|
| upstreamStream.on('end', () => {
|
| parser.flush();
|
| finishStream();
|
| });
|
| upstreamStream.on('error', () => {
|
| if (!res.writableEnded) {
|
| res.write('data: [DONE]\n\n');
|
| res.end();
|
| }
|
| });
|
| }
|
|
|
| |
| |
| |
|
|
| export function transformToAnthropicSSE(upstreamStream, res, model, requestId, onStreamError) {
|
| res.writeHead(200, {
|
| 'Content-Type': 'text/event-stream',
|
| 'Cache-Control': 'no-cache',
|
| 'Connection': 'keep-alive',
|
| 'Access-Control-Allow-Origin': '*',
|
| });
|
|
|
| function writeEvent(event, data) {
|
| res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
|
| }
|
|
|
| let headerSent = false;
|
| let blockIndex = 0;
|
|
|
| function ensureHeader() {
|
| if (headerSent) return;
|
| headerSent = true;
|
|
|
| writeEvent('message_start', {
|
| type: 'message_start',
|
| message: {
|
| id: requestId,
|
| type: 'message',
|
| role: 'assistant',
|
| model,
|
| content: [],
|
| stop_reason: null,
|
| usage: { input_tokens: 0, output_tokens: 0 },
|
| },
|
| });
|
|
|
| writeEvent('content_block_start', {
|
| type: 'content_block_start',
|
| index: blockIndex,
|
| content_block: { type: 'text', text: '' },
|
| });
|
| }
|
|
|
| const detector = createRepeatDetector();
|
| let stopped = false;
|
|
|
| function finishAnthropicStream() {
|
| if (res.writableEnded || stopped) return;
|
| stopped = true;
|
| ensureHeader();
|
| writeEvent('content_block_stop', { type: 'content_block_stop', index: blockIndex });
|
| writeEvent('message_delta', {
|
| type: 'message_delta',
|
| delta: { stop_reason: 'end_turn' },
|
| usage: { output_tokens: 0 },
|
| });
|
| writeEvent('message_stop', { type: 'message_stop' });
|
| res.end();
|
| }
|
|
|
| const parser = createJsonStreamParser((obj) => {
|
| if (stopped) return;
|
| switch (obj.type) {
|
| case 'chunk': {
|
| const { emit, repeated } = detector.feed(obj.data);
|
| if (emit) {
|
| ensureHeader();
|
| writeEvent('content_block_delta', {
|
| type: 'content_block_delta',
|
| index: blockIndex,
|
| delta: { type: 'text_delta', text: emit },
|
| });
|
| }
|
| if (repeated) finishAnthropicStream();
|
| break;
|
| }
|
|
|
| case 'reasoningContent':
|
| ensureHeader();
|
| writeEvent('content_block_delta', {
|
| type: 'content_block_delta',
|
| index: blockIndex,
|
| delta: { type: 'text_delta', text: obj.data },
|
| });
|
| break;
|
|
|
| case 'finalResult':
|
| finishAnthropicStream();
|
| break;
|
|
|
| case 'streamingError':
|
| if (onStreamError) onStreamError(obj.data);
|
| finishAnthropicStream();
|
| break;
|
| }
|
| });
|
|
|
| upstreamStream.on('data', (chunk) => parser.feed(chunk));
|
| upstreamStream.on('end', () => {
|
| parser.flush();
|
| finishAnthropicStream();
|
| });
|
| upstreamStream.on('error', () => {
|
| if (!res.writableEnded) res.end();
|
| });
|
| }
|
|
|
| |
| |
| |
| |
| |
| |
|
|
| export function transformToOpenAISSEWithTools(upstreamStream, res, model, requestId, onStreamError) {
|
| res.writeHead(200, {
|
| 'Content-Type': 'text/event-stream',
|
| 'Cache-Control': 'no-cache',
|
| 'Connection': 'keep-alive',
|
| 'Access-Control-Allow-Origin': '*',
|
| });
|
|
|
| const ts = Math.floor(Date.now() / 1000);
|
| let fullText = '';
|
| let toolCallDetected = false;
|
| let streamError = false;
|
|
|
| function writeChunk(delta, finishReason = null) {
|
| const obj = {
|
| id: requestId,
|
| object: 'chat.completion.chunk',
|
| created: ts,
|
| model,
|
| choices: [{ index: 0, delta, finish_reason: finishReason }],
|
| };
|
| res.write(`data: ${JSON.stringify(obj)}\n\n`);
|
| }
|
|
|
| writeChunk({ role: 'assistant', content: '' });
|
|
|
| const detector = createRepeatDetector();
|
|
|
| const parser = createJsonStreamParser((obj) => {
|
| switch (obj.type) {
|
| case 'chunk':
|
| fullText += obj.data;
|
| if (!toolCallDetected) {
|
| if (detectToolCallStart(fullText)) {
|
| toolCallDetected = true;
|
| } else {
|
| const { emit, repeated } = detector.feed(obj.data);
|
| if (emit) writeChunk({ content: emit });
|
| if (repeated) {
|
|
|
| }
|
| }
|
| }
|
| break;
|
|
|
| case 'reasoningContent':
|
| if (!toolCallDetected) {
|
| writeChunk({ reasoning_content: obj.data });
|
| }
|
| break;
|
|
|
| case 'finalResult':
|
| if (obj.data?.mainText) fullText = obj.data.mainText;
|
|
|
| break;
|
|
|
| case 'streamingError':
|
| streamError = true;
|
| if (onStreamError) onStreamError(obj.data);
|
| break;
|
| }
|
| });
|
|
|
| upstreamStream.on('data', (chunk) => parser.feed(chunk));
|
| upstreamStream.on('end', () => {
|
| parser.flush();
|
| if (res.writableEnded) return;
|
|
|
| if (streamError) {
|
| writeChunk({}, 'stop');
|
| res.write('data: [DONE]\n\n');
|
| res.end();
|
| return;
|
| }
|
|
|
|
|
| const { hasToolCalls, toolCalls, textContent } = parseToolCalls(fullText);
|
|
|
| if (hasToolCalls) {
|
|
|
|
|
|
|
|
|
| const openaiToolCalls = toOpenAIToolCalls(toolCalls);
|
| for (let i = 0; i < openaiToolCalls.length; i++) {
|
| const tc = openaiToolCalls[i];
|
|
|
| writeChunk({
|
| tool_calls: [{
|
| index: i,
|
| id: tc.id,
|
| type: 'function',
|
| function: { name: tc.function.name, arguments: tc.function.arguments },
|
| }],
|
| });
|
| }
|
| writeChunk({}, 'tool_calls');
|
| } else {
|
|
|
|
|
| if (toolCallDetected) {
|
|
|
| const markerIdx = fullText.indexOf('```tool_calls');
|
| if (markerIdx >= 0) {
|
| writeChunk({ content: fullText.substring(markerIdx) });
|
| }
|
| }
|
| writeChunk({}, 'stop');
|
| }
|
|
|
| res.write('data: [DONE]\n\n');
|
| res.end();
|
| });
|
|
|
| upstreamStream.on('error', () => {
|
| if (!res.writableEnded) {
|
| res.write('data: [DONE]\n\n');
|
| res.end();
|
| }
|
| });
|
| }
|
|
|
| |
| |
| |
| |
| |
|
|
| export function transformToAnthropicSSEWithTools(upstreamStream, res, model, requestId, onStreamError) {
|
| res.writeHead(200, {
|
| 'Content-Type': 'text/event-stream',
|
| 'Cache-Control': 'no-cache',
|
| 'Connection': 'keep-alive',
|
| 'Access-Control-Allow-Origin': '*',
|
| });
|
|
|
| function writeEvent(event, data) {
|
| res.write(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`);
|
| }
|
|
|
| let headerSent = false;
|
| let textBlockIndex = 0;
|
| let fullText = '';
|
| let toolCallDetected = false;
|
| let streamError = false;
|
|
|
| function ensureHeader() {
|
| if (headerSent) return;
|
| headerSent = true;
|
| writeEvent('message_start', {
|
| type: 'message_start',
|
| message: {
|
| id: requestId,
|
| type: 'message',
|
| role: 'assistant',
|
| model,
|
| content: [],
|
| stop_reason: null,
|
| usage: { input_tokens: 0, output_tokens: 0 },
|
| },
|
| });
|
| writeEvent('content_block_start', {
|
| type: 'content_block_start',
|
| index: textBlockIndex,
|
| content_block: { type: 'text', text: '' },
|
| });
|
| }
|
|
|
| const detector = createRepeatDetector();
|
|
|
| const parser = createJsonStreamParser((obj) => {
|
| switch (obj.type) {
|
| case 'chunk':
|
| fullText += obj.data;
|
| if (!toolCallDetected) {
|
| if (detectToolCallStart(fullText)) {
|
| toolCallDetected = true;
|
| } else {
|
| const { emit, repeated } = detector.feed(obj.data);
|
| if (emit) {
|
| ensureHeader();
|
| writeEvent('content_block_delta', {
|
| type: 'content_block_delta',
|
| index: textBlockIndex,
|
| delta: { type: 'text_delta', text: emit },
|
| });
|
| }
|
|
|
| }
|
| }
|
| break;
|
|
|
| case 'reasoningContent':
|
| if (!toolCallDetected) {
|
| ensureHeader();
|
| writeEvent('content_block_delta', {
|
| type: 'content_block_delta',
|
| index: textBlockIndex,
|
| delta: { type: 'text_delta', text: obj.data },
|
| });
|
| }
|
| break;
|
|
|
| case 'finalResult':
|
| if (obj.data?.mainText) fullText = obj.data.mainText;
|
| break;
|
|
|
| case 'streamingError':
|
| streamError = true;
|
| if (onStreamError) onStreamError(obj.data);
|
| break;
|
| }
|
| });
|
|
|
| upstreamStream.on('data', (chunk) => parser.feed(chunk));
|
| upstreamStream.on('end', () => {
|
| parser.flush();
|
| if (res.writableEnded) return;
|
| ensureHeader();
|
|
|
| if (streamError) {
|
| writeEvent('content_block_stop', { type: 'content_block_stop', index: textBlockIndex });
|
| writeEvent('message_delta', {
|
| type: 'message_delta',
|
| delta: { stop_reason: 'end_turn' },
|
| usage: { output_tokens: 0 },
|
| });
|
| writeEvent('message_stop', { type: 'message_stop' });
|
| res.end();
|
| return;
|
| }
|
|
|
| const { hasToolCalls, toolCalls, textContent } = parseToolCalls(fullText);
|
|
|
| if (hasToolCalls) {
|
|
|
| writeEvent('content_block_stop', { type: 'content_block_stop', index: textBlockIndex });
|
|
|
|
|
| const toolUseBlocks = toAnthropicToolUse(toolCalls);
|
| for (let i = 0; i < toolUseBlocks.length; i++) {
|
| const blockIdx = textBlockIndex + 1 + i;
|
| const tu = toolUseBlocks[i];
|
| writeEvent('content_block_start', {
|
| type: 'content_block_start',
|
| index: blockIdx,
|
| content_block: { type: 'tool_use', id: tu.id, name: tu.name, input: {} },
|
| });
|
| writeEvent('content_block_delta', {
|
| type: 'content_block_delta',
|
| index: blockIdx,
|
| delta: { type: 'input_json_delta', partial_json: JSON.stringify(tu.input) },
|
| });
|
| writeEvent('content_block_stop', { type: 'content_block_stop', index: blockIdx });
|
| }
|
|
|
| writeEvent('message_delta', {
|
| type: 'message_delta',
|
| delta: { stop_reason: 'tool_use' },
|
| usage: { output_tokens: 0 },
|
| });
|
| } else {
|
|
|
| if (toolCallDetected) {
|
| const markerIdx = fullText.indexOf('```tool_calls');
|
| if (markerIdx >= 0) {
|
| writeEvent('content_block_delta', {
|
| type: 'content_block_delta',
|
| index: textBlockIndex,
|
| delta: { type: 'text_delta', text: fullText.substring(markerIdx) },
|
| });
|
| }
|
| }
|
| writeEvent('content_block_stop', { type: 'content_block_stop', index: textBlockIndex });
|
| writeEvent('message_delta', {
|
| type: 'message_delta',
|
| delta: { stop_reason: 'end_turn' },
|
| usage: { output_tokens: 0 },
|
| });
|
| }
|
|
|
| writeEvent('message_stop', { type: 'message_stop' });
|
| res.end();
|
| });
|
|
|
| upstreamStream.on('error', () => {
|
| if (!res.writableEnded) res.end();
|
| });
|
| }
|
|
|
| |
| |
|
|
| export function collectFullResponse(upstreamStream) {
|
| return new Promise((resolve, reject) => {
|
| let text = '';
|
| let reasoning = '';
|
| let actualModel = '';
|
|
|
| const parser = createJsonStreamParser((obj) => {
|
| switch (obj.type) {
|
| case 'botType':
|
| actualModel = obj.data;
|
| break;
|
| case 'chunk':
|
| text += obj.data;
|
| break;
|
| case 'reasoningContent':
|
| reasoning += obj.data;
|
| break;
|
| case 'finalResult':
|
| if (obj.data?.mainText) text = obj.data.mainText;
|
| break;
|
| case 'streamingError':
|
| reject(new Error(obj.data || 'Streaming error'));
|
| break;
|
| }
|
| });
|
|
|
| upstreamStream.on('data', (chunk) => parser.feed(chunk));
|
| upstreamStream.on('end', () => {
|
| parser.flush();
|
|
|
| text = deduplicateText(text);
|
| resolve({ text, reasoning, model: actualModel });
|
| });
|
| upstreamStream.on('error', reject);
|
| });
|
| }
|
|
|
| |
| |
| |
|
|
| function deduplicateText(text) {
|
| if (!text || text.length < 300) return text;
|
|
|
|
|
|
|
| for (const windowSize of [500, 300, 200]) {
|
| if (text.length < windowSize * 2) continue;
|
|
|
|
|
| const start = Math.floor(text.length / 3);
|
| const needle = text.substring(start, start + windowSize);
|
|
|
|
|
| const searchFrom = start + windowSize;
|
| const repeatIdx = text.indexOf(needle, searchFrom);
|
|
|
| if (repeatIdx >= 0) {
|
|
|
| console.log(`[Dedup] 非流式去重: ${text.length} → ${repeatIdx} (window=${windowSize})`);
|
| return text.substring(0, repeatIdx).trimEnd();
|
| }
|
| }
|
|
|
| return text;
|
| }
|
|
|