diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index e958766a3..14580efb4 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -12,4 +12,5 @@ # For each one, we add the owning team, as well as # @temporalio/sdk, so the SDK team can continue to # manage repo-wide concerns -/contrib/temporal-spring-ai/ @temporalio/ai-sdk @temporalio/sdk +/contrib/temporal-spring-ai/ @temporalio/sdk @temporalio/ai-sdk +/contrib/temporal-workflowstreams/ @temporalio/sdk @temporalio/ai-sdk diff --git a/contrib/temporal-workflowstreams/README.md b/contrib/temporal-workflowstreams/README.md new file mode 100644 index 000000000..8af237d2f --- /dev/null +++ b/contrib/temporal-workflowstreams/README.md @@ -0,0 +1,168 @@ +# Workflow Streams + +A durable publish/subscribe log hosted inside a Temporal Workflow. + +External code (activities, starters, other processes) publishes messages to +named topics via **signals**; subscribers long-poll for new items via +**updates**; a **query** exposes the current offset. The stream is backed by +Temporal's durable execution, giving ordered, durable, exactly-once delivery +with client-side batching, publisher dedup, continue-as-new survival, +truncation, and ~1 MB response paging. + +It is well suited to durable event streams whose cost scales with durable +batches rather than message count. Each poll round-trip costs ~100 ms of +latency, so it is not intended for ultra-low-latency streaming. + +All APIs in this module are experimental and may change. + +## Workflow side + +Construct a `WorkflowStream` once in a `@WorkflowInit` constructor. The factory +registers the publish signal, poll update, and offset query handlers, and a +`@WorkflowInit` constructor runs before any handler dispatch, so polls and +offset queries arriving with the first workflow task (e.g. from +update-with-start) are accepted rather than rejected. + +```java +public class MyInput { + public int itemsProcessed; // your own workflow state + public WorkflowStreamState streamState; +} + +public class MyWorkflowImpl implements MyWorkflow { + private final WorkflowStream stream; + + @WorkflowInit + public MyWorkflowImpl(MyInput input) { + stream = WorkflowStream.newInstance(input.streamState); + } + + @Override + public void execute(MyInput input) { + // Optionally publish from workflow code: + stream.topic("events").publish("hello from the workflow"); + + // Run your workflow; the stream serves external publishers and subscribers + // for as long as the workflow is running. Block until your workflow's exit + // condition is met (here, a `done` flag set elsewhere, e.g. by a signal). + Workflow.await(() -> done); + } +} +``` + +Constructing the stream at the top of the workflow method also works — signals +received earlier are buffered by the SDK — but polls and offset queries are +rejected until the stream exists, so prefer `@WorkflowInit`. + +For workflows that use continue-as-new, the stream's log and offsets must be +carried across each boundary, since continue-as-new starts a fresh run with an +empty history. This is a round-trip with two halves: + +- **Capture** the state when rolling over. Instead of calling + `Workflow.continueAsNew` directly, call `stream.continueAsNew`. It drains + pollers, waits for in-flight handlers, snapshots the current stream state, and + hands it to your callback, which builds the argument list for the next run. + The callback is where you assemble the full input — carry forward your own + workflow state alongside the captured `state`: + + ```java + stream.continueAsNew(state -> { + MyInput next = new MyInput(); + next.itemsProcessed = itemsProcessed; // your own state, carried across the boundary + next.streamState = state; // the captured stream state + return new Object[] {next}; + }); + ``` + +- **Restore** it on the next run. That `MyInput` arrives as the next run's + input, and its `streamState` field is the value already passed to + `WorkflowStream.newInstance` in the example above. It is `null` on a fresh + start and non-null after a roll-over, so the stream rehydrates the log + automatically. + +The `WorkflowStreamState` field is what gives the captured stream state +somewhere to live between runs; the other fields on `MyInput` are your own and +are threaded through the same way. + +## Publishing (client side) + +From an activity, use `fromActivity` to target the parent workflow: + +```java +public void publishActivity() { + try (WorkflowStreamClient client = WorkflowStreamClient.fromActivity()) { + TopicHandle topic = client.topic("events"); + for (int i = 0; i < 100; i++) { + topic.publish("item " + i); + } + } // client.close() is called on completion, which flushes the remaining buffer +} +``` + +From a starter or any code with a `WorkflowClient`, use `newInstance` with an +explicit workflow ID: + +```java +try (WorkflowStreamClient client = WorkflowStreamClient.newInstance(workflowClient, workflowId)) { + client.topic("events").publish("from outside", /* forceFlush */ true); +} +``` + +Items are buffered and flushed automatically every batch interval (default 2s), +when the buffer reaches the max batch size, on `forceFlush`, on an explicit +`flush()`, or on `close()`. + +## Subscribing + +`subscribe` returns a blocking, single-use subscription driven on the consuming +thread: + +```java +SubscribeOptions options = SubscribeOptions.newBuilder() + .setTopics("events") // unset = all topics + .build(); +try (WorkflowStreamSubscription subscription = client.subscribe(options)) { + for (WorkflowStreamItem item : subscription) { + String value = + DefaultDataConverter.STANDARD_INSTANCE.fromPayload( + item.getPayload(), String.class, String.class); + System.out.printf("offset=%d topic=%s value=%s%n", item.getOffset(), item.getTopic(), value); + } +} +``` + +The subscription ends cleanly when the workflow reaches a terminal state, +automatically follows continue-as-new chains, and recovers from truncation by +restarting from the current base offset. `close()` stops it before the next +poll. + +Items carry the raw `io.temporal.api.common.v1.Payload`; decode at the call +site with your data converter. Offsets are **global** (across all topics), not +per-topic. + +## Options + +| Option | Default | Meaning | +| --- | --- | --- | +| `batchInterval` | 2s | Automatic flush interval | +| `maxBatchSize` | unset | Flush once the buffer reaches this size | +| `maxRetryDuration` | 10m | Max time to retry a failed flush before `FlushTimeoutException`. Must be < the workflow's publisher TTL (15m) to preserve exactly-once delivery | +| `payloadConverters` | standard set | Per-item serialization. Payload conversion only — the client's codec chain runs once on the envelope, never per item | +| `SubscribeOptions.pollCooldown` | 100ms | Min interval between polls | + +## Cross-language protocol + +The handler names (`WorkflowStreamConstants.PUBLISH_SIGNAL_NAME`, +`POLL_UPDATE_NAME`, `OFFSET_QUERY_NAME`), the JSON envelope field names, and +the per-item payload encoding (base64 of the serialized +`temporal.api.common.v1.Payload`) match other languages' packages +exactly, so a Java publisher or subscriber interoperates with a workflow +written in any of them and vice versa. The data converter codec chain +(encryption, compression) runs once on the signal/update envelope — never per +item — so payloads are not double-encoded. + +One Java-specific caveat: the protocol envelope types are serialized by the +workflow's and client's *configured* data converter. The default Jackson JSON +converter produces the wire-compatible snake_case field names (the types are +annotated with `@JsonProperty`); if you configure a non-Jackson JSON converter, +it must produce the same field names for cross-language interop. diff --git a/contrib/temporal-workflowstreams/build.gradle b/contrib/temporal-workflowstreams/build.gradle new file mode 100644 index 000000000..46a8ec9e6 --- /dev/null +++ b/contrib/temporal-workflowstreams/build.gradle @@ -0,0 +1,14 @@ +description = '''Temporal Workflow Streams: a durable, multi-topic pub/sub log hosted inside a workflow''' + +dependencies { + // this module shouldn't carry temporal-sdk with it, especially for situations when users may be using a shaded artifact + compileOnly project(':temporal-sdk') + // Jackson annotations lock the cross-SDK wire field names; provided at runtime by temporal-sdk + compileOnly "com.fasterxml.jackson.core:jackson-databind:${jacksonVersion}" + + testImplementation project(':temporal-sdk') + testImplementation project(':temporal-testing') + testImplementation "junit:junit:${junitVersion}" + + testRuntimeOnly group: 'ch.qos.logback', name: 'logback-classic', version: "${logbackVersion}" +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/FlushTimeoutException.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/FlushTimeoutException.java new file mode 100644 index 000000000..4d1b36a24 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/FlushTimeoutException.java @@ -0,0 +1,14 @@ +package io.temporal.workflowstreams; + +import io.temporal.common.Experimental; + +/** + * Thrown when a flush retry exceeds the client's max retry duration. The pending batch is dropped; + * if the signal had already been delivered the items are in the log, otherwise they are lost. + */ +@Experimental +public final class FlushTimeoutException extends RuntimeException { + public FlushTimeoutException(String message) { + super(message); + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/PollInput.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/PollInput.java new file mode 100644 index 000000000..373133047 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/PollInput.java @@ -0,0 +1,30 @@ +package io.temporal.workflowstreams; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.temporal.common.Experimental; +import java.util.ArrayList; +import java.util.List; + +/** + * The poll update payload: a request to long-poll for new items. + * + *

