Skip to content

[FLINK-36953][table] Early fire support for Flink SQL interval join#28353

Open
weiqingy wants to merge 6 commits into
apache:masterfrom
weiqingy:FLINK-36953-impl
Open

[FLINK-36953][table] Early fire support for Flink SQL interval join#28353
weiqingy wants to merge 6 commits into
apache:masterfrom
weiqingy:FLINK-36953-impl

Conversation

@weiqingy
Copy link
Copy Markdown

@weiqingy weiqingy commented Jun 7, 2026

What is the purpose of the change

This pull request implements FLIP-497: Early Fire Support for Flink SQL Interval Join.

Today an outer interval join only emits an unmatched row, null-padded, once that row's time window has fully closed. For long windows this delays the null-padded result by the full window span even when a match will never arrive. FLIP-497 adds an EARLY_FIRE SQL join hint that lets an outer interval join emit the null-padded row speculatively after a configurable delay, then retract and correct it if a real match arrives later within the window. This turns the append-only interval-join result into an updating one, so the speculative latency is paid back as a correction rather than a wrong final answer.

The hint is opt-in and scoped: it affects only outer joins (LEFT/RIGHT/FULL) with a non-negative window span. Inner joins and negative-window joins remain append-only and ignore the hint. Each unmatched outer row fires at most once — the hint is single-fire, not periodic.

Example:

SELECT /*+ EARLY_FIRE('delay'='5s') */ o.id, s.ship_time
FROM Orders o
LEFT OUTER JOIN Shipments s
ON o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '10' SECOND AND s.ship_time + INTERVAL '1' HOUR

When an order has no shipment yet, the join emits +I[id, NULL] once the delay elapses. If a matching shipment later arrives inside the window, the speculative row is corrected with -U[id, NULL] followed by +U[id, ship_time].

Brief change log

The PR is organized as five sequential commits:

  • Hint surface + planner plumbing (inert): add EarlyFireJoinHintOptions (required delay duration, optional time_mode of rowtime/proctime), register the EARLY_FIRE hint in FlinkHintStrategies with a key-value option checker, and thread it through JoinStrategy, CapitalizeQueryHintsShuttle, QueryHintsResolver, the physical rule/node, and StreamExecIntervalJoin. The resolved delay and time mode are serialized as two additive @JsonInclude(NON_NULL) fields; the ExecNode metadata version is unchanged, so existing compiled plans restore as before. The operator ignores the fields at this stage.
  • Changelog-mode inference: split StreamPhysicalIntervalJoin into its own ModifyKindSet arm so it advertises UPDATE when the hint makes it update-producing (gated on hint set + outer join + non-negative window). An early-fire join feeding an insert-only downstream now fails planning with a tailored error that names the hint.
  • Runtime early fire + retraction: register an early-fire timer at rowTime + delay for each cached unmatched outer row; on the timer, emit the padded row as +I and record the fire in a new per-side MapState<Long, List<Boolean>> kept positionally aligned with the existing row cache (so the cache serializer is unchanged and old savepoints restore the new state empty). On a later match, retract with -U and emit the match as +U. Covers the natural pairings: row-time join fires on event time, processing-time join fires on processing time.
  • Cross-domain processing-time fire on a row-time join: support EARLY_FIRE('time_mode'='proctime') on an event-time interval join — speculative pads fire on the wall clock while event-time cleanup is retained. onTimer discriminates the two timer kinds via OnTimerContext.timeDomain(), and a per-side MapState<Long, List<Long>> maps each firing processing-time to the event-time bucket keys due then (allocated only in the cross-domain case). The planner's temporary "not yet supported" rejection is removed; time_mode=rowtime on a processing-time join stays rejected.
  • Restore coverage + documentation: add an INTERVAL_JOIN_EARLY_FIRE restore test program (event-time LEFT OUTER join against a changelog sink whose correction is only producible if the fired-bit state survives the savepoint), plus EN/ZH docs for the hint in the SQL joins reference.

Verifying this change

