Skip to main content

Installation

pip install iii-sdk

Initialization

Create an III client and connect to the engine. Blocks until the WebSocket connection is established and ready.
from iii import register_worker, InitOptions
iii = register_worker('ws://localhost:49134', InitOptions(worker_name='my-worker'))

Methods

connect_async

Connect to the III Engine via WebSocket. Initializes OpenTelemetry (if configured), attaches the event loop, and establishes the WebSocket connection. This is called automatically during construction — use it only if you need to reconnect manually from an async context. Signature
async ()

create_channel

Create a streaming channel pair for worker-to-worker data transfer. The returned Channel contains a local writer / reader and their serializable refs (writer_ref, reader_ref) that can be passed as fields in invocation data to other functions. Signature
create_channel(buffer_size: int | None = None)

Parameters

NameTypeRequiredDescription
buffer_sizeint | NoneNoBuffer capacity for the channel. Defaults to 64.

Example

ch = iii.create_channel()
fn = iii.register_function({"id": "producer"}, producer_handler)
iii.trigger({"function_id": "producer", "payload": {"output": ch.writer_ref}})

create_channel_async

Create a streaming channel pair for worker-to-worker data transfer. The returned Channel contains a local writer / reader and their serializable refs (writer_ref, reader_ref) that can be passed as fields in invocation data to other functions. Signature
async (buffer_size: int | None = None)

Parameters

NameTypeRequiredDescription
buffer_sizeint | NoneNoBuffer capacity for the channel. Defaults to 64.

Example

ch = await iii.create_channel_async()
fn = iii.register_function({"id": "producer"}, producer_handler)
await iii.trigger_async({"function_id": "producer", "payload": {"output": ch.writer_ref}})

create_stream

Register a custom stream implementation, overriding the engine default. Registers 5 of the 6 IStream methods (get, set, delete, list, list_groups). The update method is not registered — atomic updates are handled by the engine’s built-in stream update logic. Signature
create_stream(stream_name: str, stream: IStream[Any])

Parameters

NameTypeRequiredDescription
stream_namestrYesUnique name for the stream.
streamIStream[Any]YesAn object implementing the IStream interface.

Example

from iii.stream import IStream
class MyStream(IStream):
    async def get(self, input): ...
    async def set(self, input): ...
    async def delete(self, input): ...
    async def list(self, input): ...
    async def list_groups(self, input): ...
    async def update(self, input): ...
iii.create_stream("my-stream", MyStream())

get_connection_state

Return the current WebSocket connection state. Signature
get_connection_state()

list_functions

List all functions registered with the engine across all workers. Signature
list_functions()

Example

for fn in iii.list_functions():
    print(fn.function_id, fn.description)

list_functions_async

List all functions registered with the engine across all workers. Signature
async ()

Example

for fn in await iii.list_functions_async():
    print(fn.function_id, fn.description)

list_trigger_types

List all trigger types registered with the engine. Signature
list_trigger_types(include_internal: bool = False)

Parameters

NameTypeRequiredDescription
include_internalboolNoIf True, include engine-internal trigger types (e.g. engine::functions-available). Defaults to False.

Example

trigger_types = iii.list_trigger_types()
for tt in trigger_types:
    print(tt.id, tt.trigger_request_format)

list_trigger_types_async

List all trigger types registered with the engine. Signature
async (include_internal: bool = False)

Parameters

NameTypeRequiredDescription
include_internalboolNoIf True, include engine-internal trigger types (e.g. engine::functions-available). Defaults to False.

Example

trigger_types = await iii.list_trigger_types_async()

list_triggers

List all triggers registered with the engine. Signature
list_triggers(include_internal: bool = False)

Parameters

NameTypeRequiredDescription
include_internalboolNoIf True, include engine-internal triggers (e.g. functions-available). Defaults to False.

Example

triggers = iii.list_triggers()
internal = iii.list_triggers(include_internal=True)

list_triggers_async

List all triggers registered with the engine. Signature
async (include_internal: bool = False)

Parameters

NameTypeRequiredDescription
include_internalboolNoIf True, include engine-internal triggers (e.g. functions-available). Defaults to False.

Example

