diff --git a/CHANGELOG.md b/CHANGELOG.md index f222668..05e1ec0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 2026-04-15 + +### Features and Fixes + +- Added Example_11: ProcessTableFunction with event-time timers, demonstrating user inactivity + detection on the `examples.marketplace.clicks` table. + ## 2024-09-13 ### Features and Fixes diff --git a/src/main/java/io/confluent/flink/examples/table/Example_11_ProcessTableFunction.java b/src/main/java/io/confluent/flink/examples/table/Example_11_ProcessTableFunction.java new file mode 100644 index 0000000..f816d7c --- /dev/null +++ b/src/main/java/io/confluent/flink/examples/table/Example_11_ProcessTableFunction.java @@ -0,0 +1,111 @@ +package io.confluent.flink.examples.table; + +import io.confluent.flink.plugin.ConfluentSettings; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.StateHint; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.types.Row; + +import java.time.Duration; +import java.time.Instant; + +import static org.apache.flink.table.annotation.ArgumentTrait.REQUIRE_ON_TIME; +import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE; +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.api.Expressions.descriptor; +import static org.apache.flink.table.api.Expressions.lit; + +/** + * A table program example illustrating how to use a {@link ProcessTableFunction} (PTF) in the Flink + * Table API. + * + *

This example detects inactive users on the {@code examples.marketplace.clicks} table. For each + * user, it counts clicks and registers an event-time timer. When no new click arrives within the + * configured timeout, the timer fires and emits an alert with the user's accumulated click count. + * + *

Unlike a windowed aggregation, the PTF emits exactly once per inactivity period, combining + * running state with the absence of events. + */ +public class Example_11_ProcessTableFunction { + + // Fill this with an environment you have write access to + static final String TARGET_CATALOG = ""; + + // Fill this with a Kafka cluster you have write access to + static final String TARGET_DATABASE = ""; + + // All logic is defined in a main() method. It can run both in an IDE or CI/CD system. + public static void main(String[] args) { + // Setup connection properties to Confluent Cloud + EnvironmentSettings settings = ConfluentSettings.fromResource("/cloud.properties"); + + // Initialize the session context to get started + TableEnvironment env = TableEnvironment.create(settings); + + // Set default catalog and database + env.useCatalog(TARGET_CATALOG); + env.useDatabase(TARGET_DATABASE); + + // Invoke the PTF inline against the clicks table, partitioned by user_id. + // The function's lifecycle is bound to this query. + System.out.println("Executing inline ProcessTableFunction..."); + env.from("`examples`.`marketplace`.`clicks`") + .partitionBy($("user_id")) + .process( + ClickInactivityMonitor.class, + lit(30).asArgument("timeoutSeconds"), + descriptor("$rowtime").asArgument("on_time")) + .execute() + .print(); + } + + /** + * A ProcessTableFunction that detects user inactivity based on click events. + * + *

For each user (partitioned by {@code user_id}), it counts incoming clicks and registers a + * named event-time timer. Each new click replaces the previous timer, resetting the inactivity + * clock. When the timer fires (no new clicks within the timeout), an alert is emitted and the + * partition's state is cleared so a returning user starts a fresh inactivity window. + */ + public static class ClickInactivityMonitor + extends ProcessTableFunction { + + /** Output POJO. The framework adds the user_id partition key and rowtime automatically. */ + public static class InactivityAlert { + public int clickCount; + } + + /** Per-user state. */ + public static class ClickState { + public int clickCount = 0; + } + + public void eval( + Context ctx, + @StateHint ClickState state, + @ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row input, + Integer timeoutSeconds) { + + state.clickCount++; + + // Each new click pushes the timeout forward; the timer fires only after true + // inactivity. + TimeContext timeCtx = ctx.timeContext(Instant.class); + timeCtx.registerOnTime( + "inactivity", timeCtx.time().plus(Duration.ofSeconds(timeoutSeconds))); + } + + public void onTimer(OnTimerContext ctx, ClickState state) { + InactivityAlert alert = new InactivityAlert(); + alert.clickCount = state.clickCount; + collect(alert); + + // Reset the partition: drop the click counter (and any leftover timers) so a returning + // user starts a new inactivity window instead of retaining state indefinitely. + ctx.clearAll(); + } + } +}