Skip to main content

Goal

Use queues to run work asynchronously with built-in retries, dead-letter support, and concurrency control. iii offers two queue modes — topic-based (pub/sub fan-out) and named queues (direct function targeting) — so you can pick the delivery model that fits your use case.

What Are Queues

A queue sits between the code that produces work and the code that processes it. Instead of calling a function and waiting for it to finish, you hand the work to a queue. The queue stores the message, delivers it to a consumer, retries on failure, and routes permanently failed messages to a dead-letter queue (DLQ) for later inspection. This separation solves three problems:
  1. Speed — the producer responds immediately instead of blocking on slow downstream work.
  2. Reliability — transient failures (network blips, service restarts) are retried automatically instead of being lost.
  3. Load control — concurrency limits prevent consumers from overwhelming downstream systems.

When to Use Queues

ScenarioUse a queue?Why
HTTP handler must respond fast, but downstream work is slowYesEnqueue the work and return 202 Accepted immediately
Multiple functions must react to the same eventYesTopic-based queues fan out to every subscriber
Work must survive process restartsYesQueues persist messages and retry on failure
External API has rate limitsYesConcurrency control throttles parallel requests
Transactions for the same entity must be orderedYesFIFO queues guarantee per-group ordering
You need the function’s return value right nowNoUse a synchronous trigger instead
The work is non-critical and losing it is acceptableMaybeTriggerAction.Void() is simpler if you don’t need retries

Two Queue Modes

iii supports two ways to use queues. Both share the same adapter, retry engine, and DLQ infrastructure — they differ in how producers address consumers.
Topic-basedNamed queues
Producertrigger({ function_id: 'enqueue', payload: { topic, data } })trigger({ function_id, payload, action: TriggerAction.Enqueue({ queue }) })
ConsumerRegisters registerTrigger({ type: 'queue', config: { topic } })No registration — function is the target
DeliveryFan-out: each subscribed function gets every message; replicas competeSingle target function per enqueue call
ConfigOptional queue_config on triggerqueue_configs in iii-config.yaml
Best forDurable pub/sub with retries and fan-outDirect function invocation with retries, FIFO, DLQ
Named queues use the Enqueue trigger action. If you are new to trigger actions, read Trigger Actions first.

Topic-Based Queues

Topic-based queues work like durable pub/sub: you publish a message to a topic, and every function subscribed to that topic receives a copy. If a function has multiple replicas, they compete on a shared per-function queue — only one replica processes each message.
1

Register consumers for a topic

Subscribe one or more functions to the same topic. Each function gets its own internal queue.
import { registerWorker } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction(
  { id: 'notify::email' },
  async (data) => {
    await sendEmail(data.userId, `Order ${data.orderId} created`)
    return {}
  },
)

iii.registerFunction(
  { id: 'audit::log' },
  async (data) => {
    await writeAuditLog('order.created', data)
    return {}
  },
)

iii.registerTrigger({
  type: 'queue',
  function_id: 'notify::email',
  config: { topic: 'order.created' },
})

iii.registerTrigger({
  type: 'queue',
  function_id: 'audit::log',
  config: { topic: 'order.created' },
})
Both notify::email and audit::log are now subscribed to order.created. Every message published to that topic reaches both functions.
2

Publish events to the topic

From any function, publish a message using the builtin enqueue function. The engine fans it out to every subscribed function.
import { TriggerAction } from 'iii-sdk'

await iii.trigger({
  function_id: 'enqueue',
  payload: {
    topic: 'order.created',
    data: { orderId: 'ord_789', userId: 'usr_42', total: 149.99 },
  },
  action: TriggerAction.Void(),
})
The producer does not need to know which functions are subscribed — it only knows the topic name.
3

Understand fan-out delivery

Topic-based queues use fan-out per function:
  • Each distinct function subscribed to a topic receives a copy of every message.
  • If a function has multiple replicas running, they compete on a shared per-function queue — only one replica processes each message.
This gives you pub/sub-style event distribution with the durability and retry guarantees of a queue.
4

Filter messages with conditions (optional)

Attach a condition function to a queue trigger to filter which messages reach the handler. The condition receives the message data and returns true or false. If false, the handler is not called — no error is surfaced.
iii.registerFunction(
  { id: 'conditions::is_high_value' },
  async (data) => data.total > 1000,
)

iii.registerTrigger({
  type: 'queue',
  function_id: 'notify::vip-team',
  config: {
    topic: 'order.created',
    condition_function_id: 'conditions::is_high_value',
  },
})
See Conditions for the full pattern including HTTP and state trigger conditions.