triggers = await iii.list_triggers_async()
internal = await iii.list_triggers_async(include_internal=True)

list_workers

List all workers currently connected to the engine. Signature
list_workers()

Example

for w in iii.list_workers():
    print(w.name, w.worker_id)

list_workers_async

List all workers currently connected to the engine. Signature
async ()

Example

for w in await iii.list_workers_async():
    print(w.name, w.worker_id)

on_functions_available

Subscribe to function-availability events from the engine. The callback fires whenever the set of available functions changes (e.g. a new worker connects or a function is unregistered). Signature
on_functions_available(callback: Callable[None])

Parameters

NameTypeRequiredDescription
callbackCallable[None]Yes-

Example

def on_change(functions):
    print("Available:", [f.function_id for f in functions])
unsub = iii.on_functions_available(on_change)
# later ...
unsub()

register_function

Register a function with the engine. Pass a handler for local execution, or an HttpInvocationConfig for HTTP-invoked functions (Lambda, Cloudflare Workers, etc.). Handlers can be synchronous or asynchronous. Sync handlers are automatically wrapped with run_in_executor so they do not block the event loop. Each handler receives a single data argument containing the trigger payload. When func_or_id is a str, the simplified API is used: request_format and response_format are auto-extracted from the handler’s type hints when not explicitly provided. Signature
register_function(func_or_id: RegisterFunctionInput | dict[str, Any] | str, handler_or_invocation: RemoteFunctionHandler | HttpInvocationConfig, description: str | None = None, metadata: dict[str, Any] | None = None, request_format: RegisterFunctionFormat | dict[str, Any] | None = None, response_format: RegisterFunctionFormat | dict[str, Any] | None = None)

Parameters

NameTypeRequiredDescription
func_or_idRegisterFunctionInput | dict[str, Any] | strYesA RegisterFunctionInput, dict with id, or a plain string function ID. When a string is passed, use keyword arguments for description, metadata, request_format, and response_format.
handler_or_invocationRemoteFunctionHandler | HttpInvocationConfigYesA callable handler or HttpInvocationConfig. Callable handlers receive one positional argument (data — the trigger payload) and may return a value.
descriptionstr | NoneNoHuman-readable description (only with string ID).
metadatadict[str, Any] | NoneNoArbitrary metadata (only with string ID).
request_formatRegisterFunctionFormat | dict[str, Any] | NoneNoSchema describing expected input (only with string ID). Auto-extracted from handler type hints when omitted.
response_formatRegisterFunctionFormat | dict[str, Any] | NoneNoSchema describing expected output (only with string ID). Auto-extracted from handler type hints when omitted.

Example

def greet(data):
    return {'message': f"Hello, {data['name']}!"}
fn = iii.register_function({"id": "greet", "description": "Greets a user"}, greet)
fn.unregister()
from pydantic import BaseModel
class GreetInput(BaseModel):
    name: str
class GreetOutput(BaseModel):
    message: str
async def greet(data: GreetInput) -> GreetOutput:
    return GreetOutput(message=f"Hello, {data.name}!")
fn = iii.register_function("greet", greet, description="Greets a user")

register_service

Register a logical service grouping with the engine. Services provide an organisational hierarchy for functions. A service can optionally reference a parent_service_id to form a tree visible in the engine dashboard. Signature
register_service(service: RegisterServiceInput | dict[str, Any])

Parameters

NameTypeRequiredDescription
serviceRegisterServiceInput | dict[str, Any]YesA RegisterServiceInput or dict with id and optional name, description, parent_service_id.

Example

iii.register_service({"id": "payments", "description": "Payment processing"})
iii.register_service({
    "id": "payments::refunds",
    "description": "Refund sub-service",
    "parent_service_id": "payments",
})

register_trigger

Bind a trigger configuration to a registered function. Signature
register_trigger(trigger: RegisterTriggerInput | dict[str, Any])

Parameters

NameTypeRequiredDescription
triggerRegisterTriggerInput | dict[str, Any]YesA RegisterTriggerInput or dict with type, function_id, and optional config.

Example

