Experimental: AWS S3 storage driver#1388
Conversation
658d2e1 to
5d6cd7e
Compare
5d6cd7e to
66a3fea
Compare
| @@ -0,0 +1,13 @@ | |||
| """Amazon S3 storage driver for Temporal external payload storage.""" | |||
There was a problem hiding this comment.
Add experimental
| self, | ||
| client: S3StorageDriverClient, | ||
| bucket: str | Callable[[StorageDriverStoreContext, Payload], str], | ||
| driver_name: str | None = None, |
There was a problem hiding this comment.
Set default here and validate non-empty in init
| a bucket name. A callable allows dynamic per-payload bucket | ||
| selection. | ||
| driver_name: Name of this driver instance. Defaults to | ||
| ``"aws.s3driver"``. Override only when registering |
There was a problem hiding this comment.
Only need to override when registering multiple of the same type.
|
|
||
| digest_segments = f"/d/sha256/{hash_digest}" | ||
|
|
||
| key = f"v0{namespace_segments}{context_segments}{digest_segments}" |
There was a problem hiding this comment.
Validate that missing segments or invalid characters are sanitized.
There was a problem hiding this comment.
Especially for user provided information such as workflow ID and activity ID.
| This API is experimental. | ||
| """ | ||
|
|
||
| def __init__( |
There was a problem hiding this comment.
Extra config knobs:
There was a problem hiding this comment.
Disable hash check
There was a problem hiding this comment.
Disable object existence check
| def new_aioboto3_client(client: S3Client) -> Aioboto3StorageDriverClient: | ||
| """Create an :class:`S3StorageDriverClient` from an aioboto3 S3 client. | ||
|
|
||
| This is a convenience factory. Equivalent to ``Aioboto3StorageDriverClient(client)``. |
| async def object_exists(self, *, bucket: str, key: str) -> bool: | ||
| """Check existence via aioboto3's ``head_object``.""" | ||
| try: | ||
| await self._client.head_object(Bucket=bucket, Key=key) |
There was a problem hiding this comment.
Is there an exists method that doesn't raise?
| async with session.client("s3") as s3_client: | ||
| driver = S3StorageDriver(client=s3_client, bucket="my-temporal-payloads") |
There was a problem hiding this comment.
Fix to use new_aioboto3_client method
| ), | ||
| ) | ||
| session = aioboto3.Session() | ||
| async with session.client("s3") as s3_client: |
There was a problem hiding this comment.
use new_aioboto3_client method
| @@ -0,0 +1,454 @@ | |||
| """Worker integration tests for S3StorageDriver key structure. | |||
There was a problem hiding this comment.
Move to tests/contrib/aws/s3driver
What was changed
S3StorageDriverClientimplementation.aioboto3library.aws-s3extra to the temporalio package to assist in including the dependencies for theaioboto3client implementation.motoserver to simulate an S3 server.Why?
Provide a default driver for externally storing large payloads in AWS S3.
Checklist
Closes S3 External Storage Driver - Python #1390
How was this tested:
motoAWS S3 HTTP service emulator