Named Queues

Named queues target a specific function directly. You define queue settings in iii-config.yaml and reference the queue name when enqueuing work.
1

Define named queues in config

Declare one or more named queues under queue_configs. Each queue has independent retry, concurrency, and ordering settings.
iii-config.yaml
modules:
  - class: modules::queue::QueueModule
    config:
      queue_configs:
        default:
          max_retries: 5
          concurrency: 10
          type: standard
        payment:
          max_retries: 10
          concurrency: 2
          type: fifo
          message_group_field: transaction_id
        email:
          max_retries: 8
          concurrency: 5
          type: standard
          backoff_ms: 2000
      adapter:
        class: modules::queue::BuiltinQueueAdapter
        config:
          store_method: file_based
          file_path: ./data/queue_store
See the Queue module reference for every field, type, and default value.
2

Enqueue work via trigger action

From any function, enqueue a job by calling trigger() with TriggerAction.Enqueue and the target queue name. The caller receives an acknowledgement (messageReceiptId) once the engine accepts the job — it does not wait for processing.
import { registerWorker, TriggerAction } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

const receipt = await iii.trigger({
  function_id: 'orders::process-payment',
  payload: { orderId: 'ord_789', amount: 149.99, currency: 'USD' },
  action: TriggerAction.Enqueue({ queue: 'payment' }),
})

console.log(receipt.messageReceiptId)
The target function receives the payload as its input — it does not need to know it was invoked via a queue.
3

Handle the enqueue result

The enqueue call can fail synchronously if the queue name is unknown or FIFO validation fails. Always handle the result.
try {
  const receipt = await iii.trigger({
    function_id: 'orders::process-payment',
    payload: { orderId: 'ord_789', amount: 149.99 },
    action: TriggerAction.Enqueue({ queue: 'payment' }),
  })
  console.log('Enqueued:', receipt.messageReceiptId)
} catch (err) {
  if (err.enqueue_error) {
    console.error('Queue rejected job:', err.enqueue_error)
  }
}
Common rejection reasons:
  • The queue name does not exist in queue_configs
  • A FIFO queue’s message_group_field is missing or null in the payload
4

Use FIFO queues for ordered processing

When processing order matters — for example, financial transactions for the same account — set type: fifo and specify message_group_field. Jobs sharing the same group value are processed strictly in order.
iii-config.yaml (excerpt)
queue_configs:
  payment:
    max_retries: 10
    concurrency: 2
    type: fifo
    message_group_field: transaction_id
The payload must contain the field named by message_group_field, and its value must be non-null.
await iii.trigger({
  function_id: 'payments::process',
  payload: { transaction_id: 'txn-abc-123', amount: 49.99, currency: 'USD' },
  action: TriggerAction.Enqueue({ queue: 'payment' }),
})
5

Configure retries and backoff

Every named queue retries failed jobs automatically. Backoff is exponential:
delay = backoff_ms × 2^(attempt - 1)
Attemptbackoff_ms: 1000backoff_ms: 2000
11 000 ms2 000 ms
22 000 ms4 000 ms
34 000 ms8 000 ms
48 000 ms16 000 ms
516 000 ms32 000 ms
iii-config.yaml (excerpt)
queue_configs:
  email:
    max_retries: 8
    backoff_ms: 2000
    concurrency: 5
    type: standard
After all retries are exhausted, the job moves to a dead-letter queue (DLQ).
See Manage Failed Triggers for DLQ inspection and redrive.
6

Control concurrency

The concurrency field sets the maximum number of jobs the engine processes simultaneously from a single queue (per engine instance).
iii-config.yaml (excerpt)
queue_configs:
  default:
    concurrency: 10
    type: standard
  payment:
    concurrency: 2
    type: fifo
    message_group_field: transaction_id
  • Standard queues: the engine pulls up to concurrency jobs simultaneously.
  • FIFO queues: the engine processes one job at a time (prefetch=1) to preserve ordering, regardless of the concurrency value.
Use low concurrency to protect rate-limited APIs. Use high concurrency for embarrassingly parallel work like image resizing.

Standard vs FIFO Queues

