Claude API: Streaming — Function Reference

Source files: websocket-server.js (800 LOC), sse-manager.js (657 LOC), multiplexer.js (680 LOC), protocol.js, index.js


Exported Class — WebSocketStreamServer (websocket-server.js)

extends EventEmitter

constructor(config)

Parameters:

  • port (number): Listen port (default: 3457)
  • host (string): Bind address (default: “127.0.0.1”)
  • maxPayload (number): Max message payload in bytes (default: 50MB)
  • heartbeatInterval (number): Ping interval ms (default: 30000)
  • heartbeatTimeout (number): Client timeout ms (default: 60000)
  • maxClients (number): Max concurrent connections (default: 100)
  • maxRoomsPerClient (number): Room membership limit per client (default: 10)
  • maxClientsPerRoom (number): Room capacity (default: 50)
  • requireAuth (boolean): Enable authentication (default: false)
  • compression (boolean): Enable per-message deflate (default: true)

start(): Promise<{httpServer, wss}>

Purpose: Create HTTP server and attach WebSocket server. Starts heartbeat timer. Returns: { httpServer, wss } server instances.


stop(): Promise<void>

Purpose: Close all client connections, stop heartbeat, close WS and HTTP servers.


broadcast(message, options): SendResult

Purpose: Send message to all connected clients with optional room/auth filtering. Parameters:

  • message (object): Message to broadcast
  • roomId (string null): Limit to room members
  • excludeClientId (string null): Exclude specific client
  • requireAuth (boolean): Only authenticated clients

Returns: { sent, failed, skipped }


sendToClient(clientId, message): boolean

Purpose: Send message to a specific client by ID.


sendToRoom(roomId, message, options): SendResult

Purpose: Broadcast to all clients in a room.


joinRoom(clientId, roomId): JoinResult

Purpose: Add client to a named room. Enforces maxRoomsPerClient and maxClientsPerRoom limits.


leaveRoom(clientId, roomId): LeaveResult

Purpose: Remove client from room.


createRoom(roomId, options): RoomObject

Purpose: Explicitly create a named room with metadata. Parameters:

  • roomId (string): Room identifier
  • options.maxClients (number): Room capacity override
  • options.metadata (object): Room metadata

deleteRoom(roomId): boolean

Purpose: Remove room and disconnect all members.


getClientInfo(clientId): ClientInfo|null

Purpose: Get connection info for a client.


getRoomInfo(roomId): RoomInfo|null

Purpose: Get room membership and metadata.


getMetrics(): ServerMetrics

Purpose: Return server statistics. Returns: { totalConnections, totalDisconnections, messagesReceived, messagesSent, errors, uptime, clientCount, roomCount }


setAuthHandler(handler): void

Purpose: Register authentication callback async (token, request) => { valid: bool, userId? }.


Exported Class — SSEManager (sse-manager.js)

constructor(options)

Parameters:

  • heartbeatInterval (number): ms between keepalive comments (default: 30000)
  • cleanupInterval (number): ms between stale client cleanup (default: 60000)
  • maxClients (number): Max SSE connections (default: 100)

start(): void

Purpose: Initialize SSE manager, start heartbeat and cleanup timers.


stop(): void

Purpose: Close all client connections and stop timers.


createEndpoint(path): RequestHandler

Purpose: Returns Express/Node HTTP handler for SSE connections. Sets Content-Type: text/event-stream headers. Creates SSEClient per request. Handles Last-Event-ID header for reconnection replay. Returns: (req, res) => SSEClient


broadcast(event, data, options): BroadcastResult

Purpose: Send event to all connected SSE clients. Parameters:

  • event (string): Event type name
  • data (any): Event payload (will be JSON-serialized)
  • options.streamId (string null): Only clients subscribed to this stream
  • options.excludeClientId (string null): Exclude specific client

Returns: { sent, failed }


sendToClient(clientId, event, data, options): boolean

Purpose: Send event to specific SSE client.


createStream(streamId, options): StreamObject

Purpose: Create a named logical stream. Clients subscribe/unsubscribe to streams.


pushToStream(streamId, event, data): PushResult

Purpose: Push event to all clients subscribed to a stream.


closeStream(streamId): void

Purpose: Send close event to all stream subscribers and remove stream.


getClientInfo(clientId): ClientInfo|null


getMetrics(): SSEMetrics


SSEClient class (internal, created by manager)

