Skip to content

Commit eceb497

Browse files
committed
Add an example for ProcessTableFunctions
1 parent 6967da8 commit eceb497

2 files changed

Lines changed: 43 additions & 88 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
## 2026-04-15
4+
5+
### Features and Fixes
6+
7+
- Added Example_11: ProcessTableFunction with event-time timers, demonstrating user inactivity
8+
detection on the `examples.marketplace.clicks` table.
9+
310
## 2024-09-13
411

512
### Features and Fixes

src/main/java/io/confluent/flink/examples/table/Example_11_ProcessTableFunction.java

Lines changed: 36 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,20 @@
2222
* A table program example illustrating how to use a {@link ProcessTableFunction} (PTF) in the Flink
2323
* Table API.
2424
*
25-
* <p>This example demonstrates a session tracking PTF invoked against the {@code
26-
* examples.marketplace.clicks} table. The PTF:
25+
* <p>This example detects inactive users on the {@code examples.marketplace.clicks} table. For each
26+
* user, it counts clicks and registers an event-time timer. When no new click arrives within the
27+
* configured timeout, the timer fires and emits an alert with the user's accumulated click count.
2728
*
28-
* <ul>
29-
* <li>Starts a session on the first click event per user
30-
* <li>Extends the session timeout on each subsequent click
31-
* <li>Automatically emits a session summary when the timeout expires
32-
* <li>Tracks click counts and URLs within each session
33-
* </ul>
34-
*
35-
* <p>Sessions are partitioned by {@code user_id} and end via timeout when no new clicks arrive
36-
* within the configured period.
29+
* <p>Unlike a windowed aggregation, the PTF emits exactly once per inactivity period, combining
30+
* running state with the absence of events.
3731
*/
3832
public class Example_11_ProcessTableFunction {
3933

4034
// Fill this with an environment you have write access to
41-
static final String TARGET_CATALOG = "mvisser";
35+
static final String TARGET_CATALOG = "";
4236

4337
// Fill this with a Kafka cluster you have write access to
44-
static final String TARGET_DATABASE = "standard_cluster";
38+
static final String TARGET_DATABASE = "";
4539

4640
// All logic is defined in a main() method. It can run both in an IDE or CI/CD system.
4741
public static void main(String[] args) {
@@ -55,104 +49,58 @@ public static void main(String[] args) {
5549
env.useCatalog(TARGET_CATALOG);
5650
env.useDatabase(TARGET_DATABASE);
5751

58-
// Invoke the PTF inline (without registration) against the clicks table.
59-
// The Table API creates a temporary JAR with all required dependencies,
60-
// uploads it to Confluent Cloud, and binds the function lifecycle to the query.
61-
// Sessions are partitioned by user_id and time out after 60 seconds of inactivity.
62-
System.out.println("Executing SessionTracker against clicks table...");
52+
// Invoke the PTF inline against the clicks table, partitioned by user_id.
53+
// The function's lifecycle is bound to this query.
54+
System.out.println("Executing inline ProcessTableFunction...");
6355
env.from("`examples`.`marketplace`.`clicks`")
6456
.partitionBy($("user_id"))
6557
.process(
66-
SessionTracker.class,
67-
lit(60).asArgument("sessionTimeoutSeconds"),
58+
ClickInactivityMonitor.class,
59+
lit(30).asArgument("timeoutSeconds"),
6860
descriptor("$rowtime").asArgument("on_time"))
6961
.execute()
7062
.print();
7163
}
7264

7365
/**
74-
* A ProcessTableFunction that tracks user sessions based on click events.
66+
* A ProcessTableFunction that detects user inactivity based on click events.
7567
*
76-
* <p>For each user (partitioned by {@code user_id}), it maintains session state and emits a
77-
* {@link SessionSummary} when the session times out due to inactivity.
68+
* <p>For each user (partitioned by {@code user_id}), it counts incoming clicks and registers a
69+
* named event-time timer. Each new click replaces the previous timer, resetting the inactivity
70+
* clock. When the timer fires (no new clicks within the timeout), an alert is emitted.
7871
*/
79-
public static class SessionTracker extends ProcessTableFunction<SessionTracker.SessionSummary> {
80-
81-
/** POJO output type for session summaries. */
82-
public static class SessionSummary {
83-
public int userId;
84-
public String status;
85-
public long sessionDuration;
86-
public int eventCount;
87-
public String firstUrl;
88-
public String lastUrl;
72+
public static class ClickInactivityMonitor
73+
extends ProcessTableFunction<ClickInactivityMonitor.InactivityAlert> {
74+
75+
/** Output POJO. The framework adds the user_id partition key and rowtime automatically. */
76+
public static class InactivityAlert {
77+
public int clickCount;
8978
}
9079

91-
/** State class tracking user session information. */
92-
public static class SessionState {
93-
public String firstUrl = null;
94-
public String lastUrl = null;
95-
public Long sessionStart = null;
96-
public int userId = 0;
97-
public int eventCount = 0;
80+
/** Per-user state. */
81+
public static class ClickState {
82+
public int clickCount = 0;
9883
}
9984

100-
/**
101-
* Evaluates incoming click events and manages session state.
102-
*
103-
* @param ctx Process context for timers and state management
104-
* @param state Session state with 2-hour TTL
105-
* @param input Click event row from the clicks table
106-
* @param sessionTimeoutSeconds Configurable session timeout in seconds
107-
*/
10885
public void eval(
10986
Context ctx,
110-
@StateHint(ttl = "2 hours") SessionState state,
87+
@StateHint ClickState state,
11188
@ArgumentHint({SET_SEMANTIC_TABLE, REQUIRE_ON_TIME}) Row input,
112-
Integer sessionTimeoutSeconds)
113-
throws Exception {
89+
Integer timeoutSeconds) {
90+
91+
state.clickCount++;
11492

93+
// Each new click pushes the timeout forward; the timer fires only after true
94+
// inactivity.
11595
TimeContext<Instant> timeCtx = ctx.timeContext(Instant.class);
116-
int userId = input.getFieldAs("user_id");
117-
String url = input.getFieldAs("url");
118-
119-
if (state.firstUrl == null) {
120-
// First click in session
121-
state.firstUrl = url;
122-
state.sessionStart = timeCtx.time().toEpochMilli();
123-
state.userId = userId;
124-
state.eventCount = 1;
125-
} else {
126-
// Subsequent click: increment count
127-
state.eventCount++;
128-
}
129-
130-
// Track URL and extend session timeout
131-
state.lastUrl = url;
13296
timeCtx.registerOnTime(
133-
"timeout", timeCtx.time().plus(Duration.ofSeconds(sessionTimeoutSeconds)));
97+
"inactivity", timeCtx.time().plus(Duration.ofSeconds(timeoutSeconds)));
13498
}
13599

136-
/**
137-
* Timer callback invoked when the session timeout is reached, indicating no new clicks
138-
* arrived within the configured period.
139-
*/
140-
public void onTimer(OnTimerContext onTimerCtx, SessionState state) throws Exception {
141-
142-
if (state.sessionStart != null) {
143-
Instant currentTime = onTimerCtx.timeContext(Instant.class).time();
144-
long duration = currentTime.toEpochMilli() - state.sessionStart;
145-
146-
SessionSummary summary = new SessionSummary();
147-
summary.userId = state.userId;
148-
summary.status = "TIMEOUT";
149-
summary.sessionDuration = duration;
150-
summary.eventCount = state.eventCount;
151-
summary.firstUrl = state.firstUrl;
152-
summary.lastUrl = state.lastUrl;
153-
154-
collect(summary);
155-
}
100+
public void onTimer(ClickState state) {
101+
InactivityAlert alert = new InactivityAlert();
102+
alert.clickCount = state.clickCount;
103+
collect(alert);
156104
}
157105
}
158106
}

0 commit comments

Comments
 (0)