Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ informal introduction to the features and their implementation.
- [Custom Type Data Conversion](#custom-type-data-conversion)
- [External Storage](#external-storage)
- [Driver Selection](#driver-selection)
- [Built-in Drivers](#built-in-drivers)
- [Custom Drivers](#custom-drivers)
- [Workers](#workers)
- [Workflows](#workflows)
Expand Down Expand Up @@ -467,23 +468,32 @@ External storage allows large payloads to be offloaded to an external storage se

External storage is configured via the `external_storage` parameter on `DataConverter`. It should be configured on the `Client` both for clients of your workflow as well as on the worker -- anywhere large payloads may be uploaded or downloaded.

A `StorageDriver` handles uploading and downloading payloads. Temporal provides built-in drivers for common storage solutions, or you may customize one. Here's an example using our provided `InMemoryTestDriver`.
A `StorageDriver` handles uploading and downloading payloads. Temporal provides [built-in drivers](#built-in-drivers) for common storage solutions, or you may implement a [custom driver](#custom-drivers). Here's an example using the built-in `S3StorageDriver`.

```python
import aioboto3
import dataclasses
from temporalio.client import Client
from temporalio.client import Client, ClientConfig
from temporalio.contrib.aws.s3driver import S3StorageDriver
from temporalio.converter import DataConverter
from temporalio.converter import ExternalStorage
from types_aiobotocore_s3.client import S3Client

driver = InMemoryTestDriver()
client_config = ClientConfig.load_client_connect_config()

client = await Client.connect(
"localhost:7233",
data_converter=dataclasses.replace(
DataConverter.default,
external_storage=ExternalStorage(drivers=[driver]),
),
)
session = aioboto3.Session()
async with session.client("s3") as s3_client:
Comment thread
jmaeagle99 marked this conversation as resolved.
driver = S3StorageDriver(
client=s3_client,
bucket="my-bucket",
)
client = await Client.connect(
**client_config,
data_converter=dataclasses.replace(
DataConverter.default,
external_storage=ExternalStorage(drivers=[driver]),
),
)
```

Some things to note about external storage:
Expand Down Expand Up @@ -540,6 +550,10 @@ Some things to note about driver selection:
* Returning `None` from a selector leaves the payload stored inline in workflow history rather than offloading it.
* The driver instance returned by the selector must be one of the instances registered in `ExternalStorage.drivers`. If it is not, an error is raised.

###### Built-in Drivers

- **[S3 Storage Driver](temporalio/contrib/aws/s3driver/)**: ⚠️ **Experimental** ⚠️ Amazon S3 driver. Install dependencies with `pip install "temporalio[aws-s3]"`.

###### Custom Drivers

Implement `temporalio.converter.StorageDriver` to integrate with an external storage system:
Expand Down
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ opentelemetry = ["opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2"]
pydantic = ["pydantic>=2.0.0,<3"]
openai-agents = ["openai-agents>=0.3,<0.7", "mcp>=1.9.4, <2"]
google-adk = ["google-adk>=1.27.0,<2"]
aws-s3 = [
"aioboto3>=10.4.0",
"types-aioboto3[s3]>=10.4.0",
]

[project.urls]
Homepage = "https://github.com/temporalio/sdk-python"
Expand Down Expand Up @@ -64,6 +68,7 @@ dev = [
"openinference-instrumentation-google-adk>=0.1.8",
"googleapis-common-protos==1.70.0",
"pytest-rerunfailures>=16.1",
"moto[s3,server]>=5",
]

[tool.poe.tasks]
Expand Down
7 changes: 7 additions & 0 deletions temporalio/contrib/aws/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# AWS Integrations for Temporal Python SDK

This directory contains AWS service integrations for the Temporal Python SDK.

## Integrations

- **[S3 Storage Driver](s3driver/)**: ⚠️ **Experimental** ⚠️ Amazon S3 driver for [external storage](../../../README.md#external-storage). Install dependencies with `pip install "temporalio[aws-s3]"`.
81 changes: 81 additions & 0 deletions temporalio/contrib/aws/s3driver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# AWS Integration for Temporal Python SDK

> ⚠️ **This package is currently at an experimental release stage.** ⚠️

This package provides AWS integrations for the Temporal Python SDK, including an Amazon S3 driver for [external storage](../../../README.md#external-storage).

## Install Dependencies

python -m pip install "temporalio[aws-s3]"

## S3 Driver

`temporalio.contrib.aws.s3driver.S3StorageDriver` stores and retrieves Temporal payloads in Amazon S3. It requires an [`aioboto3`](https://github.com/terrycain/aioboto3) S3 client and a `bucket` — either a static name or a callable for dynamic per-payload selection.

```python
import aioboto3
import dataclasses
from temporalio.client import Client
from temporalio.contrib.aws.s3driver import S3StorageDriver
from temporalio.converter import DataConverter, ExternalStorage

session = aioboto3.Session()
# Credentials and region are resolved automatically from the standard AWS credential
# chain e.g. environment variables, ~/.aws/config, IAM instance profile, and so on.
async with session.client("s3") as s3_client:
driver = S3StorageDriver(client=s3_client, bucket="my-temporal-payloads")
Comment thread
jmaeagle99 marked this conversation as resolved.
Outdated

client = await Client.connect(
"localhost:7233",
data_converter=dataclasses.replace(
DataConverter.default,
external_storage=ExternalStorage(drivers=[driver]),
),
)
```

Payloads are stored under content-addressable keys derived from a SHA-256 hash of the serialized payload bytes, segmented by namespace and workflow/activity identifiers when serialization context is available, e.g.:

v0/ns/my-namespace/wfi/my-workflow-id/d/sha256/<hash>

Some things to note about the S3 driver:

* Any driver used to store payloads must also be configured on the component that retrieves them. If the client stores workflow inputs using this driver, the worker must include it in its `ExternalStorage.drivers` list to retrieve them.
* Credentials, region, endpoint, and other AWS settings are configured on the `aioboto3` client directly.
* The target S3 bucket must already exist; the driver will not create it.
* Identical serialized bytes within the same namespace and workflow (or activity) share the same S3 object — the key is content-addressable within that scope. The same bytes used across different workflows or namespaces produce distinct S3 objects because the key includes the namespace and workflow/activity identifiers.
* Only payloads at or above `ExternalStorage.payload_size_threshold` (default: 256 KiB) are offloaded; smaller payloads are stored inline. Set `payload_size_threshold=None` to offload every payload regardless of size.
* `max_payload_size` (default: 50 MiB) sets a hard upper limit on the serialized size of any single payload. A `ValueError` is raised at store time if a payload exceeds this limit. Increase it if your workflows produce payloads larger than 50 MiB.
* Override `driver_name` only when registering multiple `S3StorageDriver` instances with distinct configurations under the same `ExternalStorage.drivers` list.
Comment thread
jmaeagle99 marked this conversation as resolved.
Outdated

### Dynamic Bucket Selection

To select the S3 bucket per payload, pass a callable as `bucket`:

```python
from temporalio.contrib.aws.s3driver import S3StorageDriver

driver = S3StorageDriver(
client=s3_client,
bucket=lambda context, payload: (
"large-payloads" if payload.ByteSize() > 10 * 1024 * 1024 else "small-payloads"
),
)
```

### Required IAM permissions

The AWS credentials used by the `aioboto3` client must have the following S3 permissions on the target bucket and its objects:

```json
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject"
],
"Resource": "arn:aws:s3:::my-temporal-payloads/*"
}
```

`s3:PutObject` is required by components that store payloads (typically the Temporal client and worker sending workflow/activity inputs), and `s3:GetObject` is required by components that retrieve them (typically workers and clients reading results). Components that only retrieve payloads do not need `s3:PutObject`, and vice versa.
13 changes: 13 additions & 0 deletions temporalio/contrib/aws/s3driver/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
"""Amazon S3 storage driver for Temporal external payload storage."""
Comment thread
jmaeagle99 marked this conversation as resolved.
Outdated

from temporalio.contrib.aws.s3driver._client import (
S3StorageDriverClient,
new_aioboto3_client,
)
from temporalio.contrib.aws.s3driver._driver import S3StorageDriver

__all__ = [
"S3StorageDriverClient",
"S3StorageDriver",
"new_aioboto3_client",
]
92 changes: 92 additions & 0 deletions temporalio/contrib/aws/s3driver/_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""S3 storage driver client abstraction for the S3 storage driver.

.. warning::
This API is experimental.
"""

from __future__ import annotations

import io
from abc import ABC, abstractmethod

from botocore.exceptions import ClientError
from types_aiobotocore_s3.client import S3Client


class S3StorageDriverClient(ABC):
Comment thread
jmaeagle99 marked this conversation as resolved.
"""Abstract base class for S3 object operations.

Implementations must support ``put_object`` and ``get_object``. Multipart
upload handling (if needed) is an internal concern of each implementation.

.. warning::
This API is experimental.
"""

@abstractmethod
async def put_object(self, *, bucket: str, key: str, data: bytes) -> None:
"""Upload *data* to the given S3 *bucket* and *key*."""

@abstractmethod
async def object_exists(self, *, bucket: str, key: str) -> bool:
"""Return ``True`` if an object exists at the given *bucket* and *key*."""

@abstractmethod
async def get_object(self, *, bucket: str, key: str) -> bytes:
"""Download and return the bytes stored at the given S3 *bucket* and *key*."""


class Aioboto3StorageDriverClient(S3StorageDriverClient):
"""Adapter that wraps an aioboto3 S3 client as an :class:`S3StorageDriverClient`.

Internally delegates to ``upload_fileobj`` for uploads (which handles
multipart automatically for objects above the multipart threshold) and
``get_object`` for downloads.

.. warning::
This API is experimental.
"""

def __init__(self, client: S3Client) -> None:
"""Wrap an aioboto3 S3 client.

Args:
client: An aioboto3 S3 client, typically obtained from
``aioboto3.Session().client("s3")``.
"""
self._client = client

async def object_exists(self, *, bucket: str, key: str) -> bool:
"""Check existence via aioboto3's ``head_object``."""
try:
await self._client.head_object(Bucket=bucket, Key=key)
Comment thread
jmaeagle99 marked this conversation as resolved.
Outdated
return True
except ClientError as e:
# head_object returns 404 as a ClientError when the key doesn't exist.
if e.response.get("Error", {}).get("Code") == "404":
return False
raise

async def put_object(self, *, bucket: str, key: str, data: bytes) -> None:
"""Upload *data* via aioboto3's ``upload_fileobj``."""
# upload_fileobj is an aioboto3-specific method not in the
# types_aiobotocore_s3 stubs; it handles multipart automatically.
await self._client.upload_fileobj(io.BytesIO(data), bucket, key) # type: ignore[arg-type]

async def get_object(self, *, bucket: str, key: str) -> bytes:
"""Download bytes via aioboto3's ``get_object``."""
response = await self._client.get_object(Bucket=bucket, Key=key)
# StreamingBody.read() is untyped in aiobotocore, returns bytes at runtime.
return await response["Body"].read() # type: ignore[no-any-return]


def new_aioboto3_client(client: S3Client) -> Aioboto3StorageDriverClient:
Comment thread
jmaeagle99 marked this conversation as resolved.
Outdated
"""Create an :class:`S3StorageDriverClient` from an aioboto3 S3 client.

This is a convenience factory. Equivalent to ``Aioboto3StorageDriverClient(client)``.
Comment thread
jmaeagle99 marked this conversation as resolved.
Outdated

Args:
client: An aioboto3 S3 client, typically obtained from
``aioboto3.Session().client("s3")``.
"""
return Aioboto3StorageDriverClient(client)
Loading
Loading