Communication Subsystem
The platform communication subsystem is a three-layer pipeline that delivers async events from bundles to connected clients. It is transport-agnostic — the same bundle code streams to SSE, Socket.IO, or REST without changes.
Filter / Firewall
Each event can be intercepted and filtered before delivery. Bundles can define event filters to transform, block, or enrich events. The bundle firewall enforces per-tenant policies on what events flow to clients.
Broadcast / P2P Channels
The relay supports session-scoped pub/sub channels. Bundles can broadcast to all subscribers of a room, or send point-to-point to a specific target_sid. The same mechanism powers streaming from proc to multiple ingress instances.
Recorder
Events flowing through the pipeline can be recorded — useful for audit trails, replay, and observability. The recorder captures the envelope payload including session_id, event, and timestamp.
ChatCommunicator — Bundle Producer API
Every bundle workflow receives a ChatCommunicator instance. It wraps Redis Pub/Sub with a typed API for all outbound async events. The same instance works whether the bundle runs in-process or inside an isolated execution runtime (Docker / Fargate sandbox) — the iso executor routes events to the supervisor over a Unix socket, which then publishes to Redis. The client SSE stream is seamless in both cases.
# Standard streaming lifecycle
await communicator.start() # chat_start event
await communicator.step("Searching…") # chat_step — visible progress
await communicator.delta("answer", chunk) # chat_delta — streaming text
await communicator.complete() # chat_complete
# Delta markers: answer · thinking · canvas · timeline_text · subsystem
await communicator.delta("thinking", reasoning_chunk)
await communicator.delta("canvas", json_payload)
# Custom service event — broadcast to all session subscribers
await communicator.event(
event_type="chat.service",
data={"key": "status.update", "value": "processing"},
)
# P2P — deliver only to a specific connected client (target_sid)
await communicator.event(
event_type="chat.service",
data={"key": "private", "value": result},
target_sid=sid, # omit for broadcast
)
# Error
await communicator.error("Something went wrong")
Outbound Firewall (per bundle)
Bundles can attach an IEventFilter to the workflow to suppress or gate events before they reach Redis. The filter sees the caller's user_type, user_id, and full event metadata. Filters are fail-open: an exception allows the event through.
from kdcube_ai_app.apps.chat.sdk.comm import IEventFilter
class MyBundleFilter(IEventFilter):
def allow_event(self, user_type, user_id, event) -> bool:
# hide internal step events from non-privileged users
if user_type != "privileged":
if event.get("type") == "chat.step" and event.get("broadcast"):
return False
return True
# Wire at entrypoint — passed to workflow factory via event_filter param
workflow = MyWorkflow(..., event_filter=MyBundleFilter())
ChatCommunicator is fully available inside sandboxed execution (Docker / Fargate). Events published from the sandbox travel: exec process → Unix socket → supervisor → Redis Pub/Sub → SSEHub → client. No code change needed between in-process and iso modes.See comm-system.md and README-comm.md for full detail.