From dabfb8e496d3f0eaf488e9abbd967aa79abd0675 Mon Sep 17 00:00:00 2001 From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com> Date: Mon, 11 May 2026 16:23:17 +0200 Subject: [PATCH 1/2] Add an example for ProcessTableFunctions --- CHANGELOG.md | 7 ++ .../Example_11_ProcessTableFunction.java | 106 ++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 src/main/java/io/confluent/flink/examples/table/Example_11_ProcessTableFunction.java diff --git a/CHANGELOG.md b/CHANGELOG.md index f222668..05e1ec0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 2026-04-15 + +### Features and Fixes + +- Added Example_11: ProcessTableFunction with event-time timers, demonstrating user inactivity + detection on the `examples.marketplace.clicks` table. + ## 2024-09-13 ### Features and Fixes diff --git a/src/main/java/io/confluent/flink/examples/table/Example_11_ProcessTableFunction.java b/src/main/java/io/confluent/flink/examples/table/Example_11_ProcessTableFunction.java new file mode 100644 index 0000000..661a78e --- /dev/null +++ b/src/main/java/io/confluent/flink/examples/table/Example_11_ProcessTableFunction.java @@ -0,0 +1,106 @@ +package io.confluent.flink.examples.table; + +import io.confluent.flink.plugin.ConfluentSettings; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.StateHint; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.functions.ProcessTableFunction; +import org.apache.flink.types.Row; + +import java.time.Duration; +import java.time.Instant; + +import static org.apache.flink.table.annotation.ArgumentTrait.REQUIRE_ON_TIME; +import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE; +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.api.Expressions.descriptor; +import static org.apache.flink.table.api.Expressions.lit; + +/** + * A table program example illustrating how to use a {@link ProcessTableFunction} (PTF) in the Flink + * Table API. + * + *

This example detects inactive users on the {@code examples.marketplace.clicks} table. For each + * user, it counts clicks and registers an event-time timer. When no new click arrives within the + * configured timeout, the timer fires and emits an alert with the user's accumulated click count. + * + *

Unlike a windowed aggregation, the PTF emits exactly once per inactivity period, combining + * running state with the absence of events. + */ +public class Example_11_ProcessTableFunction { + + // Fill this with an environment you have write access to + static final String TARGET_CATALOG = ""; + + // Fill this with a Kafka cluster you have write access to + static final String TARGET_DATABASE = ""; + + // All logic is defined in a main() method. It can run both in an IDE or CI/CD system. + public static void main(String[] args) { + // Setup connection properties to Confluent Cloud + EnvironmentSettings settings = ConfluentSettings.fromResource("/cloud.properties"); + + // Initialize the session context to get started + TableEnvironment env = TableEnvironment.create(settings); + + // Set default catalog and database + env.useCatalog(TARGET_CATALOG); + env.useDatabase(TARGET_DATABASE); + + // Invoke the PTF inline against the clicks table, partitioned by user_id. + // The function's lifecycle is bound to this query. + System.out.println("Executing inline ProcessTableFunction..."); + env.from("`examples`.`marketplace`.`clicks`") + .partitionBy($("user_id")) + .process( + ClickInactivityMonitor.class, + lit(30).asArgument("timeoutSeconds"), + descriptor("$rowtime").asArgument("on_time")) + .execute() + .print(); + } + + /** + * A ProcessTableFunction that detects user inactivity based on click events. + * + *

For each user (partitioned by {@code user_id}), it counts incoming clicks and registers a + * named event-time timer. Each new click replaces the previous timer, resetting the inactivity + * clock. When the timer fires (no new clicks within the timeout), an alert is emitted. + */ + public static class ClickInactivityMonitor + extends ProcessTableFunction { + + /** Output POJO. The framework adds the user_id partition key and rowtime automatically. */ + public static class InactivityAlert { + public int clickCount; + } + + /** Per-user state. */ + public static class ClickState { + public int clickCount = 0; + } + + public void eval( + Context ctx, + @StateHint ClickState state, + @ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row input, + Integer timeoutSeconds) { + + state.clickCount++; + + // Each new click pushes the timeout forward; the timer fires only after true + // inactivity. + TimeContext timeCtx = ctx.timeContext(Instant.class); + timeCtx.registerOnTime( + "inactivity", timeCtx.time().plus(Duration.ofSeconds(timeoutSeconds))); + } + + public void onTimer(ClickState state) { + InactivityAlert alert = new InactivityAlert(); + alert.clickCount = state.clickCount; + collect(alert); + } + } +} From bc8f1e226c9b1a0c8a2b65b54dd7e08df1f5dedf Mon Sep 17 00:00:00 2001 From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com> Date: Fri, 15 May 2026 10:02:13 +0200 Subject: [PATCH 2/2] Clear partition state after the inactivity timer fires Per review feedback, the ClickInactivityMonitor PTF would otherwise retain ClickState for every user indefinitely. Calling ctx.clearAll() in onTimer() resets the partition so a returning user starts a fresh inactivity window. --- .../examples/table/Example_11_ProcessTableFunction.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/confluent/flink/examples/table/Example_11_ProcessTableFunction.java b/src/main/java/io/confluent/flink/examples/table/Example_11_ProcessTableFunction.java index 661a78e..f816d7c 100644 --- a/src/main/java/io/confluent/flink/examples/table/Example_11_ProcessTableFunction.java +++ b/src/main/java/io/confluent/flink/examples/table/Example_11_ProcessTableFunction.java @@ -67,7 +67,8 @@ public static void main(String[] args) { * *

For each user (partitioned by {@code user_id}), it counts incoming clicks and registers a * named event-time timer. Each new click replaces the previous timer, resetting the inactivity - * clock. When the timer fires (no new clicks within the timeout), an alert is emitted. + * clock. When the timer fires (no new clicks within the timeout), an alert is emitted and the + * partition's state is cleared so a returning user starts a fresh inactivity window. */ public static class ClickInactivityMonitor extends ProcessTableFunction { @@ -97,10 +98,14 @@ public void eval( "inactivity", timeCtx.time().plus(Duration.ofSeconds(timeoutSeconds))); } - public void onTimer(ClickState state) { + public void onTimer(OnTimerContext ctx, ClickState state) { InactivityAlert alert = new InactivityAlert(); alert.clickCount = state.clickCount; collect(alert); + + // Reset the partition: drop the click counter (and any leftover timers) so a returning + // user starts a new inactivity window instead of retaining state indefinitely. + ctx.clearAll(); } } }