Connect to Kadoa’s WebSocket server for instant event notifications. Kadoa broadcasts events to your connected clients.
WebSockets are Kadoa’s lowest-latency notification channel, but they should be treated as a best-effort realtime transport rather than an exactly-once delivery system. For critical workflows, make your event handling idempotent and use event.id as a dedupe key.
Setup
import { KadoaClient } from "@kadoa/node-sdk" ;
const client = new KadoaClient ({ apiKey: "YOUR_API_KEY" });
const realtime = await client . connectRealtime ();
realtime . onEvent (( event ) => {
console . log ( "Event:" , event . eventType , event . message );
});
realtime . onConnection (( connected ) => {
console . log ( "Connected:" , connected );
});
realtime . onError (( error ) => {
console . error ( "Error:" , error );
});
workflow_failed message fields: required workflowId, source, reason, action; optional workflowName, sourceUrl, url.
workflow_recovered message fields: required workflowId, workflowName, source; optional sourceUrl, url.
For API configuration, see the API reference .
Dashboard Setup
Go to Notifications in the sidebar
Click Add Channel → WebSocket
Select the Enable Websocket real-time streaming checkbox.
Reconnects and Redeploys
During an infrastructure drain or redeploy, Kadoa sends a control.draining message before closing the socket. This is an expected lifecycle event, not a workflow failure and not a permanent outage.
Clients should reconnect automatically when they receive control.draining:
Detect control.draining.
If it includes retryAfterMs, wait roughly that long. Otherwise use a short reconnect delay.
Open a new WebSocket connection.
Re-subscribe to your team channel after the new connection is open.
Kadoa also sends heartbeat messages. If your client stops receiving heartbeats for too long, close the stale socket and reconnect using the same flow.
After the drain notice, Kadoa will close the old socket with close code 1001 and reason Server shutting down. Treat that close as the final shutdown of the old connection, not as a workflow error.
Reliability Notes
WebSockets are optimized for low-latency notifications, not for guaranteed exactly-once delivery.
A reconnecting client can still observe a brief gap during deploys, drains, or network interruptions.
Custom clients can reduce that gap by persisting the latest event _cursor and re-subscribing with lastCursor.
Even with overlap-aware reconnects, clients should treat event.id as an idempotency key and dedupe repeated deliveries.
Advanced Custom Client Behavior
Kadoa may send an additive control-plane message before closing a socket:
{
"type" : "control.draining" ,
"connectionId" : "12345-1711111111111-7" ,
"retryAfterMs" : 500 ,
"deadlineAt" : "2026-03-22T14:05:30.000Z" ,
"resumeSupported" : true
}
Business events may include an optional _cursor:
{
"eventType" : "workflow_finished" ,
"id" : "event-uuid" ,
"timestamp" : "2025-01-15T10:30:00.000Z" ,
"_cursor" : "1742-0" ,
"message" : {}
}
If you run a custom client and want lower-disruption reconnects, store the latest _cursor and re-subscribe with:
{
"action" : "subscribe" ,
"channel" : "<team-id>" ,
"lastCursor" : "1742-0"
}
This improves continuity, but it is still not a promise of zero-loss or exactly-once delivery.
import json
import threading
import requests
import websocket
API_KEY = "YOUR_API_KEY"
PUBLIC_API_URI = "https://api.kadoa.com"
WSS_API_URI = "wss://realtime.kadoa.com"
RECONNECT_DELAY_SECONDS = 5
CONNECTION_DRAINING_CLOSE_CODE = 1001
team_id = None
last_cursor = None
def fetch_access_token ():
response = requests.post(
f " { PUBLIC_API_URI } /v4/oauth2/token" ,
headers = { "Content-Type" : "application/json" , "x-api-key" : API_KEY },
timeout = 10 ,
)
response.raise_for_status()
data = response.json()
return data[ "access_token" ], data[ "team_id" ]
def connect ():
access_token, current_team_id = fetch_access_token()
def on_open ( ws ):
global team_id
team_id = current_team_id
payload = { "action" : "subscribe" , "channel" : team_id}
if last_cursor:
payload[ "lastCursor" ] = last_cursor
ws.send(json.dumps(payload))
def on_message ( ws , message ):
global last_cursor
payload = json.loads(message)
if payload.get( "type" ) == "heartbeat" :
return
if payload.get( "type" ) == "control.draining" :
retry_after_seconds = payload.get( "retryAfterMs" , RECONNECT_DELAY_SECONDS * 1000 ) / 1000
threading.Timer(retry_after_seconds, connect).start()
return
if payload.get( "_cursor" ):
last_cursor = payload[ "_cursor" ]
print ( "event" , payload[ "eventType" ], payload)
def on_close ( ws , close_status_code , close_msg ):
print ( f "closed code= { close_status_code } reason= { close_msg } " )
if close_status_code == CONNECTION_DRAINING_CLOSE_CODE :
print ( "old draining socket closed" )
def on_error ( ws , error ):
print ( "websocket error" , error)
socket = websocket.WebSocketApp(
f " { WSS_API_URI } ?access_token= { access_token } " ,
on_open = on_open,
on_message = on_message,
on_close = on_close,
on_error = on_error,
)
socket.run_forever()
connect()
Event Handling
Filter events by type:
realtime . onEvent (( event ) => {
switch ( event . eventType ) {
case "workflow_finished" :
console . log ( "Workflow completed:" , event . message . id );
break ;
case "workflow_data_change" :
console . log ( "Data changed:" , event . message . differences );
break ;
case "workflow_failed" :
console . log ( "Workflow failed:" , event . message . reason );
break ;
case "workflow_recovered" :
console . log ( "Workflow recovered:" , event . message . workflowId );
break ;
}
});
All events follow this structure:
{
"eventType" : "event_name" ,
"id" : "event-uuid" ,
"timestamp" : "2025-01-15T10:30:00.000Z" ,
"_cursor" : "1742-0" ,
"message" : { /* event-specific data */ }
}
workflow_data_change
workflow_finished
workflow_failed
workflow_recovered
workflow_validation_anomaly_change
{
"eventType" : "workflow_data_change" ,
"id" : "2df91fbd-74c1-4d11-91aa-50030393574b" ,
"timestamp" : "2025-01-15T10:30:00.000Z" ,
"message" : {
"id" : "change_123" ,
"workflowId" : "wf_123" ,
"data" : [
{ "id" : "record-1" , "name" : "Product A" , "price" : 29.99 }
],
"differences" : [
{
"type" : "changed" ,
"fields" : [
{ "key" : "price" , "value" : 29.99 , "previousValue" : 24.99 }
]
}
],
"url" : "https://monitored-page.com" ,
"createdAt" : "2025-01-09T10:00:00Z"
}
}