Installation
Initialization
Create an III client and connect to the engine. Blocks until the WebSocket connection is established and ready.Methods
connect_async
Connect to the III Engine via WebSocket. Initializes OpenTelemetry (if configured), attaches the event loop, and establishes the WebSocket connection. This is called automatically during construction — use it only if you need to reconnect manually from an async context. Signaturecreate_channel
Create a streaming channel pair for worker-to-worker data transfer. The returnedChannel contains a local writer / reader
and their serializable refs (writer_ref, reader_ref) that
can be passed as fields in invocation data to other functions.
Signature
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
buffer_size | int | None | No | Buffer capacity for the channel. Defaults to 64. |
Example
create_channel_async
Create a streaming channel pair for worker-to-worker data transfer. The returnedChannel contains a local writer / reader
and their serializable refs (writer_ref, reader_ref) that
can be passed as fields in invocation data to other functions.
Signature
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
buffer_size | int | None | No | Buffer capacity for the channel. Defaults to 64. |
Example
create_stream
Register a custom stream implementation, overriding the engine default. Registers 5 of the 6IStream methods (get, set, delete,
list, list_groups). The update method is not registered
— atomic updates are handled by the engine’s built-in stream update logic.
Signature
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
stream_name | str | Yes | Unique name for the stream. |
stream | IStream[Any] | Yes | An object implementing the IStream interface. |
Example
get_connection_state
Return the current WebSocket connection state. Signaturelist_functions
List all functions registered with the engine across all workers. SignatureExample
list_functions_async
List all functions registered with the engine across all workers. SignatureExample
list_trigger_types
List all trigger types registered with the engine. SignatureParameters
| Name | Type | Required | Description |
|---|---|---|---|
include_internal | bool | No | If True, include engine-internal trigger types (e.g. engine::functions-available). Defaults to False. |
Example
list_trigger_types_async
List all trigger types registered with the engine. SignatureParameters
| Name | Type | Required | Description |
|---|---|---|---|
include_internal | bool | No | If True, include engine-internal trigger types (e.g. engine::functions-available). Defaults to False. |
Example
list_triggers
List all triggers registered with the engine. SignatureParameters
| Name | Type | Required | Description |
|---|---|---|---|
include_internal | bool | No | If True, include engine-internal triggers (e.g. functions-available). Defaults to False. |
Example
list_triggers_async
List all triggers registered with the engine. SignatureParameters
| Name | Type | Required | Description |
|---|---|---|---|
include_internal | bool | No | If True, include engine-internal triggers (e.g. functions-available). Defaults to False. |
Example
list_workers
List all workers currently connected to the engine. SignatureExample
list_workers_async
List all workers currently connected to the engine. SignatureExample
on_functions_available
Subscribe to function-availability events from the engine. The callback fires whenever the set of available functions changes (e.g. a new worker connects or a function is unregistered). SignatureParameters
| Name | Type | Required | Description |
|---|---|---|---|
callback | Callable[None] | Yes | - |
Example
register_function
Register a function with the engine. Pass a handler for local execution, or anHttpInvocationConfig
for HTTP-invoked functions (Lambda, Cloudflare Workers, etc.).
Handlers can be synchronous or asynchronous. Sync handlers are
automatically wrapped with run_in_executor so they do not
block the event loop. Each handler receives a single data
argument containing the trigger payload.
When func_or_id is a str, the simplified API is used:
request_format and response_format are auto-extracted
from the handler’s type hints when not explicitly provided.
Signature
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
func_or_id | RegisterFunctionInput | dict[str, Any] | str | Yes | A RegisterFunctionInput, dict with id, or a plain string function ID. When a string is passed, use keyword arguments for description, metadata, request_format, and response_format. |
handler_or_invocation | RemoteFunctionHandler | HttpInvocationConfig | Yes | A callable handler or HttpInvocationConfig. Callable handlers receive one positional argument (data — the trigger payload) and may return a value. |
description | str | None | No | Human-readable description (only with string ID). |
metadata | dict[str, Any] | None | No | Arbitrary metadata (only with string ID). |
request_format | RegisterFunctionFormat | dict[str, Any] | None | No | Schema describing expected input (only with string ID). Auto-extracted from handler type hints when omitted. |
response_format | RegisterFunctionFormat | dict[str, Any] | None | No | Schema describing expected output (only with string ID). Auto-extracted from handler type hints when omitted. |
Example
register_service
Register a logical service grouping with the engine. Services provide an organisational hierarchy for functions. A service can optionally reference aparent_service_id to form
a tree visible in the engine dashboard.
Signature
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
service | RegisterServiceInput | dict[str, Any] | Yes | A RegisterServiceInput or dict with id and optional name, description, parent_service_id. |
Example
register_trigger
Bind a trigger configuration to a registered function. SignatureParameters
| Name | Type | Required | Description |
|---|---|---|---|
trigger | RegisterTriggerInput | dict[str, Any] | Yes | A RegisterTriggerInput or dict with type, function_id, and optional config. |
Example
register_trigger_type
Register a custom trigger type with the engine. Returns a :class:TriggerTypeRef handle with register_trigger
and register_function methods.
Signature
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
trigger_type | RegisterTriggerTypeInput | dict[str, Any] | Yes | A RegisterTriggerTypeInput or dict with id, description, and optional trigger_request_format / call_request_format (Pydantic class or dict). |
handler | TriggerHandler[Any] | Yes | A TriggerHandler instance. |
Example
shutdown
Gracefully shut down the client, releasing all resources. Cancels any pending reconnection attempts, rejects all in-flight invocations with an error, closes the WebSocket connection, and stops the background event-loop thread. After this call the instance must not be reused. SignatureExample
shutdown_async
Gracefully shut down the client, releasing all resources. Cancels any pending reconnection attempts, rejects all in-flight invocations with an error, closes the WebSocket connection, and stops the background event-loop thread. After this call the instance must not be reused. SignatureExample
trigger
Invoke a remote function. The routing behavior and return type depend on theaction field:
- No action: synchronous — waits for the function to return.
TriggerAction.Enqueue(...): async via named queue — returnsEnqueueResult.TriggerAction.Void(): fire-and-forget — returnsNone.
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
request | dict[str, Any] | TriggerRequest | Yes | A TriggerRequest or dict with function_id, payload, and optional action / timeout_ms. |
Example
trigger_async
Invoke a remote function. The routing behavior and return type depend on theaction field:
- No action: synchronous — waits for the function to return.
TriggerAction.Enqueue(...): async via named queue — returnsEnqueueResult.TriggerAction.Void(): fire-and-forget — returnsNone.
Parameters
| Name | Type | Required | Description |
|---|---|---|---|
request | dict[str, Any] | TriggerRequest | Yes | A TriggerRequest or dict with function_id, payload, and optional action / timeout_ms. |
Example
unregister_trigger_type
Unregister a previously registered trigger type. SignatureParameters
| Name | Type | Required | Description |
|---|---|---|---|
trigger_type | RegisterTriggerTypeInput | dict[str, Any] | Yes | A RegisterTriggerTypeInput or dict with id and optional description. |
Example
Logger
Structured logger that emits logs as OpenTelemetry LogRecords. Every log call automatically captures the active trace and span context, correlating your logs with distributed traces without any manual wiring. When OTel is not initialized, Logger gracefully falls back to Pythonlogging.
Pass structured data as the second argument to any log method. Using a
dict of key-value pairs (instead of string interpolation) lets you
filter, aggregate, and build dashboards in your observability backend.
debug
Log a debug-level message. SignatureParameters
| Name | Type | Required | Description |
|---|---|---|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
Example
error
Log an error-level message. SignatureParameters
| Name | Type | Required | Description |
|---|---|---|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
Example
info
Log an info-level message. SignatureParameters
| Name | Type | Required | Description |
|---|---|---|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
Example
warn
Log a warning-level message. SignatureParameters
| Name | Type | Required | Description |
|---|---|---|---|
message | str | Yes | Human-readable log message. |
data | Any | No | Structured context attached as OTel log attributes. Use dicts of key-value pairs to enable filtering and aggregation in your observability backend (e.g. Grafana, Datadog, New Relic). |
Example
Types
InitOptions · ReconnectionConfig · TelemetryOptions · HttpInvocationConfig · RegisterFunctionFormat · RegisterFunctionInput · RegisterServiceInput · RegisterTriggerInput · RegisterTriggerTypeInput · TriggerActionEnqueue · TriggerActionVoid · TriggerRequest · IStream · OtelConfig · TriggerHandler
InitOptions
Options for configuring the III SDK.| Name | Type | Required | Description |
|---|---|---|---|
enable_metrics_reporting | bool | No | Enable worker metrics via OpenTelemetry. Default True. |
headers | dict[str, str] | None | No | - |
invocation_timeout_ms | int | No | Default timeout for trigger() in milliseconds. Default 30000. |
otel | OtelConfig | dict[str, Any] | None | No | OpenTelemetry configuration. Enabled by default. Set \{'enabled': False\} or env OTEL_ENABLED=false to disable. |
reconnection_config | ReconnectionConfig | None | No | WebSocket reconnection behavior. |
telemetry | TelemetryOptions | None | No | Internal telemetry metadata. |
worker_name | str | None | No | Display name for this worker. Defaults to hostname:pid. |
ReconnectionConfig
Configuration for WebSocket reconnection behavior.| Name | Type | Required | Description |
|---|---|---|---|
backoff_multiplier | float | No | Exponential backoff multiplier. Default 2.0. |
initial_delay_ms | int | No | Starting delay in milliseconds. Default 1000. |
jitter_factor | float | No | Random jitter factor (0—1). Default 0.3. |
max_delay_ms | int | No | Maximum delay cap in milliseconds. Default 30000. |
max_retries | int | No | Maximum retry attempts. -1 for infinite. Default -1. |
TelemetryOptions
Telemetry metadata to be reported to the engine.| Name | Type | Required | Description |
|---|---|---|---|
amplitude_api_key | str | None | No | Amplitude API key for product analytics. |
framework | str | None | No | Framework name (e.g. motia) if applicable. |
language | str | None | No | Programming language of the worker (e.g. python). |
project_name | str | None | No | Name of the project this worker belongs to. |
HttpInvocationConfig
Config for HTTP external function invocation.| Name | Type | Required | Description |
|---|---|---|---|
auth | HttpAuthConfig | None | No | Authentication configuration (bearer, HMAC, or API key). |
headers | dict[str, str] | None | No | Additional HTTP headers to include in the request. |
method | Literal['GET', 'POST', 'PUT', 'PATCH', 'DELETE'] | No | HTTP method. Defaults to 'POST'. |
timeout_ms | int | None | No | Request timeout in milliseconds. |
url | str | No | Target URL for the HTTP invocation. |
RegisterFunctionFormat
Format definition for function parameters.| Name | Type | Required | Description |
|---|---|---|---|
body | list[RegisterFunctionFormat] | None | No | Nested fields for object types. |
description | str | None | No | Human-readable description of the parameter. |
items | RegisterFunctionFormat | None | No | Item schema for array types. |
name | str | Yes | Parameter name. |
required | bool | No | Whether the parameter is required. |
type | str | Yes | Type string (string, number, boolean, object, array, null, map). |
RegisterFunctionInput
Input for registering a function — matches Node.js RegisterFunctionInput.| Name | Type | Required | Description |
|---|---|---|---|
description | str | None | No | Human-readable description. |
id | str | No | Unique function identifier. |
invocation | HttpInvocationConfig | None | No | HTTP invocation config for externally hosted functions. |
metadata | dict[str, Any] | None | No | Arbitrary metadata attached to the function. |
request_format | RegisterFunctionFormat | dict[str, Any] | None | No | Schema describing expected input. |
response_format | RegisterFunctionFormat | dict[str, Any] | None | No | Schema describing expected output. |
RegisterServiceInput
Input for registering a service (matches Node SDK’s RegisterServiceInput).| Name | Type | Required | Description |
|---|---|---|---|
description | str | None | No | Description of the service. |
id | str | No | Unique service identifier. |
name | str | None | No | Human-readable service name. |
parent_service_id | str | None | No | ID of the parent service for hierarchical grouping. |
RegisterTriggerInput
Input for registering a trigger (matches Node SDK’s RegisterTriggerInput).| Name | Type | Required | Description |
|---|---|---|---|
config | Any | No | Trigger-type-specific configuration. |
function_id | str | No | ID of the function this trigger invokes. |
metadata | dict[str, Any] | None | No | Arbitrary metadata attached to the trigger. |
type | str | No | Trigger type identifier (e.g. http, queue, cron). |
RegisterTriggerTypeInput
Input for registering a trigger type.| Name | Type | Required | Description |
|---|---|---|---|
call_request_format | Any | None | No | JSON Schema describing the payload sent to functions. |
description | str | No | Human-readable description of the trigger type. |
id | str | No | Unique identifier for the trigger type. |
trigger_request_format | Any | None | No | JSON Schema describing the expected trigger config. |
TriggerActionEnqueue
Routes the invocation through a named queue for async processing.| Name | Type | Required | Description |
|---|---|---|---|
queue | str | Yes | Name of the target queue. |
type | Literal['enqueue'] | No | Always 'enqueue'. |
TriggerActionVoid
Fire-and-forget routing. No response is returned.| Name | Type | Required | Description |
|---|---|---|---|
type | Literal['void'] | No | Always 'void'. |
TriggerRequest
Request object fortrigger().
| Name | Type | Required | Description |
|---|---|---|---|
action | TriggerActionEnqueue | TriggerActionVoid | None | No | Routing action — None for sync, TriggerAction.Enqueue(...) for queue, TriggerAction.Void() for fire-and-forget. |
function_id | str | No | ID of the function to invoke. |
payload | Any | No | Data to pass to the function. |
timeout_ms | int | None | No | Override the default invocation timeout. |
IStream
Abstract interface for stream operations.OtelConfig
Configuration for OpenTelemetry initialization.| Name | Type | Required | Description |
|---|---|---|---|
enabled | bool | None | No | Enable OTel. Defaults to True. Set OTEL_ENABLED=false/0/no/off to disable. |
engine_ws_url | str | None | No | III Engine WebSocket URL. Defaults to env III_URL or ‘ws://localhost:49134’. |
fetch_instrumentation_enabled | bool | No | Auto-instrument urllib HTTP calls via URLLibInstrumentor. Defaults to True. |
logs_batch_size | int | None | No | Maximum number of log records exported per batch. Defaults to 1 when not set. |
logs_enabled | bool | None | No | Enable OTel log export via EngineLogExporter. Defaults to True when OTel is enabled. |
logs_flush_interval_ms | int | None | No | Log processor flush delay in milliseconds. Defaults to 100ms when not set. |
metrics_enabled | bool | No | Enable OTel metrics export via EngineMetricsExporter. Defaults to True. |
metrics_export_interval_ms | int | No | Metrics export interval in milliseconds. Defaults to 60000 (60 seconds). |
service_instance_id | str | None | No | Service instance ID. Defaults to a random UUID. |
service_name | str | None | No | Service name. Defaults to env OTEL_SERVICE_NAME or ‘iii-python-sdk’. |
service_namespace | str | None | No | Service namespace attribute. |
service_version | str | None | No | Service version. Defaults to env SERVICE_VERSION or ‘unknown’. |