Communication Subsystem
The platform communication subsystem is a three-layer pipeline that delivers async events from bundles to connected clients. This is the same delivery path used by the default chat UI, bundle widgets, and custom bundle main views. It is transport-agnostic at the event level — the same bundle code can emit progress, deltas, and completion events without caring whether the connected client consumes them over SSE or Socket.IO.
Bundle REST operations and bundle-served MCP endpoints are sibling proc surfaces that sit next to this pipeline, not inside the event envelope itself. They can still emit communicator events during a request, but the underlying HTTP request/response or MCP exchange remains a separate surface.
Filter / Firewall
Each event can be intercepted and filtered before delivery. Bundles can define event filters to transform, block, or enrich events. The bundle firewall enforces per-tenant policies on what events flow to clients.
Broadcast / P2P Channels
The relay supports session-scoped pub/sub channels. Bundles can broadcast to all subscribers of a room, or send point-to-point to a specific target_sid. The same mechanism powers streaming from proc to multiple ingress instances.
Recorder
Events flowing through the pipeline can be recorded — useful for audit trails, replay, and observability. The recorder captures the envelope payload including session_id, event, and timestamp.
ChatCommunicator — Bundle Producer API
Every bundle workflow receives a ChatCommunicator instance. It wraps Redis Pub/Sub with a typed API for all outbound async events. The same instance works whether the bundle runs in-process or inside an isolated execution runtime (Docker / Fargate sandbox) — the iso executor routes events to the supervisor over a Unix socket, which then publishes to Redis. Bundle-defined chat UIs, widgets, and custom frontends see the same stream surface in both cases.
# Standard streaming lifecycle
await communicator.start() # chat_start event
await communicator.step("Searching…") # chat_step — visible progress
await communicator.delta("answer", chunk) # chat_delta — streaming text
await communicator.event(
event_type="chat.compaction",
data={"status": "completed"},
) # chat_compaction — context compaction status
await communicator.complete() # chat_complete
# Delta markers: answer · thinking · canvas · timeline_text · subsystem
await communicator.delta("thinking", reasoning_chunk)
await communicator.delta("canvas", json_payload)
# Custom service event — broadcast to all session subscribers
await communicator.event(
event_type="chat.service",
data={"key": "status.update", "value": "processing"},
)
# P2P — deliver only to a specific connected client (target_sid)
await communicator.event(
event_type="chat.service",
data={"key": "private", "value": result},
target_sid=sid, # omit for broadcast
)
# Error
await communicator.error("Something went wrong")
Outbound Firewall (per bundle)
Bundles can attach an IEventFilter to the workflow to suppress or gate events before they reach Redis. The filter sees the caller's user_type, user_id, and full event metadata. Filters are fail-open: an exception allows the event through.
from kdcube_ai_app.apps.chat.sdk.comm import IEventFilter
class MyBundleFilter(IEventFilter):
def allow_event(self, user_type, user_id, event) -> bool:
# hide internal step events from non-privileged users
if user_type != "privileged":
if event.get("type") == "chat.step" and event.get("broadcast"):
return False
return True
# Wire at entrypoint — passed to workflow factory via event_filter param
workflow = MyWorkflow(..., event_filter=MyBundleFilter())
ChatCommunicator is fully available inside sandboxed execution (Docker / Fargate). Events published from the sandbox travel: exec process → Unix socket → supervisor → Redis Pub/Sub → SSEHub → client. No code change needed between in-process and iso modes.See comm-system.md and README-comm.md for full detail.
Streaming vs Request-Scoped Surfaces
The communication pipeline carries asynchronous bundle events. It does not mean every bundle interface is itself a chat stream. KDCube currently exposes several adjacent surfaces, each with a different contract.
| Surface | Route family | What it carries | How it relates to the communication pipeline |
|---|---|---|---|
| SSE | /sse/stream + /sse/chat |
Realtime chat events and turn lifecycle | Native transport for the chat event envelope. |
| Socket.IO | Socket namespace / chat_message |
Same event model as SSE with a bidirectional socket transport | Native transport for the same chat event envelope. |
| Bundle REST interfaces | /api/integrations/bundles/.../widgets, /operations, /public, /static |
Widget HTML, main-view assets, request/response APIs, webhook-style endpoints | Not themselves chat streams, but bundle code can emit communicator events while handling the request. |
| Bundle-served MCP | /api/integrations/bundles/.../mcp/{alias} or /public/mcp/{alias} |
MCP HTTP traffic into a bundle-provided FastMCP or ASGI app | Separate request surface. Proc resolves the route and dispatches the HTTP request, but does not wrap the exchange in chat_start / chat_delta / chat_complete. MCP auth, if any, is owned by the bundle MCP app. |
| Background jobs | Redis Stream namespace, no public HTTP route | Ready background work routed to a bundle @on_job handler |
Separate processor-claimed work surface. A producer such as @cron or a widget operation enqueues work, proc claims it fairly, builds bundle runtime context, and invokes async @on_job. It is not delivered to clients unless the bundle emits communicator events or writes user-visible results. |
A bundle operation or MCP request can still produce live UI updates when the bundle code explicitly emits communicator events. In that case the streaming delivery reuses the same SSE or Socket.IO session identified by auth state and optional KDC-Stream-ID.
@mcp(...) returns a FastMCP application or an MCP-ready ASGI app. Proc resolves the alias and forwards the original headers and body into that subapp, but does not authenticate the MCP call itself. The bundle MCP app authenticates it if the endpoint is not intentionally public. The MCP HTTP exchange is therefore request-scoped, even if the bundle also emits separate chat events during the same operation.Bundle-served MCP transport contract
For MCP, there are three separate responsibilities:
- Proc: resolve the bundle route, forward the original HTTP request, and dispatch into the returned FastMCP or ASGI subapp
- Bundle: decide whether the request is public or authenticated, read whatever headers/cookies it wants, and verify them itself
- Client: call the explicit MCP route and send the auth material that the bundle contract requires
That means the MCP request is not using the normal browser/user-flow API auth contract. AUTH.ID_TOKEN_HEADER_NAME is not the MCP auth header. The bundle may choose a completely different header such as X-Versatile-Preferences-MCP-Token, or a bearer token, cookie, HMAC signature, or custom JWT scheme.
Typical authenticated MCP flow:
- bundle props define a non-secret contract such as the header name
- bundle secrets store the verification material such as a shared token
- the MCP client calls
/api/integrations/bundles/{tenant}/{project}/{bundle_id}/mcp/{alias} - proc forwards headers and body unchanged into the bundle MCP app
- the bundle verifies the request and either raises
401or returns the FastMCP app
Typical public MCP flow:
- the client calls
/api/integrations/bundles/{tenant}/{project}/{bundle_id}/public/mcp/{alias} - proc forwards the request into the bundle MCP app
- the bundle either accepts it as public or still applies its own auth logic if that is the chosen contract
route="operations" vs route="public" selects the URL family. It does not select the auth mechanism. Proc does not verify MCP auth on either route family.Bundle-authenticated public API hooks
Public bundle APIs now have two different auth ownership shapes:
- Proc-owned:
public_auth="none"orpublic_auth={"mode":"header_secret", ...} - Bundle-owned:
public_auth="bundle"
For public_auth="bundle", there are three separate responsibilities:
- Proc: resolve the route, parse the request, and invoke the bundle method
- Bundle: accept
request: Request, read the inbound headers/body, and decide whether the hook is valid - Client / webhook provider: call the explicit public operations route and send the auth material the bundle contract requires
Typical bundle-authenticated public hook flow:
- bundle props define a non-secret contract such as the header name
- bundle secrets store the verification material such as a shared token
- the caller sends
POST /api/integrations/bundles/{tenant}/{project}/{bundle_id}/public/{alias} - proc forwards the request into the bundle method
- the bundle verifies the request and either raises
401/403or returns normally
@api(..., route="public", public_auth="bundle"), proc no longer acts as the token verifier. The bundle defines the header/token contract and enforces it inside the method itself.