WebSocket Trigger

WebSocket triggers allow your workflow to maintain persistent connections with clients, enabling real-time, two-way data exchange. This is particularly useful for scenarios requiring instant updates and interactions, such as chat applications, live collaboration tools, or real-time gaming.

It differs from SSE trigger that it duplex, meaning it can send and receive data wheres SSE is one-way from the server to the client.

Here's a simple example of a WebSocket trigger workflow:

workflow('StreamSSE', {
  tag: 'realtime',
  trigger: trigger.sse({
    path: '/',
  }),
  execute: async ({ trigger }) => {
    const stream = new PassThrough();
    setInterval(() => {
      stream.push(`data: ${trigger.query.channel}\n\n`);
    }, 1000);
    return stream;
  },
});

This workflow will be triggered by an SSE request to the path /{featureName}/realtime and using http method post.

How it works

The WebSocket trigger utilizes the project's routing extension to establish WebSocket connections. The endpoint for a WebSocket trigger workflow follows this pattern:

How to use it?

Echo

workflow('EchoWebsocket', {
  tag: 'realtime',
  trigger: trigger.websocket({
    topic: 'chat',
  }),
  execute: async ({ trigger }) => {
    return trigger.channel;
  },
});

Broadcast

workflow('BroadcastWebsocket', {
  tag: 'realtime',
  trigger: trigger.websocket({
    topic: 'chat',
  }),
  execute: async ({ trigger }) => {
    return trigger.channel;
  },
});

One to One

workflow('OneToOneWebsocket', {
  tag: 'realtime',
  trigger: trigger.websocket({
    topic: 'chat',
  }),
  execute: async ({ trigger }) => {
    return trigger.channel;
  },
});

Chat

import { on } from 'node:stream';
import { merge } from 'rxjs';

workflow('StreamChatWebsocket', {
  tag: 'realtime',
  trigger: trigger.websocket({
    topic: 'chat',
  }),
  execute: async ({ trigger }) => {
    return merge(
      on(process, 'uncaughtException'),
      on(process, 'unhandledRejection')
    );
  },
});
import { on } from 'node:stream';
import { merge } from 'rxjs';
import { tap } from 'rxjs/operators';

workflow('StreamErrorsLogging', {
  tag: 'realtime',
  trigger: trigger.sse({
    path: '/errors',
  }),
  execute: async ({ trigger }) => {
    return merge(
      on(process, 'uncaughtException').pipe(
        tap(error => console.error('Uncaught Exception:', error))
      ),
      on(process, 'unhandledRejection').pipe(
        tap(error => console.error('Unhandled Rejection:', error))
      )
    );
  },
});