-
Notifications
You must be signed in to change notification settings - Fork 216
Add initial extstore types #2900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
cconstable
wants to merge
12
commits into
master
Choose a base branch
from
extstore-foundation
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
d087573
Add initial extstore types.
cconstable 3d3a5b0
Synthesize extstore driver if only one driver is given with no select…
cconstable 0328e77
Remove nullable payloadSizeThreshold. 0 means store everything.
cconstable 660ac6b
Update extstore builder to use checkState instead of checkArguments.
cconstable 2109261
Require extstore drivers to give an explicit type. Relying on default…
cconstable 7eae7a1
Add equals and hash conformance for extstore types.
cconstable 96ec5a6
Add a convenience overload for setDrivers for varargs.
cconstable 1ee8e92
Add comments to storage driver info classes.
cconstable b4e7cc6
Make extstore threshold package level rather than private.
cconstable 594450a
Add extstore test for last-setDriver-wins scenarion and update doc co…
cconstable f434a51
Refactor storage driver context to be interface.
cconstable 407d9d0
rename ExternalStorage to ExternalStorageOptions.
cconstable File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
108 changes: 108 additions & 0 deletions
108
temporal-sdk/src/main/java/io/temporal/payload/storage/ExternalStorage.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import com.google.common.base.Preconditions; | ||
| import io.temporal.common.Experimental; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import javax.annotation.Nonnull; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| /** Configuration for offloading large payloads to external storage. */ | ||
| @Experimental | ||
| public final class ExternalStorageOptions { | ||
| static final int DEFAULT_PAYLOAD_SIZE_THRESHOLD = 256 * 1024; | ||
|
|
||
| public static Builder newBuilder() { | ||
| return new Builder(); | ||
| } | ||
|
|
||
| private final @Nonnull List<StorageDriver> drivers; | ||
| private final @Nonnull StorageDriverSelector driverSelector; | ||
| private final int payloadSizeThreshold; | ||
|
|
||
| private ExternalStorageOptions( | ||
| @Nonnull List<StorageDriver> drivers, | ||
| @Nonnull StorageDriverSelector driverSelector, | ||
| int payloadSizeThreshold) { | ||
| this.drivers = Collections.unmodifiableList(new ArrayList<>(drivers)); | ||
| this.driverSelector = driverSelector; | ||
| this.payloadSizeThreshold = payloadSizeThreshold; | ||
| } | ||
|
|
||
| @Nonnull | ||
| public List<StorageDriver> getDrivers() { | ||
| return drivers; | ||
| } | ||
|
|
||
| @Nonnull | ||
| public StorageDriverSelector getDriverSelector() { | ||
| return driverSelector; | ||
| } | ||
|
|
||
| /** | ||
| * Minimum payload size in bytes before external storage is considered. {@code 0} stores all | ||
| * payloads. Defaults to 256 KiB. | ||
| */ | ||
| public int getPayloadSizeThreshold() { | ||
| return payloadSizeThreshold; | ||
| } | ||
|
|
||
| public static final class Builder { | ||
| private List<StorageDriver> drivers = Collections.emptyList(); | ||
| private StorageDriverSelector driverSelector; | ||
| private int payloadSizeThreshold = ExternalStorageOptions.DEFAULT_PAYLOAD_SIZE_THRESHOLD; | ||
|
|
||
| private Builder() {} | ||
|
|
||
| /** | ||
| * At least one driver is required. When more than one is set, a selector is also required. If | ||
| * this is called multiple times, the last one wins and previous drivers are overwritten. | ||
| */ | ||
| public Builder setDrivers(@Nonnull List<StorageDriver> drivers) { | ||
|
cconstable marked this conversation as resolved.
|
||
| this.drivers = Objects.requireNonNull(drivers, "drivers"); | ||
| return this; | ||
| } | ||
|
|
||
| /** Convenience for registering a single driver; no selector is needed in this case. */ | ||
| public Builder setDriver(@Nonnull StorageDriver driver) { | ||
| return setDrivers(Collections.singletonList(Objects.requireNonNull(driver, "driver"))); | ||
| } | ||
|
|
||
| /** Required when more than one driver is registered; with a single driver it is optional. */ | ||
| public Builder setDriverSelector(@Nullable StorageDriverSelector driverSelector) { | ||
| this.driverSelector = driverSelector; | ||
| return this; | ||
| } | ||
|
|
||
| /** Set to {@code 0} to store all payloads. Defaults to 256 KiB. */ | ||
| public Builder setPayloadSizeThreshold(int payloadSizeThreshold) { | ||
| this.payloadSizeThreshold = payloadSizeThreshold; | ||
| return this; | ||
| } | ||
|
|
||
| public ExternalStorageOptions build() { | ||
| Preconditions.checkState(!drivers.isEmpty(), "At least one driver must be provided"); | ||
| Preconditions.checkState( | ||
| payloadSizeThreshold >= 0, "payloadSizeThreshold must be greater than or equal to zero"); | ||
| Set<String> names = new HashSet<>(); | ||
| for (StorageDriver driver : drivers) { | ||
| String name = driver.getName(); | ||
| Preconditions.checkState( | ||
| names.add(name), "Multiple drivers registered with name '%s'", name); | ||
| } | ||
| Preconditions.checkState( | ||
| drivers.size() == 1 || driverSelector != null, | ||
| "driverSelector must be specified when more than one driver is registered"); | ||
| StorageDriverSelector selector = driverSelector; | ||
| if (selector == null) { | ||
| StorageDriver driver = drivers.get(0); | ||
| selector = (context, payload) -> driver; | ||
| } | ||
| return new ExternalStorageOptions(drivers, selector, payloadSizeThreshold); | ||
| } | ||
| } | ||
| } | ||
108 changes: 108 additions & 0 deletions
108
temporal-sdk/src/main/java/io/temporal/payload/storage/ExternalStorageOptions.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import com.google.common.base.Preconditions; | ||
| import io.temporal.common.Experimental; | ||
| import java.util.ArrayList; | ||
| import java.util.Collections; | ||
| import java.util.HashSet; | ||
| import java.util.List; | ||
| import java.util.Objects; | ||
| import java.util.Set; | ||
| import javax.annotation.Nonnull; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| /** Configuration for offloading large payloads to external storage. */ | ||
| @Experimental | ||
| public final class ExternalStorageOptions { | ||
| static final int DEFAULT_PAYLOAD_SIZE_THRESHOLD = 256 * 1024; | ||
|
|
||
| public static Builder newBuilder() { | ||
| return new Builder(); | ||
| } | ||
|
|
||
| private final @Nonnull List<StorageDriver> drivers; | ||
| private final @Nonnull StorageDriverSelector driverSelector; | ||
| private final int payloadSizeThreshold; | ||
|
|
||
| private ExternalStorageOptions( | ||
| @Nonnull List<StorageDriver> drivers, | ||
| @Nonnull StorageDriverSelector driverSelector, | ||
| int payloadSizeThreshold) { | ||
| this.drivers = Collections.unmodifiableList(new ArrayList<>(drivers)); | ||
| this.driverSelector = driverSelector; | ||
| this.payloadSizeThreshold = payloadSizeThreshold; | ||
| } | ||
|
|
||
| @Nonnull | ||
| public List<StorageDriver> getDrivers() { | ||
| return drivers; | ||
| } | ||
|
|
||
| @Nonnull | ||
| public StorageDriverSelector getDriverSelector() { | ||
| return driverSelector; | ||
| } | ||
|
|
||
| /** | ||
| * Minimum payload size in bytes before external storage is considered. {@code 0} stores all | ||
| * payloads. Defaults to 256 KiB. | ||
| */ | ||
| public int getPayloadSizeThreshold() { | ||
| return payloadSizeThreshold; | ||
| } | ||
|
|
||
| public static final class Builder { | ||
| private List<StorageDriver> drivers = Collections.emptyList(); | ||
| private StorageDriverSelector driverSelector; | ||
| private int payloadSizeThreshold = ExternalStorageOptions.DEFAULT_PAYLOAD_SIZE_THRESHOLD; | ||
|
|
||
| private Builder() {} | ||
|
|
||
| /** | ||
| * At least one driver is required. When more than one is set, a selector is also required. If | ||
| * this is called multiple times, the last one wins and previous drivers are overwritten. | ||
| */ | ||
| public Builder setDrivers(@Nonnull List<StorageDriver> drivers) { | ||
| this.drivers = Objects.requireNonNull(drivers, "drivers"); | ||
| return this; | ||
| } | ||
|
|
||
| /** Convenience for registering a single driver; no selector is needed in this case. */ | ||
| public Builder setDriver(@Nonnull StorageDriver driver) { | ||
| return setDrivers(Collections.singletonList(Objects.requireNonNull(driver, "driver"))); | ||
| } | ||
|
|
||
| /** Required when more than one driver is registered; with a single driver it is optional. */ | ||
| public Builder setDriverSelector(@Nullable StorageDriverSelector driverSelector) { | ||
| this.driverSelector = driverSelector; | ||
| return this; | ||
| } | ||
|
|
||
| /** Set to {@code 0} to store all payloads. Defaults to 256 KiB. */ | ||
| public Builder setPayloadSizeThreshold(int payloadSizeThreshold) { | ||
| this.payloadSizeThreshold = payloadSizeThreshold; | ||
| return this; | ||
| } | ||
|
|
||
| public ExternalStorageOptions build() { | ||
| Preconditions.checkState(!drivers.isEmpty(), "At least one driver must be provided"); | ||
| Preconditions.checkState( | ||
| payloadSizeThreshold >= 0, "payloadSizeThreshold must be greater than or equal to zero"); | ||
| Set<String> names = new HashSet<>(); | ||
| for (StorageDriver driver : drivers) { | ||
| String name = driver.getName(); | ||
| Preconditions.checkState( | ||
| names.add(name), "Multiple drivers registered with name '%s'", name); | ||
| } | ||
| Preconditions.checkState( | ||
| drivers.size() == 1 || driverSelector != null, | ||
| "driverSelector must be specified when more than one driver is registered"); | ||
| StorageDriverSelector selector = driverSelector; | ||
| if (selector == null) { | ||
| StorageDriver driver = drivers.get(0); | ||
| selector = (context, payload) -> driver; | ||
| } | ||
| return new ExternalStorageOptions(drivers, selector, payloadSizeThreshold); | ||
| } | ||
| } | ||
| } |
44 changes: 44 additions & 0 deletions
44
temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriver.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import io.temporal.api.common.v1.Payload; | ||
| import io.temporal.common.Experimental; | ||
| import java.util.List; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import javax.annotation.Nonnull; | ||
|
|
||
| /** Stores and retrieves payloads in an external storage system. */ | ||
| @Experimental | ||
| public interface StorageDriver { | ||
| /** | ||
| * Name of this driver instance, unique among the drivers registered in a single {@link | ||
| * ExternalStorageOptions}. Used as the routing key recorded in a stored payload's reference and resolved | ||
| * back to this driver on retrieval. | ||
| */ | ||
| @Nonnull | ||
| String getName(); | ||
|
|
||
| /** | ||
| * Stable, implementation-level identifier for this driver, the same across all instances of the | ||
| * driver class and ideally across SDKs (e.g. {@code "aws.s3driver"}). Used for metrics and worker | ||
| * heartbeat reporting, so it must not be derived from anything that changes between versions or | ||
| * refactors. | ||
| */ | ||
| @Nonnull | ||
| String getType(); | ||
|
|
||
| /** | ||
| * Stores {@code payloads} and returns one {@link StorageDriverClaim} per payload, in the same | ||
| * order. The returned list must be the same length as {@code payloads}. | ||
| */ | ||
| @Nonnull | ||
| CompletableFuture<List<StorageDriverClaim>> store( | ||
| @Nonnull StorageDriverStoreContext context, @Nonnull List<Payload> payloads); | ||
|
|
||
| /** | ||
| * Retrieves the payloads identified by {@code claims} and returns one {@link Payload} per claim, | ||
| * in the same order. The returned list must be the same length as {@code claims}. | ||
| */ | ||
| @Nonnull | ||
| CompletableFuture<List<Payload>> retrieve( | ||
| @Nonnull StorageDriverRetrieveContext context, @Nonnull List<StorageDriverClaim> claims); | ||
| } |
76 changes: 76 additions & 0 deletions
76
temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverActivityInfo.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import io.temporal.common.Experimental; | ||
| import java.util.Objects; | ||
| import javax.annotation.Nonnull; | ||
| import javax.annotation.Nullable; | ||
|
|
||
| /** | ||
| * Identity of the activity a payload is being stored on behalf of. Provided to a {@link | ||
| * StorageDriver} via {@link StorageDriverStoreContext#getTarget()}. All fields except {@code | ||
| * namespace} are best-effort and may be {@code null} when not available at store time. | ||
| */ | ||
| @Experimental | ||
| public final class StorageDriverActivityInfo implements StorageDriverTargetInfo { | ||
| private final @Nonnull String namespace; | ||
| private final @Nullable String id; | ||
| private final @Nullable String runId; | ||
| private final @Nullable String type; | ||
|
|
||
| /** | ||
| * @param namespace the activity's namespace; must not be {@code null} | ||
| * @param id the activity ID, or {@code null} if not available | ||
| * @param runId the activity run ID (standalone activities), or {@code null} if not available | ||
| * @param type the activity type name, or {@code null} if not available | ||
| */ | ||
| public StorageDriverActivityInfo( | ||
| @Nonnull String namespace, | ||
| @Nullable String id, | ||
| @Nullable String runId, | ||
| @Nullable String type) { | ||
| this.namespace = Objects.requireNonNull(namespace, "namespace"); | ||
| this.id = id; | ||
| this.runId = runId; | ||
| this.type = type; | ||
| } | ||
|
|
||
| @Nonnull | ||
| public String getNamespace() { | ||
| return namespace; | ||
| } | ||
|
|
||
| @Nullable | ||
| public String getId() { | ||
| return id; | ||
| } | ||
|
|
||
| @Nullable | ||
| public String getRunId() { | ||
| return runId; | ||
| } | ||
|
|
||
| @Nullable | ||
| public String getType() { | ||
| return type; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) { | ||
| return true; | ||
| } | ||
| if (!(o instanceof StorageDriverActivityInfo)) { | ||
| return false; | ||
| } | ||
| StorageDriverActivityInfo that = (StorageDriverActivityInfo) o; | ||
| return namespace.equals(that.namespace) | ||
| && Objects.equals(id, that.id) | ||
| && Objects.equals(runId, that.runId) | ||
| && Objects.equals(type, that.type); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return Objects.hash(namespace, id, runId, type); | ||
| } | ||
| } |
44 changes: 44 additions & 0 deletions
44
temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverClaim.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import io.temporal.common.Experimental; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
| import javax.annotation.Nonnull; | ||
|
|
||
| /** | ||
| * Driver-defined reference to an externally stored payload, used to retrieve it later. | ||
| * | ||
| * @see StorageDriver | ||
| */ | ||
| @Experimental | ||
| public final class StorageDriverClaim { | ||
| private final @Nonnull Map<String, String> claimData; | ||
|
|
||
| public StorageDriverClaim(@Nonnull Map<String, String> claimData) { | ||
| this.claimData = | ||
| Collections.unmodifiableMap(new HashMap<>(Objects.requireNonNull(claimData, "claimData"))); | ||
| } | ||
|
|
||
| @Nonnull | ||
| public Map<String, String> getClaimData() { | ||
| return claimData; | ||
| } | ||
|
|
||
| @Override | ||
| public boolean equals(Object o) { | ||
| if (this == o) { | ||
| return true; | ||
| } | ||
| if (!(o instanceof StorageDriverClaim)) { | ||
| return false; | ||
| } | ||
| return claimData.equals(((StorageDriverClaim) o).claimData); | ||
| } | ||
|
|
||
| @Override | ||
| public int hashCode() { | ||
| return claimData.hashCode(); | ||
| } | ||
| } |
12 changes: 12 additions & 0 deletions
12
temporal-sdk/src/main/java/io/temporal/payload/storage/StorageDriverRetrieveContext.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| package io.temporal.payload.storage; | ||
|
|
||
| import io.temporal.common.Experimental; | ||
|
|
||
| /** | ||
| * Context passed to {@link StorageDriver#retrieve}. | ||
| * | ||
| * <p>Implemented by the SDK and passed to the driver. Driver authors do not implement this in | ||
| * production code, only when constructing instances for their own tests. | ||
| */ | ||
| @Experimental | ||
| public interface StorageDriverRetrieveContext {} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the package be located here? or should we consider something like
temporal.common.extstore? Here intemporal.payloadit's next to the codec which seems like the right place.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current
io.temporal.payload.storagemakes sense.