Skip to content

Commit dabfb8e

Browse files
committed
Add an example for ProcessTableFunctions
1 parent 4b5aaa4 commit dabfb8e

2 files changed

Lines changed: 113 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
## 2026-04-15
4+
5+
### Features and Fixes
6+
7+
- Added Example_11: ProcessTableFunction with event-time timers, demonstrating user inactivity
8+
detection on the `examples.marketplace.clicks` table.
9+
310
## 2024-09-13
411

512
### Features and Fixes
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package io.confluent.flink.examples.table;
2+
3+
import io.confluent.flink.plugin.ConfluentSettings;
4+
5+
import org.apache.flink.table.annotation.ArgumentHint;
6+
import org.apache.flink.table.annotation.StateHint;
7+
import org.apache.flink.table.api.EnvironmentSettings;
8+
import org.apache.flink.table.api.TableEnvironment;
9+
import org.apache.flink.table.functions.ProcessTableFunction;
10+
import org.apache.flink.types.Row;
11+
12+
import java.time.Duration;
13+
import java.time.Instant;
14+
15+
import static org.apache.flink.table.annotation.ArgumentTrait.REQUIRE_ON_TIME;
16+
import static org.apache.flink.table.annotation.ArgumentTrait.SET_SEMANTIC_TABLE;
17+
import static org.apache.flink.table.api.Expressions.$;
18+
import static org.apache.flink.table.api.Expressions.descriptor;
19+
import static org.apache.flink.table.api.Expressions.lit;
20+
21+
/**
22+
* A table program example illustrating how to use a {@link ProcessTableFunction} (PTF) in the Flink
23+
* Table API.
24+
*
25+
* <p>This example detects inactive users on the {@code examples.marketplace.clicks} table. For each
26+
* user, it counts clicks and registers an event-time timer. When no new click arrives within the
27+
* configured timeout, the timer fires and emits an alert with the user's accumulated click count.
28+
*
29+
* <p>Unlike a windowed aggregation, the PTF emits exactly once per inactivity period, combining
30+
* running state with the absence of events.
31+
*/
32+
public class Example_11_ProcessTableFunction {
33+
34+
// Fill this with an environment you have write access to
35+
static final String TARGET_CATALOG = "";
36+
37+
// Fill this with a Kafka cluster you have write access to
38+
static final String TARGET_DATABASE = "";
39+
40+
// All logic is defined in a main() method. It can run both in an IDE or CI/CD system.
41+
public static void main(String[] args) {
42+
// Setup connection properties to Confluent Cloud
43+
EnvironmentSettings settings = ConfluentSettings.fromResource("/cloud.properties");
44+
45+
// Initialize the session context to get started
46+
TableEnvironment env = TableEnvironment.create(settings);
47+
48+
// Set default catalog and database
49+
env.useCatalog(TARGET_CATALOG);
50+
env.useDatabase(TARGET_DATABASE);
51+
52+
// Invoke the PTF inline against the clicks table, partitioned by user_id.
53+
// The function's lifecycle is bound to this query.
54+
System.out.println("Executing inline ProcessTableFunction...");
55+
env.from("`examples`.`marketplace`.`clicks`")
56+
.partitionBy($("user_id"))
57+
.process(
58+
ClickInactivityMonitor.class,
59+
lit(30).asArgument("timeoutSeconds"),
60+
descriptor("$rowtime").asArgument("on_time"))
61+
.execute()
62+
.print();
63+
}
64+
65+
/**
66+
* A ProcessTableFunction that detects user inactivity based on click events.
67+
*
68+
* <p>For each user (partitioned by {@code user_id}), it counts incoming clicks and registers a
69+
* named event-time timer. Each new click replaces the previous timer, resetting the inactivity
70+
* clock. When the timer fires (no new clicks within the timeout), an alert is emitted.
71+
*/
72+
public static class ClickInactivityMonitor
73+
extends ProcessTableFunction<ClickInactivityMonitor.InactivityAlert> {
74+
75+
/** Output POJO. The framework adds the user_id partition key and rowtime automatically. */
76+
public static class InactivityAlert {
77+
public int clickCount;
78+
}
79+
80+
/** Per-user state. */
81+
public static class ClickState {
82+
public int clickCount = 0;
83+
}
84+
85+
public void eval(
86+
Context ctx,
87+
@StateHint ClickState state,
88+
@ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row input,
89+
Integer timeoutSeconds) {
90+
91+
state.clickCount++;
92+
93+
// Each new click pushes the timeout forward; the timer fires only after true
94+
// inactivity.
95+
TimeContext<Instant> timeCtx = ctx.timeContext(Instant.class);
96+
timeCtx.registerOnTime(
97+
"inactivity", timeCtx.time().plus(Duration.ofSeconds(timeoutSeconds)));
98+
}
99+
100+
public void onTimer(ClickState state) {
101+
InactivityAlert alert = new InactivityAlert();
102+
alert.clickCount = state.clickCount;
103+
collect(alert);
104+
}
105+
}
106+
}

0 commit comments

Comments
 (0)