Goal
Push real-time updates (chat messages, notifications, live dashboards) to connected clients over WebSocket.Steps
1. Enable the Stream module
iii-config.yaml
modules:
- class: modules::stream::StreamModule
config:
port: ${STREAM_PORT:3112}
host: localhost
adapter:
class: modules::stream::adapters::KvStore
config:
store_method: file_based # Options: in_memory, file_based
file_path: ./data/stream_store # required for file_based
# class: modules::stream::adapters::RedisAdapter
# config:
# redis_url: redis://localhost:6379
2. Write to a stream
- Node / TypeScript
- Python
- Rust
stream-writer.ts
import { registerWorker, Logger } from 'iii-sdk'
const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')
iii.registerFunction({ id: 'chat::send' }, async (input) => {
const logger = new Logger()
const messageId = crypto.randomUUID()
await iii.trigger({
function_id: 'stream::set',
payload: {
stream_name: 'chat',
group_id: input.roomId,
item_id: messageId,
data: { text: input.text, author: input.author },
},
})
logger.info('Message sent to stream', { messageId })
return { messageId }
})
// Then call from another function or worker
const { messageId } = await iii.trigger({
function_id: 'chat::send',
payload: { roomId: 'room-123', text: 'Hello world', author: 'alice' },
})
logger.info('Sent message', { messageId })
stream_writer.py
import os
import uuid
from iii import Logger, register_worker
iii = register_worker(os.environ.get("III_URL", "ws://localhost:49134"))
def send_message(input):
logger = Logger()
message_id = str(uuid.uuid4())
iii.trigger({
"function_id": "stream::set",
"payload": {
"stream_name": "chat",
"group_id": input["roomId"],
"item_id": message_id,
"data": {"text": input["text"], "author": input["author"]},
},
})
logger.info("Message sent to stream", {"messageId": message_id})
return {"messageId": message_id}
iii.register_function("chat::send", send_message)
# Then call from another function or worker
result = iii.trigger({
"function_id": "chat::send",
"payload": {"roomId": "room-123", "text": "Hello world", "author": "alice"},
})
print("Sent message:", result["messageId"])
stream_writer.rs
use iii_sdk::{register_worker, InitOptions, Logger, RegisterFunction, TriggerRequest};
use serde_json::{json, Value};
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = std::env::var("III_URL").unwrap_or_else(|_| "ws://127.0.0.1:49134".to_string());
let iii = register_worker(&url, InitOptions::default());
let iii_clone = iii.clone();
let reg = RegisterFunction::new_async("chat::send", move |input: Value| {
let iii = iii_clone.clone();
async move {
let logger = Logger::new();
let message_id = uuid::Uuid::new_v4().to_string();
let room_id = input["roomId"].as_str().unwrap_or("");
let text = input["text"].as_str().unwrap_or("");
let author = input["author"].as_str().unwrap_or("");
iii.trigger(TriggerRequest {
function_id: "stream::set".into(),
payload: json!({
"stream_name": "chat",
"group_id": room_id,
"item_id": message_id,
"data": { "text": text, "author": author },
}),
action: None,
timeout_ms: None,
})
.await?;
logger.info("Message sent to stream", Some(json!({ "messageId": message_id })));
Ok(json!({ "messageId": message_id }))
}
});
iii.register_function(reg);
signal::ctrl_c().await?;
Ok(())
}
// Then call from another function or worker
let result = iii
.trigger(TriggerRequest {
function_id: "chat::send".into(),
payload: json!({ "roomId": "room-123", "text": "Hello world", "author": "alice" }),
action: None,
timeout_ms: None,
})
.await?;
println!("Sent message: {}", result["messageId"]);
3. Read from a stream
- Node / TypeScript
- Python
- Rust
stream-reader.ts
iii.registerFunction({ id: 'chat::list' }, async (input) => {
const logger = new Logger()
const messages = await iii.trigger({
function_id: 'stream::list',
payload: { stream_name: 'chat', group_id: input.roomId },
})
logger.info('Messages retrieved', { count: messages.length })
return messages
})
// Then call from another function or worker
const messages = await iii.trigger({
function_id: 'chat::list',
payload: { roomId: 'room-123' },
})
logger.info('Messages', { messages })
stream_reader.py
def list_messages(input):
logger = Logger()
messages = iii.trigger({
"function_id": "stream::list",
"payload": {"stream_name": "chat", "group_id": input["roomId"]},
})
logger.info("Messages retrieved", {"count": len(messages)})
return messages
iii.register_function("chat::list", list_messages)
# Then call from another function or worker
messages = iii.trigger({
"function_id": "chat::list",
"payload": {"roomId": "room-123"},
})
print("Messages:", messages)
stream_reader.rs
use iii_sdk::{RegisterFunction, TriggerRequest};
use serde_json::{json, Value};
let iii_clone = iii.clone();
let reg = RegisterFunction::new_async("chat::list", move |input: Value| {
let iii = iii_clone.clone();
async move {
let logger = Logger::new();
let room_id = input["roomId"].as_str().unwrap_or("");
let messages = iii
.trigger(TriggerRequest {
function_id: "stream::list".into(),
payload: json!({ "stream_name": "chat", "group_id": room_id }),
action: None,
timeout_ms: None,
})
.await?;
logger.info("Messages retrieved", Some(json!({ "count": messages.as_array().map(|a| a.len()).unwrap_or(0) })));
Ok(messages)
}
});
iii.register_function(reg);
// Then call from another function or worker
let messages = iii
.trigger(TriggerRequest {
function_id: "stream::list".into(),
payload: json!({ "stream_name": "chat", "group_id": "room-123" }),
action: None,
timeout_ms: None,
})
.await?;
println!("Messages: {:?}", messages);
4. Connect a client
Clients connect to the stream WebSocket endpoint to receive live updates:client.js
const ws = new WebSocket('ws://localhost:3112/stream/chat/room-123')
ws.onmessage = (event) => {
const update = JSON.parse(event.data)
console.log('New message:', update)
}
5. React to stream events
Register triggers to run server-side logic when clients join or when stream data changes.On client join
- Node / TypeScript
- Python
- Rust
stream-triggers.ts
import { StreamJoinLeaveEvent } from 'iii-sdk/stream'
const onJoin = iii.registerFunction({ id: 'chat::onJoin' }, async (input: StreamJoinLeaveEvent) => {
const logger = new Logger()
logger.info('Client joined chat room', {
stream: input.stream_name,
group: input.group_id,
context: input.context,
})
return {}
})
iii.registerTrigger({
type: 'stream:join',
function_id: onJoin.id,
config: {},
})
stream_triggers.py
from iii import StreamJoinLeaveEvent
def on_join(input: StreamJoinLeaveEvent):
logger = Logger()
logger.info("Client joined chat room", {
"stream": input.stream_name,
"group": input.group_id,
"context": input.context,
})
return {}
iii.register_function({"id": "chat::onJoin"}, on_join)
iii.register_trigger({"type": "stream:join", "function_id": "chat::onJoin", "config": {}})
stream_triggers.rs
use iii_sdk::{IIITrigger, StreamJoinLeaveCallRequest, StreamJoinLeaveTriggerConfig};
let reg = RegisterFunction::new_async("chat::onJoin", move |input: Value| {
async move {
let logger = Logger::new();
let event: StreamJoinLeaveCallRequest = serde_json::from_value(input)?;
logger.info("Client joined chat room", Some(json!({
"stream": event.stream_name,
"group": event.group_id,
"context": event.context,
})));
Ok(json!({}))
}
});
iii.register_function(reg);
iii.register_trigger(
IIITrigger::StreamJoin(StreamJoinLeaveTriggerConfig::new()).for_function("chat::onJoin"),
)?;
On message created
- Node / TypeScript
- Python
- Rust
stream-triggers.ts
import { StreamChangeEvent } from 'iii-sdk/stream'
const onMessage = iii.registerFunction({ id: 'chat::onMessage' }, async (input: StreamChangeEvent) => {
const logger = new Logger()
if (input.event.type === 'create') {
logger.info('New message in room', { room: input.groupId, messageId: input.id, data: input.event.data })
}
return {}
})
iii.registerTrigger({
type: 'stream',
function_id: onMessage.id,
config: { stream_name: 'chat' },
})
stream_triggers.py
from iii import StreamChangeEvent
def on_message(input: StreamChangeEvent):
logger = Logger()
if input.event.type == "create":
logger.info("New message in room", {
"room": input.groupId,
"messageId": input.id,
"data": input.event.data,
})
return {}
iii.register_function({"id": "chat::onMessage"}, on_message)
iii.register_trigger({
"type": "stream",
"function_id": "chat::onMessage",
"config": {"stream_name": "chat"},
})
stream_triggers.rs
use iii_sdk::{IIITrigger, StreamCallRequest, StreamEventType, StreamTriggerConfig};
let reg = RegisterFunction::new_async("chat::onMessage", move |input: Value| {
async move {
let logger = Logger::new();
let event: StreamCallRequest = serde_json::from_value(input)?;
if event.event.event_type == StreamEventType::Create {
logger.info("New message in room", Some(json!({
"room": event.group_id,
"messageId": event.id,
"data": event.event.data,
})));
}
Ok(json!({}))
}
});
iii.register_function(reg);
iii.register_trigger(
IIITrigger::Stream(StreamTriggerConfig::new().stream_name("chat")).for_function("chat::onMessage"),
)?;
Result
Any data written to the stream viastream::set is immediately pushed to all connected WebSocket clients subscribed to that stream and group. Server-side triggers let you react to joins, leaves, and data changes without polling.
For all stream operations, trigger payload shapes, and authentication, see the Stream module reference.