trigger = iii.register_trigger({
  'type': 'http',
  'function_id': 'greet',
  'config': {'api_path': '/greet', 'http_method': 'GET'}
})
trigger = iii.register_trigger(RegisterTriggerInput(
    type="http", function_id="greet",
    config={'api_path': '/greet', 'http_method': 'GET'}
))
trigger.unregister()

register_trigger_type

Register a custom trigger type with the engine. Returns a :class:TriggerTypeRef handle with register_trigger and register_function methods. Signature
register_trigger_type(trigger_type: RegisterTriggerTypeInput | dict[str, Any], handler: TriggerHandler[Any])

Parameters

NameTypeRequiredDescription
trigger_typeRegisterTriggerTypeInput | dict[str, Any]YesA RegisterTriggerTypeInput or dict with id, description, and optional trigger_request_format / call_request_format (Pydantic class or dict).
handlerTriggerHandler[Any]YesA TriggerHandler instance.

Example

webhook = iii.register_trigger_type(
    RegisterTriggerTypeInput(
        id="webhook",
        description="Webhook trigger",
        trigger_request_format=WebhookConfig,
        call_request_format=WebhookCallRequest,
    ),
    WebhookHandler(),
)
webhook.register_function("handler", handle_webhook)
webhook.register_trigger("handler", WebhookConfig(url="/hook"))

shutdown

Gracefully shut down the client, releasing all resources. Cancels any pending reconnection attempts, rejects all in-flight invocations with an error, closes the WebSocket connection, and stops the background event-loop thread. After this call the instance must not be reused. Signature
shutdown()

Example

iii = register_worker('ws://localhost:49134')
# ... do work ...
iii.shutdown()

shutdown_async

Gracefully shut down the client, releasing all resources. Cancels any pending reconnection attempts, rejects all in-flight invocations with an error, closes the WebSocket connection, and stops the background event-loop thread. After this call the instance must not be reused. Signature
async ()

Example

iii = register_worker('ws://localhost:49134')
# ... do work ...
await iii.shutdown_async()

trigger

Invoke a remote function. The routing behavior and return type depend on the action field:
  • No action: synchronous — waits for the function to return.
  • TriggerAction.Enqueue(...): async via named queue — returns EnqueueResult.
  • TriggerAction.Void(): fire-and-forget — returns None.
Signature
trigger(request: dict[str, Any] | TriggerRequest)

Parameters

NameTypeRequiredDescription
requestdict[str, Any] | TriggerRequestYesA TriggerRequest or dict with function_id, payload, and optional action / timeout_ms.

Example

result = iii.trigger({'function_id': 'greet', 'payload': {'name': 'World'}})
iii.trigger({'function_id': 'notify', 'payload': {}, 'action': TriggerAction.Void()})

trigger_async

Invoke a remote function. The routing behavior and return type depend on the action field:
  • No action: synchronous — waits for the function to return.
  • TriggerAction.Enqueue(...): async via named queue — returns EnqueueResult.
  • TriggerAction.Void(): fire-and-forget — returns None.
Signature
async (request: dict[str, Any] | TriggerRequest)

Parameters

NameTypeRequiredDescription
requestdict[str, Any] | TriggerRequestYesA TriggerRequest or dict with function_id, payload, and optional action / timeout_ms.

Example

result = await iii.trigger_async({'function_id': 'greet', 'payload': {'name': 'World'}})
await iii.trigger_async({'function_id': 'notify', 'payload': {}, 'action': TriggerAction.Void()})

unregister_trigger_type

Unregister a previously registered trigger type. Signature
unregister_trigger_type(trigger_type: RegisterTriggerTypeInput | dict[str, Any])

Parameters

NameTypeRequiredDescription
trigger_typeRegisterTriggerTypeInput | dict[str, Any]YesA RegisterTriggerTypeInput or dict with id and optional description.

Example

iii.unregister_trigger_type({"id": "webhook", "description": "Webhook trigger"})
iii.unregister_trigger_type(RegisterTriggerTypeInput(id="webhook", description="Webhook trigger"))

Logger

Structured logger that emits logs as OpenTelemetry LogRecords. Every log call automatically captures the active trace and span context, correlating your logs with distributed traces without any manual wiring. When OTel is not initialized, Logger gracefully falls back to Python logging. Pass structured data as the second argument to any log method. Using a dict of key-value pairs (instead of string interpolation) lets you filter, aggregate, and build dashboards in your observability backend.

