| import { GlobalConfig } from '@n8n/config'; |
| import type { WorkflowEntity } from '@n8n/db'; |
| import { ExecutionRepository } from '@n8n/db'; |
| import { WorkflowRepository } from '@n8n/db'; |
| import { Container } from '@n8n/di'; |
| import { mock } from 'jest-mock-extended'; |
| import { ExternalSecretsProxy } from 'n8n-core'; |
| import type { IWorkflowBase } from 'n8n-workflow'; |
| import type { |
| IExecuteWorkflowInfo, |
| IWorkflowExecuteAdditionalData, |
| ExecuteWorkflowOptions, |
| IRun, |
| INodeExecutionData, |
| } from 'n8n-workflow'; |
| import type PCancelable from 'p-cancelable'; |
|
|
| import { ActiveExecutions } from '@/active-executions'; |
| import { CredentialsHelper } from '@/credentials-helper'; |
| import { VariablesService } from '@/environments.ee/variables/variables.service.ee'; |
| import { EventService } from '@/events/event.service'; |
| import { |
| CredentialsPermissionChecker, |
| SubworkflowPolicyChecker, |
| } from '@/executions/pre-execution-checks'; |
| import { ExternalHooks } from '@/external-hooks'; |
| import { UrlService } from '@/services/url.service'; |
| import { WorkflowStatisticsService } from '@/services/workflow-statistics.service'; |
| import { Telemetry } from '@/telemetry'; |
| import { executeWorkflow, getBase, getRunData } from '@/workflow-execute-additional-data'; |
| import * as WorkflowHelpers from '@/workflow-helpers'; |
| import { mockInstance } from '@test/mocking'; |
|
|
| const EXECUTION_ID = '123'; |
| const LAST_NODE_EXECUTED = 'Last node executed'; |
|
|
| const getMockRun = ({ lastNodeOutput }: { lastNodeOutput: Array<INodeExecutionData[] | null> }) => |
| mock<IRun>({ |
| data: { |
| resultData: { |
| runData: { |
| [LAST_NODE_EXECUTED]: [ |
| { |
| startTime: 100, |
| data: { |
| main: lastNodeOutput, |
| }, |
| }, |
| ], |
| }, |
| lastNodeExecuted: LAST_NODE_EXECUTED, |
| }, |
| }, |
| finished: true, |
| mode: 'manual', |
| startedAt: new Date(), |
| status: 'new', |
| waitTill: undefined, |
| }); |
|
|
| const getCancelablePromise = async (run: IRun) => |
| await mock<PCancelable<IRun>>({ |
| then: jest |
| .fn() |
| .mockImplementation(async (onfulfilled) => await Promise.resolve(run).then(onfulfilled)), |
| catch: jest |
| .fn() |
| .mockImplementation(async (onrejected) => await Promise.resolve(run).catch(onrejected)), |
| finally: jest |
| .fn() |
| .mockImplementation(async (onfinally) => await Promise.resolve(run).finally(onfinally)), |
| [Symbol.toStringTag]: 'PCancelable', |
| }); |
|
|
| const processRunExecutionData = jest.fn(); |
|
|
| jest.mock('n8n-core', () => ({ |
| __esModule: true, |
| ...jest.requireActual('n8n-core'), |
| WorkflowExecute: jest.fn().mockImplementation(() => ({ |
| processRunExecutionData, |
| })), |
| })); |
|
|
| describe('WorkflowExecuteAdditionalData', () => { |
| const variablesService = mockInstance(VariablesService); |
| variablesService.getAllCached.mockResolvedValue([]); |
| const credentialsHelper = mockInstance(CredentialsHelper); |
| const externalSecretsProxy = mockInstance(ExternalSecretsProxy); |
| const eventService = mockInstance(EventService); |
| mockInstance(ExternalHooks); |
| Container.set(VariablesService, variablesService); |
| Container.set(CredentialsHelper, credentialsHelper); |
| Container.set(ExternalSecretsProxy, externalSecretsProxy); |
| const executionRepository = mockInstance(ExecutionRepository); |
| mockInstance(Telemetry); |
| const workflowRepository = mockInstance(WorkflowRepository); |
| const activeExecutions = mockInstance(ActiveExecutions); |
| mockInstance(CredentialsPermissionChecker); |
| mockInstance(SubworkflowPolicyChecker); |
| mockInstance(WorkflowStatisticsService); |
|
|
| const urlService = mockInstance(UrlService); |
| Container.set(UrlService, urlService); |
|
|
| test('logAiEvent should call MessageEventBus', async () => { |
| const additionalData = await getBase('user-id'); |
|
|
| const eventName = 'ai-messages-retrieved-from-memory'; |
| const payload = { |
| msg: 'test message', |
| executionId: '123', |
| nodeName: 'n8n-memory', |
| workflowId: 'workflow-id', |
| workflowName: 'workflow-name', |
| nodeType: 'n8n-memory', |
| }; |
|
|
| additionalData.logAiEvent(eventName, payload); |
|
|
| expect(eventService.emit).toHaveBeenCalledTimes(1); |
| expect(eventService.emit).toHaveBeenCalledWith(eventName, payload); |
| }); |
|
|
| describe('executeWorkflow', () => { |
| const runWithData = getMockRun({ |
| lastNodeOutput: [[{ json: { test: 1 } }]], |
| }); |
|
|
| beforeEach(() => { |
| workflowRepository.get.mockResolvedValue( |
| mock<WorkflowEntity>({ id: EXECUTION_ID, nodes: [] }), |
| ); |
| activeExecutions.add.mockResolvedValue(EXECUTION_ID); |
| processRunExecutionData.mockReturnValue(getCancelablePromise(runWithData)); |
| }); |
|
|
| it('should execute workflow, return data and execution id', async () => { |
| const response = await executeWorkflow( |
| mock<IExecuteWorkflowInfo>(), |
| mock<IWorkflowExecuteAdditionalData>(), |
| mock<ExecuteWorkflowOptions>({ loadedWorkflowData: undefined, doNotWaitToFinish: false }), |
| ); |
|
|
| expect(response).toEqual({ |
| data: runWithData.data.resultData.runData[LAST_NODE_EXECUTED][0].data!.main, |
| executionId: EXECUTION_ID, |
| }); |
| }); |
|
|
| it('should execute workflow, skip waiting', async () => { |
| const response = await executeWorkflow( |
| mock<IExecuteWorkflowInfo>(), |
| mock<IWorkflowExecuteAdditionalData>(), |
| mock<ExecuteWorkflowOptions>({ loadedWorkflowData: undefined, doNotWaitToFinish: true }), |
| ); |
|
|
| expect(response).toEqual({ |
| data: [null], |
| executionId: EXECUTION_ID, |
| }); |
| }); |
|
|
| it('should set sub workflow execution as running', async () => { |
| await executeWorkflow( |
| mock<IExecuteWorkflowInfo>(), |
| mock<IWorkflowExecuteAdditionalData>(), |
| mock<ExecuteWorkflowOptions>({ loadedWorkflowData: undefined }), |
| ); |
|
|
| expect(executionRepository.setRunning).toHaveBeenCalledWith(EXECUTION_ID); |
| }); |
|
|
| it('should return waitTill property when workflow execution is waiting', async () => { |
| const waitTill = new Date(); |
| runWithData.waitTill = waitTill; |
|
|
| const response = await executeWorkflow( |
| mock<IExecuteWorkflowInfo>(), |
| mock<IWorkflowExecuteAdditionalData>(), |
| mock<ExecuteWorkflowOptions>({ loadedWorkflowData: undefined, doNotWaitToFinish: false }), |
| ); |
|
|
| expect(response).toEqual({ |
| data: runWithData.data.resultData.runData[LAST_NODE_EXECUTED][0].data!.main, |
| executionId: EXECUTION_ID, |
| waitTill, |
| }); |
| }); |
| }); |
|
|
| describe('getRunData', () => { |
| it('should throw error to add trigger ndoe', async () => { |
| const workflow = mock<IWorkflowBase>({ |
| id: '1', |
| name: 'test', |
| nodes: [], |
| active: false, |
| }); |
| await expect(getRunData(workflow)).rejects.toThrowError('Missing node to start execution'); |
| }); |
|
|
| const workflow = mock<IWorkflowBase>({ |
| id: '1', |
| name: 'test', |
| nodes: [ |
| { |
| type: 'n8n-nodes-base.executeWorkflowTrigger', |
| }, |
| ], |
| active: false, |
| }); |
|
|
| it('should return default data', async () => { |
| expect(await getRunData(workflow)).toEqual({ |
| executionData: { |
| executionData: { |
| contextData: {}, |
| metadata: {}, |
| nodeExecutionStack: [ |
| { |
| data: { main: [[{ json: {} }]] }, |
| metadata: { parentExecution: undefined }, |
| node: workflow.nodes[0], |
| source: null, |
| }, |
| ], |
| waitingExecution: {}, |
| waitingExecutionSource: {}, |
| }, |
| resultData: { runData: {} }, |
| startData: {}, |
| }, |
| executionMode: 'integrated', |
| workflowData: workflow, |
| }); |
| }); |
|
|
| it('should return run data with input data and metadata', async () => { |
| const data = [{ json: { test: 1 } }]; |
| const parentExecution = { |
| executionId: '123', |
| workflowId: '567', |
| }; |
| expect(await getRunData(workflow, data, parentExecution)).toEqual({ |
| executionData: { |
| executionData: { |
| contextData: {}, |
| metadata: {}, |
| nodeExecutionStack: [ |
| { |
| data: { main: [data] }, |
| metadata: { parentExecution }, |
| node: workflow.nodes[0], |
| source: null, |
| }, |
| ], |
| waitingExecution: {}, |
| waitingExecutionSource: {}, |
| }, |
| parentExecution: { |
| executionId: '123', |
| workflowId: '567', |
| }, |
| resultData: { runData: {} }, |
| startData: {}, |
| }, |
| executionMode: 'integrated', |
| workflowData: workflow, |
| }); |
| }); |
| }); |
|
|
| describe('getBase', () => { |
| const mockWebhookBaseUrl = 'webhook-base-url.com'; |
| jest.spyOn(urlService, 'getWebhookBaseUrl').mockReturnValue(mockWebhookBaseUrl); |
|
|
| const globalConfig = mockInstance(GlobalConfig); |
| Container.set(GlobalConfig, globalConfig); |
| globalConfig.endpoints = mock<GlobalConfig['endpoints']>({ |
| rest: '/rest/', |
| formWaiting: '/form-waiting/', |
| webhook: '/webhook/', |
| webhookWaiting: '/webhook-waiting/', |
| webhookTest: '/webhook-test/', |
| }); |
|
|
| const mockVariables = { variable: 1 }; |
| jest.spyOn(WorkflowHelpers, 'getVariables').mockResolvedValue(mockVariables); |
|
|
| it('should return base additional data with default values', async () => { |
| const additionalData = await getBase(); |
|
|
| expect(additionalData).toMatchObject({ |
| currentNodeExecutionIndex: 0, |
| credentialsHelper, |
| executeWorkflow: expect.any(Function), |
| restApiUrl: `${mockWebhookBaseUrl}/rest/`, |
| instanceBaseUrl: mockWebhookBaseUrl, |
| formWaitingBaseUrl: `${mockWebhookBaseUrl}/form-waiting/`, |
| webhookBaseUrl: `${mockWebhookBaseUrl}/webhook/`, |
| webhookWaitingBaseUrl: `${mockWebhookBaseUrl}/webhook-waiting/`, |
| webhookTestBaseUrl: `${mockWebhookBaseUrl}/webhook-test/`, |
| currentNodeParameters: undefined, |
| executionTimeoutTimestamp: undefined, |
| userId: undefined, |
| setExecutionStatus: expect.any(Function), |
| variables: mockVariables, |
| externalSecretsProxy, |
| startRunnerTask: expect.any(Function), |
| logAiEvent: expect.any(Function), |
| }); |
| }); |
|
|
| it('should include userId when provided', async () => { |
| const userId = 'test-user-id'; |
| const additionalData = await getBase(userId); |
|
|
| expect(additionalData.userId).toBe(userId); |
| }); |
|
|
| it('should include currentNodeParameters when provided', async () => { |
| const currentNodeParameters = { param1: 'value1' }; |
| const additionalData = await getBase(undefined, currentNodeParameters); |
|
|
| expect(additionalData.currentNodeParameters).toBe(currentNodeParameters); |
| }); |
|
|
| it('should include executionTimeoutTimestamp when provided', async () => { |
| const executionTimeoutTimestamp = Date.now() + 1000; |
| const additionalData = await getBase(undefined, undefined, executionTimeoutTimestamp); |
|
|
| expect(additionalData.executionTimeoutTimestamp).toBe(executionTimeoutTimestamp); |
| }); |
| }); |
| }); |
|
|