| import { heartbeatMessageSchema } from '@n8n/api-types'; |
| import type { User } from '@n8n/db'; |
| import { Service } from '@n8n/di'; |
| import { UnexpectedError } from 'n8n-workflow'; |
| import type WebSocket from 'ws'; |
|
|
| import { AbstractPush } from './abstract.push'; |
|
|
| function heartbeat(this: WebSocket) { |
| this.isAlive = true; |
| } |
|
|
| @Service() |
| export class WebSocketPush extends AbstractPush<WebSocket> { |
| add(pushRef: string, userId: User['id'], connection: WebSocket) { |
| connection.isAlive = true; |
| connection.on('pong', heartbeat); |
|
|
| super.add(pushRef, userId, connection); |
|
|
| const onMessage = async (data: WebSocket.RawData) => { |
| try { |
| const buffer = Array.isArray(data) ? Buffer.concat(data) : Buffer.from(data); |
| const msg: unknown = JSON.parse(buffer.toString('utf8')); |
|
|
| |
| |
| |
| if (await this.isClientHeartbeat(msg)) { |
| return; |
| } |
|
|
| this.onMessageReceived(pushRef, msg); |
| } catch (error) { |
| this.errorReporter.error( |
| new UnexpectedError('Error parsing push message', { |
| extra: { |
| userId, |
| data, |
| }, |
| cause: error, |
| }), |
| ); |
| this.logger.error("Couldn't parse message from editor-UI", { |
| error: error as unknown, |
| pushRef, |
| data, |
| }); |
| } |
| }; |
|
|
| |
| connection.once('close', () => { |
| connection.off('pong', heartbeat); |
| connection.off('message', onMessage); |
| this.remove(pushRef); |
| }); |
|
|
| connection.on('message', onMessage); |
| } |
|
|
| protected close(connection: WebSocket): void { |
| connection.close(); |
| } |
|
|
| protected sendToOneConnection(connection: WebSocket, data: string): void { |
| connection.send(data); |
| } |
|
|
| protected ping(connection: WebSocket): void { |
| |
| if (!connection.isAlive) { |
| return connection.terminate(); |
| } |
| connection.isAlive = false; |
| connection.ping(); |
| } |
|
|
| private async isClientHeartbeat(msg: unknown) { |
| const result = await heartbeatMessageSchema.safeParseAsync(msg); |
|
|
| return result.success; |
| } |
| } |
|
|