DimensionStandardFIFO
Processing modelUp to concurrency jobs in parallelOne job at a time (prefetch=1)
OrderingNo guarantees — jobs may complete in any orderStrictly ordered within a message group
message_group_fieldNot requiredRequired — must be present and non-null in every payload
ThroughputHigh — scales with concurrencyLower — trades throughput for ordering
Use casesEmail sends, image processing, notificationsPayments, ledger entries, state machines
RetriesRetried independently, other jobs continueRetried inline — blocks the queue until success or DLQ

Standard queue flow

Jobs are dequeued and processed concurrently. Each job is independent.

FIFO queue flow

Jobs within the same message group are processed one at a time, strictly in order.

Retry and dead-letter flow

When a job fails, the engine retries it with exponential backoff. After all retries exhaust, the job moves to the DLQ.

Real-World Scenarios

Scenario 1: HTTP API to Queue Pipeline

The most common pattern — an HTTP endpoint accepts a request, responds immediately, and offloads the actual work to a queue. This keeps API response times fast regardless of how long downstream processing takes.
iii-config.yaml
modules:
  - class: modules::queue::QueueModule
    config:
      queue_configs:
        payment:
          max_retries: 10
          concurrency: 2
          type: fifo
          message_group_field: orderId
        email:
          max_retries: 5
          concurrency: 10
          type: standard
          backoff_ms: 2000
      adapter:
        class: modules::queue::BuiltinQueueAdapter
        config:
          store_method: file_based
          file_path: ./data/queue_store
import { registerWorker, TriggerAction, Logger } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction({ id: 'orders::create' }, async (req) => {
  const logger = new Logger()
  const order = { id: crypto.randomUUID(), ...req.body }

  await iii.trigger({
    function_id: 'orders::process-payment',
    payload: { orderId: order.id, amount: order.total, currency: 'USD' },
    action: TriggerAction.Enqueue({ queue: 'payment' }),
  })

  await iii.trigger({
    function_id: 'emails::confirmation',
    payload: { email: order.email, orderId: order.id },
    action: TriggerAction.Enqueue({ queue: 'email' }),
  })

  await iii.trigger({
    function_id: 'analytics::track',
    payload: { event: 'order_created', orderId: order.id },
    action: TriggerAction.Void(),
  })

  logger.info('Order created', { orderId: order.id })
  return { status_code: 201, body: { orderId: order.id } }
})

iii.registerTrigger({
  type: 'http',
  function_id: 'orders::create',
  config: { api_path: '/orders', http_method: 'POST' },
})
This example uses all three trigger actions: Enqueue for payment (reliable, ordered) and email (reliable, parallel), and Void for analytics (best-effort).

Scenario 2: Event Fan-Out with Topic Queues

An order system publishes order.created events. Multiple independent services — email notifications, inventory updates, and analytics — each need to process every order. Topic-based queues fan out each message to all subscribers with independent retries per function.
import { registerWorker, TriggerAction } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction({ id: 'notify::email' }, async (data) => {
  await sendEmail(data.email, `Your order ${data.orderId} is confirmed!`)
  return {}
})

iii.registerFunction({ id: 'inventory::reserve' }, async (data) => {
  for (const item of data.items) {
    await reserveStock(item.sku, item.quantity)
  }
  return {}
})

iii.registerFunction({ id: 'analytics::track' }, async (data) => {
  await trackEvent('order_created', { orderId: data.orderId, total: data.total })
  return {}
})

iii.registerTrigger({
  type: 'queue',
  function_id: 'notify::email',
  config: { topic: 'order.created' },
})

iii.registerTrigger({
  type: 'queue',
  function_id: 'inventory::reserve',
  config: { topic: 'order.created' },
})

iii.registerTrigger({
  type: 'queue',
  function_id: 'analytics::track',
  config: { topic: 'order.created' },
})

iii.registerFunction({ id: 'orders::create' }, async (req) => {
  const order = { id: crypto.randomUUID(), ...req.body }

  await iii.trigger({
    function_id: 'enqueue',
    payload: { topic: 'order.created', data: order },
    action: TriggerAction.Void(),
  })

  return { status_code: 201, body: { orderId: order.id } }
})
All three functions receive every order.created event independently. If inventory::reserve fails and retries, it does not affect notify::email or analytics::track.

Scenario 3: Financial Transaction Ledger (FIFO)

Transactions for the same account must be applied in order to prevent balance inconsistencies. Different accounts can process in parallel.
iii-config.yaml (excerpt)
queue_configs:
  ledger:
    max_retries: 15
    concurrency: 1
    type: fifo
    message_group_field: account_id
    backoff_ms: 500
