Skip to main content
Connect to Kadoa’s WebSocket server for instant event notifications. Kadoa broadcasts events to your connected clients.
WebSockets are Kadoa’s lowest-latency notification channel, but they should be treated as a best-effort realtime transport rather than an exactly-once delivery system. For critical workflows, make your event handling idempotent and use event.id as a dedupe key.

Setup

import { KadoaClient } from "@kadoa/node-sdk";

const client = new KadoaClient({ apiKey: "YOUR_API_KEY" });
const realtime = await client.connectRealtime();

realtime.onEvent((event) => {
  console.log("Event:", event.eventType, event.message);
});

realtime.onConnection((connected) => {
  console.log("Connected:", connected);
});

realtime.onError((error) => {
  console.error("Error:", error);
});
workflow_failed message fields: required workflowId, source, reason, action; optional workflowName, sourceUrl, url. workflow_recovered message fields: required workflowId, workflowName, source; optional sourceUrl, url. For API configuration, see the API reference.

Dashboard Setup

  1. Go to Notifications in the sidebar
  2. Click Add ChannelWebSocket
  3. Select the Enable Websocket real-time streaming checkbox.
WebSocket setup

Reconnects and Redeploys

During an infrastructure drain or redeploy, Kadoa sends a control.draining message before closing the socket. This is an expected lifecycle event, not a workflow failure and not a permanent outage. Clients should reconnect automatically when they receive control.draining:
  1. Detect control.draining.
  2. If it includes retryAfterMs, wait roughly that long. Otherwise use a short reconnect delay.
  3. Open a new WebSocket connection.
  4. Re-subscribe to your team channel after the new connection is open.
Kadoa also sends heartbeat messages. If your client stops receiving heartbeats for too long, close the stale socket and reconnect using the same flow. After the drain notice, Kadoa will close the old socket with close code 1001 and reason Server shutting down. Treat that close as the final shutdown of the old connection, not as a workflow error.

Reliability Notes

  • WebSockets are optimized for low-latency notifications, not for guaranteed exactly-once delivery.
  • A reconnecting client can still observe a brief gap during deploys, drains, or network interruptions.
  • Custom clients can reduce that gap by persisting the latest event _cursor and re-subscribing with lastCursor.
  • Even with overlap-aware reconnects, clients should treat event.id as an idempotency key and dedupe repeated deliveries.

Advanced Custom Client Behavior

Kadoa may send an additive control-plane message before closing a socket:
{
  "type": "control.draining",
  "connectionId": "12345-1711111111111-7",
  "retryAfterMs": 500,
  "deadlineAt": "2026-03-22T14:05:30.000Z",
  "resumeSupported": true
}
Business events may include an optional _cursor:
{
  "eventType": "workflow_finished",
  "id": "event-uuid",
  "timestamp": "2025-01-15T10:30:00.000Z",
  "_cursor": "1742-0",
  "message": {}
}
If you run a custom client and want lower-disruption reconnects, store the latest _cursor and re-subscribe with:
{
  "action": "subscribe",
  "channel": "<team-id>",
  "lastCursor": "1742-0"
}
This improves continuity, but it is still not a promise of zero-loss or exactly-once delivery.
Python reconnect example
import json
import threading

import requests
import websocket

API_KEY = "YOUR_API_KEY"
PUBLIC_API_URI = "https://api.kadoa.com"
WSS_API_URI = "wss://realtime.kadoa.com"

RECONNECT_DELAY_SECONDS = 5
CONNECTION_DRAINING_CLOSE_CODE = 1001

team_id = None
last_cursor = None


def fetch_access_token():
    response = requests.post(
        f"{PUBLIC_API_URI}/v4/oauth2/token",
        headers={"Content-Type": "application/json", "x-api-key": API_KEY},
        timeout=10,
    )
    response.raise_for_status()
    data = response.json()
    return data["access_token"], data["team_id"]


def connect():
    access_token, current_team_id = fetch_access_token()

    def on_open(ws):
        global team_id
        team_id = current_team_id
        payload = {"action": "subscribe", "channel": team_id}
        if last_cursor:
            payload["lastCursor"] = last_cursor
        ws.send(json.dumps(payload))

    def on_message(ws, message):
        global last_cursor
        payload = json.loads(message)
        if payload.get("type") == "heartbeat":
            return
        if payload.get("type") == "control.draining":
            retry_after_seconds = payload.get("retryAfterMs", RECONNECT_DELAY_SECONDS * 1000) / 1000
            threading.Timer(retry_after_seconds, connect).start()
            return
        if payload.get("_cursor"):
            last_cursor = payload["_cursor"]
        print("event", payload["eventType"], payload)

    def on_close(ws, close_status_code, close_msg):
        print(f"closed code={close_status_code} reason={close_msg}")
        if close_status_code == CONNECTION_DRAINING_CLOSE_CODE:
            print("old draining socket closed")

    def on_error(ws, error):
        print("websocket error", error)

    socket = websocket.WebSocketApp(
        f"{WSS_API_URI}?access_token={access_token}",
        on_open=on_open,
        on_message=on_message,
        on_close=on_close,
        on_error=on_error,
    )
    socket.run_forever()


connect()

Event Handling

Filter events by type:
realtime.onEvent((event) => {
  switch (event.eventType) {
    case "workflow_finished":
      console.log("Workflow completed:", event.message.id);
      break;
    case "workflow_data_change":
      console.log("Data changed:", event.message.differences);
      break;
    case "workflow_failed":
      console.log("Workflow failed:", event.message.reason);
      break;
    case "workflow_recovered":
      console.log("Workflow recovered:", event.message.workflowId);
      break;
  }
});

Event Format

All events follow this structure:
{
  "eventType": "event_name",
  "id": "event-uuid",
  "timestamp": "2025-01-15T10:30:00.000Z",
  "_cursor": "1742-0",
  "message": { /* event-specific data */ }
}
{
  "eventType": "workflow_data_change",
  "id": "2df91fbd-74c1-4d11-91aa-50030393574b",
  "timestamp": "2025-01-15T10:30:00.000Z",
  "message": {
    "id": "change_123",
    "workflowId": "wf_123",
    "data": [
      { "id": "record-1", "name": "Product A", "price": 29.99 }
    ],
    "differences": [
      {
        "type": "changed",
        "fields": [
          { "key": "price", "value": 29.99, "previousValue": 24.99 }
        ]
      }
    ],
    "url": "https://monitored-page.com",
    "createdAt": "2025-01-09T10:00:00Z"
  }
}