| import { Logger } from '@n8n/backend-common'; |
| import type { IExecutionResponse } from '@n8n/db'; |
| import { ExecutionRepository } from '@n8n/db'; |
| import { Service } from '@n8n/di'; |
| import type { DateTime } from 'luxon'; |
| import { InstanceSettings } from 'n8n-core'; |
| import { sleep } from 'n8n-workflow'; |
| import type { IRun, ITaskData } from 'n8n-workflow'; |
|
|
| import { ARTIFICIAL_TASK_DATA } from '@/constants'; |
| import { NodeCrashedError } from '@/errors/node-crashed.error'; |
| import { WorkflowCrashedError } from '@/errors/workflow-crashed.error'; |
| import { getLifecycleHooksForRegularMain } from '@/execution-lifecycle/execution-lifecycle-hooks'; |
| import { Push } from '@/push'; |
|
|
| import type { EventMessageTypes } from '../eventbus/event-message-classes'; |
|
|
| |
| |
| |
| @Service() |
| export class ExecutionRecoveryService { |
| constructor( |
| private readonly logger: Logger, |
| private readonly instanceSettings: InstanceSettings, |
| private readonly push: Push, |
| private readonly executionRepository: ExecutionRepository, |
| ) {} |
|
|
| |
| |
| |
| async recoverFromLogs(executionId: string, messages: EventMessageTypes[]) { |
| if (this.instanceSettings.isFollower) return; |
|
|
| const amendedExecution = await this.amend(executionId, messages); |
|
|
| if (!amendedExecution) return null; |
|
|
| this.logger.info('[Recovery] Logs available, amended execution', { |
| executionId: amendedExecution.id, |
| }); |
|
|
| await this.executionRepository.updateExistingExecution(executionId, amendedExecution); |
|
|
| await this.runHooks(amendedExecution); |
|
|
| this.push.once('editorUiConnected', async () => { |
| await sleep(1000); |
| this.push.broadcast({ type: 'executionRecovered', data: { executionId } }); |
| }); |
|
|
| return amendedExecution; |
| } |
|
|
| |
| |
| |
|
|
| |
| |
| |
| private async amend(executionId: string, messages: EventMessageTypes[]) { |
| if (messages.length === 0) return await this.amendWithoutLogs(executionId); |
|
|
| const { nodeMessages, workflowMessages } = this.toRelevantMessages(messages); |
|
|
| if (nodeMessages.length === 0) return null; |
|
|
| const execution = await this.executionRepository.findSingleExecution(executionId, { |
| includeData: true, |
| unflattenData: true, |
| }); |
|
|
| |
| |
| |
| |
| |
| if (!execution || (['success', 'error'].includes(execution.status) && execution.data)) { |
| return null; |
| } |
|
|
| const runExecutionData = execution.data ?? { resultData: { runData: {} } }; |
|
|
| let lastNodeRunTimestamp: DateTime | undefined; |
|
|
| for (const node of execution.workflowData.nodes) { |
| const nodeStartedMessage = nodeMessages.find( |
| (m) => m.payload.nodeName === node.name && m.eventName === 'n8n.node.started', |
| ); |
|
|
| if (!nodeStartedMessage) continue; |
|
|
| const nodeHasRunData = runExecutionData.resultData.runData[node.name] !== undefined; |
|
|
| if (nodeHasRunData) continue; |
|
|
| const nodeFinishedMessage = nodeMessages.find( |
| (m) => m.payload.nodeName === node.name && m.eventName === 'n8n.node.finished', |
| ); |
|
|
| const taskData: ITaskData = { |
| startTime: nodeStartedMessage.ts.toUnixInteger(), |
| executionIndex: 0, |
| executionTime: -1, |
| source: [null], |
| }; |
|
|
| if (nodeFinishedMessage) { |
| taskData.executionStatus = 'success'; |
| taskData.data ??= ARTIFICIAL_TASK_DATA; |
| taskData.executionTime = nodeFinishedMessage.ts.diff(nodeStartedMessage.ts).toMillis(); |
| lastNodeRunTimestamp = nodeFinishedMessage.ts; |
| } else { |
| taskData.executionStatus = 'crashed'; |
| taskData.error = new NodeCrashedError(node); |
| taskData.executionTime = 0; |
| runExecutionData.resultData.error = new WorkflowCrashedError(); |
| lastNodeRunTimestamp = nodeStartedMessage.ts; |
| } |
|
|
| runExecutionData.resultData.lastNodeExecuted = node.name; |
| runExecutionData.resultData.runData[node.name] = [taskData]; |
| } |
|
|
| return { |
| ...execution, |
| status: execution.status === 'error' ? 'error' : 'crashed', |
| stoppedAt: this.toStoppedAt(lastNodeRunTimestamp, workflowMessages), |
| data: runExecutionData, |
| } as IExecutionResponse; |
| } |
|
|
| private async amendWithoutLogs(executionId: string) { |
| const exists = await this.executionRepository.exists({ where: { id: executionId } }); |
|
|
| if (!exists) return null; |
|
|
| await this.executionRepository.markAsCrashed(executionId); |
|
|
| const execution = await this.executionRepository.findSingleExecution(executionId, { |
| includeData: true, |
| unflattenData: true, |
| }); |
|
|
| return execution ?? null; |
| } |
|
|
| private toRelevantMessages(messages: EventMessageTypes[]) { |
| return messages.reduce<{ |
| nodeMessages: EventMessageTypes[]; |
| workflowMessages: EventMessageTypes[]; |
| }>( |
| (acc, cur) => { |
| if (cur.eventName.startsWith('n8n.node.')) { |
| acc.nodeMessages.push(cur); |
| } else if (cur.eventName.startsWith('n8n.workflow.')) { |
| acc.workflowMessages.push(cur); |
| } |
|
|
| return acc; |
| }, |
| { nodeMessages: [], workflowMessages: [] }, |
| ); |
| } |
|
|
| private toStoppedAt(timestamp: DateTime | undefined, messages: EventMessageTypes[]) { |
| if (timestamp) return timestamp.toJSDate(); |
|
|
| const WORKFLOW_END_EVENTS = new Set([ |
| 'n8n.workflow.success', |
| 'n8n.workflow.crashed', |
| 'n8n.workflow.failed', |
| ]); |
|
|
| return ( |
| messages.find((m) => WORKFLOW_END_EVENTS.has(m.eventName)) ?? |
| messages.find((m) => m.eventName === 'n8n.workflow.started') |
| )?.ts.toJSDate(); |
| } |
|
|
| private async runHooks(execution: IExecutionResponse) { |
| execution.data ??= { resultData: { runData: {} } }; |
|
|
| const lifecycleHooks = getLifecycleHooksForRegularMain( |
| { |
| userId: '', |
| workflowData: execution.workflowData, |
| executionMode: execution.mode, |
| executionData: execution.data, |
| runData: execution.data.resultData.runData, |
| retryOf: execution.retryOf ?? undefined, |
| }, |
| execution.id, |
| ); |
|
|
| const run: IRun = { |
| data: execution.data, |
| finished: false, |
| mode: execution.mode, |
| waitTill: execution.waitTill ?? undefined, |
| startedAt: execution.startedAt, |
| stoppedAt: execution.stoppedAt, |
| status: execution.status, |
| }; |
|
|
| await lifecycleHooks.runHook('workflowExecuteAfter', [run]); |
| } |
| } |
|
|