Skip to main content

Goal

Push real-time updates (chat messages, notifications, live dashboards) to connected clients over WebSocket.

Steps

1. Enable the Stream module

iii-config.yaml
modules:
  - class: modules::stream::StreamModule
    config:
      port: ${STREAM_PORT:3112}
      host: localhost
      adapter:
        class: modules::stream::adapters::KvStore
        config:
          store_method: file_based  # Options: in_memory, file_based
          file_path: ./data/stream_store  # required for file_based
        # class: modules::stream::adapters::RedisAdapter
        # config:
        #   redis_url: redis://localhost:6379

2. Write to a stream

stream-writer.ts
import { registerWorker, Logger } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction({ id: 'chat::send' }, async (input) => {
  const logger = new Logger()
  const messageId = crypto.randomUUID()

  await iii.trigger({
    function_id: 'stream::set',
    payload: {
      stream_name: 'chat',
      group_id: input.roomId,
      item_id: messageId,
      data: { text: input.text, author: input.author },
    },
  })

  logger.info('Message sent to stream', { messageId })
  return { messageId }
})

// Then call from another function or worker
const { messageId } = await iii.trigger({
  function_id: 'chat::send',
  payload: { roomId: 'room-123', text: 'Hello world', author: 'alice' },
})
logger.info('Sent message', { messageId })

3. Read from a stream

stream-reader.ts
iii.registerFunction({ id: 'chat::list' }, async (input) => {
  const logger = new Logger()

  const messages = await iii.trigger({
    function_id: 'stream::list',
    payload: { stream_name: 'chat', group_id: input.roomId },
  })

  logger.info('Messages retrieved', { count: messages.length })
  return messages
})

// Then call from another function or worker
const messages = await iii.trigger({
  function_id: 'chat::list',
  payload: { roomId: 'room-123' },
})
logger.info('Messages', { messages })

4. Connect a client

Clients connect to the stream WebSocket endpoint to receive live updates:
client.js
const ws = new WebSocket('ws://localhost:3112/stream/chat/room-123')

ws.onmessage = (event) => {
  const update = JSON.parse(event.data)
  console.log('New message:', update)
}

5. React to stream events

Register triggers to run server-side logic when clients join or when stream data changes.

On client join

stream-triggers.ts
import { StreamJoinLeaveEvent } from 'iii-sdk/stream'

const onJoin = iii.registerFunction({ id: 'chat::onJoin' }, async (input: StreamJoinLeaveEvent) => {
  const logger = new Logger()
  logger.info('Client joined chat room', {
    stream: input.stream_name,
    group: input.group_id,
    context: input.context,
  })
  return {}
})

iii.registerTrigger({
  type: 'stream:join',
  function_id: onJoin.id,
  config: {},
})

On message created

stream-triggers.ts
import { StreamChangeEvent } from 'iii-sdk/stream'

const onMessage = iii.registerFunction({ id: 'chat::onMessage' }, async (input: StreamChangeEvent) => {
  const logger = new Logger()

  if (input.event.type === 'create') {
    logger.info('New message in room', { room: input.groupId, messageId: input.id, data: input.event.data })
  }

  return {}
})

iii.registerTrigger({
  type: 'stream',
  function_id: onMessage.id,
  config: { stream_name: 'chat' },
})

Result

Any data written to the stream via stream::set is immediately pushed to all connected WebSocket clients subscribed to that stream and group. Server-side triggers let you react to joins, leaves, and data changes without polling.
For all stream operations, trigger payload shapes, and authentication, see the Stream module reference.