Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions contrib/temporal-payload-storage-s3/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
description = '''Temporal Java SDK External Storage Driver for AWS S3'''

ext {
awsSdkVersion = '2.31.0'
}

dependencies {
compileOnly project(':temporal-serviceclient')
compileOnly project(':temporal-sdk')

api platform("software.amazon.awssdk:bom:$awsSdkVersion")
api "software.amazon.awssdk:s3"

testImplementation project(':temporal-serviceclient')
testImplementation project(':temporal-sdk')
testImplementation "junit:junit:${junitVersion}"
testRuntimeOnly group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.temporal.payload.storage.s3;

import io.temporal.api.common.v1.Payload;
import io.temporal.common.Experimental;
import io.temporal.payload.storage.StorageDriverStoreContext;
import javax.annotation.Nonnull;

/**
* Resolves the target S3 bucket for a payload. Use {@link
* S3StorageDriver.Builder#setBucket(String)} for a fixed bucket, or supply a resolver via {@link
* S3StorageDriver.Builder#setBucketResolver(BucketResolver)} to choose a bucket per payload.
*/
@Experimental
@FunctionalInterface
public interface BucketResolver {
@Nonnull
String resolveBucket(@Nonnull StorageDriverStoreContext context, @Nonnull Payload payload);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.temporal.payload.storage.s3;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

final class CompletableFutures {
private CompletableFutures() {}

/**
* Returns a future that completes when all of the given futures complete, yielding a list of
* their results. If any future completes exceptionally, the returned future also completes
* exceptionally with the same exception. If the input list is empty, the returned future
* completes immediately with an empty list.
*
* @param <T>
* @param futures
* @return
*/
static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]))
.thenApply(
ignored -> {
List<T> results = new ArrayList<>(futures.size());
for (CompletableFuture<T> future : futures) {
results.add(future.join());
}
return results;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# AWS S3 Driver

Temporal's S3 Driver for External Storage. Uses the official [AWS S3 Java SDK](https://github.com/aws/aws-sdk-java-v2).

## Usage

Construct the S3 storage driver:

```java
import io.temporal.payload.storage.s3.S3AsyncClientAdapter;
import io.temporal.payload.storage.s3.S3StorageDriver;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;

S3AsyncClient s3Client =
S3AsyncClient.builder().region(Region.US_EAST_1).build();

S3StorageDriver driver =
S3StorageDriver.newBuilder()
.setClient(new S3AsyncClientAdapter(s3Client))
.setBucket("temporal-payloads")
.build();
```

Register the driver in external storage config:

```java
import io.temporal.payload.storage.ExternalStorage;

ExternalStorage externalStorage =
ExternalStorage.newBuilder()
.setDriver(driver)
.build();
```

Use `setBucketResolver(...)` instead of `setBucket(...)` when bucket selection must vary per
payload.

## S3 Storage Key Specification

All Temporal S3 drivers generate S3 keys in a consistent manner.

### Key format

Workflow key:
```text
v0/ns/{namespace}/wt/{workflow-type}/wi/{workflow-id}/ri/{run-id}/d/{hash-algorithm}/{hex-digest}
```

Activity key:
```text
v0/ns/{namespace}/at/{activity-type}/ai/{activity-id}/ri/{run-id}/d/{hash-algorithm}/{hex-digest}
```

Fallback key (unknown target):
```text
v0/d/{hash-algorithm}/{hex-digest}
```

- If no namespace, workflow, or activity information is available, the fallback is used.
- Dynamic path segments are percent-encoded (rules below).
- Missing values (including a missing `run-id`) are encoded as `null`.
- `hex-digest` is lower-case SHA-256 hex (64 characters).

### Percent-encoding rules

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are encoding rules that all temporal s3 drivers should conform to. If we have a better place to put information like this I will move it, otherwise I'll put something similar in the READMEs of the other SDKs.


1. Treat each key path component as UTF-8 bytes.
2. Leave ASCII letters and digits unescaped.
3. Leave the following ASCII characters unescaped: `- _ . ~ $ & + : = @`
4. Encode all other bytes as % followed by two uppercase hexadecimal digits.
5. Empty or null values are encoded as the literal string `null`.
6. This is path-segment escaping, not form encoding (`+` stays `+`).

### Examples

Workflow key example:

```text
input:
namespace=payments prod
workflow-type=ChargeWorkflow
workflow-id=order+123=abc
run-id=3f1d6c7a-8b2e-4f7a-9d0a-87a6f95e4d31
hash-algorithm=sha256
hex-digest=9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08

output:
v0/ns/payments%20prod/wt/ChargeWorkflow/wi/order+123=abc/ri/3f1d6c7a-8b2e-4f7a-9d0a-87a6f95e4d31/d/sha256/9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08
```

Activity key example:

```text
input:
namespace=payments prod
activity-type=Capture/Charge
activity-id=activity id+42
run-id=9e1d1fd9-2f8a-4c40-93e2-731f31b9268b
hash-algorithm=sha256
hex-digest=2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824

output:
v0/ns/payments%20prod/at/Capture%2FCharge/ai/activity%20id+42/ri/9e1d1fd9-2f8a-4c40-93e2-731f31b9268b/d/sha256/2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824
```

Fallback key example:

```text
input:
hash-algorithm=sha256
hex-digest=486ea46224d1bb4fb680f34f7c9ad96a8f24ec88be73ea8e5a6c65260e9cb8a7

output:
v0/d/sha256/486ea46224d1bb4fb680f34f7c9ad96a8f24ec88be73ea8e5a6c65260e9cb8a7
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package io.temporal.payload.storage.s3;

import io.temporal.common.Experimental;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import javax.annotation.Nonnull;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;

/**
* {@link S3Client} backed by the AWS SDK for Java v2 {@link S3AsyncClient}. The wrapped client must
* be configured with credentials and a region by the caller.
*/
@Experimental
public final class S3AsyncClientAdapter implements S3Client {
private final S3AsyncClient client;

public S3AsyncClientAdapter(@Nonnull S3AsyncClient client) {
this.client = Objects.requireNonNull(client, "client");
}

@Nonnull
@Override
public CompletableFuture<Void> putObject(
@Nonnull String bucket, @Nonnull String key, @Nonnull byte[] data) {
// fromBytesUnsafe avoids a defensive copy of data; the driver never mutates it after this call.
return client
.putObject(
PutObjectRequest.builder().bucket(bucket).key(key).build(),
AsyncRequestBody.fromBytesUnsafe(data))
.thenApply(response -> (Void) null);
}

@Nonnull
@Override
public CompletableFuture<Boolean> objectExists(@Nonnull String bucket, @Nonnull String key) {
return client
.headObject(HeadObjectRequest.builder().bucket(bucket).key(key).build())
.handle(
(response, ex) -> {
if (ex == null) {
return true;
}
Throwable cause =
(ex instanceof CompletionException && ex.getCause() != null) ? ex.getCause() : ex;
if (cause instanceof NoSuchKeyException) {
return false;
}
if (cause instanceof S3Exception && ((S3Exception) cause).statusCode() == 404) {
return false;
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new RuntimeException(cause);
});
}

@Nonnull
@Override
public CompletableFuture<byte[]> getObject(@Nonnull String bucket, @Nonnull String key) {
return client
.getObject(
GetObjectRequest.builder().bucket(bucket).key(key).build(),
AsyncResponseTransformer.toBytes())
// asByteArrayUnsafe avoids a copy; the driver only reads the bytes (hash + parse).
.thenApply(response -> response.asByteArrayUnsafe());
}

@Nonnull
@Override
public Map<String, String> describe() {
Region region = client.serviceClientConfiguration().region();
if (region == null) {
return Collections.emptyMap();
}
return Collections.singletonMap("client_region", region.id());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.temporal.payload.storage.s3;

import io.temporal.common.Experimental;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nonnull;

/** Interface for S3 {@link S3StorageDriver} operations: upload, existence check, and download. */
@Experimental
public interface S3Client {
/**
* Uploads {@code data} to the given {@code bucket} and {@code key}, overwriting any existing
* object at that key. Implementations must be safe to call concurrently for different keys.
*/
@Nonnull
CompletableFuture<Void> putObject(
@Nonnull String bucket, @Nonnull String key, @Nonnull byte[] data);

/**
* Reports whether an object exists at the given {@code bucket} and {@code key}. The future
* completes with {@code false} when the object is absent, and completes exceptionally when
* existence cannot be determined (e.g. a network or permission failure).
*/
@Nonnull
CompletableFuture<Boolean> objectExists(@Nonnull String bucket, @Nonnull String key);

/**
* Downloads the bytes stored at the given {@code bucket} and {@code key}. The future completes
* exceptionally if the object does not exist.
*/
@Nonnull
CompletableFuture<byte[]> getObject(@Nonnull String bucket, @Nonnull String key);

/**
* Diagnostic metadata about the client configuration, such as {@code {"client_region":
* "us-west-2"}}, that the driver appends to error messages. Returns an empty map by default.
*/
@Nonnull
default Map<String, String> describe() {
return Collections.emptyMap();
}
}
Loading
Loading