Skip to main content

Goal

Subscribe multiple functions to a topic so that every published message fans out to all subscribers, with each function processing its copy independently. For help deciding between topic-based and named queues, see When to use which.

Enable the Queue module

iii-config.yaml
workers:
  - name: iii-queue
    config:
      queue_configs:
        default:
          max_retries: 5
          concurrency: 10
          type: standard
      adapter:
        name: builtin
        config:
          store_method: file_based
          file_path: ./data/queue_store
For complete configuration options please refer to Queue module reference.

Steps

1. Register consumers for a topic

Subscribe one or more functions to the same topic. Each function gets its own internal queue.
import { registerWorker } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction('notify::email', async (data) => {
  await sendEmail(data.userId, `Order ${data.orderId} created`)
  return {}
})

iii.registerFunction('audit::log', async (data) => {
  await writeAuditLog('order.created', data)
  return {}
})

iii.registerTrigger({
  type: 'queue',
  function_id: 'notify::email',
  config: { topic: 'order.created' },
})

iii.registerTrigger({
  type: 'queue',
  function_id: 'audit::log',
  config: { topic: 'order.created' },
})
Both notify::email and audit::log are now subscribed to order.created. Every message published to that topic reaches both functions.

2. Publish events to the topic

From any function, publish a message using the builtin enqueue function. The engine fans it out to every subscribed function.
await iii.trigger({
  function_id: 'enqueue',
  payload: {
    topic: 'order.created',
    data: { orderId: 'ord_789', userId: 'usr_42', total: 149.99 },
  },
})
The producer does not need to know which functions are subscribed — it only knows the topic name.

3. Understand fan-out delivery

Topic-based queues use fan-out per function:
  • Each distinct function subscribed to a topic receives a copy of every message.
  • If a function has multiple replicas running, they compete on a shared per-function queue — only one replica processes each message.
When a function has multiple replicas, they compete on the shared per-function queue — only one replica processes each message: This gives you pub/sub-style event distribution with the durability and retry guarantees of a queue.

4. Filter messages with conditions (optional)

Attach a condition function to a queue trigger to filter which messages reach the handler. The condition receives the message data and returns true or false. If false, the handler is not called — no error is surfaced.
iii.registerFunction('conditions::is_high_value', async (data) => data.total > 1000)

iii.registerTrigger({
  type: 'queue',
  function_id: 'notify::vip-team',
  config: {
    topic: 'order.created',
    condition_function_id: 'conditions::is_high_value',
  },
})
See Conditions for the full pattern including HTTP and state trigger conditions.

Result

Every function subscribed to a topic receives a copy of each published message. If a function has multiple replicas, they compete on a shared per-function queue — only one replica processes each message. The producer only knows the topic name; it does not need to know which functions are subscribed.

Real-World Scenario

Event Fan-Out with Topic Queues

An order system publishes order.created events. Multiple independent services — email notifications, inventory updates, and analytics — each need to process every order. Topic-based queues fan out each message to all subscribers with independent retries per function.
import { registerWorker } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction('notify::email', async (data) => {
  await sendEmail(data.email, `Your order ${data.orderId} is confirmed!`)
  return {}
})

iii.registerFunction('inventory::reserve', async (data) => {
  for (const item of data.items) {
    await reserveStock(item.sku, item.quantity)
  }
  return {}
})

iii.registerFunction('analytics::track', async (data) => {
  await trackEvent('order_created', { orderId: data.orderId, total: data.total })
  return {}
})

iii.registerTrigger({
  type: 'queue',
  function_id: 'notify::email',
  config: { topic: 'order.created' },
})

iii.registerTrigger({
  type: 'queue',
  function_id: 'inventory::reserve',
  config: { topic: 'order.created' },
})

iii.registerTrigger({
  type: 'queue',
  function_id: 'analytics::track',
  config: { topic: 'order.created' },
})

iii.registerFunction('orders::create', async (req) => {
  const order = { id: crypto.randomUUID(), ...req.body }

  await iii.trigger({
    function_id: 'enqueue',
    payload: { topic: 'order.created', data: order },
  })

  return { status_code: 201, body: { orderId: order.id } }
})
All three functions receive every order.created event independently. If inventory::reserve fails and retries, it does not affect notify::email or analytics::track.
For adapter options (builtin, RabbitMQ, Redis), scenario-based recommendations, and the full queue configuration reference, see the Queue module reference.

Remember

Producers publish to a topic and return immediately. The engine fans out each message to every subscribed function, with independent retries per function. If a function has multiple replicas, they compete on a shared per-function queue — only one replica processes each message.

Next Steps

Named Queues

Enqueue jobs to specific functions with retries, FIFO ordering, and concurrency control

Dead Letter Queues

Handle and redrive failed queue messages

Queue Module Reference

Full configuration reference for queues and adapters

Conditions

Filter queue messages with condition functions