Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ informal introduction to the features and their implementation.
- [Custom Type Data Conversion](#custom-type-data-conversion)
- [External Storage](#external-storage)
- [Driver Selection](#driver-selection)
- [Built-in Drivers](#built-in-drivers)
- [Custom Drivers](#custom-drivers)
- [Workers](#workers)
- [Workflows](#workflows)
Expand Down Expand Up @@ -467,25 +468,36 @@ External storage allows large payloads to be offloaded to an external storage se

External storage is configured via the `external_storage` parameter on `DataConverter`. It should be configured on the `Client` both for clients of your workflow as well as on the worker -- anywhere large payloads may be uploaded or downloaded.

A `StorageDriver` handles uploading and downloading payloads. Temporal provides built-in drivers for common storage solutions, or you may customize one. Here's an example using our provided `InMemoryTestDriver`.
A `StorageDriver` handles uploading and downloading payloads. Temporal provides [built-in drivers](#built-in-drivers) for common storage solutions, or you may implement a [custom driver](#custom-drivers). Here's an example using the built-in `S3StorageDriver` with the SDK's `aioboto3` client:

```python
import aioboto3
import dataclasses
from temporalio.client import Client
from temporalio.client import Client, ClientConfig
from temporalio.contrib.aws.s3driver import S3StorageDriver
from temporalio.contrib.aws.s3driver.aioboto3 import new_aioboto3_client
from temporalio.converter import DataConverter
from temporalio.converter import ExternalStorage

driver = InMemoryTestDriver()
client_config = ClientConfig.load_client_connect_config()

client = await Client.connect(
"localhost:7233",
data_converter=dataclasses.replace(
DataConverter.default,
external_storage=ExternalStorage(drivers=[driver]),
),
)
session = aioboto3.Session()
async with session.client("s3") as s3_client:
driver = S3StorageDriver(
client=new_aioboto3_client(s3_client),
bucket="my-bucket",
)
client = await Client.connect(
**client_config,
data_converter=dataclasses.replace(
DataConverter.default,
external_storage=ExternalStorage(drivers=[driver]),
),
)
```

See the [S3 driver README](temporalio/contrib/aws/s3driver/) for further details.

Some things to note about external storage:

* Only payloads that meet or exceed `ExternalStorage.payload_size_threshold` (default 256 KiB) are offloaded. Smaller payloads are stored inline as normal.
Expand Down Expand Up @@ -540,6 +552,10 @@ Some things to note about driver selection:
* Returning `None` from a selector leaves the payload stored inline in workflow history rather than offloading it.
* The driver instance returned by the selector must be one of the instances registered in `ExternalStorage.drivers`. If it is not, an error is raised.

###### Built-in Drivers

- **[S3 Storage Driver](temporalio/contrib/aws/s3driver/)**: ⚠️ **Experimental** ⚠️ Amazon S3 driver. Ships with an aioboto3 client, or bring your own by subclassing `S3StorageDriverClient`.

###### Custom Drivers

Implement `temporalio.converter.StorageDriver` to integrate with an external storage system:
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ opentelemetry = ["opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2"]
pydantic = ["pydantic>=2.0.0,<3"]
openai-agents = ["openai-agents>=0.3,<0.7", "mcp>=1.9.4, <2"]
google-adk = ["google-adk>=1.27.0,<2"]
aioboto3 = [
"aioboto3>=10.4.0",
"types-aioboto3[s3]>=10.4.0",
]

[project.urls]
Homepage = "https://github.com/temporalio/sdk-python"
Expand Down Expand Up @@ -64,6 +68,7 @@ dev = [
"openinference-instrumentation-google-adk>=0.1.8",
"googleapis-common-protos==1.70.0",
"pytest-rerunfailures>=16.1",
"moto[s3,server]>=5",
]

[tool.poe.tasks]
Expand Down
104 changes: 104 additions & 0 deletions temporalio/contrib/aws/s3driver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# AWS Integration for Temporal Python SDK

> ⚠️ **This package is currently at an experimental release stage.** ⚠️

This package provides AWS integrations for the Temporal Python SDK, including an Amazon S3 driver for [external storage](../../../README.md#external-storage).

## S3 Driver

`S3StorageDriver` stores and retrieves Temporal payloads in Amazon S3. It accepts any `S3StorageDriverClient` implementation and a `bucket` — either a static name or a callable for dynamic per-payload selection.

### Using the built-in aioboto3 client

The SDK ships with an [`aioboto3`](https://github.com/terrycain/aioboto3)-based client. Install the extra to pull in its dependencies:

python -m pip install "temporalio[aioboto3]"

```python
import aioboto3
import dataclasses
from temporalio.client import Client
from temporalio.contrib.aws.s3driver import S3StorageDriver
from temporalio.contrib.aws.s3driver.aioboto3 import new_aioboto3_client
from temporalio.converter import DataConverter, ExternalStorage

session = aioboto3.Session()
# Credentials and region are resolved automatically from the standard AWS credential
# chain e.g. environment variables, ~/.aws/config, IAM instance profile, and so on.
async with session.client("s3") as s3_client:
driver = S3StorageDriver(
client=new_aioboto3_client(s3_client),
bucket="my-temporal-payloads",
)

client = await Client.connect(
"localhost:7233",
data_converter=dataclasses.replace(
DataConverter.default,
external_storage=ExternalStorage(drivers=[driver]),
),
)
```

### Custom S3 client implementations

To use a different S3 library, subclass `S3StorageDriverClient` and implement `put_object`, `get_object`, and `object_exists`. The ABC has no external dependencies, so no AWS packages are required to import it.

```python
from temporalio.contrib.aws.s3driver import S3StorageDriverClient

class MyS3Client(S3StorageDriverClient):
async def put_object(self, *, bucket: str, key: str, data: bytes) -> None: ...
async def object_exists(self, *, bucket: str, key: str) -> bool: ...
async def get_object(self, *, bucket: str, key: str) -> bytes: ...

driver = S3StorageDriver(client=MyS3Client(), bucket="my-temporal-payloads")
```

### Key structure

Payloads are stored under content-addressable keys derived from a SHA-256 hash of the serialized payload bytes, segmented by namespace and workflow/activity identifiers when serialization context is available, e.g.:

v0/ns/my-namespace/wfi/my-workflow-id/d/sha256/<hash>

### Notes

* Any driver used to store payloads must also be configured on the component that retrieves them. If the client stores workflow inputs using this driver, the worker must include it in its `ExternalStorage.drivers` list to retrieve them.
* The target S3 bucket must already exist; the driver will not create it.
* Identical serialized bytes within the same namespace and workflow (or activity) share the same S3 object — the key is content-addressable within that scope. The same bytes used across different workflows or namespaces produce distinct S3 objects because the key includes the namespace and workflow/activity identifiers.
* Only payloads at or above `ExternalStorage.payload_size_threshold` (default: 256 KiB) are offloaded; smaller payloads are stored inline. Set `ExternalStorage.payload_size_threshold` to `None` to offload every payload regardless of size.
* `S3StorageDriver.max_payload_size` (default: 50 MiB) sets a hard upper limit on the serialized size of any single payload. A `ValueError` is raised at store time if a payload exceeds this limit. Increase it if your workflows produce payloads larger than 50 MiB.
* Override `S3StorageDriver.driver_name` only when registering multiple `S3StorageDriver` instances with distinct configurations under the same `ExternalStorage.drivers` list.

### Dynamic Bucket Selection

To select the S3 bucket per payload, pass a callable as `bucket`:

```python
from temporalio.contrib.aws.s3driver import S3StorageDriver
from temporalio.contrib.aws.s3driver.aioboto3 import new_aioboto3_client

driver = S3StorageDriver(
client=new_aioboto3_client(s3_client),
bucket=lambda context, payload: (
"large-payloads" if payload.ByteSize() > 10 * 1024 * 1024 else "small-payloads"
),
)
```

### Required IAM permissions

The AWS credentials used by your S3 client must have the following S3 permissions on the target bucket and its objects:

```json
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject"
],
"Resource": "arn:aws:s3:::my-temporal-payloads/*"
}
```

`s3:PutObject` is required by components that store payloads (typically the Temporal client and worker sending workflow/activity inputs), and `s3:GetObject` is required by components that retrieve them (typically workers and clients reading results). Components that only retrieve payloads do not need `s3:PutObject`, and vice versa.
13 changes: 13 additions & 0 deletions temporalio/contrib/aws/s3driver/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Amazon S3 storage driver for Temporal external storage.

.. warning::
This API is experimental.
"""

from temporalio.contrib.aws.s3driver._client import S3StorageDriverClient
from temporalio.contrib.aws.s3driver._driver import S3StorageDriver

__all__ = [
"S3StorageDriverClient",
"S3StorageDriver",
]
32 changes: 32 additions & 0 deletions temporalio/contrib/aws/s3driver/_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""S3 storage driver client abstraction for the S3 storage driver.

.. warning::
This API is experimental.
"""

from __future__ import annotations

from abc import ABC, abstractmethod


class S3StorageDriverClient(ABC):
"""Abstract base class for S3 object operations.

Implementations must support ``put_object`` and ``get_object``. Multipart
upload handling (if needed) is an internal concern of each implementation.

.. warning::
This API is experimental.
"""

@abstractmethod
async def put_object(self, *, bucket: str, key: str, data: bytes) -> None:
"""Upload *data* to the given S3 *bucket* and *key*."""

@abstractmethod
async def object_exists(self, *, bucket: str, key: str) -> bool:
"""Return ``True`` if an object exists at the given *bucket* and *key*."""

@abstractmethod
async def get_object(self, *, bucket: str, key: str) -> bytes:
"""Download and return the bytes stored at the given S3 *bucket* and *key*."""
Loading
Loading