[AURON #1865] Merge native operators: fuse Flink source and Calc into one native plan#2327
[AURON #1865] Merge native operators: fuse Flink source and Calc into one native plan#2327weiqingy wants to merge 8 commits into
Conversation
…t a watermark AuronKafkaSourceFunction.open() set isRunning=true only inside the watermarkStrategy != null branch, but run() collects records only while isRunning is true on both the watermark and no-watermark paths. A source configured without an event-time watermark therefore emitted nothing. Set isRunning=true unconditionally once open() completes so no-watermark sources emit records (and snapshot offsets and discover partitions) like watermarked ones, while a partial-init failure still leaves the source not-running. Add a no-watermark mock table and an end-to-end test asserting the source emits its records.
…AuronKafkaSourceFunction Enable AuronKafkaSourceFunction to run a fused Project[Filter?[KafkaScan]] native plan when given a logical Calc sub-plan via setMergedCalcPlan, splicing the KafkaScan into the FFIReader-placeholder leaf and prepending the three metadata passthrough columns so the fused output stays metadata-prefixed. The path is dormant until the planner wires it: with no merged plan set, open(), run(), and getOutputType() behave exactly as before.
Apply strict gating in the shadowed StreamExecCalc: a Calc converts to native only when it can fuse into a native upstream chain -- its input is the Auron native Kafka source, the source has no event-time watermark, and every Rex converts. When fusible, the Calc's logical Project[Filter?] sub-plan is handed to the source via setMergedCalcPlan and the source transformation is returned re-typed to the Calc's projected output, eliminating the Calc operator so the source -> Calc chain runs as one native pipeline. Every other case (non-native upstream, watermarked source, any unconvertible Rex) runs as a stock Flink Calc; the standalone FFIReader native-Calc emission is removed. A Calc over a non-native source (e.g. the values connector) now runs as a Flink codegen Calc, so testCastBooleanToString asserts Flink's uppercase boolean-to-string rendering.
…c fusion Add AuronKafkaSourceMergeITCase exercising the fused path end-to-end over the no-watermark auron-kafka table T5: filter + projection fusion, projection-only fusion, and a native boolean-to-string cast whose lowercase rendering proves the fused Project[Filter?[KafkaScan]] ran natively (Flink codegen renders uppercase). A control case over the watermarked source T2 confirms the non-fused path stays correct. Operator elimination is covered by the StreamExecCalc unit tests; the integration tests assert row-set correctness and native execution.
|
Hi @Tartarus0zm, could you please help review this PR when you get a chance? Thanks! |
SteNicholas
left a comment
There was a problem hiding this comment.
Auron StreamExecCalc → Kafka source fusion review
Nice direction. I verified the core mechanism is sound: native resolves projection/filter columns by name (auron-planner/src/planner.rs:876, proto index ignored), so prepending the 3 meta columns and the resulting ordinal shift are harmless; meta columns stay first so offset-commit bookkeeping is unchanged; meta names/types/nullability match the KafkaScan schema; and the isRunning move is a genuine no-watermark emit fix. The new non-transient fields serialize fine.
The findings cluster around one root cause: the fused Calc returns the upstream source Transformation directly and mutates the shared AuronKafkaSourceFunction, with no guard against the same source being reached more than once (ExecNodeBase.translateToPlan memoizes it). The top three correctness items — shared-source plan overwrite, double setOutputType, and stacked-Calc double-fuse — are all closed by only fusing when the source has a single consumer and isn't already merged. Details inline.
Cleared (not issues): meta column order/types/reader offsets, run() projected-type sizing, field serialization, and the boolean-cast lowercase/uppercase test flips.
| // failure, so it goes to Flink's Calc regardless of the fallback flag. | ||
| final AuronKafkaSourceFunction source = asNativeKafkaSource(upstream); | ||
| if (source != null && !source.hasWatermark()) { | ||
| source.setMergedCalcPlan(plan.get(), outputRowType); |
There was a problem hiding this comment.
Correctness (high): a source reached by two Calcs gets its merged plan overwritten.
setMergedCalcPlan mutates the shared AuronKafkaSourceFunction, but ExecNodeBase.translateToPlan memoizes the source transformation, so one source instance can be handed to more than one Calc:
- Source reuse (default
table.optimizer.reuse-source-enabled=true): this source declares no projection/filter push-down, so two scans of the same table collapse into one shared source node. Two fusible Calcs above it both callsetMergedCalcPlan— last write wins, and since both return the same re-typedupstream, the branches collapse to one stream. e.g.SELECT age FROM T5 WHERE age>20 UNION ALL SELECT age+1 FROM T5 WHERE age>10→ silently wrong results. - Stacked Calcs (
source→Calc1→Calc2left unmerged byCalcMergeRule): Calc1 returns the source transformation, so Calc2 re-matches the same source, overwrites Calc1's plan, and builds exprs against Calc1's projected columns theKafkaScannever emits → native name-resolution failure.
Suggest fusing only when the source has exactly one consumer/output edge, and refusing to re-fuse an already-merged source (guard on mergedCalcPlan != null).
There was a problem hiding this comment.
Confirmed, and it's reachable under default config. Instrumented repro: SELECT age FROM T5 WHERE age>20 UNION ALL SELECT age+1 FROM T5 WHERE age>10 → both Calcs get the same source instance, both fuse, result is 6 rows instead of 5.
Fixed in 03b757df: fusion now proceeds only when the source's sole-consumership is provable — source/sub-plan reuse disabled (so each scan gets its own source), plus an already-merged guard for the stacked-Calc case. A translating Calc can't see how many operators read its source, and one physical source can't serve two projections, so turning fusion on by default needs a graph-level pass that counts each source's consumers — filed as follow-up #2329 and recorded as Rev 5 in the design doc. Added a regression test that asserts the UNION ALL case is correct under default reuse.
| // the source and this Calc has read getOutputType() yet — Flink's Transformation locks | ||
| // the type on first read. An ExecNode inserted between source and Calc that reads the | ||
| // type before this point would lock the original RowType and make this re-type a no-op. | ||
| upstream.setOutputType(InternalTypeInfo.of(outputRowType)); |
There was a problem hiding this comment.
Correctness: this is a second setOutputType on the source transformation and can throw.
CommonExecTableSourceScan already typed this transformation. Transformation.setOutputType throws IllegalStateException once typeUsed is set (any prior getOutputType() read). The single-consumer happy path works because nothing reads the type in between — but in the shared/stacked-source case (see the setMergedCalcPlan comment), a downstream of the first consumer locks the type and this re-type aborts job translation instead of falling back. A single-consumer guard also resolves this.
There was a problem hiding this comment.
Same root cause — agreed. The sole-consumer gate removes this: with no shared, already-typed source to re-type, the second setOutputType is never reached. Closed by 03b757df.
| // Set at plan time, so these must survive operator serialization to the TaskManager | ||
| // (non-transient, like watermarkStrategy below). | ||
| private PhysicalPlanNode mergedCalcPlan; | ||
| private RowType mergedProjectedOutputType; |
There was a problem hiding this comment.
Simplification: mergedProjectedOutputType is derivable from mergedCalcPlan's outer ProjectionExecNode (field names + arrow types) and drives a duplicated mergedProjectedOutputType != null ? … : (RowType) outputType ternary in both getOutputType() (L435) and run() (L329). Deriving it on demand removes the field and collapses both ternaries to one source of truth.
There was a problem hiding this comment.
The field is redundant, but I'd push back on deriving it: there's no reverse Arrow→LogicalType converter in the repo (only the forward convertToAuronArrowType), so reconstructing the RowType from the projection's arrow types would be lossy on nullability and add fragile new code — whereas the field is handed straight from the planner with the correct type already in hand.
Counter in 93aaae62 that addresses the real concern (the duplicated ternary in run() and getOutputType()): keep the field but collapse both into one effectiveLogicalOutputType() accessor, so there's a single source of truth. Happy to revisit a proper reverse converter later if you'd prefer the field gone.
| * {@code Project[FFIReader]} and {@code Project[Filter[FFIReader]]} (the Calc always wraps its | ||
| * output in a projection). | ||
| */ | ||
| private static PhysicalPlanNode spliceScanIntoLeaf(PhysicalPlanNode node, PhysicalPlanNode kafkaScan) { |
There was a problem hiding this comment.
Cleanup (reuse + dead code): spliceScanIntoLeaf is a near-verbatim duplicate of FlinkAuronCalcOperator.injectFfiReaderLeaf (same recursive Project/Filter/FFIReader walk and error). After this PR, FlinkAuronCalcOperator and its (large) unit test are orphaned — nothing emits the standalone FFIReader native-Calc path anymore. Either share one parameterized tree-rewriter or remove the now-dead operator + test, so the two walkers can't drift apart.
There was a problem hiding this comment.
Confirmed — after this PR FlinkAuronCalcOperator is only referenced by its own test, and spliceScanIntoLeaf duplicates its injectFfiReaderLeaf. Filed as #2328: remove the operator and its test, collapse the two walkers into one, and fold in the bare-marker-leaf cleanup. It lands as a focused PR off master right after this merges — the operator is dead only once this is in, so removing it from a clean master keeps the diff honest and this PR's diff on the fusion + correctness work.
| * so it runs as a stock Flink Calc above the native source. The row set must still be correct: | ||
| * only age 21 and 22 survive the filter. */ | ||
| @Test | ||
| public void testCalcOverWatermarkedSourceDoesNotFuseButIsCorrect() { |
There was a problem hiding this comment.
Test validity: this claims to cover the hasWatermark() gate, but T2's watermark is on a computed column, so Flink inserts a watermark-assigner/projection between the source and the Calc — the Calc's upstream is no longer a LegacySourceTransformation, so asNativeKafkaSource() returns null and fusion is blocked by the non-native-upstream branch, not the watermark branch. If hasWatermark() were deleted this test would still pass (false confidence — and it's the only guard against the copy() watermark-drop risk noted on hasWatermark). A directly-watermarked native source (no computed column) would actually exercise the gate.
There was a problem hiding this comment.
I'd push back here with evidence. I instrumented the T2 path: the Calc's upstream is a LegacySourceTransformation (native source detected) with hasWatermark true, and it falls back via the hasWatermark() guard — not the non-native-upstream branch. Because the source implements SupportsWatermarkPushDown, Flink folds the watermark into the source rather than inserting an assigner operator, so the Calc's immediate upstream is still the native source.
So deleting hasWatermark() flips T2 to fuse — the test does exercise the guard, and a directly-watermarked source would be redundant. I'll add a short comment documenting the push-down mechanism so the intent is clear.
A Calc fused into the native Kafka source by mutating the shared AuronKafkaSourceFunction and returning its Transformation re-typed to the projected output. When Flink source reuse hands the same source to more than one Calc, the source's translated Transformation is memoized, so both Calcs fused it last-write-wins, and a single physical source cannot emit two different projections. A UNION ALL over one no-watermark table therefore produced wrong results. Fusion now proceeds only when the source is provably a sole consumer: source and sub-plan reuse are both disabled (Flink then builds a distinct source per scan) and the source has not already been merged (guarding stacked Calcs). Every other case runs the stock Flink Calc, unchanged in correctness. The existing fusion integration tests now disable reuse so they still exercise the native fused path; a new test asserts a shared-source UNION ALL is correct under default reuse.
…ableSource.copy() copy() rebuilt the source via its constructor and dropped the applied watermarkStrategy, resetting it to null. The gate that blocks fusing a Calc into a watermarked source reads the strategy off the source function, so it silently relied on Flink re-applying the watermark after every copy(). Carry the strategy through copy() so the gate is self-contained regardless of when the optimizer copies the source.
Collapse the duplicated metadata-or-projected output-type ternary in run() and getOutputType() into one accessor. Define the three Kafka metadata columns (partition, offset, timestamp) once in KafkaConstants and derive the source row-type assembly, the projection passthrough, and the proto schema conversion from it, so adding or renaming a metadata column happens in one place. Fail fast when the merged sub-plan root is not a projection, and when a logical projected column name collides with a reserved metadata column name, instead of silently producing a wrong plan.
|
@SteNicholas Thanks for the thorough review — the single-root-cause framing was right, and it led to a sharper finding than I'd assumed. I reproduced the shared-source case end-to-end: One constraint shaped the fix: a translating Calc cannot see how many operators read its source (Flink's exec-node graph only points from consumer to producer), and a single physical source can only run one plan — so a "first-Calc-claims, others-fall-back" guard isn't enough, because the first fuse already re-points the shared source. Checking "is this source read by exactly one operator" needs a pass that sees the whole plan graph, which a single Calc node doesn't have. So for this PR I gate fusion to the cases where sole-consumership is provable (source/sub-plan reuse disabled, plus an already-merged guard for stacked Calcs), which is correct under all configs. Turning fusion on by default — by counting each source's consumers in a graph-level pass — is follow-up work I've filed as #2329 (recorded as Rev 5 in the design doc). If you'd rather bring that into this PR despite the added Fixes pushed as separate commits ( |
|
|
||
| @Override | ||
| @SuppressWarnings("unchecked") | ||
| protected Transformation<RowData> translateToPlanInternal(PlannerBase planner, ExecNodeConfig config) { |
There was a problem hiding this comment.
From my gut feeling, doing the merge here isn't a good choice:
-
Each StreamExec should only be responsible for translating its own operator — it shouldn't reach across operators.
-
The way we're checking whether to merge operators is too hacky. I'd suggest getting the upstream operator and checking operator instanceof FlinkAuronOperator or function instanceof FlinkAuronFunction instead — that's much more reasonable.
Also, the ideal approach for multi-operator fusion would be to use a Rule/optimizer, but that's significantly more complex, and we'd also need to figure out how to inject that optimization rule into Flink.
So a relatively pragmatic middle ground is to handle this during JobGraph generation, similar to how Gluten does it — we'd need to rewrite Flink's StreamGraphTranslator.
What do you think? @weiqingy @SteNicholas
There was a problem hiding this comment.
Thanks — this is the right question to put on the table, and I agree with where you're pointing.
On altitude: you're right that a StreamExec translating its own node shouldn't reach across to mutate its upstream. The end state I want is the one you describe — a graph-level pass (an ExecNodeGraphProcessor, or rewriting StreamGraphTranslator as Gluten does) that sees the whole chain and fuses there, so no node reaches outside itself. I've tracked that as #2329; it's also what multi-consumer / source-reuse correctness needs, so the two converge.
On detection: agreed. I've switched it to key on the FlinkAuronFunction marker and lifted the fusion hand-off onto that interface, so it no longer depends on the concrete Kafka source — any native Auron source function is now a fusion target. The one Kafka-specific bit still remaining is the LegacySourceTransformation unwrap, which the #2329 graph-level pass removes entirely.
Where I'd value your call is sequencing. This PR gates fusion to the sole-consumer-provable case (source / sub-plan reuse off), so it's correct under every config, covered end-to-end, and mirrors the plan-merge already used on the Spark side — so I see it as a correct stepping stone that delivers the single-conversion win now, with the graph-level rewrite (#2329) as the proper home the later #1264 operator phases build on. The alternative is to hold this and go straight to the StreamGraphTranslator approach.
My preference is to land this as the interim and do the graph-level pass in #2329, but I'm happy to pivot now if you'd rather not carry the in-StreamExecCalc step.
There was a problem hiding this comment.
@weiqingy This issue was originally designed for fusing adjacent Auron native operators, but it indeed only considered the single-operator (single source) scenario, without taking into account the multi-operator reuse case.
I haven't encountered the multi-operator reuse scenario very often, so I didn't consider it at the time.
I still recommend going with the graph-level approach from the start.
- First the logic is clearer; second, it covers a wider range of scenarios and is more general-purpose; and third, subsequent operator fusion work can continue to reuse it.
- If we implement this based on StreamExecCalc now, once the graph-level implementation is done in the future, how would we handle the code in StreamExecCalc?
I think this issue can be treated as covering single-operator fusion, with #2329 handling multi-operator reuse fusion. If #2329 can completely cover the single-operator fusion scenario as well, then I think this issue can be abandoned.
What do you think? @weiqingy @SteNicholas
There was a problem hiding this comment.
@Tartarus0zm @weiqingy I re-reviewed the current revision with this question in mind, and I come down on the graph-level approach too. Three things from the re-review tipped me:
1. The interim delivers almost nothing under default config. Fusion is gated to isSourceFusionSafe — both table.optimizer.reuse-source-enabled and reuse-sub-plan-enabled off — and both default to true. So out of the box nothing fuses; the IT cases have to disable reuse in @BeforeEach to exercise the path at all. And since this PR also removes the standalone FlinkAuronCalcOperator emission, under default config every convertible Calc now goes to Flink codegen rather than native. The "single-conversion win now" only materializes when a user opts into a non-default reuse config (which itself can regress other queries by un-sharing sources), so the default-config value is close to nil — which is most of the stepping-stone argument.
2. The correctness gate infers a whole-graph property from session config, which is exactly what a node can't see. isSourceFusionSafe uses "reuse disabled ⇒ one consumer per source" as a proxy for sole-consumership. That's sound for a freshly-planned SQL job but it isn't topology — a compiled plan (@JsonCreator/COMPILE PLAN is wired here) freezes source sharing at compile time, and restoring it under a different reuse config can re-permit fusion into a genuinely shared source, reopening the last-write-wins corruption the gate closed. Counting a source's real consumers needs the graph, so it belongs in #2329.
3. Your maintenance question is the deciding one. Once #2329 lands graph-level fusion, the in-StreamExecCalc reach-across (the LegacySourceTransformation unwrap, the reuse-config gate, the upstream re-typing) is all throwaway — and it lives in the shared Calc shadow every query flows through, carrying a behavior change (strict mode FAIL_BACK=false now silently falls a convertible-but-unfusible Calc back to Flink codegen instead of throwing). I'd rather not carry that.
So +1 to building #2329 graph-level from the start and treating #1865 as the single-operator case it subsumes; if #2329 covers single-operator fusion, folding #1865 into it as you suggest makes sense.
@weiqingy — to be clear, none of the implementation is wasted: the Rex→native converter wiring, the metadata-splice + by-name column resolution, and the no-watermark emit fix all carry straight over to the graph-level pass; it's really the placement and the config-inference gate that move. And both your pushbacks hold on re-check — there's genuinely no reverse Arrow→LogicalType converter so keeping mergedProjectedOutputType is right, and T2 does exercise the hasWatermark() gate (watermark pushed into the source, Calc's upstream stays a LegacySourceTransformation). One small follow-up either way: that test still passes with hasWatermark() deleted since [22,23] is the result fused or not, so a watermark-dependent assertion would actually lock the gate.
If the team would still rather land this as an interim, I'm not blocking — I'd just want it to (a) state explicitly that fusion is off under default reuse, (b) guard the compiled-plan restore case, and (c) preserve strict mode's native-or-throw contract.
…n marker Lift the source-side fusion hand-off (setMergedCalcPlan / isMergedCalcPlanSet / hasWatermark) onto the FlinkAuronFunction interface, and detect the upstream fusion target by that marker instead of the concrete AuronKafkaSourceFunction. Any native Auron source function is now a fusion target, not only the Kafka source; the source-unwrap of the legacy transformation is unchanged.
SteNicholas
left a comment
There was a problem hiding this comment.
Re-review follow-ups after the fix commits. The shared-source correctness items are addressed and both pushbacks hold up on re-check (no reverse Arrow→LogicalType converter, and T2 does exercise the hasWatermark() gate). The items below are new from re-reading the current revision — a couple of correctness/robustness edges plus test/maintainability points.
The overarching point — under default reuse config nothing fuses, and standalone native Calc is removed, so by default every convertible Calc now runs Flink codegen — is the design discussion in the thread above; these inline notes are the concrete follow-ups regardless of which direction the PR takes.
| } | ||
|
|
||
| LOG.debug("Calc node {} converts but cannot fuse into a native source; using Flink Calc.", getId()); | ||
| return translateToFlinkCalc(planner, config); |
There was a problem hiding this comment.
Correctness (behavior change + doc mismatch): strict mode no longer guarantees native-or-throw for a Calc. When plan.isPresent() but the Calc can't fuse, this always returns translateToFlinkCalc(...) regardless of FAIL_BACK_FLINK_ENGINE_ENABLED. Two issues:
- The class javadoc claims the opposite — "except when
FAIL_BACK_FLINK_ENGINE_ENABLEDisfalseand the Calc is fully convertible yet cannot fuse, where it throwsIllegalStateException." The code never throws on that branch; the only throw is in the!plan.isPresent()branch above. Doc and code disagree. - Pre-PR a convertible Calc unconditionally emitted
FlinkAuronCalcOperator, so strict mode meant "convert natively or throw." Now a convertible-but-unfusible Calc (which under default reuse is every Calc) silently degrades to Flink codegen atdebuglevel — strict mode no longer implies native execution. Either re-check the fallback flag and throw here, or fix the javadoc to match.
| * @return {@code true} only when both source reuse and sub-plan reuse are disabled | ||
| */ | ||
| private static boolean isSourceFusionSafe(ReadableConfig config) { | ||
| return !config.get(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED) |
There was a problem hiding this comment.
Correctness (latent): inferring sole-consumership from the session reuse config breaks for compiled plans. isSourceFusionSafe reads reuse-source-enabled/reuse-sub-plan-enabled at translate time as a proxy for "this source has exactly one consumer." That holds for a freshly-planned SQL job, but StreamExecCalc is plan-serializable (@JsonCreator + @ExecNodeMetadata) and a compiled plan freezes source sharing at compile time. Compile a UNION ALL over one table with reuse enabled (one shared source → two Calcs), then EXECUTE PLAN in a session with reuse disabled: this returns true, the first Calc fuses and re-types the shared upstream, and the second consumer reads corrupted rows — exactly the last-write-wins case the gate is meant to block. Session config no longer reflects the compiled topology; real consumer-counting needs the graph-level pass (#2329).
| // merged Calc plan is active and the original output otherwise. | ||
| List<RowType.RowField> fieldList = new LinkedList<>(KAFKA_AURON_META_FIELDS); | ||
| fieldList.addAll(effectiveLogicalOutputType().getFields()); | ||
| RowType auronOutputRowType = new RowType(fieldList); |
There was a problem hiding this comment.
Cleanup (correctness footgun): the metadata count is still hardcoded 3 despite KAFKA_AURON_META_FIELDS being declared the single source of truth. This line correctly derives the row type from KAFKA_AURON_META_FIELDS, but both run() loops below still use literal FlinkArrowReader.create(batch, auronOutputRowType, 3) and getInt(-3) / getLong(-2) / getLong(-1). KafkaConstants' javadoc promises "adding or renaming a metadata column happens in one place," yet adding a 4th meta column would grow auronOutputRowType to 4 meta cols while the reader still assumes 3 → every row reads the wrong Arrow vector. Derive the offset from KAFKA_AURON_META_FIELDS.size() so the contract actually holds.
| List<ArrowType> originalTypes = new ArrayList<>(projection.getDataTypeList()); | ||
|
|
||
| for (RowType.RowField metaField : KAFKA_AURON_META_FIELDS) { | ||
| if (originalNames.contains(metaField.getName())) { |
There was a problem hiding this comment.
Robustness: reserved-meta-name collisions are caught late and incompletely.
- (output names) This check runs at TaskManager
open()and throws. But the planner fusion gate inStreamExecCalchas no equivalent check, soSELECT age AS serialized_kafka_records_offset FROM T5passes planning, fuses, then crash-loops every task instead of degrading to a stock Flink Calc at plan time. Checking the projected output names at the gate would let an unfusable name fall back rather than fail the job. - (input names) Only the projection's output names are checked. A logical input column literally named e.g.
serialized_kafka_records_timestampis unguarded: the splicedKafkaScanschema becomes[meta3, ...logical]with a duplicate name, and since native resolves columns by name (planner.rs:876) the inner expr binds to the meta column (index 2), silently returning Kafka timestamps in place of user data. Low-likelihood but silent — worth rejecting on input names too.
| .executeSql("SELECT `age` + 1 FROM T2 WHERE `age` > 20") | ||
| .collect()); | ||
| rows.sort(Comparator.comparingInt(o -> (int) o.getField(0))); | ||
| assertThat(rows).isEqualTo(Arrays.asList(Row.of(22), Row.of(23))); |
There was a problem hiding this comment.
Test validity: this asserts only the row set [22, 23], which is identical whether or not the Calc fuses, so it can't catch a regression in the hasWatermark() gate it's meant to cover. To close the loop on our thread: you're right that SupportsWatermarkPushDown folds the watermark into the source, so the Calc's upstream stays a LegacySourceTransformation and hasWatermark() (not the non-native branch) is what fires — my earlier guess was wrong. But the assertion here is insensitive to that: delete hasWatermark() and T2 would wrongly fuse into a watermarked source (stripping per-record event-time), yet this test still passes because filter+projection yields [22, 23] either way. A watermark-dependent assertion (a windowed aggregation over ts) or an operator-topology assertion would actually lock the gate. Same insensitivity applies to the positive fusion cases — only the boolean-cast lowercase/uppercase case truly distinguishes native from Flink codegen.
Which issue does this PR close?
Closes #1865
Rationale for this change
A
SELECT proj … FROM kafka WHERE predplan runs today as two independent native operators: the native Kafka source converts its columnar output toRowData, then the shadowedStreamExecCalcoperator converts it back to Arrow, runsProject[Filter]natively, and converts back toRowData— paying multiple row/column conversions where one would do. This PR fuses the source and a convertible Calc into a single native planProject[Filter?[KafkaScan]]that runs inside the source, so the data stays columnar end-to-end and converts toRowDataonce, at the chain tail. This is the whole-stage native operator merging direction for the Flink integration, mirroring the plan-merge already used on the Spark side.What changes are included in this PR?
Strict gating in the shadowed
StreamExecCalc: a Calc converts to native only when it can fuse into a native upstream chain — its input is the native Auron Kafka source, the source has no event-time watermark, and every expression converts. When fusible, the Calc hands its logicalProject[Filter?]sub-plan to the source and returns the source transformation re-typed to the projected output, eliminating the Calc operator. Every other case (non-native upstream, watermarked source, any unconvertible expression) runs as a stock Flink Calc.The source splices its
KafkaScaninto the handed-off sub-plan and prepends the partition/offset/timestamp metadata columns, so its per-record offset-commit and timestamp bookkeeping are unchanged; native column resolution is by name, so no column-index rewriting is needed.The standalone FFI-reader native-Calc path is no longer emitted from the Calc decision. Detection and hand-off go through the source function, so no source-operator type is introduced; operator-level native identity is left to the future ExecNode-graph merge that would consume it. Event-time-watermark fusion is out of scope and tracked as a follow-up.
This PR is stacked on #2325 (no-watermark source emit fix), which it requires; that commit drops out of the diff once #2325 merges and this branch is rebased.
Are there any user-facing changes?
Performance: a
source → Calcchain over the native Kafka source with no event-time watermark now runs as a single native pipeline with one columnar-to-row conversion. A Calc that cannot fuse (non-native source, watermarked source, or unsupported expression) runs as Flink's codegen Calc.How was this patch tested?
Unit tests in
StreamExecCalcTestassert the fusion decision (native source + no watermark + convertible → merged source transformation with no separate Calc operator; each negative case → stock Flink Calc). An end-to-endAuronKafkaSourceMergeITCaseruns fused SQL over a no-watermarkauron-kafkatable and asserts row-set correctness; a native boolean-to-string cast renders lowercase, proving the fused path executes natively (Flink codegen renders uppercase). The fullauron-flink-planner(130) andauron-flink-runtime(137) module test suites pass.