ManimCat / src /studio-agent /runtime /run-processor.ts
Bin29's picture
Sync from main: c1ef036 chore: document docker persistence volumes
94e1b2f
import { createLogger } from '../../utils/logger'
import { createStudioToolPart } from '../domain/factories'
import type {
StudioAssistantMessage,
StudioEventBus,
StudioMessageStore,
StudioPartStore,
StudioProcessorStreamEvent,
StudioRun,
StudioSession,
StudioToolPart,
StudioToolResult
} from '../domain/types'
import { isDoomLoop } from './doom-loop'
import { StudioPartSynchronizer } from './part-synchronizer'
import { StudioTextStreamAccumulator } from './text-stream-accumulator'
import {
getToolInput,
getToolTimeStart,
mergeToolMetadata,
mergeToolStateMetadata
} from './tool-state'
const logger = createLogger('StudioRunProcessor')
export type StudioProcessorOutcome = 'continue' | 'stop' | 'compact'
interface StudioRunProcessorOptions {
messageStore: StudioMessageStore
partStore: StudioPartStore
}
export class StudioRunProcessor {
private readonly partStore: StudioPartStore
private readonly sync: StudioPartSynchronizer
private readonly textStream: StudioTextStreamAccumulator
constructor(options: StudioRunProcessorOptions) {
this.partStore = options.partStore
this.sync = new StudioPartSynchronizer(options.messageStore, options.partStore)
this.textStream = new StudioTextStreamAccumulator(options.partStore, this.sync)
}
async processStream(input: {
session: StudioSession
run: StudioRun
assistantMessage: StudioAssistantMessage
events: AsyncIterable<StudioProcessorStreamEvent>
eventBus?: StudioEventBus
shouldCompact?: (usage?: { tokens?: number }, assistantMessage?: StudioAssistantMessage) => Promise<boolean>
onDoomLoop?: (toolName: string, toolInput: Record<string, unknown>) => Promise<boolean>
}): Promise<StudioProcessorOutcome> {
const toolCalls = new Map<string, StudioToolPart>()
let currentAssistantMessage = input.assistantMessage
let activeTextPartId: string | null = null
let activeTextValue = ''
let activeReasoningPartId: string | null = null
let blocked = false
let needsCompaction = false
for await (const event of input.events) {
switch (event.type) {
case 'assistant-message-start': {
currentAssistantMessage = event.message
activeTextPartId = null
activeTextValue = ''
activeReasoningPartId = null
break
}
case 'tool-input-start': {
input.eventBus?.publish({
type: 'tool_input_start',
sessionId: input.session.id,
runId: input.run.id,
toolName: event.toolName,
callId: event.id,
raw: event.raw
})
const part = createStudioToolPart({
messageId: currentAssistantMessage.id,
sessionId: currentAssistantMessage.sessionId,
tool: event.toolName,
callId: event.id,
raw: event.raw
})
await this.sync.appendPart(currentAssistantMessage, part)
toolCalls.set(event.id, part)
break
}
case 'tool-call': {
input.eventBus?.publish({
type: 'tool_call',
sessionId: input.session.id,
runId: input.run.id,
toolName: event.toolName,
callId: event.toolCallId,
input: event.input
})
const match = toolCalls.get(event.toolCallId)
if (!match) {
break
}
const allowed = await this.allowToolCall({
assistantMessage: currentAssistantMessage,
toolName: event.toolName,
toolInput: event.input,
onDoomLoop: input.onDoomLoop
})
if (!allowed) {
blocked = true
await this.updateToolState(match.id, {
status: 'error',
input: event.input,
error: `Doom loop rejected for tool "${event.toolName}"`,
time: { start: Date.now(), end: Date.now() }
})
toolCalls.delete(event.toolCallId)
break
}
await this.updateToolState(match.id, {
status: 'running',
input: event.input,
title: undefined,
metadata: undefined,
time: { start: Date.now() }
})
break
}
case 'tool-result': {
input.eventBus?.publish({
type: 'tool_result',
sessionId: input.session.id,
runId: input.run.id,
toolName: toolCalls.get(event.toolCallId)?.tool ?? 'unknown',
callId: event.toolCallId,
status: 'completed',
title: event.title,
output: event.output,
metadata: event.metadata,
attachments: event.attachments
})
await this.completeToolCall(toolCalls, event)
break
}
case 'tool-error': {
input.eventBus?.publish({
type: 'tool_result',
sessionId: input.session.id,
runId: input.run.id,
toolName: toolCalls.get(event.toolCallId)?.tool ?? 'unknown',
callId: event.toolCallId,
status: 'failed',
error: event.error,
metadata: event.metadata
})
blocked = await this.failToolCall(toolCalls, event)
break
}
case 'text-start': {
activeTextValue = ''
activeTextPartId = await this.textStream.startPart(currentAssistantMessage, 'text')
break
}
case 'text-delta': {
activeTextValue += event.text
await this.textStream.appendDelta(activeTextPartId, event.text, 'text')
break
}
case 'text-end': {
const text = activeTextValue.trim()
if (text) {
input.eventBus?.publish({
type: 'assistant_text',
sessionId: input.session.id,
runId: input.run.id,
text
})
}
activeTextPartId = null
activeTextValue = ''
break
}
case 'reasoning-start': {
activeReasoningPartId = await this.textStream.startPart(currentAssistantMessage, 'reasoning')
break
}
case 'reasoning-delta': {
await this.textStream.appendDelta(activeReasoningPartId, event.text, 'reasoning')
break
}
case 'reasoning-end': {
activeReasoningPartId = null
break
}
case 'finish-step': {
if (!currentAssistantMessage.summary && input.shouldCompact) {
needsCompaction = await input.shouldCompact(event.usage, currentAssistantMessage)
}
break
}
}
}
if (blocked) {
return 'stop'
}
if (needsCompaction) {
return 'compact'
}
return 'continue'
}
async applyToolMetadata(input: {
assistantMessage: StudioAssistantMessage
callId: string
title?: string
metadata?: Record<string, unknown>
}): Promise<void> {
const part = await this.findToolPart(input.assistantMessage, input.callId)
if (!part) {
return
}
await this.sync.updatePart(part.id, {
...part,
metadata: {
...(part.metadata ?? {}),
...(input.metadata ?? {})
},
state: mergeToolStateMetadata(part.state, input.title, input.metadata)
})
}
async materializeToolResult(input: {
session: StudioSession
run: StudioRun
assistantMessage: StudioAssistantMessage
callId: string
toolName: string
toolInput: Record<string, unknown>
result: StudioToolResult
}): Promise<void> {
const part = createStudioToolPart({
messageId: input.assistantMessage.id,
sessionId: input.assistantMessage.sessionId,
tool: input.toolName,
callId: input.callId
})
const created = await this.sync.appendPart(input.assistantMessage, part)
await this.updateToolState(created.id, {
status: 'completed',
input: input.toolInput,
output: input.result.output,
title: input.result.title,
metadata: input.result.metadata,
attachments: input.result.attachments,
time: { start: Date.now(), end: Date.now() }
})
logger.info('Materialized direct tool result into assistant message', {
sessionId: input.session.id,
runId: input.run.id,
toolName: input.toolName,
callId: input.callId
})
}
private async allowToolCall(input: {
assistantMessage: StudioAssistantMessage
toolName: string
toolInput: Record<string, unknown>
onDoomLoop?: (toolName: string, toolInput: Record<string, unknown>) => Promise<boolean>
}): Promise<boolean> {
if (!input.onDoomLoop) {
return true
}
const doomLoop = await isDoomLoop({
assistantMessage: input.assistantMessage,
partStore: this.partStore,
toolName: input.toolName,
toolInput: input.toolInput
})
return doomLoop ? input.onDoomLoop(input.toolName, input.toolInput) : true
}
private async completeToolCall(
toolCalls: Map<string, StudioToolPart>,
event: Extract<StudioProcessorStreamEvent, { type: 'tool-result' }>
): Promise<void> {
const match = toolCalls.get(event.toolCallId)
if (!match) {
return
}
const runningState = await this.partStore.getById(match.id)
await this.updateToolState(match.id, {
status: 'completed',
input: getToolInput(runningState),
output: event.output,
title: event.title ?? `Completed ${match.tool}`,
metadata: mergeToolMetadata(runningState, event.metadata),
attachments: event.attachments,
time: {
start: getToolTimeStart(runningState),
end: Date.now()
}
})
toolCalls.delete(event.toolCallId)
}
private async failToolCall(
toolCalls: Map<string, StudioToolPart>,
event: Extract<StudioProcessorStreamEvent, { type: 'tool-error' }>
): Promise<boolean> {
const match = toolCalls.get(event.toolCallId)
if (!match) {
return false
}
const runningState = await this.partStore.getById(match.id)
await this.updateToolState(match.id, {
status: 'error',
input: getToolInput(runningState),
error: event.error,
metadata: mergeToolMetadata(runningState, event.metadata),
time: {
start: getToolTimeStart(runningState),
end: Date.now()
}
})
toolCalls.delete(event.toolCallId)
return event.metadata?.recoverable !== true
}
private async findToolPart(
assistantMessage: StudioAssistantMessage,
callId: string
): Promise<StudioToolPart | null> {
const parts = await this.partStore.listByMessageId(assistantMessage.id)
const part = [...parts]
.reverse()
.find((candidate) => candidate.type === 'tool' && candidate.callId === callId)
return part?.type === 'tool' ? part : null
}
private async updateToolState(partId: string, state: StudioToolPart['state']): Promise<void> {
const current = await this.partStore.getById(partId)
if (!current || current.type !== 'tool') {
return
}
await this.sync.updatePart(partId, {
...current,
state,
metadata: mergeToolMetadata(current, 'metadata' in state ? state.metadata : undefined)
})
}
}