debug

Log a debug-level message. Signature
debug(message: str, data: Any = None)

Parameters

NameTypeRequiredDescription
messagestrYesHuman-readable log message.
dataAnyNoStructured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic).

Example

logger.debug('Cache lookup', {'key': 'user:42', 'hit': False})

error

Log an error-level message. Signature
error(message: str, data: Any = None)

Parameters

NameTypeRequiredDescription
messagestrYesHuman-readable log message.
dataAnyNoStructured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic).

Example

logger.error('Payment failed', {
    'order_id': 'ord_123',
    'gateway': 'stripe',
    'error_code': 'card_declined',
})

info

Log an info-level message. Signature
info(message: str, data: Any = None)

Parameters

NameTypeRequiredDescription
messagestrYesHuman-readable log message.
dataAnyNoStructured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic).

Example

logger.info('Order processed', {'order_id': 'ord_123', 'status': 'completed'})

warn

Log a warning-level message. Signature
warn(message: str, data: Any = None)

Parameters

NameTypeRequiredDescription
messagestrYesHuman-readable log message.
dataAnyNoStructured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic).

Example

logger.warn('Retry attempt', {'attempt': 3, 'max_retries': 5, 'endpoint': '/api/charge'})

Types

InitOptions · ReconnectionConfig · TelemetryOptions · HttpInvocationConfig · RegisterFunctionFormat · RegisterFunctionInput · RegisterServiceInput · RegisterTriggerInput · RegisterTriggerTypeInput · TriggerActionEnqueue · TriggerActionVoid · TriggerRequest · IStream · OtelConfig · TriggerHandler

InitOptions

Options for configuring the III SDK.
NameTypeRequiredDescription
enable_metrics_reportingboolNoEnable worker metrics via OpenTelemetry. Default True.
headersdict[str, str] | NoneNo-
invocation_timeout_msintNoDefault timeout for trigger() in milliseconds. Default 30000.
otelOtelConfig | dict[str, Any] | NoneNoOpenTelemetry configuration. Enabled by default. Set \{'enabled': False\} or env OTEL_ENABLED=false to disable.
reconnection_configReconnectionConfig | NoneNoWebSocket reconnection behavior.
telemetryTelemetryOptions | NoneNoInternal telemetry metadata.
worker_namestr | NoneNoDisplay name for this worker. Defaults to hostname:pid.

ReconnectionConfig

Configuration for WebSocket reconnection behavior.
NameTypeRequiredDescription
backoff_multiplierfloatNoExponential backoff multiplier. Default 2.0.
initial_delay_msintNoStarting delay in milliseconds. Default 1000.
jitter_factorfloatNoRandom jitter factor (0—1). Default 0.3.
max_delay_msintNoMaximum delay cap in milliseconds. Default 30000.
max_retriesintNoMaximum retry attempts. -1 for infinite. Default -1.

TelemetryOptions

Telemetry metadata to be reported to the engine.
NameTypeRequiredDescription
amplitude_api_keystr | NoneNoAmplitude API key for product analytics.
frameworkstr | NoneNoFramework name (e.g. motia) if applicable.
languagestr | NoneNoProgramming language of the worker (e.g. python).
project_namestr | NoneNoName of the project this worker belongs to.

HttpInvocationConfig

Config for HTTP external function invocation.
NameTypeRequiredDescription
authHttpAuthConfig | NoneNoAuthentication configuration (bearer, HMAC, or API key).
headersdict[str, str] | NoneNoAdditional HTTP headers to include in the request.
methodLiteral['GET', 'POST', 'PUT', 'PATCH', 'DELETE']NoHTTP method. Defaults to 'POST'.
timeout_msint | NoneNoRequest timeout in milliseconds.
urlstrNoTarget URL for the HTTP invocation.

RegisterFunctionFormat

Format definition for function parameters.
NameTypeRequiredDescription
bodylist[RegisterFunctionFormat] | NoneNoNested fields for object types.
descriptionstr | NoneNoHuman-readable description of the parameter.
itemsRegisterFunctionFormat | NoneNoItem schema for array types.
namestrYesParameter name.
requiredboolNoWhether the parameter is required.
typestrYesType string (string, number, boolean, object, array, null, map).