import { registerWorker, TriggerAction } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction({ id: 'transactions::submit' }, async (req) => {
  const { account_id, type, amount } = req.body

  const receipt = await iii.trigger({
    function_id: 'ledger::apply',
    payload: { account_id, type, amount },
    action: TriggerAction.Enqueue({ queue: 'ledger' }),
  })

  return { status_code: 202, body: { receiptId: receipt.messageReceiptId } }
})

iii.registerFunction({ id: 'ledger::apply' }, async (txn) => {
  const { account_id, type, amount } = txn
  if (type === 'deposit') {
    await db.query('UPDATE accounts SET balance = balance + $1 WHERE id = $2', [amount, account_id])
  } else if (type === 'withdraw') {
    const { rows } = await db.query('SELECT balance FROM accounts WHERE id = $1', [account_id])
    if (rows[0].balance < amount) {
      throw new Error('Insufficient funds')
    }
    await db.query('UPDATE accounts SET balance = balance - $1 WHERE id = $2', [amount, account_id])
  }
  return { applied: true }
})
Because the ledger queue is FIFO with message_group_field: account_id, the deposit for acct_A always completes before the withdrawal. Without FIFO ordering, the withdrawal could execute first and fail with “Insufficient funds” even though the deposit was submitted first.

Scenario 4: Bulk Email with Rate Limiting

A marketing system sends thousands of emails. The SMTP provider has a rate limit. A standard queue with low concurrency prevents overloading the provider while retrying transient failures.
iii-config.yaml (excerpt)
queue_configs:
  bulk-email:
    max_retries: 5
    concurrency: 3
    type: standard
    backoff_ms: 5000
import { registerWorker, TriggerAction } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134')

iii.registerFunction({ id: 'campaigns::launch' }, async (campaign) => {
  for (const recipient of campaign.recipients) {
    await iii.trigger({
      function_id: 'emails::send',
      payload: {
        to: recipient.email,
        subject: campaign.subject,
        body: campaign.body,
      },
      action: TriggerAction.Enqueue({ queue: 'bulk-email' }),
    })
  }

  return { enqueued: campaign.recipients.length }
})

iii.registerFunction({ id: 'emails::send' }, async (email) => {
  const response = await fetch('https://smtp-provider.example/send', {
    method: 'POST',
    body: JSON.stringify(email),
    headers: { 'Content-Type': 'application/json' },
  })

  if (!response.ok) {
    throw new Error(`SMTP error: ${response.status}`)
  }

  return { sent: true }
})
With concurrency: 3, at most three emails are in-flight at any time. Failed sends retry with exponential backoff (5s, 10s, 20s, 40s, 80s), protecting the SMTP provider from overload.

Choosing an Adapter

The queue adapter determines where messages are stored and how they are distributed.
ScenarioRecommended AdapterWhy
Local developmentBuiltinQueueAdapter (in_memory)Zero dependencies, fast iteration
Single-instance productionBuiltinQueueAdapter (file_based)Durable across restarts, no external infra
Multi-instance productionRabbitMQAdapterDistributes messages across engine instances
Regardless of which adapter you choose, retry semantics, concurrency enforcement, and FIFO ordering behave identically — the engine owns these behaviors, not the adapter.
See the Queue module reference for adapter configuration and the adapter comparison table for a feature matrix.
When using the RabbitMQ adapter, iii creates exchanges and queues using a predictable naming convention. For a queue named payment, the main queue is iii.__fn_queue::payment, the retry queue is iii.__fn_queue::payment::retry.queue, and the DLQ is iii.__fn_queue::payment::dlq.queue. See Dead Letter Queues for the full resource map.

Queue Config Reference

FieldTypeDefaultDescription
max_retriesu323Maximum delivery attempts before routing to DLQ
concurrencyu3210Maximum concurrent workers for this queue (standard only)
typestring"standard""standard" for concurrent processing; "fifo" for ordered processing
message_group_fieldstringRequired for FIFO — the JSON field in the payload used for ordering groups (must be non-null)
backoff_msu641000Base retry backoff in milliseconds. Applied exponentially: backoff_ms × 2^(attempt - 1)
poll_interval_msu64100Worker poll interval in milliseconds
For the full module configuration including adapter settings, see the Queue module reference.

Next Steps

Trigger Actions

Understand synchronous, Void, and Enqueue invocation modes

Dead Letter Queues

Handle and redrive failed queue messages

Queue Module Reference

Full configuration reference for queues and adapters

Conditions

Filter queue messages with condition functions