Claude API: Streaming — Function Reference
Source files: websocket-server.js (800 LOC), sse-manager.js (657 LOC), multiplexer.js (680 LOC), protocol.js, index.js
Exported Class — WebSocketStreamServer (websocket-server.js)
extends EventEmitter
constructor(config)
Parameters:
port(number): Listen port (default: 3457)host(string): Bind address (default: “127.0.0.1”)maxPayload(number): Max message payload in bytes (default: 50MB)heartbeatInterval(number): Ping interval ms (default: 30000)heartbeatTimeout(number): Client timeout ms (default: 60000)maxClients(number): Max concurrent connections (default: 100)maxRoomsPerClient(number): Room membership limit per client (default: 10)maxClientsPerRoom(number): Room capacity (default: 50)requireAuth(boolean): Enable authentication (default: false)compression(boolean): Enable per-message deflate (default: true)
start(): Promise<{httpServer, wss}>
Purpose: Create HTTP server and attach WebSocket server. Starts heartbeat timer.
Returns: { httpServer, wss } server instances.
stop(): Promise<void>
Purpose: Close all client connections, stop heartbeat, close WS and HTTP servers.
broadcast(message, options): SendResult
Purpose: Send message to all connected clients with optional room/auth filtering. Parameters:
message(object): Message to broadcast-
roomId(stringnull): Limit to room members -
excludeClientId(stringnull): Exclude specific client requireAuth(boolean): Only authenticated clients
Returns: { sent, failed, skipped }
sendToClient(clientId, message): boolean
Purpose: Send message to a specific client by ID.
sendToRoom(roomId, message, options): SendResult
Purpose: Broadcast to all clients in a room.
joinRoom(clientId, roomId): JoinResult
Purpose: Add client to a named room. Enforces maxRoomsPerClient and maxClientsPerRoom limits.
leaveRoom(clientId, roomId): LeaveResult
Purpose: Remove client from room.
createRoom(roomId, options): RoomObject
Purpose: Explicitly create a named room with metadata. Parameters:
roomId(string): Room identifieroptions.maxClients(number): Room capacity overrideoptions.metadata(object): Room metadata
deleteRoom(roomId): boolean
Purpose: Remove room and disconnect all members.
getClientInfo(clientId): ClientInfo|null
Purpose: Get connection info for a client.
getRoomInfo(roomId): RoomInfo|null
Purpose: Get room membership and metadata.
getMetrics(): ServerMetrics
Purpose: Return server statistics.
Returns: { totalConnections, totalDisconnections, messagesReceived, messagesSent, errors, uptime, clientCount, roomCount }
setAuthHandler(handler): void
Purpose: Register authentication callback async (token, request) => { valid: bool, userId? }.
Exported Class — SSEManager (sse-manager.js)
constructor(options)
Parameters:
heartbeatInterval(number): ms between keepalive comments (default: 30000)cleanupInterval(number): ms between stale client cleanup (default: 60000)maxClients(number): Max SSE connections (default: 100)
start(): void
Purpose: Initialize SSE manager, start heartbeat and cleanup timers.
stop(): void
Purpose: Close all client connections and stop timers.
createEndpoint(path): RequestHandler
Purpose: Returns Express/Node HTTP handler for SSE connections. Sets Content-Type: text/event-stream headers. Creates SSEClient per request. Handles Last-Event-ID header for reconnection replay.
Returns: (req, res) => SSEClient
broadcast(event, data, options): BroadcastResult
Purpose: Send event to all connected SSE clients. Parameters:
event(string): Event type namedata(any): Event payload (will be JSON-serialized)-
options.streamId(stringnull): Only clients subscribed to this stream -
options.excludeClientId(stringnull): Exclude specific client
Returns: { sent, failed }
sendToClient(clientId, event, data, options): boolean
Purpose: Send event to specific SSE client.
createStream(streamId, options): StreamObject
Purpose: Create a named logical stream. Clients subscribe/unsubscribe to streams.
pushToStream(streamId, event, data): PushResult
Purpose: Push event to all clients subscribed to a stream.
closeStream(streamId): void
Purpose: Send close event to all stream subscribers and remove stream.
getClientInfo(clientId): ClientInfo|null
getMetrics(): SSEMetrics
SSEClient class (internal, created by manager)
Per-connection SSE client:
send(event, data, options): Write SSE-formatted event to response stream. Buffers for replay (maxBufferSize: 1000).sendComment(text): Write SSE comment for heartbeatsendMessage(type, payload, options): Send typed protocol messagesendError(error, code): Send error eventsubscribeToStream(streamId): Subscribe client to streamunsubscribeFromStream(streamId): UnsubscribegetReplayBuffer(sinceId): Return buffered events since ID for reconnectionclose(): End HTTP response
Exported Class — StreamMultiplexer (multiplexer.js)
constructor(options)
Parameters:
maxStreams(number): Max concurrent streams (default: 100)schedulerInterval(number): ms between scheduler ticks (default: 100)defaultPriority(StreamPriority): Default stream priority
addStream(streamId, options): BufferedStream
Purpose: Register a new stream with the multiplexer. Parameters:
-
priority(StreamPriority): CRITICALHIGH NORMAL LOW BACKGROUND maxBufferSize(number): Per-stream buffer limithighWaterMark(number): Buffer % that triggers backpressurelowWaterMark(number): Buffer % that releases backpressure
Returns: BufferedStream object.
removeStream(streamId): boolean
Purpose: Remove stream and clean up buffer.
pushToStream(streamId, chunk): PushResult
Purpose: Add chunk to stream buffer. Returns backpressure signal if buffer is near full.
Returns: { accepted, backpressure?, bufferSize?, reason? }
subscribe(handler): UnsubscribeFunction
Purpose: Register callback for multiplexed output chunks. Parameters:
handler(Function):(streamId, chunk) => void
Returns: Unsubscribe function.
pause(streamId): void
Purpose: Pause stream (drops incoming chunks while paused).
resume(streamId): void
Purpose: Resume paused stream.
getStats(): MultiplexerStats
Purpose: Return per-stream and aggregate statistics.
BufferedStream class (internal)
Per-stream buffer with backpressure:
addChunk(chunk): Buffer chunk; returns{ accepted, backpressure?, reason? }getNextChunk(): Dequeue next chunk; auto-resumes if below lowWaterMark
protocol.js — Exported Constants and Utilities
StreamEventType
CONNECTED, DISCONNECTED, STREAM_START, STREAM_DELTA, STREAM_END, STREAM_ERROR, HEARTBEAT, SUBSCRIBE, UNSUBSCRIBE, ROOM_JOIN, ROOM_LEAVE
StreamState
PENDING, ACTIVE, PAUSED, COMPLETED, ERROR, CANCELLED
StreamPriority
CRITICAL (1), HIGH (2), NORMAL (3), LOW (4), BACKGROUND (5)
Message factory functions
createMessage(type, payload, options): Build protocol message with timestamp + IDcreateDeltaMessage(streamId, delta, sequenceNum): Stream chunk messagecreateErrorMessage(error, code, options): Error messagecreateStatusMessage(streamId, status, options): Status change messagecreatePingMessage()/createPongMessage(): Heartbeat paircreateRoomJoinMessage(roomId)/createRoomLeaveMessage(roomId)createBackpressureMessage(streamId, bufferLevel): Backpressure signal
Serialization
serializeMessage(message): JSON.stringifydeserializeMessage(data): JSON.parse with error handlingvalidateMessage(message): Check required fields
Key Data Structures
| Structure | Fields | Purpose |
|---|---|---|
| ClientInfo | id, connectedAt, rooms, authenticated, metadata | WS/SSE client |
| RoomObject | id, clients, maxClients, metadata, createdAt | WS room |
| StreamObject | id, subscribers, metadata, createdAt | SSE logical stream |
| BufferedStream | id, priority, buffer, state, stats, paused | Mux stream buffer |
External Dependencies
ws—WebSocketServerclasshttp(Node.js):createServercrypto(Node.js):randomUUID- No Anthropic SDK calls — transport infrastructure only
Notes for Rewrite
WebSocketStreamServeruseswslibrary directly. In Colibri, replace with MCP-compatible transport if needed.- SSE endpoint handler returns the
SSEClientobject — caller is responsible for storing reference. - Backpressure in
StreamMultiplexeris advisory (per-stream), not TCP-level. getReplayBuffer(sinceId)enables reconnection without message loss — requires client to sendLast-Event-IDheader.- All stream servers log to
console.error(MCP-safe, avoids stdout pollution).