RegisterFunctionInput

Input for registering a function — matches Node.js RegisterFunctionInput.
NameTypeRequiredDescription
descriptionstr | NoneNoHuman-readable description.
idstrNoUnique function identifier.
invocationHttpInvocationConfig | NoneNoHTTP invocation config for externally hosted functions.
metadatadict[str, Any] | NoneNoArbitrary metadata attached to the function.
request_formatRegisterFunctionFormat | dict[str, Any] | NoneNoSchema describing expected input.
response_formatRegisterFunctionFormat | dict[str, Any] | NoneNoSchema describing expected output.

RegisterServiceInput

Input for registering a service (matches Node SDK’s RegisterServiceInput).
NameTypeRequiredDescription
descriptionstr | NoneNoDescription of the service.
idstrNoUnique service identifier.
namestr | NoneNoHuman-readable service name.
parent_service_idstr | NoneNoID of the parent service for hierarchical grouping.

RegisterTriggerInput

Input for registering a trigger (matches Node SDK’s RegisterTriggerInput).
NameTypeRequiredDescription
configAnyNoTrigger-type-specific configuration.
function_idstrNoID of the function this trigger invokes.
metadatadict[str, Any] | NoneNoArbitrary metadata attached to the trigger.
typestrNoTrigger type identifier (e.g. http, queue, cron).

RegisterTriggerTypeInput

Input for registering a trigger type.
NameTypeRequiredDescription
call_request_formatAny | NoneNoJSON Schema describing the payload sent to functions.
descriptionstrNoHuman-readable description of the trigger type.
idstrNoUnique identifier for the trigger type.
trigger_request_formatAny | NoneNoJSON Schema describing the expected trigger config.

TriggerActionEnqueue

Routes the invocation through a named queue for async processing.
NameTypeRequiredDescription
queuestrYesName of the target queue.
typeLiteral['enqueue']NoAlways 'enqueue'.

TriggerActionVoid

Fire-and-forget routing. No response is returned.
NameTypeRequiredDescription
typeLiteral['void']NoAlways 'void'.

TriggerRequest

Request object for trigger().
NameTypeRequiredDescription
actionTriggerActionEnqueue | TriggerActionVoid | NoneNoRouting action — None for sync, TriggerAction.Enqueue(...) for queue, TriggerAction.Void() for fire-and-forget.
function_idstrNoID of the function to invoke.
payloadAnyNoData to pass to the function.
timeout_msint | NoneNoOverride the default invocation timeout.

IStream

Abstract interface for stream operations.

OtelConfig

Configuration for OpenTelemetry initialization.
NameTypeRequiredDescription
enabledbool | NoneNoEnable OTel. Defaults to True. Set OTEL_ENABLED=false/0/no/off to disable.
engine_ws_urlstr | NoneNoIII Engine WebSocket URL. Defaults to env III_URL or ‘ws://localhost:49134’.
fetch_instrumentation_enabledboolNoAuto-instrument urllib HTTP calls via URLLibInstrumentor. Defaults to True.
logs_batch_sizeint | NoneNoMaximum number of log records exported per batch. Defaults to 1 when not set.
logs_enabledbool | NoneNoEnable OTel log export via EngineLogExporter. Defaults to True when OTel is enabled.
logs_flush_interval_msint | NoneNoLog processor flush delay in milliseconds. Defaults to 100ms when not set.
metrics_enabledboolNoEnable OTel metrics export via EngineMetricsExporter. Defaults to True.
metrics_export_interval_msintNoMetrics export interval in milliseconds. Defaults to 60000 (60 seconds).
service_instance_idstr | NoneNoService instance ID. Defaults to a random UUID.
service_namestr | NoneNoService name. Defaults to env OTEL_SERVICE_NAME or ‘iii-python-sdk’.
service_namespacestr | NoneNoService namespace attribute.
service_versionstr | NoneNoService version. Defaults to env SERVICE_VERSION or ‘unknown’.

TriggerHandler

Abstract base class for trigger handlers.