Field names are part of the cross-language wire protocol; this type must serialize to JSON + * with exactly these names. + */ +@Experimental +public final class PollInput { + /** Topics to filter on. Empty means all topics. */ + @JsonProperty("topics") + public List topics = new ArrayList<>(); + + /** Global offset to start from. Zero means the beginning. */ + @JsonProperty("from_offset") + public long fromOffset; + + public PollInput() {} + + public PollInput(List topics, long fromOffset) { + this.topics = topics; + this.fromOffset = fromOffset; + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/PollResult.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/PollResult.java new file mode 100644 index 000000000..5a1c0f0e5 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/PollResult.java @@ -0,0 +1,34 @@ +package io.temporal.workflowstreams; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.temporal.common.Experimental; +import java.util.ArrayList; +import java.util.List; + +/** + * The poll update response: items matching the poll request. When {@code more_ready} is true the + * response was truncated to stay within size limits and the subscriber should poll again + * immediately rather than applying a cooldown. + * + *

Field names are part of the cross-language wire protocol; this type must serialize to JSON + * with exactly these names. + */ +@Experimental +public final class PollResult { + @JsonProperty("items") + public List items = new ArrayList<>(); + + @JsonProperty("next_offset") + public long nextOffset; + + @JsonProperty("more_ready") + public boolean moreReady; + + public PollResult() {} + + public PollResult(List items, long nextOffset, boolean moreReady) { + this.items = items; + this.nextOffset = nextOffset; + this.moreReady = moreReady; + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/PublishEntry.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/PublishEntry.java new file mode 100644 index 000000000..f11b2c989 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/PublishEntry.java @@ -0,0 +1,27 @@ +package io.temporal.workflowstreams; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.temporal.common.Experimental; + +/** + * A single entry within a publish batch on the wire. {@code data} is a base64-encoded, serialized + * {@link io.temporal.api.common.v1.Payload}. + * + *

Field names are part of the cross-language wire protocol; this type must serialize to JSON + * with exactly these names. + */ +@Experimental +public final class PublishEntry { + @JsonProperty("topic") + public String topic; + + @JsonProperty("data") + public String data; + + public PublishEntry() {} + + public PublishEntry(String topic, String data) { + this.topic = topic; + this.data = data; + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/PublishInput.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/PublishInput.java new file mode 100644 index 000000000..f0c2c4063 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/PublishInput.java @@ -0,0 +1,32 @@ +package io.temporal.workflowstreams; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.temporal.common.Experimental; +import java.util.ArrayList; +import java.util.List; + +/** + * The publish signal payload carrying a batch of entries, along with the dedup fields. + * + *

Field names are part of the cross-language wire protocol; this type must serialize to JSON + * with exactly these names. + */ +@Experimental +public final class PublishInput { + @JsonProperty("items") + public List items = new ArrayList<>(); + + @JsonProperty("publisher_id") + public String publisherId = ""; + + @JsonProperty("sequence") + public long sequence; + + public PublishInput() {} + + public PublishInput(List items, String publisherId, long sequence) { + this.items = items; + this.publisherId = publisherId; + this.sequence = sequence; + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/SubscribeOptions.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/SubscribeOptions.java new file mode 100644 index 000000000..3c1bbc19d --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/SubscribeOptions.java @@ -0,0 +1,76 @@ +package io.temporal.workflowstreams; + +import io.temporal.common.Experimental; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** Options for {@link WorkflowStreamClient#subscribe}. */ +@Experimental +public final class SubscribeOptions { + public static Builder newBuilder() { + return new Builder(); + } + + public static SubscribeOptions getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final SubscribeOptions DEFAULT_INSTANCE = newBuilder().build(); + + private final List topics; + private final long fromOffset; + private final Duration pollCooldown; + + private SubscribeOptions(List topics, long fromOffset, Duration pollCooldown) { + this.topics = topics; + this.fromOffset = fromOffset; + this.pollCooldown = pollCooldown; + } + + public List getTopics() { + return topics; + } + + public long getFromOffset() { + return fromOffset; + } + + public Duration getPollCooldown() { + return pollCooldown; + } + + public static final class Builder { + private List topics = new ArrayList<>(); + private long fromOffset; + private Duration pollCooldown = WorkflowStreamConstants.DEFAULT_POLL_COOLDOWN; + + private Builder() {} + + /** Filters the subscription. Empty (the default) means all topics. */ + public Builder setTopics(String... topics) { + this.topics = new ArrayList<>(Arrays.asList(topics)); + return this; + } + + /** Global offset to start from. Zero (the default) means the beginning. */ + public Builder setFromOffset(long fromOffset) { + this.fromOffset = fromOffset; + return this; + } + + /** + * Minimum interval between polls when no more items are immediately ready. Default: 100 + * milliseconds. + */ + public Builder setPollCooldown(Duration pollCooldown) { + this.pollCooldown = pollCooldown; + return this; + } + + public SubscribeOptions build() { + return new SubscribeOptions(topics, fromOffset, pollCooldown); + } + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/TopicHandle.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/TopicHandle.java new file mode 100644 index 000000000..4c7df2993 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/TopicHandle.java @@ -0,0 +1,45 @@ +package io.temporal.workflowstreams; + +import io.temporal.common.Experimental; + +/** + * Publishes to and subscribes from a single topic. Obtained via {@link WorkflowStreamClient#topic}. + */ +@Experimental +public final class TopicHandle { + private final String name; + private final WorkflowStreamClient client; + + TopicHandle(String name, WorkflowStreamClient client) { + this.name = name; + this.client = client; + } + + /** Returns the topic name. */ + public String getName() { + return name; + } + + /** Buffers {@code value} for publishing on this topic. See {@link #publish(Object, boolean)}. */ + public void publish(Object value) { + publish(value, false); + } + + /** + * Buffers {@code value} for publishing on this topic. {@code value} goes through the client's + * payload converters at flush time; a pre-built {@link io.temporal.api.common.v1.Payload} + * bypasses conversion. Pass {@code forceFlush} to wake the publisher and send immediately. + */ + public void publish(Object value, boolean forceFlush) { + client.publishToTopic(name, value, forceFlush); + } + + /** + * Returns a subscription over items on this topic, starting at {@code fromOffset}. See {@link + * WorkflowStreamClient#subscribe}. + */ + public WorkflowStreamSubscription subscribe(long fromOffset) { + return client.subscribe( + SubscribeOptions.newBuilder().setTopics(name).setFromOffset(fromOffset).build()); + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WireItem.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WireItem.java new file mode 100644 index 000000000..b88dd90f8 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WireItem.java @@ -0,0 +1,31 @@ +package io.temporal.workflowstreams; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.temporal.common.Experimental; + +/** + * The wire representation of a stream item. {@code data} is a base64-encoded, serialized {@link + * io.temporal.api.common.v1.Payload}. + * + *

Field names are part of the cross-language wire protocol; this type must serialize to JSON + * with exactly these names. + */ +@Experimental +public final class WireItem { + @JsonProperty("topic") + public String topic; + + @JsonProperty("data") + public String data; + + @JsonProperty("offset") + public long offset; + + public WireItem() {} + + public WireItem(String topic, String data, long offset) { + this.topic = topic; + this.data = data; + this.offset = offset; + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStream.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStream.java new file mode 100644 index 000000000..8446ff006 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStream.java @@ -0,0 +1,314 @@ +package io.temporal.workflowstreams; + +import io.temporal.api.common.v1.Payload; +import io.temporal.common.Experimental; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.failure.ApplicationFailure; +import io.temporal.workflow.ContinueAsNewOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflowstreams.internal.PayloadWire; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import javax.annotation.Nullable; + +/** + * The workflow-side stream object: an append-only, multi-topic log served to external publishers + * (via signal), subscribers (via update), and offset queries (via query). + * + *

Construct it once with {@link #newInstance}, preferably in a {@link + * io.temporal.workflow.WorkflowInit} constructor: the factory registers all three handlers on the + * current workflow, and a {@code @WorkflowInit} constructor runs before any handler dispatch, so + * polls arriving with the first workflow task are accepted. Constructing it at the start of the + * workflow method also works — signals received earlier are buffered by the SDK — but polls and + * offset queries are rejected until the stream exists. + */ +@Experimental +public final class WorkflowStream { + + /** A single decoded log entry held in workflow memory. */ + private static final class InternalEntry { + final String topic; + final Payload payload; + + InternalEntry(String topic, Payload payload) { + this.topic = topic; + this.payload = payload; + } + } + + private final DataConverter dataConverter; + + private final List log = new ArrayList<>(); + private long baseOffset; + private final Map publisherSequences = new HashMap<>(); + private final Map publisherLastSeen = new HashMap<>(); + private boolean draining; + + private final Map topicHandles = new HashMap<>(); + + /** Constructs a stream with no prior state and default options. */ + public static WorkflowStream newInstance() { + return newInstance(null, WorkflowStreamOptions.getDefaultInstance()); + } + + /** + * Constructs a stream, restoring state carried across a continue-as-new boundary. {@code + * priorState} may be null. + */ + public static WorkflowStream newInstance(@Nullable WorkflowStreamState priorState) { + return newInstance(priorState, WorkflowStreamOptions.getDefaultInstance()); + } + + /** + * Constructs a {@code WorkflowStream} and registers its signal, update, and query handlers on the + * current workflow. Pass {@code priorState} (which may be null) to restore state carried across a + * continue-as-new boundary. + */ + public static WorkflowStream newInstance( + @Nullable WorkflowStreamState priorState, WorkflowStreamOptions options) { + return new WorkflowStream(priorState, options); + } + + private WorkflowStream(@Nullable WorkflowStreamState priorState, WorkflowStreamOptions options) { + // A converter built only from PayloadConverters is codec-free, so workflow-published + // items are never double-encoded against the worker's response codec. + if (options.getPayloadConverters().length > 0) { + this.dataConverter = new DefaultDataConverter(options.getPayloadConverters()); + } else { + this.dataConverter = DefaultDataConverter.STANDARD_INSTANCE; + } + + if (priorState != null) { + baseOffset = priorState.baseOffset; + if (priorState.log != null) { + for (WireItem item : priorState.log) { + log.add(new InternalEntry(item.topic, PayloadWire.decode(item.data))); + } + } + if (priorState.publisherSequences != null) { + publisherSequences.putAll(priorState.publisherSequences); + } + if (priorState.publisherLastSeen != null) { + publisherLastSeen.putAll(priorState.publisherLastSeen); + } + } + + Workflow.registerListener(new ListenerImpl()); + } + + /** + * Returns a handle for publishing to {@code name}. Repeated calls with the same name return the + * same handle. + */ + public WorkflowTopicHandle topic(String name) { + return topicHandles.computeIfAbsent(name, n -> new WorkflowTopicHandle(n, this)); + } + + /** Unblocks all waiting poll handlers and rejects new polls. Used before continue-as-new. */ + public void detachPollers() { + draining = true; + } + + /** + * Returns a serializable snapshot of stream state for continue-as-new. It drops per-publisher + * sequence tracking for publishers that have not sent a batch within {@code publisherTtl}. + */ + public WorkflowStreamState getState(Duration publisherTtl) { + double now = Workflow.currentTimeMillis() / 1000.0; + double ttlSeconds = publisherTtl.toMillis() / 1000.0; + + WorkflowStreamState state = new WorkflowStreamState(); + state.baseOffset = baseOffset; + for (Map.Entry e : publisherSequences.entrySet()) { + Double ts = publisherLastSeen.get(e.getKey()); + double lastSeen = ts == null ? 0 : ts; + if (now - lastSeen < ttlSeconds) { + state.publisherSequences.put(e.getKey(), e.getValue()); + state.publisherLastSeen.put(e.getKey(), lastSeen); + } + } + + for (InternalEntry entry : log) { + // Per-item offset is re-derivable from baseOffset + index on reload. + state.log.add(new WireItem(entry.topic, PayloadWire.encode(entry.payload), 0)); + } + return state; + } + + /** + * Drains pollers, waits for in-flight handlers to finish, captures stream state, and + * continues-as-new with the arguments built by {@code buildArgs}, so it can take a moment before + * the current run ends. {@code buildArgs} receives the post-detach stream state and returns the + * positional arguments for the new run; thread the {@link WorkflowStreamState} into your workflow + * input so the stream survives the rollover. + * + *

State is captured with the default 15-minute publisher TTL. For a custom TTL, use the manual + * recipe: {@link #detachPollers}, {@code Workflow.await(() -> + * Workflow.isEveryHandlerFinished())}, {@link #getState}, then {@link Workflow#continueAsNew}. + * + *

This method never returns. + */ + public void continueAsNew(Function buildArgs) { + continueAsNew(null, buildArgs); + } + + /** See {@link #continueAsNew(Function)}. */ + public void continueAsNew( + @Nullable ContinueAsNewOptions options, Function buildArgs) { + detachPollers(); + Workflow.await(() -> Workflow.isEveryHandlerFinished()); + WorkflowStreamState state = getState(WorkflowStreamConstants.DEFAULT_PUBLISHER_TTL); + Workflow.continueAsNew(options, buildArgs.apply(state)); + } + + /** + * Discards log entries before {@code upToOffset} and advances the base offset. After truncation, + * polls requesting an offset before the new base receive a {@code TruncatedOffset} error. + * + * @throws io.temporal.failure.ApplicationFailure a non-retryable {@code TruncateOutOfRange} + * failure if {@code upToOffset} is past the end of the log + */ + public void truncate(long upToOffset) { + long logIndex = upToOffset - baseOffset; + if (logIndex <= 0) { + return; + } + if (logIndex > log.size()) { + throw ApplicationFailure.newNonRetryableFailure( + String.format( + "cannot truncate to offset %d: only %d items exist", + upToOffset, baseOffset + log.size()), + WorkflowStreamConstants.ERROR_TYPE_TRUNCATE_OUT_OF_RANGE); + } + log.subList(0, (int) logIndex).clear(); + baseOffset = upToOffset; + } + + void publishToTopic(String topic, Object value) { + Payload payload; + if (value instanceof Payload) { + payload = (Payload) value; + } else { + payload = + dataConverter + .toPayload(value) + .orElseThrow( + () -> + new IllegalArgumentException( + "workflowstreams: no payload converter accepted the published value")); + } + log.add(new InternalEntry(topic, payload)); + } + + private class ListenerImpl implements WorkflowStreamListener { + @Override + public void publish(PublishInput input) { + if (input.publisherId != null && !input.publisherId.isEmpty()) { + Long lastSeq = publisherSequences.get(input.publisherId); + if (lastSeq != null && input.sequence <= lastSeq) { + return; // duplicate — skip + } + publisherSequences.put(input.publisherId, input.sequence); + publisherLastSeen.put(input.publisherId, Workflow.currentTimeMillis() / 1000.0); + } + if (input.items == null) { + return; + } + for (PublishEntry entry : input.items) { + Payload payload; + try { + payload = PayloadWire.decode(entry.data); + } catch (RuntimeException e) { + // A malformed entry would be a protocol violation; skip it rather than + // corrupting the log. + continue; + } + log.add(new InternalEntry(entry.topic, payload)); + } + } + + @Override + public void validatePoll(PollInput input) { + if (draining) { + throw ApplicationFailure.newNonRetryableFailure( + "workflow is draining for continue-as-new", + WorkflowStreamConstants.ERROR_TYPE_STREAM_DRAINING); + } + } + + @Override + public PollResult poll(PollInput input) { + // Wait until items at or after the requested offset are available, the requested + // offset has been truncated away, or the stream is draining. baseOffset can advance + // via truncate while we wait, so re-evaluate the requested position against the + // current baseOffset on every check rather than capturing it once up front — + // otherwise a truncation that passes the waiting offset leaves the condition + // permanently unsatisfiable. + boolean[] truncated = new boolean[1]; + Workflow.await( + () -> { + if (draining) { + return true; + } + if (input.fromOffset != 0 && input.fromOffset < baseOffset) { + // The subscriber's position was truncated, possibly while waiting. + truncated[0] = true; + return true; + } + // max clamps "from the beginning" to whatever is available. + long logOffset = Math.max(input.fromOffset - baseOffset, 0); + return log.size() > logOffset; + }); + if (truncated[0]) { + throw ApplicationFailure.newNonRetryableFailure( + String.format( + "requested offset %d has been truncated; current base offset is %d", + input.fromOffset, baseOffset), + WorkflowStreamConstants.ERROR_TYPE_TRUNCATED_OFFSET); + } + + long logOffset = Math.max(input.fromOffset - baseOffset, 0); + + Set topicSet = + input.topics == null || input.topics.isEmpty() ? null : new HashSet<>(input.topics); + + List wireItems = new ArrayList<>(); + int size = 0; + boolean moreReady = false; + long nextOffset = baseOffset + log.size(); + + for (long i = logOffset; i < log.size(); i++) { + InternalEntry entry = log.get((int) i); + if (topicSet != null && !topicSet.contains(entry.topic)) { + continue; + } + long globalOffset = baseOffset + i; + String encoded = PayloadWire.encode(entry.payload); + int itemSize = PayloadWire.wireSize(encoded, entry.topic); + if (size + itemSize > WorkflowStreamConstants.MAX_POLL_RESPONSE_BYTES + && !wireItems.isEmpty()) { + // Resume from this item on the next poll. + nextOffset = globalOffset; + moreReady = true; + break; + } + size += itemSize; + wireItems.add(new WireItem(entry.topic, encoded, globalOffset)); + } + + return new PollResult(wireItems, nextOffset, moreReady); + } + + @Override + public long offset() { + return baseOffset + log.size(); + } + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamClient.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamClient.java new file mode 100644 index 000000000..f709c758e --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamClient.java @@ -0,0 +1,148 @@ +package io.temporal.workflowstreams; + +import io.temporal.activity.Activity; +import io.temporal.activity.ActivityExecutionContext; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowStub; +import io.temporal.common.Experimental; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.workflowstreams.internal.StreamPublisher; +import java.util.HashMap; +import java.util.Map; + +/** + * Publishes to and subscribes from a workflow stream from external code (activities, starters, + * other processes). The publish path is owned by an internal publisher that batches buffered items + * and signals them to the workflow; the client itself holds the target workflow and the read + * (subscribe/query) surface. + * + *

Close the client (e.g. via try-with-resources) to guarantee a final flush of buffered items. + */ +@Experimental +public final class WorkflowStreamClient implements AutoCloseable { + private final WorkflowClient client; + private final String workflowId; + private final StreamPublisher publisher; + + private final Map topicHandles = new HashMap<>(); + + /** Creates a client targeting {@code workflowId} through the given Temporal client. */ + public static WorkflowStreamClient newInstance(WorkflowClient client, String workflowId) { + return newInstance(client, workflowId, WorkflowStreamClientOptions.getDefaultInstance()); + } + + /** + * Creates a client targeting {@code workflowId} through the given Temporal client. The returned + * client follows continue-as-new chains in {@link #subscribe}. + */ + public static WorkflowStreamClient newInstance( + WorkflowClient client, String workflowId, WorkflowStreamClientOptions options) { + return new WorkflowStreamClient(client, workflowId, options); + } + + /** See {@link #fromActivity(WorkflowStreamClientOptions)}. */ + public static WorkflowStreamClient fromActivity() { + return fromActivity(WorkflowStreamClientOptions.getDefaultInstance()); + } + + /** + * Creates a client targeting the current activity's parent workflow, using the activity's + * Temporal client. Must be called from an activity thread. + * + * @throws IllegalStateException if not called from an activity, or if the activity has no parent + * workflow; in the latter case use {@link #newInstance} with an explicit workflow ID + */ + public static WorkflowStreamClient fromActivity(WorkflowStreamClientOptions options) { + ActivityExecutionContext context = Activity.getExecutionContext(); + String workflowId = context.getInfo().getWorkflowId(); + if (workflowId == null || workflowId.isEmpty()) { + throw new IllegalStateException( + "workflowstreams: fromActivity requires an activity scheduled by a workflow; otherwise" + + " use newInstance with an explicit workflow ID"); + } + return newInstance(context.getWorkflowClient(), workflowId, options); + } + + private WorkflowStreamClient( + WorkflowClient client, String workflowId, WorkflowStreamClientOptions options) { + this.client = client; + this.workflowId = workflowId; + + // A converter built only from PayloadConverters is codec-free, so items are never + // double-encoded against the codec on the client's signal/update envelope. + DataConverter dataConverter; + if (options.getPayloadConverters().length > 0) { + dataConverter = new DefaultDataConverter(options.getPayloadConverters()); + } else { + dataConverter = DefaultDataConverter.STANDARD_INSTANCE; + } + + WorkflowStub stub = client.newUntypedWorkflowStub(workflowId); + this.publisher = + new StreamPublisher( + input -> stub.signal(WorkflowStreamConstants.PUBLISH_SIGNAL_NAME, input), + dataConverter, + options.getBatchInterval(), + options.getMaxBatchSize(), + options.getMaxRetryDuration()); + } + + /** + * Returns a handle for publishing to and subscribing from {@code name}. Repeated calls with the + * same name return the same handle. + */ + public synchronized TopicHandle topic(String name) { + return topicHandles.computeIfAbsent(name, n -> new TopicHandle(n, this)); + } + + /** + * Sends buffered (and pending) items and waits for server confirmation. Returns once the items + * buffered at call time have been signaled to the workflow and acknowledged. + * + * @throws FlushTimeoutException if a pending batch cannot be sent within the max retry duration + */ + public void flush() { + publisher.flush(); + } + + /** Queries the current global offset of the stream. */ + public long getOffset() { + return client + .newUntypedWorkflowStub(workflowId) + .query(WorkflowStreamConstants.OFFSET_QUERY_NAME, Long.class); + } + + /** + * Returns a subscription that long-polls for new items. Iterate with: + * + *

{@code
+   * try (WorkflowStreamSubscription subscription = streamClient.subscribe(options)) {
+   *   for (WorkflowStreamItem item : subscription) {
+   *     // use item
+   *   }
+   * }
+   * }
+ * + *

The iteration runs on the caller's thread. Each item carries the raw {@link + * io.temporal.api.common.v1.Payload}; decode it with your data converter. The subscription ends + * cleanly when the workflow reaches a terminal state, and automatically follows continue-as-new + * chains. + */ + public WorkflowStreamSubscription subscribe(SubscribeOptions options) { + return new WorkflowStreamSubscription(client, workflowId, options); + } + + /** + * Stops the background publisher and drains any remaining items, guaranteeing a final flush. It + * surfaces any deferred {@link FlushTimeoutException} from a prior background flush failure. + */ + @Override + public void close() { + publisher.close(); + } + + void publishToTopic(String topic, Object value, boolean forceFlush) { + publisher.publish(topic, value, forceFlush); + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamClientOptions.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamClientOptions.java new file mode 100644 index 000000000..96e5e570a --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamClientOptions.java @@ -0,0 +1,108 @@ +package io.temporal.workflowstreams; + +import io.temporal.common.Experimental; +import io.temporal.common.converter.PayloadConverter; +import java.time.Duration; + +/** Options for constructing a {@link WorkflowStreamClient}. */ +@Experimental +public final class WorkflowStreamClientOptions { + public static Builder newBuilder() { + return new Builder(); + } + + public static WorkflowStreamClientOptions getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final WorkflowStreamClientOptions DEFAULT_INSTANCE = newBuilder().build(); + + private final Duration batchInterval; + private final int maxBatchSize; + private final Duration maxRetryDuration; + private final PayloadConverter[] payloadConverters; + + private WorkflowStreamClientOptions( + Duration batchInterval, + int maxBatchSize, + Duration maxRetryDuration, + PayloadConverter[] payloadConverters) { + this.batchInterval = batchInterval; + this.maxBatchSize = maxBatchSize; + this.maxRetryDuration = maxRetryDuration; + this.payloadConverters = payloadConverters; + } + + public Duration getBatchInterval() { + return batchInterval; + } + + public int getMaxBatchSize() { + return maxBatchSize; + } + + public Duration getMaxRetryDuration() { + return maxRetryDuration; + } + + public PayloadConverter[] getPayloadConverters() { + return payloadConverters; + } + + public static final class Builder { + private Duration batchInterval = WorkflowStreamConstants.DEFAULT_BATCH_INTERVAL; + private int maxBatchSize; + private Duration maxRetryDuration = WorkflowStreamConstants.DEFAULT_MAX_RETRY_DURATION; + private PayloadConverter[] payloadConverters = new PayloadConverter[0]; + + private Builder() {} + + /** Interval between automatic flushes. Default: 2 seconds. */ + public Builder setBatchInterval(Duration batchInterval) { + this.batchInterval = batchInterval; + return this; + } + + /** + * Triggers a flush once the buffer reaches this many items. Zero (the default) disables + * size-based flushing. + */ + public Builder setMaxBatchSize(int maxBatchSize) { + this.maxBatchSize = maxBatchSize; + return this; + } + + /** + * Maximum time to retry a failed flush before surfacing a {@link FlushTimeoutException}. Must + * be less than the workflow's publisher TTL (default 15 minutes) to preserve exactly-once + * delivery. Default: 10 minutes. + */ + public Builder setMaxRetryDuration(Duration maxRetryDuration) { + this.maxRetryDuration = maxRetryDuration; + return this; + } + + /** + * Customizes how published values are serialized into the per-item Payloads carried inside each + * batch. They are combined into a {@link io.temporal.common.converter.DefaultDataConverter} in + * the order given, so the last one should be a catch-all such as a JSON converter. + * + *

Only payload conversion happens here — never a payload codec (encryption, compression). + * The codec chain configured on the Temporal client runs once on the signal/update envelope + * that carries each batch, so encoding items here too would double-encode them; the {@code + * PayloadConverter[]} type makes that mistake impossible. To decode subscribed items, use a + * converter built from the same payload converters. + * + *

Default: the standard converter set. + */ + public Builder setPayloadConverters(PayloadConverter... payloadConverters) { + this.payloadConverters = payloadConverters; + return this; + } + + public WorkflowStreamClientOptions build() { + return new WorkflowStreamClientOptions( + batchInterval, maxBatchSize, maxRetryDuration, payloadConverters); + } + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamConstants.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamConstants.java new file mode 100644 index 000000000..4245b9de6 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamConstants.java @@ -0,0 +1,55 @@ +package io.temporal.workflowstreams; + +import io.temporal.common.Experimental; +import java.time.Duration; + +/** + * Fixed handler names and error types of the workflow streams wire protocol. These are part of the + * cross-language contract and match the Go, Python, and TypeScript packages exactly. The Java SDK + * normally reserves the {@code __temporal_} prefix, but explicitly permits the {@code + * __temporal_workflow_stream_} sub-namespace for this package. + */ +@Experimental +public final class WorkflowStreamConstants { + /** Signal external publishers send to append a batch of items to the stream. */ + public static final String PUBLISH_SIGNAL_NAME = "__temporal_workflow_stream_publish"; + + /** Update subscribers send to long-poll for new items. */ + public static final String POLL_UPDATE_NAME = "__temporal_workflow_stream_poll"; + + /** Query that returns the current global offset. */ + public static final String OFFSET_QUERY_NAME = "__temporal_workflow_stream_offset"; + + /** + * ApplicationFailure type returned by the poll update when the requested offset has already been + * truncated. + */ + public static final String ERROR_TYPE_TRUNCATED_OFFSET = "TruncatedOffset"; + + /** + * ApplicationFailure type thrown by {@link WorkflowStream#truncate} when the requested offset is + * past the end of the log. + */ + public static final String ERROR_TYPE_TRUNCATE_OUT_OF_RANGE = "TruncateOutOfRange"; + + /** + * ApplicationFailure type the poll update's validator returns while the stream is detaching for + * continue-as-new. It tells a subscriber the rollover is in progress so it retries (rather than + * surfacing an error) until the poll lands on the successor run. + */ + public static final String ERROR_TYPE_STREAM_DRAINING = "StreamDraining"; + + /** + * Caps the estimated wire size of a single poll response. Responses that would exceed this are + * truncated and signal {@code more_ready} so the subscriber pages through the remainder. + */ + static final int MAX_POLL_RESPONSE_BYTES = 1_000_000; + + // Default option values, matching the Go, Python, and TypeScript packages. + static final Duration DEFAULT_BATCH_INTERVAL = Duration.ofSeconds(2); + static final Duration DEFAULT_POLL_COOLDOWN = Duration.ofMillis(100); + static final Duration DEFAULT_PUBLISHER_TTL = Duration.ofMinutes(15); + static final Duration DEFAULT_MAX_RETRY_DURATION = Duration.ofMinutes(10); + + private WorkflowStreamConstants() {} +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamItem.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamItem.java new file mode 100644 index 000000000..cf611fe64 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamItem.java @@ -0,0 +1,35 @@ +package io.temporal.workflowstreams; + +import io.temporal.api.common.v1.Payload; +import io.temporal.common.Experimental; + +/** + * A single decoded item yielded by a subscription. {@code payload} is the raw {@link Payload}; + * decode it at the call site with a payload converter, e.g. {@code + * DefaultDataConverter.STANDARD_INSTANCE.fromPayload(item.getPayload(), String.class, + * String.class)}. + */ +@Experimental +public final class WorkflowStreamItem { + private final String topic; + private final Payload payload; + private final long offset; + + public WorkflowStreamItem(String topic, Payload payload, long offset) { + this.topic = topic; + this.payload = payload; + this.offset = offset; + } + + public String getTopic() { + return topic; + } + + public Payload getPayload() { + return payload; + } + + public long getOffset() { + return offset; + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamListener.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamListener.java new file mode 100644 index 000000000..e52f29036 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamListener.java @@ -0,0 +1,30 @@ +package io.temporal.workflowstreams; + +import io.temporal.common.Experimental; +import io.temporal.workflow.QueryMethod; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.UpdateMethod; +import io.temporal.workflow.UpdateValidatorMethod; + +/** + * The signal, update, and query handlers a {@link WorkflowStream} registers on its workflow via + * {@link io.temporal.workflow.Workflow#registerListener(Object)}. The handler names are part of the + * cross-language wire protocol. + * + *

This interface is public only because listener methods are invoked reflectively; user code + * should not implement it. Construct a {@link WorkflowStream} instead. + */ +@Experimental +public interface WorkflowStreamListener { + @SignalMethod(name = WorkflowStreamConstants.PUBLISH_SIGNAL_NAME) + void publish(PublishInput input); + + @UpdateMethod(name = WorkflowStreamConstants.POLL_UPDATE_NAME) + PollResult poll(PollInput input); + + @UpdateValidatorMethod(updateName = WorkflowStreamConstants.POLL_UPDATE_NAME) + void validatePoll(PollInput input); + + @QueryMethod(name = WorkflowStreamConstants.OFFSET_QUERY_NAME) + long offset(); +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamOptions.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamOptions.java new file mode 100644 index 000000000..de0a0c879 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamOptions.java @@ -0,0 +1,59 @@ +package io.temporal.workflowstreams; + +import io.temporal.common.Experimental; +import io.temporal.common.converter.PayloadConverter; + +/** Options for constructing a {@link WorkflowStream}. */ +@Experimental +public final class WorkflowStreamOptions { + public static Builder newBuilder() { + return new Builder(); + } + + public static WorkflowStreamOptions getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + private static final WorkflowStreamOptions DEFAULT_INSTANCE = newBuilder().build(); + + private final PayloadConverter[] payloadConverters; + + private WorkflowStreamOptions(PayloadConverter[] payloadConverters) { + this.payloadConverters = payloadConverters; + } + + public PayloadConverter[] getPayloadConverters() { + return payloadConverters; + } + + public static final class Builder { + private PayloadConverter[] payloadConverters = new PayloadConverter[0]; + + private Builder() {} + + /** + * Customizes how values published from workflow code (via {@link WorkflowTopicHandle#publish}) + * are serialized into per-item Payloads. They are combined into a {@link + * io.temporal.common.converter.DefaultDataConverter} in the order given, so the last one should + * be a catch-all such as a JSON converter. + * + *

As on the client side, only payload conversion happens here — never a payload codec. The + * worker's codec chain runs once on the poll-update response that carries each batch to + * subscribers, so encoding items here too would double-encode them; the {@code + * PayloadConverter[]} type makes that impossible. + * + *

There is no public accessor for the worker's configured data converter inside workflow + * code, so it cannot be picked up automatically; pass the matching payload converters here to + * keep workflow-side publishes consistent with the rest of your workflow. Default: the standard + * converter set. + */ + public Builder setPayloadConverters(PayloadConverter... payloadConverters) { + this.payloadConverters = payloadConverters; + return this; + } + + public WorkflowStreamOptions build() { + return new WorkflowStreamOptions(payloadConverters); + } + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamState.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamState.java new file mode 100644 index 000000000..da8039454 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamState.java @@ -0,0 +1,34 @@ +package io.temporal.workflowstreams; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.temporal.common.Experimental; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A serializable snapshot of stream state for continue-as-new. Thread a {@code WorkflowStreamState} + * field through your workflow input and pass it to {@link + * WorkflowStream#newInstance(WorkflowStreamState)}. + * + *

Field names are part of the cross-language wire protocol; this type must serialize to JSON + * with exactly these names. + */ +@Experimental +public final class WorkflowStreamState { + @JsonProperty("log") + public List log = new ArrayList<>(); + + @JsonProperty("base_offset") + public long baseOffset; + + @JsonProperty("publisher_sequences") + public Map publisherSequences = new HashMap<>(); + + /** Unix seconds of the last batch accepted from each publisher. */ + @JsonProperty("publisher_last_seen") + public Map publisherLastSeen = new HashMap<>(); + + public WorkflowStreamState() {} +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamSubscription.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamSubscription.java new file mode 100644 index 000000000..9e349e0bb --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowStreamSubscription.java @@ -0,0 +1,217 @@ +package io.temporal.workflowstreams; + +import io.temporal.api.enums.v1.WorkflowExecutionStatus; +import io.temporal.client.UpdateOptions; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowStub; +import io.temporal.client.WorkflowTargetOptions; +import io.temporal.client.WorkflowUpdateHandle; +import io.temporal.client.WorkflowUpdateStage; +import io.temporal.common.Experimental; +import io.temporal.failure.ApplicationFailure; +import io.temporal.workflowstreams.internal.PayloadWire; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * A blocking, single-use subscription over a workflow stream, driven on the consuming thread. Each + * poll long-polls the workflow's poll update; the subscription ends cleanly ({@code hasNext() == + * false}) when the workflow reaches a terminal state, and automatically follows continue-as-new + * chains. + * + *

{@link #close} stops the subscription before the next poll; a poll already blocked on the + * server is not interrupted. + */ +@Experimental +public final class WorkflowStreamSubscription + implements Iterator, Iterable, AutoCloseable { + private final WorkflowClient client; + private final String workflowId; + private final WorkflowStub latestRunStub; + private final List topics; + private final long pollCooldownMs; + + private final Deque queue = new ArrayDeque<>(); + private long offset; + private boolean done; + private volatile boolean closed; + private boolean cooldownBeforeNextPoll; + // The run the most recent poll's update was admitted to. Captured before waiting for the + // update's outcome so that, if that run continues-as-new mid-poll (failing the outcome), we + // still know which run to inspect to tell a rollover apart from a terminal end. + private String polledRunId = ""; + + WorkflowStreamSubscription(WorkflowClient client, String workflowId, SubscribeOptions options) { + this.client = client; + this.workflowId = workflowId; + this.latestRunStub = client.newUntypedWorkflowStub(workflowId); + this.topics = options.getTopics(); + this.offset = options.getFromOffset(); + this.pollCooldownMs = options.getPollCooldown().toMillis(); + } + + /** + * Returns this subscription; it is single-use, so iterate it at most once (typically with a + * for-each loop). + */ + @Override + public Iterator iterator() { + return this; + } + + /** + * Returns whether another item is available, long-polling the workflow until one is (or the + * stream ends). Blocks on the consuming thread. + */ + @Override + public boolean hasNext() { + while (queue.isEmpty() && !done) { + if (closed) { + done = true; + break; + } + pollOnce(); + } + return !queue.isEmpty(); + } + + @Override + public WorkflowStreamItem next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + return queue.poll(); + } + + /** Stops the subscription before the next poll. */ + @Override + public void close() { + closed = true; + } + + private void pollOnce() { + if (cooldownBeforeNextPoll) { + cooldownBeforeNextPoll = false; + if (!sleepCooldown()) { + return; + } + } + + try { + // Wait only for ACCEPTED so startUpdate returns the handle (and its run id) as soon + // as the update is admitted; getResult then waits for the outcome. With a COMPLETED + // wait stage a mid-poll continue-as-new would fail startUpdate without a handle, + // losing the run id. + UpdateOptions updateOptions = + UpdateOptions.newBuilder(PollResult.class) + .setUpdateName(WorkflowStreamConstants.POLL_UPDATE_NAME) + .setWaitForStage(WorkflowUpdateStage.ACCEPTED) + .build(); + WorkflowUpdateHandle handle = + latestRunStub.startUpdate(updateOptions, new PollInput(topics, offset)); + polledRunId = handle.getExecution().getRunId(); + PollResult result = handle.getResult(); + + for (WireItem item : result.items) { + queue.add(new WorkflowStreamItem(item.topic, PayloadWire.decode(item.data), item.offset)); + } + offset = result.nextOffset; + cooldownBeforeNextPoll = !result.moreReady; + } catch (RuntimeException e) { + handlePollError(e); + } + } + + private void handlePollError(RuntimeException e) { + ApplicationFailure failure = findApplicationFailure(e); + if (failure != null) { + if (WorkflowStreamConstants.ERROR_TYPE_TRUNCATED_OFFSET.equals(failure.getType())) { + // Fell behind truncation; restart from the beginning of whatever still exists. + offset = 0; + return; + } + if (WorkflowStreamConstants.ERROR_TYPE_STREAM_DRAINING.equals(failure.getType())) { + // The workflow is detaching for continue-as-new. Back off and retry; the poll + // lands on the successor run once the rollover completes (or the chain/terminal + // checks below fire on a genuine end). + sleepCooldown(); + return; + } + } + // The workflow may have continued-as-new or completed between polls. Follow the chain, + // exit cleanly on a terminal state, otherwise surface the error. + WorkflowExecutionStatus status = describePolledRun(); + if (status == WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW) { + // Subsequent polls use the latest-run stub, addressing the successor automatically. + return; + } + if (isTerminal(status)) { + done = true; + return; + } + throw e; + } + + /** + * Describes the run the most recent poll was admitted to: a rolled-over run is closed with status + * CONTINUED_AS_NEW, whereas the latest run would report RUNNING, so describing by run id is what + * makes the rollover check fire. The successor run id is not needed — subsequent polls address + * the latest run automatically. A blank run id (no poll has been admitted yet) falls back to + * describing the latest run. + */ + private WorkflowExecutionStatus describePolledRun() { + try { + WorkflowStub stub; + if (polledRunId == null || polledRunId.isEmpty()) { + stub = latestRunStub; + } else { + stub = + client.newUntypedWorkflowStub( + WorkflowTargetOptions.newBuilder() + .setWorkflowId(workflowId) + .setRunId(polledRunId) + .build()); + } + return stub.describe().getStatus(); + } catch (RuntimeException e) { + return WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED; + } + } + + private static boolean isTerminal(WorkflowExecutionStatus status) { + switch (status) { + case WORKFLOW_EXECUTION_STATUS_COMPLETED: + case WORKFLOW_EXECUTION_STATUS_FAILED: + case WORKFLOW_EXECUTION_STATUS_CANCELED: + case WORKFLOW_EXECUTION_STATUS_TERMINATED: + case WORKFLOW_EXECUTION_STATUS_TIMED_OUT: + return true; + default: + return false; + } + } + + private static ApplicationFailure findApplicationFailure(Throwable e) { + for (Throwable t = e; t != null; t = t.getCause()) { + if (t instanceof ApplicationFailure) { + return (ApplicationFailure) t; + } + } + return null; + } + + /** Returns false if interrupted, after marking the subscription done. */ + private boolean sleepCooldown() { + try { + Thread.sleep(pollCooldownMs); + return true; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + done = true; + return false; + } + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowTopicHandle.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowTopicHandle.java new file mode 100644 index 000000000..b447b6ce3 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/WorkflowTopicHandle.java @@ -0,0 +1,29 @@ +package io.temporal.workflowstreams; + +import io.temporal.common.Experimental; + +/** Publishes to a single topic from workflow code. Obtained via {@link WorkflowStream#topic}. */ +@Experimental +public final class WorkflowTopicHandle { + private final String name; + private final WorkflowStream stream; + + WorkflowTopicHandle(String name, WorkflowStream stream) { + this.name = name; + this.stream = stream; + } + + /** Returns the topic name. */ + public String getName() { + return name; + } + + /** + * Appends {@code value} to the stream on this topic. {@code value} is serialized by the stream's + * payload converters (see {@link WorkflowStreamOptions.Builder#setPayloadConverters}), defaulting + * to the standard set; a pre-built {@link io.temporal.api.common.v1.Payload} bypasses conversion. + */ + public void publish(Object value) { + stream.publishToTopic(name, value); + } +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/internal/PayloadWire.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/internal/PayloadWire.java new file mode 100644 index 000000000..ae6202c13 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/internal/PayloadWire.java @@ -0,0 +1,40 @@ +package io.temporal.workflowstreams.internal; + +import com.google.protobuf.InvalidProtocolBufferException; +import io.temporal.api.common.v1.Payload; +import java.util.Base64; + +/** + * Encodes and decodes the base64-of-proto per-item wire format shared across the Go, Python, and + * TypeScript workflow streams packages. Internal to the workflow streams module. + */ +public final class PayloadWire { + /** Encodes a Payload to the base64-of-proto wire format. */ + public static String encode(Payload payload) { + return Base64.getEncoder().encodeToString(payload.toByteArray()); + } + + /** + * Decodes the base64-of-proto wire format back to a Payload. + * + * @throws IllegalArgumentException if the input is not valid base64 or not a valid Payload + */ + public static Payload decode(String wire) { + byte[] bytes = Base64.getDecoder().decode(wire); + try { + return Payload.parseFrom(bytes); + } catch (InvalidProtocolBufferException e) { + throw new IllegalArgumentException("workflowstreams: unmarshal payload", e); + } + } + + /** + * Estimates the contribution of a single encoded item to a poll response. {@code encoded} is + * already base64 (its on-wire representation). + */ + public static int wireSize(String encoded, String topic) { + return encoded.length() + topic.length(); + } + + private PayloadWire() {} +} diff --git a/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/internal/StreamPublisher.java b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/internal/StreamPublisher.java new file mode 100644 index 000000000..eef174c0d --- /dev/null +++ b/contrib/temporal-workflowstreams/src/main/java/io/temporal/workflowstreams/internal/StreamPublisher.java @@ -0,0 +1,280 @@ +package io.temporal.workflowstreams.internal; + +import io.temporal.api.common.v1.Payload; +import io.temporal.common.converter.DataConverter; +import io.temporal.workflowstreams.FlushTimeoutException; +import io.temporal.workflowstreams.PublishEntry; +import io.temporal.workflowstreams.PublishInput; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Owns the client-side publish path: it buffers published values, batches them, and sends each + * batch to the workflow via the injected signal function. It assigns the per-publisher dedup key (a + * stable publisher ID plus a monotonic sequence advanced only on a confirmed send) so the workflow + * can drop duplicates, and it retries a failed batch until the max retry duration elapses. + * + *

The signal function is injected (rather than holding a client) so the publish path can be + * exercised in isolation. Internal to the workflow streams module. + */ +public final class StreamPublisher { + + /** Sends a publish signal to the target workflow. Throws on delivery failure. */ + @FunctionalInterface + public interface SignalFunction { + void send(PublishInput input); + } + + private static final class BufItem { + + final String topic; + final Object value; + + BufItem(String topic, Object value) { + this.topic = topic; + this.value = value; + } + } + + private final SignalFunction signal; + private final DataConverter dataConverter; + private final String publisherId; + private final long batchIntervalMs; + private final int maxBatchSize; + private final long maxRetryDurationMs; + + private final Object stateLock = new Object(); + private List buffer = new ArrayList<>(); + private List pending; + private long pendingSeq; + private long sequence; + private long pendingStartNanos; + private boolean started; + private boolean closed; + private FlushTimeoutException deferredError; + private ScheduledExecutorService scheduler; + + /** Serializes doFlush so concurrent callers send sequentially. */ + private final Object flushLock = new Object(); + + public StreamPublisher( + SignalFunction signal, + DataConverter dataConverter, + Duration batchInterval, + int maxBatchSize, + Duration maxRetryDuration) { + this.signal = signal; + this.dataConverter = dataConverter; + this.publisherId = UUID.randomUUID().toString().replace("-", "").substring(0, 16); + this.batchIntervalMs = batchInterval.toMillis(); + this.maxBatchSize = maxBatchSize; + this.maxRetryDurationMs = maxRetryDuration.toMillis(); + } + + /** + * Buffers a value and lazily starts the background flush loop. Triggers an immediate flush on + * {@code forceFlush} or once the buffer reaches the max batch size. + */ + public void publish(String topic, Object value, boolean forceFlush) { + boolean trigger; + ScheduledExecutorService toTrigger = null; + synchronized (stateLock) { + buffer.add(new BufItem(topic, value)); + trigger = forceFlush || (maxBatchSize > 0 && buffer.size() >= maxBatchSize); + if (!closed) { + ensureStartedLocked(); + toTrigger = scheduler; + } + } + if (trigger && toTrigger != null) { + toTrigger.execute(this::backgroundFlush); + } + } + + private void ensureStartedLocked() { + if (started || closed) { + return; + } + started = true; + scheduler = + Executors.newSingleThreadScheduledExecutor( + r -> { + Thread t = new Thread(r, "temporal-workflow-stream-publisher"); + t.setDaemon(true); + return t; + }); + scheduler.scheduleWithFixedDelay( + this::backgroundFlush, batchIntervalMs, batchIntervalMs, TimeUnit.MILLISECONDS); + } + + private void backgroundFlush() { + try { + doFlush(); + } catch (FlushTimeoutException e) { + // The pending batch was dropped and can't be recovered. Stash the error so + // flush/close surface it and stop the loop. + ScheduledExecutorService toStop; + synchronized (stateLock) { + deferredError = e; + toStop = scheduler; + } + if (toStop != null) { + toStop.shutdown(); + } + } catch (RuntimeException e) { + // Transient failure: pending stays set for retry on the next tick. + } + } + + /** + * Sends the pending batch (retry) or encodes and sends the buffer (new batch). Serialized so + * concurrent callers send sequentially. + */ + private void doFlush() { + synchronized (flushLock) { + List batch; + long seq; + + synchronized (stateLock) { + if (pending != null) { + if (System.nanoTime() - pendingStartNanos + > TimeUnit.MILLISECONDS.toNanos(maxRetryDurationMs)) { + // Advance the confirmed sequence so the next batch gets a fresh sequence + // number. Without this the next batch reuses pendingSeq, which the + // workflow may have already accepted — causing silent dedup (data loss). + sequence = pendingSeq; + pending = null; + pendingSeq = 0; + pendingStartNanos = 0; + throw new FlushTimeoutException( + String.format( + "workflowstreams: flush retry exceeded the max retry duration (%dms); pending" + + " batch dropped", + maxRetryDurationMs)); + } + batch = pending; + seq = pendingSeq; + } else if (!buffer.isEmpty()) { + // encodeBuffer may throw; the buffer is left intact for a later flush. + batch = encodeBuffer(buffer); + buffer = new ArrayList<>(); + seq = sequence + 1; + pending = batch; + pendingSeq = seq; + pendingStartNanos = System.nanoTime(); + } else { + return; + } + } + + // On failure the signal throws and pending stays set for retry. + signal.send(new PublishInput(batch, publisherId, seq)); + + synchronized (stateLock) { + sequence = seq; + pending = null; + pendingSeq = 0; + pendingStartNanos = 0; + } + } + } + + private List encodeBuffer(List items) { + List out = new ArrayList<>(items.size()); + for (BufItem item : items) { + Payload payload; + if (item.value instanceof Payload) { + payload = (Payload) item.value; + } else { + payload = + dataConverter + .toPayload(item.value) + .orElseThrow( + () -> + new IllegalArgumentException( + "workflowstreams: no payload converter accepted the published value")); + } + out.add(new PublishEntry(item.topic, PayloadWire.encode(payload))); + } + return out; + } + + /** + * Sends buffered (and pending) items and waits for confirmation. Returns once the items buffered + * at call time have been signaled and acknowledged. + * + * @throws FlushTimeoutException if a pending batch cannot be sent within the max retry duration + */ + public void flush() { + throwDeferred(); + + long targetSeq; + synchronized (stateLock) { + if (pending == null && buffer.isEmpty()) { + return; + } + long baseSeq = pending != null ? pendingSeq : sequence; + targetSeq = buffer.isEmpty() ? baseSeq : baseSeq + 1; + } + + while (true) { + synchronized (stateLock) { + if (sequence >= targetSeq) { + break; + } + } + doFlush(); + } + throwDeferred(); + } + + /** + * Stops the background flush loop and drains any remaining items, surfacing a deferred {@link + * FlushTimeoutException} from a prior background failure. + */ + public void close() { + ScheduledExecutorService toStop; + synchronized (stateLock) { + if (closed) { + return; + } + closed = true; + toStop = scheduler; + } + + if (toStop != null) { + toStop.shutdownNow(); + try { + toStop.awaitTermination(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + // Final drain: a single doFlush processes either pending OR the buffer. + while (true) { + synchronized (stateLock) { + if (pending == null && buffer.isEmpty()) { + break; + } + } + doFlush(); + } + throwDeferred(); + } + + private void throwDeferred() { + synchronized (stateLock) { + if (deferredError != null) { + FlushTimeoutException e = deferredError; + deferredError = null; + throw e; + } + } + } +} diff --git a/contrib/temporal-workflowstreams/src/test/java/io/temporal/workflowstreams/PayloadWireTest.java b/contrib/temporal-workflowstreams/src/test/java/io/temporal/workflowstreams/PayloadWireTest.java new file mode 100644 index 000000000..8115a21bc --- /dev/null +++ b/contrib/temporal-workflowstreams/src/test/java/io/temporal/workflowstreams/PayloadWireTest.java @@ -0,0 +1,75 @@ +package io.temporal.workflowstreams; + +import com.google.protobuf.ByteString; +import io.temporal.api.common.v1.Payload; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.workflowstreams.internal.PayloadWire; +import java.nio.charset.StandardCharsets; +import java.util.Base64; +import org.junit.Assert; +import org.junit.Test; + +public class PayloadWireTest { + private static final DataConverter DC = DefaultDataConverter.STANDARD_INSTANCE; + + @Test + public void testPayloadWireRoundTrip() { + Payload payload = DC.toPayload("hello").get(); + + String wire = PayloadWire.encode(payload); + Payload got = PayloadWire.decode(wire); + Assert.assertEquals(payload, got); + + // The decoded payload still carries its encoding metadata so a consumer can + // decode it back to the original value. + String s = DC.fromPayload(got, String.class, String.class); + Assert.assertEquals("hello", s); + } + + @Test + public void testPayloadWireFormatIsBase64OfProto() throws Exception { + Payload payload = + Payload.newBuilder() + .putMetadata("encoding", ByteString.copyFromUtf8("json/plain")) + .setData(ByteString.copyFromUtf8("\"hi\"")) + .build(); + String wire = PayloadWire.encode(payload); + + // Wire format is base64-of-marshaled-proto; decoding base64 then proto must + // reproduce the payload. This is the contract shared with the Go, Python, and + // TypeScript packages. + byte[] raw = Base64.getDecoder().decode(wire); + Payload decoded = Payload.parseFrom(raw); + Assert.assertEquals(payload, decoded); + } + + @Test + public void testDecodePayloadWireRejectsBadInput() { + try { + PayloadWire.decode("not valid base64!!!"); + Assert.fail("unreachable"); + } catch (IllegalArgumentException expected) { + } + } + + @Test + public void testBinaryPayloadRoundTrip() { + byte[] original = new byte[] {0x00, 0x01, (byte) 0xff}; + Payload payload = DC.toPayload(original).get(); + + String wire = PayloadWire.encode(payload); + Payload got = PayloadWire.decode(wire); + + byte[] b = DC.fromPayload(got, byte[].class, byte[].class); + Assert.assertArrayEquals(original, b); + } + + @Test + public void testWireSize() { + Payload payload = + Payload.newBuilder().setData(ByteString.copyFrom("x", StandardCharsets.UTF_8)).build(); + String wire = PayloadWire.encode(payload); + Assert.assertEquals(wire.length() + "topic".length(), PayloadWire.wireSize(wire, "topic")); + } +} diff --git a/contrib/temporal-workflowstreams/src/test/java/io/temporal/workflowstreams/StreamPublisherTest.java b/contrib/temporal-workflowstreams/src/test/java/io/temporal/workflowstreams/StreamPublisherTest.java new file mode 100644 index 000000000..b1d6d1945 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/test/java/io/temporal/workflowstreams/StreamPublisherTest.java @@ -0,0 +1,233 @@ +package io.temporal.workflowstreams; + +import io.temporal.api.common.v1.Payload; +import io.temporal.common.converter.ByteArrayPayloadConverter; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.common.converter.EncodingKeys; +import io.temporal.workflowstreams.internal.PayloadWire; +import io.temporal.workflowstreams.internal.StreamPublisher; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import org.junit.Assert; +import org.junit.Test; + +public class StreamPublisherTest { + private static final DataConverter DC = DefaultDataConverter.STANDARD_INSTANCE; + + /** Records sent batches; when {@code failure} is set, sending throws it instead. */ + private static class RecordingSignal implements StreamPublisher.SignalFunction { + final List signals = new ArrayList<>(); + volatile RuntimeException failure; + + @Override + public synchronized void send(PublishInput input) { + if (failure != null) { + throw failure; + } + signals.add(input); + } + + synchronized List recorded() { + return new ArrayList<>(signals); + } + } + + private static StreamPublisher newPublisher( + RecordingSignal signal, Duration batchInterval, int maxBatchSize, Duration maxRetry) { + return new StreamPublisher(signal, DC, batchInterval, maxBatchSize, maxRetry); + } + + private static StreamPublisher newPublisher(RecordingSignal signal) { + return newPublisher( + signal, Duration.ofSeconds(2), 0, WorkflowStreamConstants.DEFAULT_MAX_RETRY_DURATION); + } + + private static String decodeItem(PublishInput input, int index) { + Payload payload = PayloadWire.decode(input.items.get(index).data); + return DC.fromPayload(payload, String.class, String.class); + } + + private static void eventually(Duration timeout, Runnable assertion) throws InterruptedException { + long deadline = System.nanoTime() + timeout.toNanos(); + while (true) { + try { + assertion.run(); + return; + } catch (AssertionError e) { + if (System.nanoTime() > deadline) { + throw e; + } + Thread.sleep(5); + } + } + } + + @Test + public void testFlushSendsBufferedItems() { + RecordingSignal signal = new RecordingSignal(); + StreamPublisher publisher = newPublisher(signal); + publisher.publish("events", "a", false); + publisher.publish("events", "b", false); + + publisher.flush(); + + List signals = signal.recorded(); + Assert.assertEquals(1, signals.size()); + Assert.assertEquals(2, signals.get(0).items.size()); + Assert.assertEquals(1, signals.get(0).sequence); + Assert.assertFalse(signals.get(0).publisherId.isEmpty()); + Assert.assertEquals("a", decodeItem(signals.get(0), 0)); + Assert.assertEquals("b", decodeItem(signals.get(0), 1)); + + publisher.close(); + } + + /** + * Proves that the configured payload converters (not the default set) serialize each item. With + * only the byte-array converter, a byte[] round-trips but a string has no converter and fails — + * whereas the default set's JSON fallback would have accepted it. + */ + @Test + public void testPayloadConvertersDriveItemConversion() { + RecordingSignal signal = new RecordingSignal(); + StreamPublisher publisher = + new StreamPublisher( + signal, + new DefaultDataConverter(new ByteArrayPayloadConverter()), + Duration.ofSeconds(2), + 0, + WorkflowStreamConstants.DEFAULT_MAX_RETRY_DURATION); + + publisher.publish("events", "hi".getBytes(StandardCharsets.UTF_8), false); + publisher.flush(); + List signals = signal.recorded(); + Assert.assertEquals(1, signals.size()); + Payload payload = PayloadWire.decode(signals.get(0).items.get(0).data); + Assert.assertEquals( + "item must be serialized by the configured byte-array converter", + "binary/plain", + payload.getMetadataOrThrow(EncodingKeys.METADATA_ENCODING_KEY).toStringUtf8()); + publisher.close(); + + // A string is unconvertible under the byte-array-only set, so the flush fails — the + // default set's JSON fallback would have accepted it. Use a fresh publisher since the + // unconvertible item stays buffered after the error. + StreamPublisher publisher2 = + new StreamPublisher( + new RecordingSignal(), + new DefaultDataConverter(new ByteArrayPayloadConverter()), + Duration.ofSeconds(2), + 0, + WorkflowStreamConstants.DEFAULT_MAX_RETRY_DURATION); + publisher2.publish("events", "not-bytes", false); + try { + publisher2.flush(); + Assert.fail("unreachable"); + } catch (RuntimeException expected) { + } + } + + @Test + public void testFlushNoopWhenEmpty() { + RecordingSignal signal = new RecordingSignal(); + StreamPublisher publisher = newPublisher(signal); + publisher.flush(); + Assert.assertTrue(signal.recorded().isEmpty()); + } + + @Test + public void testSequenceAdvancesAcrossFlushes() { + RecordingSignal signal = new RecordingSignal(); + StreamPublisher publisher = newPublisher(signal); + + publisher.publish("t", "x", false); + publisher.flush(); + publisher.publish("t", "y", false); + publisher.flush(); + + List signals = signal.recorded(); + Assert.assertEquals(2, signals.size()); + Assert.assertEquals(1, signals.get(0).sequence); + Assert.assertEquals(2, signals.get(1).sequence); + Assert.assertEquals(signals.get(0).publisherId, signals.get(1).publisherId); + + publisher.close(); + } + + @Test + public void testMaxBatchSizeTriggersFlush() throws InterruptedException { + RecordingSignal signal = new RecordingSignal(); + // Long interval so only the size threshold can trigger a flush. + StreamPublisher publisher = + newPublisher( + signal, Duration.ofHours(1), 2, WorkflowStreamConstants.DEFAULT_MAX_RETRY_DURATION); + + publisher.publish("t", "a", false); + publisher.publish("t", "b", false); // reaches maxBatchSize -> flush + + eventually(Duration.ofSeconds(5), () -> Assert.assertEquals(1, signal.recorded().size())); + + publisher.close(); + } + + @Test + public void testCloseDrainsBuffer() { + RecordingSignal signal = new RecordingSignal(); + StreamPublisher publisher = + newPublisher( + signal, Duration.ofHours(1), 0, WorkflowStreamConstants.DEFAULT_MAX_RETRY_DURATION); + + publisher.publish("t", "a", false); + publisher.close(); + + List signals = signal.recorded(); + Assert.assertEquals(1, signals.size()); + Assert.assertEquals(1, signals.get(0).items.size()); + } + + @Test + public void testForceFlush() throws InterruptedException { + RecordingSignal signal = new RecordingSignal(); + StreamPublisher publisher = + newPublisher( + signal, Duration.ofHours(1), 0, WorkflowStreamConstants.DEFAULT_MAX_RETRY_DURATION); + + publisher.publish("t", "a", true); // forceFlush + + eventually(Duration.ofSeconds(5), () -> Assert.assertEquals(1, signal.recorded().size())); + + publisher.close(); + } + + @Test + public void testFlushTimeoutAfterMaxRetryDuration() throws InterruptedException { + RecordingSignal signal = new RecordingSignal(); + signal.failure = new RuntimeException("boom"); + StreamPublisher publisher = newPublisher(signal, Duration.ofHours(1), 0, Duration.ofMillis(1)); + + publisher.publish("t", "a", false); + + // The first flush sets pending and fails to send (transient "boom"). + try { + publisher.flush(); + Assert.fail("unreachable"); + } catch (RuntimeException e) { + Assert.assertEquals("boom", e.getMessage()); + } + + // Wait past the retry window with ample margin for coarse OS timer granularity. The + // next flush sees the window exceeded and throws FlushTimeoutException. + Thread.sleep(50); + + try { + publisher.flush(); + Assert.fail("unreachable"); + } catch (FlushTimeoutException expected) { + } + + publisher.close(); + } +} diff --git a/contrib/temporal-workflowstreams/src/test/java/io/temporal/workflowstreams/SubscribeTest.java b/contrib/temporal-workflowstreams/src/test/java/io/temporal/workflowstreams/SubscribeTest.java new file mode 100644 index 000000000..0e3e9fa9f --- /dev/null +++ b/contrib/temporal-workflowstreams/src/test/java/io/temporal/workflowstreams/SubscribeTest.java @@ -0,0 +1,244 @@ +package io.temporal.workflowstreams; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowStub; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.UpdateMethod; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class SubscribeTest { + private static final DataConverter DC = DefaultDataConverter.STANDARD_INSTANCE; + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder().setWorkflowTypes(SubscribeHostWorkflowImpl.class).build(); + + private static final SubscribeOptions FAST_POLL = + SubscribeOptions.newBuilder().setPollCooldown(Duration.ofMillis(50)).build(); + + private WorkflowStub startHostWorkflow() { + SubscribeHostWorkflow workflow = + testWorkflowRule.newWorkflowStubTimeoutOptions(SubscribeHostWorkflow.class); + WorkflowExecution execution = WorkflowClient.start(workflow::execute, null); + return testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub(execution.getWorkflowId()); + } + + private WorkflowStreamClient newStreamClient(WorkflowStub stub) { + return WorkflowStreamClient.newInstance( + testWorkflowRule.getWorkflowClient(), + stub.getExecution().getWorkflowId(), + WorkflowStreamClientOptions.newBuilder().setBatchInterval(Duration.ofMillis(100)).build()); + } + + private static String decode(WorkflowStreamItem item) { + return DC.fromPayload(item.getPayload(), String.class, String.class); + } + + @Test + public void testSubscribeDeliversItemsAndAdvancesOffset() { + WorkflowStub stub = startHostWorkflow(); + try (WorkflowStreamClient streamClient = newStreamClient(stub)) { + streamClient.topic("evt").publish("a", true); + streamClient.flush(); + // A workflow-side publish lands on the same log. + stub.signal("publishLocal", "evt", "b"); + + try (WorkflowStreamSubscription subscription = streamClient.subscribe(FAST_POLL)) { + Assert.assertTrue(subscription.hasNext()); + WorkflowStreamItem first = subscription.next(); + Assert.assertEquals("evt", first.getTopic()); + Assert.assertEquals("a", decode(first)); + Assert.assertEquals(0, first.getOffset()); + + Assert.assertTrue(subscription.hasNext()); + WorkflowStreamItem second = subscription.next(); + Assert.assertEquals("b", decode(second)); + Assert.assertEquals(1, second.getOffset()); + } + + Assert.assertEquals(2, streamClient.getOffset()); + } + stub.signal("finish"); + stub.getResult(Void.class); + } + + @Test + public void testTopicHandleSubscribeFilters() { + WorkflowStub stub = startHostWorkflow(); + try (WorkflowStreamClient streamClient = newStreamClient(stub)) { + streamClient.topic("a").publish("1"); + streamClient.topic("b").publish("2"); + streamClient.topic("a").publish("3"); + streamClient.flush(); + + try (WorkflowStreamSubscription subscription = streamClient.topic("a").subscribe(0)) { + Assert.assertEquals("1", decode(subscription.next())); + WorkflowStreamItem second = subscription.next(); + Assert.assertEquals("3", decode(second)); + Assert.assertEquals(2, second.getOffset()); + } + } + stub.signal("finish"); + stub.getResult(Void.class); + } + + @Test + public void testSubscribeEndsCleanlyOnTerminal() { + WorkflowStub stub = startHostWorkflow(); + try (WorkflowStreamClient streamClient = newStreamClient(stub)) { + streamClient.topic("evt").publish("a", true); + streamClient.flush(); + + try (WorkflowStreamSubscription subscription = streamClient.subscribe(FAST_POLL)) { + Assert.assertEquals("a", decode(subscription.next())); + + // Complete the workflow, then keep polling: the subscription must end cleanly + // rather than surface an error. + stub.signal("finish"); + stub.getResult(Void.class); + Assert.assertFalse( + "terminal workflow should end the stream without surfacing an error", + subscription.hasNext()); + } + } + } + + @Test + public void testSubscribeFollowsContinueAsNew() { + WorkflowStub stub = startHostWorkflow(); + try (WorkflowStreamClient streamClient = newStreamClient(stub)) { + streamClient.topic("evt").publish("a", true); + streamClient.flush(); + + try (WorkflowStreamSubscription subscription = streamClient.subscribe(FAST_POLL)) { + WorkflowStreamItem first = subscription.next(); + Assert.assertEquals("a", decode(first)); + Assert.assertEquals(0, first.getOffset()); + + // Roll the workflow over to a new run. The stream state (including item "a") + // is carried across the continue-as-new boundary. + stub.signal("rollover"); + streamClient.topic("evt").publish("b", true); + streamClient.flush(); + + // The subscription retries through the rollover (draining rejections, polls + // lost to the closing run) and picks up on the successor run where the prior + // log — and so the subscriber's offset — is preserved. + WorkflowStreamItem second = subscription.next(); + Assert.assertEquals("b", decode(second)); + Assert.assertEquals(1, second.getOffset()); + } + } + stub.signal("finish"); + stub.getResult(Void.class); + } + + @Test + public void testSubscribeTruncationResetsOffset() { + WorkflowStub stub = startHostWorkflow(); + try (WorkflowStreamClient streamClient = newStreamClient(stub)) { + streamClient.topic("evt").publish("a"); + streamClient.topic("evt").publish("b"); + streamClient.topic("evt").publish("c"); + streamClient.flush(); + // Confirm the batch has been applied before truncating. + try (WorkflowStreamSubscription warmup = streamClient.subscribe(FAST_POLL)) { + warmup.next(); + } + + stub.update("truncate", Void.class, 2L); + + // A subscription positioned before the new base offset restarts from the + // beginning of whatever still exists instead of failing. + SubscribeOptions options = + SubscribeOptions.newBuilder() + .setFromOffset(1) + .setPollCooldown(Duration.ofMillis(50)) + .build(); + try (WorkflowStreamSubscription subscription = streamClient.subscribe(options)) { + WorkflowStreamItem item = subscription.next(); + Assert.assertEquals("c", decode(item)); + Assert.assertEquals(2, item.getOffset()); + } + + Assert.assertEquals(3, streamClient.getOffset()); + } + stub.signal("finish"); + stub.getResult(Void.class); + } + + @Test + public void testCloseStopsIteration() { + WorkflowStub stub = startHostWorkflow(); + try (WorkflowStreamClient streamClient = newStreamClient(stub)) { + WorkflowStreamSubscription subscription = streamClient.subscribe(FAST_POLL); + subscription.close(); + Assert.assertFalse(subscription.hasNext()); + } + stub.signal("finish"); + stub.getResult(Void.class); + } + + @WorkflowInterface + public interface SubscribeHostWorkflow { + @WorkflowMethod + void execute(WorkflowStreamState priorState); + + @SignalMethod + void finish(); + + @SignalMethod + void rollover(); + + @SignalMethod + void publishLocal(String topic, String value); + + @UpdateMethod + void truncate(long upToOffset); + } + + public static class SubscribeHostWorkflowImpl implements SubscribeHostWorkflow { + private WorkflowStream stream; + private boolean finished; + private boolean rollover; + + @Override + public void execute(WorkflowStreamState priorState) { + stream = WorkflowStream.newInstance(priorState); + Workflow.await(() -> finished || rollover); + if (rollover) { + stream.continueAsNew(state -> new Object[] {state}); + } + } + + @Override + public void finish() { + finished = true; + } + + @Override + public void rollover() { + rollover = true; + } + + @Override + public void publishLocal(String topic, String value) { + stream.topic(topic).publish(value); + } + + @Override + public void truncate(long upToOffset) { + stream.truncate(upToOffset); + } + } +} diff --git a/contrib/temporal-workflowstreams/src/test/java/io/temporal/workflowstreams/WorkflowStreamTest.java b/contrib/temporal-workflowstreams/src/test/java/io/temporal/workflowstreams/WorkflowStreamTest.java new file mode 100644 index 000000000..e5f9589e8 --- /dev/null +++ b/contrib/temporal-workflowstreams/src/test/java/io/temporal/workflowstreams/WorkflowStreamTest.java @@ -0,0 +1,315 @@ +package io.temporal.workflowstreams; + +import io.temporal.api.common.v1.Payload; +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowStub; +import io.temporal.client.WorkflowUpdateException; +import io.temporal.common.converter.ByteArrayPayloadConverter; +import io.temporal.common.converter.DataConverter; +import io.temporal.common.converter.DataConverterException; +import io.temporal.common.converter.DefaultDataConverter; +import io.temporal.failure.ApplicationFailure; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.workflow.SignalMethod; +import io.temporal.workflow.UpdateMethod; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInit; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import io.temporal.workflowstreams.internal.PayloadWire; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class WorkflowStreamTest { + private static final DataConverter DC = DefaultDataConverter.STANDARD_INSTANCE; + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes( + StreamHostWorkflowImpl.class, + ByteOnlyPublishWorkflowImpl.class, + InitHostWorkflowImpl.class) + .build(); + + private static PublishInput publishInput(String publisherId, long seq, String... topicValues) { + List items = new ArrayList<>(); + for (int i = 0; i < topicValues.length; i += 2) { + Payload payload = DC.toPayload(topicValues[i + 1]).get(); + items.add(new PublishEntry(topicValues[i], PayloadWire.encode(payload))); + } + return new PublishInput(items, publisherId, seq); + } + + private WorkflowStub startHostWorkflow() { + StreamHostWorkflow workflow = + testWorkflowRule.newWorkflowStubTimeoutOptions(StreamHostWorkflow.class); + WorkflowExecution execution = WorkflowClient.start(workflow::execute, null); + return testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub(execution.getWorkflowId()); + } + + @Test + public void testExternalPublishAndOffsetQuery() { + WorkflowStub stub = startHostWorkflow(); + + stub.signal( + WorkflowStreamConstants.PUBLISH_SIGNAL_NAME, + publishInput("pub1", 1, "events", "a", "events", "b")); + // Poll first so the offset query observes the published items. + stub.update( + WorkflowStreamConstants.POLL_UPDATE_NAME, + PollResult.class, + new PollInput(Collections.emptyList(), 0)); + + long offset = stub.query(WorkflowStreamConstants.OFFSET_QUERY_NAME, Long.class); + Assert.assertEquals(2, offset); + + stub.signal("finish"); + stub.getResult(Void.class); + } + + @Test + public void testPublisherDedup() { + WorkflowStub stub = startHostWorkflow(); + + stub.signal( + WorkflowStreamConstants.PUBLISH_SIGNAL_NAME, publishInput("pub1", 1, "events", "a")); + // Same publisher + sequence: must be dropped. + stub.signal( + WorkflowStreamConstants.PUBLISH_SIGNAL_NAME, publishInput("pub1", 1, "events", "dup")); + stub.signal( + WorkflowStreamConstants.PUBLISH_SIGNAL_NAME, publishInput("pub1", 2, "events", "c")); + + PollResult result = + stub.update( + WorkflowStreamConstants.POLL_UPDATE_NAME, + PollResult.class, + new PollInput(Collections.emptyList(), 0)); + Assert.assertEquals("duplicate batch should be dropped", 2, result.items.size()); + Assert.assertEquals(2, result.nextOffset); + Assert.assertEquals( + "a", + DC.fromPayload(PayloadWire.decode(result.items.get(0).data), String.class, String.class)); + Assert.assertEquals( + "c", + DC.fromPayload(PayloadWire.decode(result.items.get(1).data), String.class, String.class)); + + stub.signal("finish"); + stub.getResult(Void.class); + } + + @Test + public void testPollReturnsItemsWithTopicFilter() { + WorkflowStub stub = startHostWorkflow(); + + stub.signal( + WorkflowStreamConstants.PUBLISH_SIGNAL_NAME, + publishInput("pub1", 1, "a", "1", "b", "2", "a", "3")); + + PollResult result = + stub.update( + WorkflowStreamConstants.POLL_UPDATE_NAME, + PollResult.class, + new PollInput(Collections.singletonList("a"), 0)); + + // Only topic "a" items, with global offsets 0 and 2. + Assert.assertEquals(2, result.items.size()); + Assert.assertEquals("a", result.items.get(0).topic); + Assert.assertEquals(0, result.items.get(0).offset); + Assert.assertEquals("a", result.items.get(1).topic); + Assert.assertEquals(2, result.items.get(1).offset); + Assert.assertEquals(3, result.nextOffset); + Assert.assertFalse(result.moreReady); + + Payload payload = PayloadWire.decode(result.items.get(1).data); + Assert.assertEquals("3", DC.fromPayload(payload, String.class, String.class)); + + stub.signal("finish"); + stub.getResult(Void.class); + } + + @Test + public void testTruncate() { + WorkflowStub stub = startHostWorkflow(); + + stub.signal( + WorkflowStreamConstants.PUBLISH_SIGNAL_NAME, + publishInput("pub1", 1, "events", "a", "events", "b", "events", "c")); + // Ensure the batch has been applied before truncating. + stub.update( + WorkflowStreamConstants.POLL_UPDATE_NAME, + PollResult.class, + new PollInput(Collections.emptyList(), 0)); + + stub.update("truncate", Void.class, 2L); + + // Offset 0 means "from the beginning of whatever still exists". + PollResult fromStart = + stub.update( + WorkflowStreamConstants.POLL_UPDATE_NAME, + PollResult.class, + new PollInput(Collections.emptyList(), 0)); + Assert.assertEquals(1, fromStart.items.size()); + Assert.assertEquals(2, fromStart.items.get(0).offset); + + // A poll positioned before the new base offset fails with TruncatedOffset. + try { + stub.update( + WorkflowStreamConstants.POLL_UPDATE_NAME, + PollResult.class, + new PollInput(Collections.emptyList(), 1)); + Assert.fail("unreachable"); + } catch (WorkflowUpdateException e) { + ApplicationFailure failure = (ApplicationFailure) e.getCause(); + Assert.assertEquals(WorkflowStreamConstants.ERROR_TYPE_TRUNCATED_OFFSET, failure.getType()); + } + + // Truncating past the end of the log fails with TruncateOutOfRange. + try { + stub.update("truncate", Void.class, 10L); + Assert.fail("unreachable"); + } catch (WorkflowUpdateException e) { + ApplicationFailure failure = (ApplicationFailure) e.getCause(); + Assert.assertEquals( + WorkflowStreamConstants.ERROR_TYPE_TRUNCATE_OUT_OF_RANGE, failure.getType()); + } + + stub.signal("finish"); + stub.getResult(Void.class); + } + + @Test + public void testStreamConstructedInWorkflowInit() { + InitHostWorkflow workflow = + testWorkflowRule.newWorkflowStubTimeoutOptions(InitHostWorkflow.class); + WorkflowExecution execution = WorkflowClient.start(workflow::execute, null); + WorkflowStub stub = + testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub(execution.getWorkflowId()); + + stub.signal( + WorkflowStreamConstants.PUBLISH_SIGNAL_NAME, publishInput("pub1", 1, "events", "a")); + PollResult result = + stub.update( + WorkflowStreamConstants.POLL_UPDATE_NAME, + PollResult.class, + new PollInput(Collections.emptyList(), 0)); + Assert.assertEquals(1, result.items.size()); + Assert.assertEquals( + "a", + DC.fromPayload(PayloadWire.decode(result.items.get(0).data), String.class, String.class)); + Assert.assertEquals( + 1L, (long) stub.query(WorkflowStreamConstants.OFFSET_QUERY_NAME, Long.class)); + + stub.signal("finish"); + stub.getResult(Void.class); + } + + @Test + public void testWorkflowPublishUsesConfiguredConverters() { + ByteOnlyPublishWorkflow workflow = + testWorkflowRule.newWorkflowStubTimeoutOptions(ByteOnlyPublishWorkflow.class); + Assert.assertTrue( + "a string is unconvertible under the byte-array-only set, proving setPayloadConverters" + + " drives conversion", + workflow.execute()); + } + + @WorkflowInterface + public interface StreamHostWorkflow { + @WorkflowMethod + void execute(WorkflowStreamState priorState); + + @SignalMethod + void finish(); + + @UpdateMethod + void truncate(long upToOffset); + } + + public static class StreamHostWorkflowImpl implements StreamHostWorkflow { + private WorkflowStream stream; + private boolean finished; + + @Override + public void execute(WorkflowStreamState priorState) { + stream = WorkflowStream.newInstance(priorState); + Workflow.await(() -> finished); + } + + @Override + public void finish() { + finished = true; + } + + @Override + public void truncate(long upToOffset) { + stream.truncate(upToOffset); + } + } + + @WorkflowInterface + public interface InitHostWorkflow { + @WorkflowMethod + void execute(WorkflowStreamState priorState); + + @SignalMethod + void finish(); + } + + /** Hosts the stream from a {@code @WorkflowInit} constructor, the recommended pattern. */ + public static class InitHostWorkflowImpl implements InitHostWorkflow { + private final WorkflowStream stream; + private boolean finished; + + @WorkflowInit + public InitHostWorkflowImpl(WorkflowStreamState priorState) { + stream = WorkflowStream.newInstance(priorState); + } + + @Override + public void execute(WorkflowStreamState priorState) { + Workflow.await(() -> finished); + } + + @Override + public void finish() { + finished = true; + } + } + + @WorkflowInterface + public interface ByteOnlyPublishWorkflow { + @WorkflowMethod + boolean execute(); + } + + /** + * Restricts the stream to the byte-array converter and returns whether publishing a string failed + * — it should, since that set has no converter for strings, whereas the default set's JSON + * fallback would accept it. A byte[] must still publish cleanly. + */ + public static class ByteOnlyPublishWorkflowImpl implements ByteOnlyPublishWorkflow { + @Override + public boolean execute() { + WorkflowStream stream = + WorkflowStream.newInstance( + null, + WorkflowStreamOptions.newBuilder() + .setPayloadConverters(new ByteArrayPayloadConverter()) + .build()); + stream.topic("events").publish("hi".getBytes(StandardCharsets.UTF_8)); + try { + stream.topic("events").publish("not-bytes"); + return false; + } catch (DataConverterException e) { + return true; + } + } + } +} diff --git a/settings.gradle b/settings.gradle index fe80370b0..bdc4602c9 100644 --- a/settings.gradle +++ b/settings.gradle @@ -9,6 +9,8 @@ project(':temporal-opentracing').projectDir = file('contrib/temporal-opentracing include 'temporal-kotlin' include 'temporal-spring-ai' project(':temporal-spring-ai').projectDir = file('contrib/temporal-spring-ai') +include 'temporal-workflowstreams' +project(':temporal-workflowstreams').projectDir = file('contrib/temporal-workflowstreams') include 'temporal-spring-boot-autoconfigure' include 'temporal-spring-boot-starter' include 'temporal-remote-data-encoder' diff --git a/temporal-bom/build.gradle b/temporal-bom/build.gradle index e73d0d300..56ba39615 100644 --- a/temporal-bom/build.gradle +++ b/temporal-bom/build.gradle @@ -18,5 +18,6 @@ dependencies { api project(':temporal-test-server') api project(':temporal-testing') api project(':temporal-envconfig') + api project(':temporal-workflowstreams') } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java index 4c5ec49b1..e0b427896 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/common/InternalUtils.java @@ -28,6 +28,7 @@ /** Utility functions shared by the implementation code. */ public final class InternalUtils { public static String TEMPORAL_RESERVED_PREFIX = "__temporal_"; + public static final String WORKFLOW_STREAM_RESERVED_PREFIX = "__temporal_workflow_stream_"; private static final Logger log = LoggerFactory.getLogger(InternalUtils.class); private static String QUERY_TYPE_STACK_TRACE = "__stack_trace"; @@ -159,9 +160,21 @@ public static NexusWorkflowStarter createNexusBoundStub( return new NexusWorkflowStarter(stub.newInstance(nexusWorkflowOptions.build()), operationToken); } + /** + * Returns true if the given name is in the {@code __temporal_workflow_stream_} sub-namespace, + * which is reserved for the workflow streams contrib module and permitted for signal, update, and + * query handler registration. + */ + public static boolean isWorkflowStreamReservedName(String name) { + return name.startsWith(WORKFLOW_STREAM_RESERVED_PREFIX); + } + /** Check the method name for reserved prefixes or names. */ public static void checkMethodName(POJOWorkflowMethodMetadata methodMetadata) { - if (methodMetadata.getName().startsWith(TEMPORAL_RESERVED_PREFIX)) { + boolean workflowStreamExempt = + !methodMetadata.getType().equals(WorkflowMethodType.WORKFLOW) + && isWorkflowStreamReservedName(methodMetadata.getName()); + if (methodMetadata.getName().startsWith(TEMPORAL_RESERVED_PREFIX) && !workflowStreamExempt) { throw new IllegalArgumentException( methodMetadata.getType().toString().toLowerCase() + " name \"" diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java index b32fe08ff..b92ac3b28 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/QueryDispatcher.java @@ -1,6 +1,7 @@ package io.temporal.internal.sync; import static io.temporal.internal.common.InternalUtils.TEMPORAL_RESERVED_PREFIX; +import static io.temporal.internal.common.InternalUtils.isWorkflowStreamReservedName; import io.temporal.api.common.v1.Payloads; import io.temporal.api.sdk.v1.WorkflowInteractionDefinition; @@ -80,7 +81,8 @@ public Optional handleQuery( Optional input) { WorkflowOutboundCallsInterceptor.RegisterQueryInput handler = queryCallbacks.get(queryName); Object[] args; - if (queryName.startsWith(TEMPORAL_RESERVED_PREFIX)) { + if (queryName.startsWith(TEMPORAL_RESERVED_PREFIX) + && !isWorkflowStreamReservedName(queryName)) { throw new IllegalArgumentException( "Unknown query type: " + queryName + ", knownTypes=" + queryCallbacks.keySet()); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java index d85b86815..9bc6bdfa7 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SignalDispatcher.java @@ -1,6 +1,7 @@ package io.temporal.internal.sync; import static io.temporal.internal.common.InternalUtils.TEMPORAL_RESERVED_PREFIX; +import static io.temporal.internal.common.InternalUtils.isWorkflowStreamReservedName; import io.temporal.api.common.v1.Payloads; import io.temporal.api.sdk.v1.WorkflowInteractionDefinition; @@ -68,7 +69,8 @@ public void handleSignal( signalCallbacks.get(signalName); Object[] args; HandlerUnfinishedPolicy policy; - if (signalName.startsWith(TEMPORAL_RESERVED_PREFIX)) { + if (signalName.startsWith(TEMPORAL_RESERVED_PREFIX) + && !isWorkflowStreamReservedName(signalName)) { // Ignore internal signals return; } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java index 1122989c8..8ce53e87a 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/UpdateDispatcher.java @@ -1,6 +1,7 @@ package io.temporal.internal.sync; import static io.temporal.internal.common.InternalUtils.TEMPORAL_RESERVED_PREFIX; +import static io.temporal.internal.common.InternalUtils.isWorkflowStreamReservedName; import io.temporal.api.common.v1.Payloads; import io.temporal.api.sdk.v1.WorkflowInteractionDefinition; @@ -43,7 +44,8 @@ public void handleValidateUpdate( updateCallbacks.get(updateName); Object[] args; HandlerUnfinishedPolicy policy; - if (updateName.startsWith(TEMPORAL_RESERVED_PREFIX)) { + if (updateName.startsWith(TEMPORAL_RESERVED_PREFIX) + && !isWorkflowStreamReservedName(updateName)) { throw new IllegalArgumentException( "Unknown update name: " + updateName + ", knownTypes=" + updateCallbacks.keySet()); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/WorkflowStreamReservedNameTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/WorkflowStreamReservedNameTest.java new file mode 100644 index 000000000..373defd6d --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/WorkflowStreamReservedNameTest.java @@ -0,0 +1,123 @@ +package io.temporal.workflow; + +import io.temporal.api.common.v1.WorkflowExecution; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowQueryException; +import io.temporal.client.WorkflowStub; +import io.temporal.internal.common.InternalUtils; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import java.util.ArrayList; +import java.util.List; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +/** + * Verifies that the {@code __temporal_workflow_stream_} sub-namespace is permitted for signal, + * update, and query handlers (used by the workflow streams contrib module) while other {@code + * __temporal_} names remain reserved. + */ +public class WorkflowStreamReservedNameTest { + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestStreamReservedNameWorkflowImpl.class) + .build(); + + @Test + public void testIsWorkflowStreamReservedName() { + Assert.assertTrue( + InternalUtils.isWorkflowStreamReservedName("__temporal_workflow_stream_publish")); + Assert.assertTrue( + InternalUtils.isWorkflowStreamReservedName("__temporal_workflow_stream_poll")); + Assert.assertTrue( + InternalUtils.isWorkflowStreamReservedName("__temporal_workflow_stream_offset")); + Assert.assertFalse(InternalUtils.isWorkflowStreamReservedName("__temporal_")); + Assert.assertFalse(InternalUtils.isWorkflowStreamReservedName("__temporal_foo")); + Assert.assertFalse(InternalUtils.isWorkflowStreamReservedName("__internal")); + Assert.assertFalse(InternalUtils.isWorkflowStreamReservedName("events")); + } + + @Test + public void testWorkflowStreamReservedNamesAreHandled() { + TestStreamReservedNameWorkflow workflow = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestStreamReservedNameWorkflow.class); + WorkflowExecution execution = WorkflowClient.start(workflow::execute); + + WorkflowStub stub = + testWorkflowRule.getWorkflowClient().newUntypedWorkflowStub(execution.getWorkflowId()); + + stub.signal("__temporal_workflow_stream_publish", "a"); + stub.signal("__temporal_workflow_stream_publish", "b"); + String updateResult = stub.update("__temporal_workflow_stream_poll", String.class, "c"); + Assert.assertEquals("polled:c", updateResult); + Long offset = stub.query("__temporal_workflow_stream_offset", Long.class); + Assert.assertEquals(Long.valueOf(3), offset); + + // Other __temporal_ names remain reserved. + try { + stub.query("__temporal_other", Long.class); + Assert.fail("unreachable"); + } catch (WorkflowQueryException e) { + Assert.assertTrue(e.getCause().getMessage().contains("Unknown query type")); + } + + stub.signal("finish"); + stub.getResult(String.class); + } + + public interface StreamHandlersListener { + @SignalMethod(name = "__temporal_workflow_stream_publish") + void publish(String value); + + @UpdateMethod(name = "__temporal_workflow_stream_poll") + String poll(String value); + + @QueryMethod(name = "__temporal_workflow_stream_offset") + long offset(); + } + + @WorkflowInterface + public interface TestStreamReservedNameWorkflow { + @WorkflowMethod + String execute(); + + @SignalMethod + void finish(); + } + + public static class TestStreamReservedNameWorkflowImpl implements TestStreamReservedNameWorkflow { + private final List values = new ArrayList<>(); + private boolean finished; + + @Override + public String execute() { + Workflow.registerListener( + new StreamHandlersListener() { + @Override + public void publish(String value) { + values.add(value); + } + + @Override + public String poll(String value) { + values.add(value); + return "polled:" + value; + } + + @Override + public long offset() { + return values.size(); + } + }); + Workflow.await(() -> finished); + return String.join(",", values); + } + + @Override + public void finish() { + finished = true; + } + } +}