Communication Subsystem
The platform communication subsystem is a three-layer pipeline that delivers async events from apps to connected clients. This is the same delivery path used by the default chat UI, app widgets, and custom app main views. It is transport-agnostic at the event level — the same app code can emit progress, deltas, tool lifecycle steps, search result events, and completion events without caring whether the connected client consumes them over SSE or Socket.IO.
App REST operations and app-served MCP endpoints are sibling proc surfaces that sit next to this pipeline, not inside the event envelope itself. They can still emit communicator events during a request, but the underlying HTTP request/response or MCP exchange remains a separate surface.
The same communicator can record selected post-firewall envelopes into scoped buffers and hand batches to event sinks. Recording is separate from client delivery: an event can be client-visible, recorded for a sink, both, or neither depending on firewall and selector policy.
The same infrastructure now supports an event bus and data bus: event envelopes represent time-ordered happenings, while data-bus payloads carry typed subsystem state such as named-service search results, context pins, scene commands, and widget coordination data. See Platform Architecture for the bus diagram and Object Ecosystem & Ontologic Contracts for the provider/surface contract.
Filter / Firewall
Each event can be intercepted and filtered before delivery. Apps can define event filters to transform, block, or enrich events. The app event firewall enforces per-tenant policies on what event-bus messages flow to clients and recorders.
Broadcast / P2P Channels
The relay supports session-scoped pub/sub channels. Apps can broadcast to all subscribers of a room, or send point-to-point to a specific target_sid. The same mechanism powers streaming from proc to multiple ingress instances.
Scoped Recorder / Event Sinks
Selected post-firewall envelopes can be recorded into bounded, scoped buffers. Apps open JSON-serializable scopes with comm.record(...) or async with comm.recording(...), then send batches through send_recorded_events(...). Sinks are batch callbacks, not per-event waits on the hot path.
ChatCommunicator — App Producer API
Every app entrypoint receives a ChatCommunicator instance. It wraps Redis Pub/Sub with a typed API for all outbound async events. The same instance works whether the app runs in-process or inside an isolated execution runtime (Docker / Fargate sandbox) — the iso executor routes events to the supervisor over a Unix socket, which then publishes to Redis. App-defined chat UIs, widgets, and custom frontends see the same stream surface in both cases.
External Events and Named-Service Events
External events are how user-interface widgets, integrations, and services enter the conversation/event lane without pretending to be plain chat text. They can carry attachments, object refs, source metadata, and routing information. Named-service providers can then define how their objects are materialized and projected into ReAct blocks through block.produce and block.render policies.
When a ReAct tool call, tool result, or protocol rejection is emitted to the UI, it travels as a lifecycle event on the same communication substrate. This makes the Steps view a truthful trace of proposed calls, executed calls, results, and errors.
# Standard streaming lifecycle
await communicator.start() # chat_start event
await communicator.step("Searching…") # chat_step — visible progress
await communicator.delta("answer", chunk) # chat_delta — streaming text
await communicator.event(
event_type="chat.compaction",
data={"status": "completed"},
) # chat_compaction — context compaction status
await communicator.complete() # chat_complete
# Delta markers: answer · thinking · canvas · timeline_text · subsystem
await communicator.delta("thinking", reasoning_chunk)
await communicator.delta("canvas", json_payload)
# Custom service event — broadcast to all session subscribers
await communicator.event(
event_type="chat.service",
data={"key": "status.update", "value": "processing"},
)
# P2P — deliver only to a specific connected client (target_sid)
await communicator.event(
event_type="chat.service",
data={"key": "private", "value": result},
target_sid=sid, # omit for broadcast
)
# Error
await communicator.error("Something went wrong")
Scoped Recording and Event Sinks
Recording reuses the same event vocabulary as the outbound firewall but at a different boundary. The firewall decides whether an event reaches clients; recording decides whether an already-allowed envelope is copied into an in-memory buffer for later sink delivery.
async with communicator.recording(
selector,
scope={"owner": "workflow"},
sink=event_sink,
send_on_exit=True,
):
await run_work()
Multiple scopes are additive. If a workflow scope and a tool scope match the same envelope, the recorded item carries both scopes. Platform child tool runtimes receive portable scopes through COMM_SPEC; child-added records return through comm_recorded_events.json and are sent by the host after merge.
Outbound Event Firewall (per app)
Apps can attach an IEventFilter to the workflow to suppress or gate event-bus messages before they reach Redis, clients, or recorders. The filter sees the caller's user_type, user_id, and full event metadata. Filters are fail-open: an exception allows the event through.
from kdcube_ai_app.apps.chat.sdk.comm import IEventFilter
class MyAppEventFilter(IEventFilter):
def allow_event(self, user_type, user_id, event) -> bool:
# hide internal step events from non-privileged users
if user_type != "privileged":
if event.get("type") == "chat.step" and event.get("broadcast"):
return False
return True
# Wire at entrypoint — passed to entrypoint factory via event_filter param
workflow = MyWorkflow(..., event_filter=MyAppEventFilter())
ChatCommunicator is available inside platform isolated execution. Client-visible events travel from child runtime to relay, and recorded events use side-file handoff: child writes comm_recorded_events.json, host merges, host sends through the configured sink. Sink callbacks are not serialized into the child unless the child configures its own sink.See comm-system.md and README-comm.md for full detail.
Streaming vs Request-Scoped Surfaces
The communication pipeline carries asynchronous app events. It does not mean every app interface is itself a chat stream. KDCube currently exposes several adjacent surfaces, each with a different contract.
| Surface | Route family | What it carries | How it relates to the communication pipeline |
|---|---|---|---|
| SSE | /sse/stream + /sse/chat |
Realtime chat events and turn lifecycle | Native transport for the chat event envelope. |
| Socket.IO | Socket namespace / chat_message |
Same event model as SSE with a bidirectional socket transport | Native transport for the same chat event envelope. |
| App REST interfaces | /api/integrations/bundles/.../widgets, /operations, /public, /static |
Widget HTML, main-view assets, request/response APIs, webhook-style endpoints | Not themselves chat streams, but app code can emit communicator events while handling the request. |
| App-served MCP | /api/integrations/bundles/.../mcp/{alias} or /public/mcp/{alias} |
MCP HTTP traffic into an app-provided FastMCP or ASGI app | Separate request surface. Proc resolves the route and dispatches the HTTP request, but does not wrap the exchange in chat_start / chat_delta / chat_complete. MCP auth, if any, is owned by the app MCP endpoint. |
| Background jobs | Redis Stream namespace, no public HTTP route | Ready background work routed to an app @on_job handler |
Separate processor-claimed work surface. A producer such as @cron or a widget operation enqueues work, proc claims it fairly, builds app runtime context, and invokes async @on_job. It is not delivered to clients unless the app emits communicator events or writes user-visible results. |
An app operation or MCP request can still produce live UI updates when the app code explicitly emits communicator events. In that case the streaming delivery reuses the same SSE or Socket.IO session identified by auth state and optional KDC-Stream-ID.
@mcp(...) returns a FastMCP application or an MCP-ready ASGI app. Proc resolves the alias and forwards the original headers and body into that subapp, but does not authenticate the MCP call itself. The app MCP endpoint authenticates it if the endpoint is not intentionally public. The MCP HTTP exchange is therefore request-scoped, even if the app also emits separate chat events during the same operation.App-served MCP transport contract
For MCP, there are three separate responsibilities:
- Proc: resolve the app route, forward the original HTTP request, and dispatch into the returned FastMCP or ASGI subapp
- App: decide whether the request is public or authenticated, read whatever headers/cookies it wants, and verify them itself
- Client: call the explicit MCP route and send the auth material that the app contract requires
That means the MCP request is not using the normal browser/user-flow API auth contract. AUTH.ID_TOKEN_HEADER_NAME is not the MCP auth header. The app may choose a completely different header, bearer token, cookie, HMAC signature, or custom JWT scheme.
Typical authenticated MCP flow:
- app props define a non-secret contract such as the header name
- app secrets store the verification material such as a shared token
- the MCP client calls
/api/integrations/bundles/{tenant}/{project}/{bundle_id}/mcp/{alias} - proc forwards headers and body unchanged into the app MCP endpoint
- the app verifies the request and either raises
401or returns the FastMCP app
Typical public MCP flow:
- the client calls
/api/integrations/bundles/{tenant}/{project}/{bundle_id}/public/mcp/{alias} - proc forwards the request into the app MCP endpoint
- the app either accepts it as public or still applies its own auth logic if that is the chosen contract
route="operations" vs route="public" selects the URL family. It does not select the auth mechanism. Proc does not verify MCP auth on either route family.App-authenticated public API hooks
Public app APIs now have two different auth ownership shapes:
- Proc-owned:
public_auth="none"orpublic_auth={"mode":"header_secret", ...} - App-owned:
public_auth="bundle"
For public_auth="bundle", there are three separate responsibilities:
- Proc: resolve the route, parse the request, and invoke the app method
- App: accept
request: Request, read the inbound headers/body, and decide whether the hook is valid - Client / webhook provider: call the explicit public operations route and send the auth material the app contract requires
Typical app-authenticated public hook flow:
- app props define a non-secret contract such as the header name
- app secrets store the verification material such as a shared token
- the caller sends
POST /api/integrations/bundles/{tenant}/{project}/{bundle_id}/public/{alias} - proc forwards the request into the app method
- the app verifies the request and either raises
401/403or returns normally
@api(..., route="public", public_auth="bundle"), proc no longer acts as the token verifier. The app defines the header/token contract and enforces it inside the method itself.