Per-connection SSE client:

  • send(event, data, options): Write SSE-formatted event to response stream. Buffers for replay (maxBufferSize: 1000).
  • sendComment(text): Write SSE comment for heartbeat
  • sendMessage(type, payload, options): Send typed protocol message
  • sendError(error, code): Send error event
  • subscribeToStream(streamId): Subscribe client to stream
  • unsubscribeFromStream(streamId): Unsubscribe
  • getReplayBuffer(sinceId): Return buffered events since ID for reconnection
  • close(): End HTTP response

Exported Class — StreamMultiplexer (multiplexer.js)

constructor(options)

Parameters:

  • maxStreams (number): Max concurrent streams (default: 100)
  • schedulerInterval (number): ms between scheduler ticks (default: 100)
  • defaultPriority (StreamPriority): Default stream priority

addStream(streamId, options): BufferedStream

Purpose: Register a new stream with the multiplexer. Parameters:

  • priority (StreamPriority): CRITICAL HIGH NORMAL LOW BACKGROUND
  • maxBufferSize (number): Per-stream buffer limit
  • highWaterMark (number): Buffer % that triggers backpressure
  • lowWaterMark (number): Buffer % that releases backpressure

Returns: BufferedStream object.


removeStream(streamId): boolean

Purpose: Remove stream and clean up buffer.


pushToStream(streamId, chunk): PushResult

Purpose: Add chunk to stream buffer. Returns backpressure signal if buffer is near full. Returns: { accepted, backpressure?, bufferSize?, reason? }


subscribe(handler): UnsubscribeFunction

Purpose: Register callback for multiplexed output chunks. Parameters:

  • handler (Function): (streamId, chunk) => void

Returns: Unsubscribe function.


pause(streamId): void

Purpose: Pause stream (drops incoming chunks while paused).


resume(streamId): void

Purpose: Resume paused stream.


getStats(): MultiplexerStats

Purpose: Return per-stream and aggregate statistics.


BufferedStream class (internal)

Per-stream buffer with backpressure:

  • addChunk(chunk): Buffer chunk; returns { accepted, backpressure?, reason? }
  • getNextChunk(): Dequeue next chunk; auto-resumes if below lowWaterMark

protocol.js — Exported Constants and Utilities

StreamEventType

CONNECTED, DISCONNECTED, STREAM_START, STREAM_DELTA, STREAM_END, STREAM_ERROR, HEARTBEAT, SUBSCRIBE, UNSUBSCRIBE, ROOM_JOIN, ROOM_LEAVE

StreamState

PENDING, ACTIVE, PAUSED, COMPLETED, ERROR, CANCELLED

StreamPriority

CRITICAL (1), HIGH (2), NORMAL (3), LOW (4), BACKGROUND (5)

Message factory functions

  • createMessage(type, payload, options): Build protocol message with timestamp + ID
  • createDeltaMessage(streamId, delta, sequenceNum): Stream chunk message
  • createErrorMessage(error, code, options): Error message
  • createStatusMessage(streamId, status, options): Status change message
  • createPingMessage() / createPongMessage(): Heartbeat pair
  • createRoomJoinMessage(roomId) / createRoomLeaveMessage(roomId)
  • createBackpressureMessage(streamId, bufferLevel): Backpressure signal

Serialization

  • serializeMessage(message): JSON.stringify
  • deserializeMessage(data): JSON.parse with error handling
  • validateMessage(message): Check required fields

Key Data Structures

Structure Fields Purpose
ClientInfo id, connectedAt, rooms, authenticated, metadata WS/SSE client
RoomObject id, clients, maxClients, metadata, createdAt WS room
StreamObject id, subscribers, metadata, createdAt SSE logical stream
BufferedStream id, priority, buffer, state, stats, paused Mux stream buffer

External Dependencies

  • wsWebSocketServer class
  • http (Node.js): createServer
  • crypto (Node.js): randomUUID
  • No Anthropic SDK calls — transport infrastructure only

Notes for Rewrite

  • WebSocketStreamServer uses ws library directly. In Colibri, replace with MCP-compatible transport if needed.
  • SSE endpoint handler returns the SSEClient object — caller is responsible for storing reference.
  • Backpressure in StreamMultiplexer is advisory (per-stream), not TCP-level.
  • getReplayBuffer(sinceId) enables reconnection without message loss — requires client to send Last-Event-ID header.
  • All stream servers log to console.error (MCP-safe, avoids stdout pollution).

Back to top

Colibri — documentation-first MCP runtime. Apache 2.0 + Commons Clause.

This site uses Just the Docs, a documentation theme for Jekyll.