This change added tests and can be verified as follows:

  • Runtime harness tests in RowTimeIntervalJoinTest and ProcTimeIntervalJoinTest cover speculative emit, the -U/+U correction on a later match, single-fire when the delay is at or beyond the window span, the cross-domain wall-clock trigger without watermark advance, and snapshot/restore both before and after the pad is emitted.
  • Planner tests in IntervalJoinTest (Scala + .xml) cover hint parsing/validation, the resolved plan fields, the default time_mode resolution, and the time_mode error cases.
  • IntervalJoinSpecJsonSerdeTest confirms the additive JSON fields round-trip and that plans without them restore unchanged.
  • IntervalJoinRestoreTest runs the new INTERVAL_JOIN_EARLY_FIRE program end-to-end against a generated plan + savepoint fixture, exercising fired-bit state survival across restore.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes — EarlyFireJoinHintOptions is @PublicEvolving (the new user-facing hint surface from FLIP-497).
  • The serializers: no — the early-fired marker is held in a new separate MapState rather than widening the existing row-cache tuple, so the cache serializer is unchanged and old savepoints restore the new state empty. The two new ExecNode JSON fields are additive and @JsonInclude(NON_NULL), leaving the metadata version unchanged.
  • The runtime per-record code paths (performance sensitive): yes — the interval-join operator's hot path is touched, but all early-fire work is gated on the hint being set, an outer join, and a non-negative window, so a plain interval join allocates nothing new and behaves as before.
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs (the SQL joins reference, EN + ZH) and JavaDocs on the new option class.

Was generative AI tooling used to co-author this PR?
  • Yes (Claude Code, Anthropic Claude Opus 4.8)

weiqingy added 5 commits June 7, 2026 15:26
…mbing

Introduce the EARLY_FIRE('delay'=..., 'time_mode'=...) SQL join hint for
interval joins and thread it through the planner as inert metadata.

EarlyFireJoinHintOptions defines the typed option keys: a required 'delay'
duration and an optional 'time_mode' (rowtime|proctime). The hint is
registered in FlinkHintStrategies with a key-value option checker that
requires a positive delay, validates time_mode, and rejects unknown option
keys. JoinStrategy, CapitalizeQueryHintsShuttle, and QueryHintsResolver are
extended so the hint propagates intact to StreamPhysicalIntervalJoinRule,
which resolves the default time_mode from the window bounds and rejects
time_mode=rowtime on a proctime join (and, for now, time_mode=proctime on an
event-time join).

The resolved delay and time_mode are carried on StreamPhysicalIntervalJoin
and serialized as two additive @JsonInclude(NON_NULL) fields on
StreamExecIntervalJoin, leaving the ExecNode metadata version unchanged so
existing compiled plans restore as before. No runtime behavior changes yet:
the operator ignores the new fields.
…fire interval join

With the EARLY_FIRE hint, an outer interval join speculatively emits a padded
unmatched row after the delay and corrects it when a match later arrives, so it
no longer produces insert-only changes. Teach FlinkChangelogModeInferenceProgram
to reflect this.

Split StreamPhysicalIntervalJoin into its own ModifyKindSet arm: its children
still consume insert-only, but the node provides INSERT and, when the hint makes
it update-producing, UPDATE. A new produceEarlyFireUpdates accessor gates that on
the hint being set, the join being outer, and a non-negative window span, so the
hint stays inert for inner joins and negative-window joins (which only ever emit
inserts). The interval join keeps its place in the UpdateKind and DeleteKind arms.

When such a join feeds an insert-only downstream, planning fails with a tailored
error that names the hint, rather than the generic "doesn't support consuming
update changes" message. Runtime behavior is unchanged; the operator still
ignores the hint.
…val join operator

Wire the EARLY_FIRE delay into the interval join operator so an outer join
speculatively emits its padded unmatched row after the delay and corrects it
when a real match arrives. Covers the natural timer pairings: a row-time join
fires on event time, a processing-time join fires on processing time.
Processing-time triggering on a row-time join stays rejected at planning.

When an unmatched outer row is cached, the operator registers an early-fire
timer at rowTime + delay. On that timer it emits the padded row as an INSERT
and records that it fired. When the row later matches, it retracts the padded
row as UPDATE_BEFORE and emits the matched row as UPDATE_AFTER, matching the
update-producing changelog mode inferred for the node. The retraction is tied
to the one-time matched-and-emitted flip, so a row that matches several times
emits a single correction followed by ordinary inserts.

