| import { ChannelType, type RequestClient } from "@buape/carbon"; |
| import { resolveAckReaction, resolveHumanDelayConfig } from "../../agents/identity.js"; |
| import { EmbeddedBlockChunker } from "../../agents/pi-embedded-block-chunker.js"; |
| import { resolveChunkMode } from "../../auto-reply/chunk.js"; |
| import { dispatchInboundMessage } from "../../auto-reply/dispatch.js"; |
| import { formatInboundEnvelope, resolveEnvelopeFormatOptions } from "../../auto-reply/envelope.js"; |
| import { |
| buildPendingHistoryContextFromMap, |
| clearHistoryEntriesIfEnabled, |
| } from "../../auto-reply/reply/history.js"; |
| import { finalizeInboundContext } from "../../auto-reply/reply/inbound-context.js"; |
| import { createReplyDispatcherWithTyping } from "../../auto-reply/reply/reply-dispatcher.js"; |
| import type { ReplyPayload } from "../../auto-reply/types.js"; |
| import { shouldAckReaction as shouldAckReactionGate } from "../../channels/ack-reactions.js"; |
| import { logTypingFailure, logAckFailure } from "../../channels/logging.js"; |
| import { createReplyPrefixOptions } from "../../channels/reply-prefix.js"; |
| import { recordInboundSession } from "../../channels/session.js"; |
| import { |
| createStatusReactionController, |
| DEFAULT_TIMING, |
| type StatusReactionAdapter, |
| } from "../../channels/status-reactions.js"; |
| import { createTypingCallbacks } from "../../channels/typing.js"; |
| import { isDangerousNameMatchingEnabled } from "../../config/dangerous-name-matching.js"; |
| import { resolveDiscordPreviewStreamMode } from "../../config/discord-preview-streaming.js"; |
| import { resolveMarkdownTableMode } from "../../config/markdown-tables.js"; |
| import { readSessionUpdatedAt, resolveStorePath } from "../../config/sessions.js"; |
| import { danger, logVerbose, shouldLogVerbose } from "../../globals.js"; |
| import { convertMarkdownTables } from "../../markdown/tables.js"; |
| import { getAgentScopedMediaLocalRoots } from "../../media/local-roots.js"; |
| import { buildAgentSessionKey } from "../../routing/resolve-route.js"; |
| import { resolveThreadSessionKeys } from "../../routing/session-key.js"; |
| import { stripReasoningTagsFromText } from "../../shared/text/reasoning-tags.js"; |
| import { truncateUtf16Safe } from "../../utils.js"; |
| import { resolveDiscordMaxLinesPerMessage } from "../accounts.js"; |
| import { chunkDiscordTextWithMode } from "../chunk.js"; |
| import { resolveDiscordDraftStreamingChunking } from "../draft-chunking.js"; |
| import { createDiscordDraftStream } from "../draft-stream.js"; |
| import { reactMessageDiscord, removeReactionDiscord } from "../send.js"; |
| import { editMessageDiscord } from "../send.messages.js"; |
| import { normalizeDiscordSlug } from "./allow-list.js"; |
| import { resolveTimestampMs } from "./format.js"; |
| import { buildDiscordInboundAccessContext } from "./inbound-context.js"; |
| import type { DiscordMessagePreflightContext } from "./message-handler.preflight.js"; |
| import { |
| buildDiscordMediaPayload, |
| resolveDiscordMessageText, |
| resolveForwardedMediaList, |
| resolveMediaList, |
| } from "./message-utils.js"; |
| import { buildDirectLabel, buildGuildLabel, resolveReplyContext } from "./reply-context.js"; |
| import { deliverDiscordReply } from "./reply-delivery.js"; |
| import { resolveDiscordAutoThreadReplyPlan, resolveDiscordThreadStarter } from "./threading.js"; |
| import { sendTyping } from "./typing.js"; |
|
|
| function sleep(ms: number): Promise<void> { |
| return new Promise((resolve) => { |
| setTimeout(resolve, ms); |
| }); |
| } |
|
|
| const DISCORD_TYPING_MAX_DURATION_MS = 20 * 60_000; |
|
|
| function isProcessAborted(abortSignal?: AbortSignal): boolean { |
| return Boolean(abortSignal?.aborted); |
| } |
|
|
| export async function processDiscordMessage(ctx: DiscordMessagePreflightContext) { |
| const { |
| cfg, |
| discordConfig, |
| accountId, |
| token, |
| runtime, |
| guildHistories, |
| historyLimit, |
| mediaMaxBytes, |
| textLimit, |
| replyToMode, |
| ackReactionScope, |
| message, |
| author, |
| sender, |
| data, |
| client, |
| channelInfo, |
| channelName, |
| messageChannelId, |
| isGuildMessage, |
| isDirectMessage, |
| isGroupDm, |
| baseText, |
| messageText, |
| shouldRequireMention, |
| canDetectMention, |
| effectiveWasMentioned, |
| shouldBypassMention, |
| threadChannel, |
| threadParentId, |
| threadParentName, |
| threadParentType, |
| threadName, |
| displayChannelSlug, |
| guildInfo, |
| guildSlug, |
| channelConfig, |
| baseSessionKey, |
| boundSessionKey, |
| threadBindings, |
| route, |
| commandAuthorized, |
| discordRestFetch, |
| abortSignal, |
| } = ctx; |
| if (isProcessAborted(abortSignal)) { |
| return; |
| } |
|
|
| const ssrfPolicy = cfg.browser?.ssrfPolicy; |
| const mediaList = await resolveMediaList(message, mediaMaxBytes, discordRestFetch, ssrfPolicy); |
| if (isProcessAborted(abortSignal)) { |
| return; |
| } |
| const forwardedMediaList = await resolveForwardedMediaList( |
| message, |
| mediaMaxBytes, |
| discordRestFetch, |
| ssrfPolicy, |
| ); |
| if (isProcessAborted(abortSignal)) { |
| return; |
| } |
| mediaList.push(...forwardedMediaList); |
| const text = messageText; |
| if (!text) { |
| logVerbose("discord: drop message " + message.id + " (empty content)"); |
| return; |
| } |
|
|
| const boundThreadId = ctx.threadBinding?.conversation?.conversationId?.trim(); |
| if (boundThreadId && typeof threadBindings.touchThread === "function") { |
| threadBindings.touchThread({ threadId: boundThreadId }); |
| } |
| const ackReaction = resolveAckReaction(cfg, route.agentId, { |
| channel: "discord", |
| accountId, |
| }); |
| const removeAckAfterReply = cfg.messages?.removeAckAfterReply ?? false; |
| const mediaLocalRoots = getAgentScopedMediaLocalRoots(cfg, route.agentId); |
| const shouldAckReaction = () => |
| Boolean( |
| ackReaction && |
| shouldAckReactionGate({ |
| scope: ackReactionScope, |
| isDirect: isDirectMessage, |
| isGroup: isGuildMessage || isGroupDm, |
| isMentionableGroup: isGuildMessage, |
| requireMention: Boolean(shouldRequireMention), |
| canDetectMention, |
| effectiveWasMentioned, |
| shouldBypassMention, |
| }), |
| ); |
| const statusReactionsEnabled = shouldAckReaction(); |
| |
| const discordRest = client.rest as unknown as RequestClient; |
| const discordAdapter: StatusReactionAdapter = { |
| setReaction: async (emoji) => { |
| await reactMessageDiscord(messageChannelId, message.id, emoji, { |
| rest: discordRest, |
| }); |
| }, |
| removeReaction: async (emoji) => { |
| await removeReactionDiscord(messageChannelId, message.id, emoji, { |
| rest: discordRest, |
| }); |
| }, |
| }; |
| const statusReactions = createStatusReactionController({ |
| enabled: statusReactionsEnabled, |
| adapter: discordAdapter, |
| initialEmoji: ackReaction, |
| emojis: cfg.messages?.statusReactions?.emojis, |
| timing: cfg.messages?.statusReactions?.timing, |
| onError: (err) => { |
| logAckFailure({ |
| log: logVerbose, |
| channel: "discord", |
| target: `${messageChannelId}/${message.id}`, |
| error: err, |
| }); |
| }, |
| }); |
| if (statusReactionsEnabled) { |
| void statusReactions.setQueued(); |
| } |
|
|
| const fromLabel = isDirectMessage |
| ? buildDirectLabel(author) |
| : buildGuildLabel({ |
| guild: data.guild ?? undefined, |
| channelName: channelName ?? messageChannelId, |
| channelId: messageChannelId, |
| }); |
| const senderLabel = sender.label; |
| const isForumParent = |
| threadParentType === ChannelType.GuildForum || threadParentType === ChannelType.GuildMedia; |
| const forumParentSlug = |
| isForumParent && threadParentName ? normalizeDiscordSlug(threadParentName) : ""; |
| const threadChannelId = threadChannel?.id; |
| const isForumStarter = |
| Boolean(threadChannelId && isForumParent && forumParentSlug) && message.id === threadChannelId; |
| const forumContextLine = isForumStarter ? `[Forum parent: #${forumParentSlug}]` : null; |
| const groupChannel = isGuildMessage && displayChannelSlug ? `#${displayChannelSlug}` : undefined; |
| const groupSubject = isDirectMessage ? undefined : groupChannel; |
| const senderName = sender.isPluralKit |
| ? (sender.name ?? author.username) |
| : (data.member?.nickname ?? author.globalName ?? author.username); |
| const senderUsername = sender.isPluralKit |
| ? (sender.tag ?? sender.name ?? author.username) |
| : author.username; |
| const senderTag = sender.tag; |
| const { groupSystemPrompt, ownerAllowFrom, untrustedContext } = buildDiscordInboundAccessContext({ |
| channelConfig, |
| guildInfo, |
| sender: { id: sender.id, name: sender.name, tag: sender.tag }, |
| allowNameMatching: isDangerousNameMatchingEnabled(discordConfig), |
| isGuild: isGuildMessage, |
| channelTopic: channelInfo?.topic, |
| }); |
| const storePath = resolveStorePath(cfg.session?.store, { |
| agentId: route.agentId, |
| }); |
| const envelopeOptions = resolveEnvelopeFormatOptions(cfg); |
| const previousTimestamp = readSessionUpdatedAt({ |
| storePath, |
| sessionKey: route.sessionKey, |
| }); |
| let combinedBody = formatInboundEnvelope({ |
| channel: "Discord", |
| from: fromLabel, |
| timestamp: resolveTimestampMs(message.timestamp), |
| body: text, |
| chatType: isDirectMessage ? "direct" : "channel", |
| senderLabel, |
| previousTimestamp, |
| envelope: envelopeOptions, |
| }); |
| const shouldIncludeChannelHistory = |
| !isDirectMessage && !(isGuildMessage && channelConfig?.autoThread && !threadChannel); |
| if (shouldIncludeChannelHistory) { |
| combinedBody = buildPendingHistoryContextFromMap({ |
| historyMap: guildHistories, |
| historyKey: messageChannelId, |
| limit: historyLimit, |
| currentMessage: combinedBody, |
| formatEntry: (entry) => |
| formatInboundEnvelope({ |
| channel: "Discord", |
| from: fromLabel, |
| timestamp: entry.timestamp, |
| body: `${entry.body} [id:${entry.messageId ?? "unknown"} channel:${messageChannelId}]`, |
| chatType: "channel", |
| senderLabel: entry.sender, |
| envelope: envelopeOptions, |
| }), |
| }); |
| } |
| const replyContext = resolveReplyContext(message, resolveDiscordMessageText); |
| if (forumContextLine) { |
| combinedBody = `${combinedBody}\n${forumContextLine}`; |
| } |
|
|
| let threadStarterBody: string | undefined; |
| let threadLabel: string | undefined; |
| let parentSessionKey: string | undefined; |
| if (threadChannel) { |
| const includeThreadStarter = channelConfig?.includeThreadStarter !== false; |
| if (includeThreadStarter) { |
| const starter = await resolveDiscordThreadStarter({ |
| channel: threadChannel, |
| client, |
| parentId: threadParentId, |
| parentType: threadParentType, |
| resolveTimestampMs, |
| }); |
| if (starter?.text) { |
| |
| threadStarterBody = starter.text; |
| } |
| } |
| const parentName = threadParentName ?? "parent"; |
| threadLabel = threadName |
| ? `Discord thread #${normalizeDiscordSlug(parentName)} › ${threadName}` |
| : `Discord thread #${normalizeDiscordSlug(parentName)}`; |
| if (threadParentId) { |
| parentSessionKey = buildAgentSessionKey({ |
| agentId: route.agentId, |
| channel: route.channel, |
| peer: { kind: "channel", id: threadParentId }, |
| }); |
| } |
| } |
| const mediaPayload = buildDiscordMediaPayload(mediaList); |
| const threadKeys = resolveThreadSessionKeys({ |
| baseSessionKey, |
| threadId: threadChannel ? messageChannelId : undefined, |
| parentSessionKey, |
| useSuffix: false, |
| }); |
| const replyPlan = await resolveDiscordAutoThreadReplyPlan({ |
| client, |
| message, |
| messageChannelId, |
| isGuildMessage, |
| channelConfig, |
| threadChannel, |
| channelType: channelInfo?.type, |
| baseText: baseText ?? "", |
| combinedBody, |
| replyToMode, |
| agentId: route.agentId, |
| channel: route.channel, |
| }); |
| const deliverTarget = replyPlan.deliverTarget; |
| const replyTarget = replyPlan.replyTarget; |
| const replyReference = replyPlan.replyReference; |
| const autoThreadContext = replyPlan.autoThreadContext; |
|
|
| const effectiveFrom = isDirectMessage |
| ? `discord:${author.id}` |
| : (autoThreadContext?.From ?? `discord:channel:${messageChannelId}`); |
| const effectiveTo = autoThreadContext?.To ?? replyTarget; |
| if (!effectiveTo) { |
| runtime.error?.(danger("discord: missing reply target")); |
| return; |
| } |
| |
| const lastRouteTo = isDirectMessage ? `user:${author.id}` : effectiveTo; |
|
|
| const inboundHistory = |
| shouldIncludeChannelHistory && historyLimit > 0 |
| ? (guildHistories.get(messageChannelId) ?? []).map((entry) => ({ |
| sender: entry.sender, |
| body: entry.body, |
| timestamp: entry.timestamp, |
| })) |
| : undefined; |
|
|
| const ctxPayload = finalizeInboundContext({ |
| Body: combinedBody, |
| BodyForAgent: baseText ?? text, |
| InboundHistory: inboundHistory, |
| RawBody: baseText, |
| CommandBody: baseText, |
| From: effectiveFrom, |
| To: effectiveTo, |
| SessionKey: boundSessionKey ?? autoThreadContext?.SessionKey ?? threadKeys.sessionKey, |
| AccountId: route.accountId, |
| ChatType: isDirectMessage ? "direct" : "channel", |
| ConversationLabel: fromLabel, |
| SenderName: senderName, |
| SenderId: sender.id, |
| SenderUsername: senderUsername, |
| SenderTag: senderTag, |
| GroupSubject: groupSubject, |
| GroupChannel: groupChannel, |
| UntrustedContext: untrustedContext, |
| GroupSystemPrompt: isGuildMessage ? groupSystemPrompt : undefined, |
| GroupSpace: isGuildMessage ? (guildInfo?.id ?? guildSlug) || undefined : undefined, |
| OwnerAllowFrom: ownerAllowFrom, |
| Provider: "discord" as const, |
| Surface: "discord" as const, |
| WasMentioned: effectiveWasMentioned, |
| MessageSid: message.id, |
| ReplyToId: replyContext?.id, |
| ReplyToBody: replyContext?.body, |
| ReplyToSender: replyContext?.sender, |
| ParentSessionKey: autoThreadContext?.ParentSessionKey ?? threadKeys.parentSessionKey, |
| MessageThreadId: threadChannel?.id ?? autoThreadContext?.createdThreadId ?? undefined, |
| ThreadStarterBody: threadStarterBody, |
| ThreadLabel: threadLabel, |
| Timestamp: resolveTimestampMs(message.timestamp), |
| ...mediaPayload, |
| CommandAuthorized: commandAuthorized, |
| CommandSource: "text" as const, |
| |
| OriginatingChannel: "discord" as const, |
| OriginatingTo: autoThreadContext?.OriginatingTo ?? replyTarget, |
| }); |
| const persistedSessionKey = ctxPayload.SessionKey ?? route.sessionKey; |
|
|
| await recordInboundSession({ |
| storePath, |
| sessionKey: persistedSessionKey, |
| ctx: ctxPayload, |
| updateLastRoute: { |
| sessionKey: persistedSessionKey, |
| channel: "discord", |
| to: lastRouteTo, |
| accountId: route.accountId, |
| }, |
| onRecordError: (err) => { |
| logVerbose(`discord: failed updating session meta: ${String(err)}`); |
| }, |
| }); |
|
|
| if (shouldLogVerbose()) { |
| const preview = truncateUtf16Safe(combinedBody, 200).replace(/\n/g, "\\n"); |
| logVerbose( |
| `discord inbound: channel=${messageChannelId} deliver=${deliverTarget} from=${ctxPayload.From} preview="${preview}"`, |
| ); |
| } |
|
|
| const typingChannelId = deliverTarget.startsWith("channel:") |
| ? deliverTarget.slice("channel:".length) |
| : messageChannelId; |
|
|
| const { onModelSelected, ...prefixOptions } = createReplyPrefixOptions({ |
| cfg, |
| agentId: route.agentId, |
| channel: "discord", |
| accountId: route.accountId, |
| }); |
| const tableMode = resolveMarkdownTableMode({ |
| cfg, |
| channel: "discord", |
| accountId, |
| }); |
| const maxLinesPerMessage = resolveDiscordMaxLinesPerMessage({ |
| cfg, |
| discordConfig, |
| accountId, |
| }); |
| const chunkMode = resolveChunkMode(cfg, "discord", accountId); |
|
|
| const typingCallbacks = createTypingCallbacks({ |
| start: () => sendTyping({ client, channelId: typingChannelId }), |
| onStartError: (err) => { |
| logTypingFailure({ |
| log: logVerbose, |
| channel: "discord", |
| target: typingChannelId, |
| error: err, |
| }); |
| }, |
| |
| maxDurationMs: DISCORD_TYPING_MAX_DURATION_MS, |
| }); |
|
|
| |
| const discordStreamMode = resolveDiscordPreviewStreamMode(discordConfig); |
| const draftMaxChars = Math.min(textLimit, 2000); |
| const accountBlockStreamingEnabled = |
| typeof discordConfig?.blockStreaming === "boolean" |
| ? discordConfig.blockStreaming |
| : cfg.agents?.defaults?.blockStreamingDefault === "on"; |
| const canStreamDraft = discordStreamMode !== "off" && !accountBlockStreamingEnabled; |
| const draftReplyToMessageId = () => replyReference.use(); |
| const deliverChannelId = deliverTarget.startsWith("channel:") |
| ? deliverTarget.slice("channel:".length) |
| : messageChannelId; |
| const draftStream = canStreamDraft |
| ? createDiscordDraftStream({ |
| rest: client.rest, |
| channelId: deliverChannelId, |
| maxChars: draftMaxChars, |
| replyToMessageId: draftReplyToMessageId, |
| minInitialChars: 30, |
| throttleMs: 1200, |
| log: logVerbose, |
| warn: logVerbose, |
| }) |
| : undefined; |
| const draftChunking = |
| draftStream && discordStreamMode === "block" |
| ? resolveDiscordDraftStreamingChunking(cfg, accountId) |
| : undefined; |
| const shouldSplitPreviewMessages = discordStreamMode === "block"; |
| const draftChunker = draftChunking ? new EmbeddedBlockChunker(draftChunking) : undefined; |
| let lastPartialText = ""; |
| let draftText = ""; |
| let hasStreamedMessage = false; |
| let finalizedViaPreviewMessage = false; |
|
|
| const resolvePreviewFinalText = (text?: string) => { |
| if (typeof text !== "string") { |
| return undefined; |
| } |
| const formatted = convertMarkdownTables(text, tableMode); |
| const chunks = chunkDiscordTextWithMode(formatted, { |
| maxChars: draftMaxChars, |
| maxLines: maxLinesPerMessage, |
| chunkMode, |
| }); |
| if (!chunks.length && formatted) { |
| chunks.push(formatted); |
| } |
| if (chunks.length !== 1) { |
| return undefined; |
| } |
| const trimmed = chunks[0].trim(); |
| if (!trimmed) { |
| return undefined; |
| } |
| const currentPreviewText = discordStreamMode === "block" ? draftText : lastPartialText; |
| if ( |
| currentPreviewText && |
| currentPreviewText.startsWith(trimmed) && |
| trimmed.length < currentPreviewText.length |
| ) { |
| return undefined; |
| } |
| return trimmed; |
| }; |
|
|
| const updateDraftFromPartial = (text?: string) => { |
| if (!draftStream || !text) { |
| return; |
| } |
| |
| const cleaned = stripReasoningTagsFromText(text, { mode: "strict", trim: "both" }); |
| |
| if (!cleaned || cleaned.startsWith("Reasoning:\n")) { |
| return; |
| } |
| if (cleaned === lastPartialText) { |
| return; |
| } |
| hasStreamedMessage = true; |
| if (discordStreamMode === "partial") { |
| |
| if ( |
| lastPartialText && |
| lastPartialText.startsWith(cleaned) && |
| cleaned.length < lastPartialText.length |
| ) { |
| return; |
| } |
| lastPartialText = cleaned; |
| draftStream.update(cleaned); |
| return; |
| } |
|
|
| let delta = cleaned; |
| if (cleaned.startsWith(lastPartialText)) { |
| delta = cleaned.slice(lastPartialText.length); |
| } else { |
| |
| draftChunker?.reset(); |
| draftText = ""; |
| } |
| lastPartialText = cleaned; |
| if (!delta) { |
| return; |
| } |
| if (!draftChunker) { |
| draftText = cleaned; |
| draftStream.update(draftText); |
| return; |
| } |
| draftChunker.append(delta); |
| draftChunker.drain({ |
| force: false, |
| emit: (chunk) => { |
| draftText += chunk; |
| draftStream.update(draftText); |
| }, |
| }); |
| }; |
|
|
| const flushDraft = async () => { |
| if (!draftStream) { |
| return; |
| } |
| if (draftChunker?.hasBuffered()) { |
| draftChunker.drain({ |
| force: true, |
| emit: (chunk) => { |
| draftText += chunk; |
| }, |
| }); |
| draftChunker.reset(); |
| if (draftText) { |
| draftStream.update(draftText); |
| } |
| } |
| await draftStream.flush(); |
| }; |
|
|
| |
| const disableBlockStreamingForDraft = draftStream ? true : undefined; |
|
|
| const { dispatcher, replyOptions, markDispatchIdle, markRunComplete } = |
| createReplyDispatcherWithTyping({ |
| ...prefixOptions, |
| humanDelay: resolveHumanDelayConfig(cfg, route.agentId), |
| typingCallbacks, |
| deliver: async (payload: ReplyPayload, info) => { |
| if (isProcessAborted(abortSignal)) { |
| return; |
| } |
| const isFinal = info.kind === "final"; |
| if (payload.isReasoning) { |
| |
| return; |
| } |
| if (draftStream && isFinal) { |
| await flushDraft(); |
| const hasMedia = Boolean(payload.mediaUrl) || (payload.mediaUrls?.length ?? 0) > 0; |
| const finalText = payload.text; |
| const previewFinalText = resolvePreviewFinalText(finalText); |
| const previewMessageId = draftStream.messageId(); |
|
|
| |
| const canFinalizeViaPreviewEdit = |
| !finalizedViaPreviewMessage && |
| !hasMedia && |
| typeof previewFinalText === "string" && |
| typeof previewMessageId === "string" && |
| !payload.isError; |
|
|
| if (canFinalizeViaPreviewEdit) { |
| await draftStream.stop(); |
| if (isProcessAborted(abortSignal)) { |
| return; |
| } |
| try { |
| await editMessageDiscord( |
| deliverChannelId, |
| previewMessageId, |
| { content: previewFinalText }, |
| { rest: client.rest }, |
| ); |
| finalizedViaPreviewMessage = true; |
| replyReference.markSent(); |
| return; |
| } catch (err) { |
| logVerbose( |
| `discord: preview final edit failed; falling back to standard send (${String(err)})`, |
| ); |
| } |
| } |
|
|
| |
| if (!finalizedViaPreviewMessage) { |
| await draftStream.stop(); |
| if (isProcessAborted(abortSignal)) { |
| return; |
| } |
| const messageIdAfterStop = draftStream.messageId(); |
| if ( |
| typeof messageIdAfterStop === "string" && |
| typeof previewFinalText === "string" && |
| !hasMedia && |
| !payload.isError |
| ) { |
| try { |
| await editMessageDiscord( |
| deliverChannelId, |
| messageIdAfterStop, |
| { content: previewFinalText }, |
| { rest: client.rest }, |
| ); |
| finalizedViaPreviewMessage = true; |
| replyReference.markSent(); |
| return; |
| } catch (err) { |
| logVerbose( |
| `discord: post-stop preview edit failed; falling back to standard send (${String(err)})`, |
| ); |
| } |
| } |
| } |
|
|
| |
| if (!finalizedViaPreviewMessage) { |
| await draftStream.clear(); |
| } |
| } |
| if (isProcessAborted(abortSignal)) { |
| return; |
| } |
|
|
| const replyToId = replyReference.use(); |
| await deliverDiscordReply({ |
| cfg, |
| replies: [payload], |
| target: deliverTarget, |
| token, |
| accountId, |
| rest: client.rest, |
| runtime, |
| replyToId, |
| replyToMode, |
| textLimit, |
| maxLinesPerMessage, |
| tableMode, |
| chunkMode, |
| sessionKey: ctxPayload.SessionKey, |
| threadBindings, |
| mediaLocalRoots, |
| }); |
| replyReference.markSent(); |
| }, |
| onError: (err, info) => { |
| runtime.error?.(danger(`discord ${info.kind} reply failed: ${String(err)}`)); |
| }, |
| onReplyStart: async () => { |
| if (isProcessAborted(abortSignal)) { |
| return; |
| } |
| await typingCallbacks.onReplyStart(); |
| await statusReactions.setThinking(); |
| }, |
| }); |
|
|
| let dispatchResult: Awaited<ReturnType<typeof dispatchInboundMessage>> | null = null; |
| let dispatchError = false; |
| let dispatchAborted = false; |
| try { |
| if (isProcessAborted(abortSignal)) { |
| dispatchAborted = true; |
| return; |
| } |
| dispatchResult = await dispatchInboundMessage({ |
| ctx: ctxPayload, |
| cfg, |
| dispatcher, |
| replyOptions: { |
| ...replyOptions, |
| abortSignal, |
| skillFilter: channelConfig?.skills, |
| disableBlockStreaming: |
| disableBlockStreamingForDraft ?? |
| (typeof discordConfig?.blockStreaming === "boolean" |
| ? !discordConfig.blockStreaming |
| : undefined), |
| onPartialReply: draftStream ? (payload) => updateDraftFromPartial(payload.text) : undefined, |
| onAssistantMessageStart: draftStream |
| ? () => { |
| if (shouldSplitPreviewMessages && hasStreamedMessage) { |
| logVerbose("discord: calling forceNewMessage() for draft stream"); |
| draftStream.forceNewMessage(); |
| } |
| lastPartialText = ""; |
| draftText = ""; |
| draftChunker?.reset(); |
| } |
| : undefined, |
| onReasoningEnd: draftStream |
| ? () => { |
| if (shouldSplitPreviewMessages && hasStreamedMessage) { |
| logVerbose("discord: calling forceNewMessage() for draft stream"); |
| draftStream.forceNewMessage(); |
| } |
| lastPartialText = ""; |
| draftText = ""; |
| draftChunker?.reset(); |
| } |
| : undefined, |
| onModelSelected, |
| onReasoningStream: async () => { |
| await statusReactions.setThinking(); |
| }, |
| onToolStart: async (payload) => { |
| if (isProcessAborted(abortSignal)) { |
| return; |
| } |
| await statusReactions.setTool(payload.name); |
| }, |
| onCompactionStart: async () => { |
| if (isProcessAborted(abortSignal)) { |
| return; |
| } |
| await statusReactions.setCompacting(); |
| }, |
| onCompactionEnd: async () => { |
| if (isProcessAborted(abortSignal)) { |
| return; |
| } |
| statusReactions.cancelPending(); |
| await statusReactions.setThinking(); |
| }, |
| }, |
| }); |
| if (isProcessAborted(abortSignal)) { |
| dispatchAborted = true; |
| return; |
| } |
| } catch (err) { |
| if (isProcessAborted(abortSignal)) { |
| dispatchAborted = true; |
| return; |
| } |
| dispatchError = true; |
| throw err; |
| } finally { |
| try { |
| |
| await draftStream?.stop(); |
| if (!finalizedViaPreviewMessage) { |
| await draftStream?.clear(); |
| } |
| } catch (err) { |
| |
| logVerbose(`discord: draft cleanup failed: ${String(err)}`); |
| } finally { |
| markRunComplete(); |
| markDispatchIdle(); |
| } |
| if (statusReactionsEnabled) { |
| if (dispatchAborted) { |
| if (removeAckAfterReply) { |
| void statusReactions.clear(); |
| } else { |
| void statusReactions.restoreInitial(); |
| } |
| } else { |
| if (dispatchError) { |
| await statusReactions.setError(); |
| } else { |
| await statusReactions.setDone(); |
| } |
| if (removeAckAfterReply) { |
| void (async () => { |
| await sleep(dispatchError ? DEFAULT_TIMING.errorHoldMs : DEFAULT_TIMING.doneHoldMs); |
| await statusReactions.clear(); |
| })(); |
| } else { |
| void statusReactions.restoreInitial(); |
| } |
| } |
| } |
| } |
| if (dispatchAborted) { |
| return; |
| } |
|
|
| if (!dispatchResult?.queuedFinal) { |
| if (isGuildMessage) { |
| clearHistoryEntriesIfEnabled({ |
| historyMap: guildHistories, |
| historyKey: messageChannelId, |
| limit: historyLimit, |
| }); |
| } |
| return; |
| } |
| if (shouldLogVerbose()) { |
| const finalCount = dispatchResult.counts.final; |
| logVerbose( |
| `discord: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${replyTarget}`, |
| ); |
| } |
| if (isGuildMessage) { |
| clearHistoryEntriesIfEnabled({ |
| historyMap: guildHistories, |
| historyKey: messageChannelId, |
| limit: historyLimit, |
| }); |
| } |
| } |
|
|