The already-fired marker is a new per-side MapState<Long, List<Boolean>> kept
positionally aligned with the existing row cache, rather than widening the
cache tuple, so the cache serializer is unchanged and old savepoints restore
the new state empty. The marker is the single gate that keeps a row padded
exactly once when the delay is at or beyond the window span. All early-fire
work is gated on the hint being set, an outer join, and a non-negative window,
so a plain interval join is unchanged and allocates nothing new.

EmitAwareCollector carries the changelog stamping so IntervalJoinFunction stays
changelog-agnostic, and every padded or matched emit stamps its RowKind
explicitly to avoid leaking a kind onto a reused row.
… interval join

Add the cross-domain timer combination the previous commit left out: an
event-time interval join with EARLY_FIRE('time_mode'='proctime') now fires its
speculative pads on the wall clock while keeping its event-time cleanup. The
temporary "not yet supported" rejection in the planner rule is removed; the
row-time-on-processing-time rejection is retained.

onTimer distinguishes the two timer kinds by OnTimerContext.timeDomain(): in
the cross-domain case early-fire timers are processing-time and cleanup timers
are event-time, so a processing-time firing runs early fire and returns while
an event-time firing runs cleanup only. The discrimination is gated on a new
cross-domain flag, so the natural pairings keep the previous timestamp - delay
recovery where early fire and cleanup share a domain.

A processing-time firing timestamp cannot be mapped back to an event-time cache
bucket arithmetically, so a per-side MapState<Long, List<Long>> keyed by firing
processing-time records the event-time bucket keys due to fire then. It is
allocated only in the cross-domain case and reuses the existing per-bucket emit
and positional fired bit, so the retract-and-correct path is shared. Every
scheduled firing time fires and removes its own entry, and a bucket already
cleaned by event-time expiry makes the firing a no-op, so nothing accumulates.

The schedule is value-typed and order-preserving and processing-time timers are
checkpointed, so a timer pending at snapshot fires after restore against the
restored schedule and fired bits and emits at most the not-yet-emitted pad.
Harness tests cover the wall-clock trigger without watermark advance, a snapshot
before the timer fires, and a snapshot after the pad is emitted.
…-fire interval join

Round out the feature with end-to-end restore coverage and user docs; no
operator or planner changes.

Add an INTERVAL_JOIN_EARLY_FIRE table test program: an event-time LEFT OUTER
interval join carrying EARLY_FIRE('delay'='2s') against a changelog sink. An
unmatched left row is null-padded (+I) before the savepoint and corrected
(-U then +U) when its matching right row arrives after restore. That correction
is only producible if the early-fired bit MapState round-trips through the
savepoint, so the program exercises state survival rather than a single fire in
isolation. The generated plan carries the additive earlyFireDelay and
earlyFireTimeMode fields; the three existing fixtures, which omit them, restore
unchanged and are left untouched.

Document the EARLY_FIRE hint in the SQL joins reference (English and Chinese):
its purpose, the delay and time_mode options with their defaults and error
cases, the single-fire and outer-only semantics, and a note distinguishing it
from the unrelated table.exec.emit.early-fire.* window-aggregation config.
@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Jun 7, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@weiqingy
Copy link
Copy Markdown
Author

weiqingy commented Jun 7, 2026

Hi @wuchong @xuyangzhong @xccui Could you please help review this PR? Thanks!

…overage and config docs

The EARLY_FIRE join hint only accepts key-value options, so it must be
treated like the LOOKUP hint in two places that assume list-style hints:

- JoinHintTestBase#testMultiJoinHints builds every join hint with
  list-option syntax; EARLY_FIRE has to be filtered out alongside LOOKUP.
- The config-docs completeness check scans *Options classes under
  org.apache.flink.table.api.config; EarlyFireJoinHintOptions is
  documented in the SQL join-hints page, not the generated config
  tables, so it joins LookupJoinHintOptions in the exclusion set.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants