Skip to content

optimize: extend internalConcat dispatch for value-presented sources#2978

Draft
He-Pin wants to merge 3 commits into
apache:mainfrom
He-Pin:optimize-internal-concat-value-presented
Draft

optimize: extend internalConcat dispatch for value-presented sources#2978
He-Pin wants to merge 3 commits into
apache:mainfrom
He-Pin:optimize-internal-concat-value-presented

Conversation

@He-Pin
Copy link
Copy Markdown
Member

@He-Pin He-Pin commented May 17, 2026

⚠️ Stacked on #2977 — do not merge until #2977 lands. The current diff includes #2977's commits; once #2977 merges, this PR's diff will narrow to its own commit. Drafted to make the dependency explicit.

Motivation

FlowOps#internalConcat previously had only one fast-path: a SingleSource on the right-hand side of concat / concatLazy was rerouted through the lightweight SingleConcat stage instead of the general two-port Concat[U](2, detached) fan-in graph (which materializes the whole substream plus a detacher buffer). All other value-presented sources (IterableSource, IteratorSource, RangeSource, RepeatSource, JavaStreamSource, FutureSource, FailedSource) still took the heavy concatGraph path even though their data is already in memory or trivially producible — the fan-in machinery and substream materialization were pure overhead. Heavy concat users (pekko-http and others) carry that cost on every materialization.

This mirrors the optimization just shipped for FlattenConcat / FlattenMerge in #2977, applied to the concat operator chain.

Modification

Add four small specialized GraphStage[FlowShape[E, E]] siblings of SingleConcat, each passing through elements while upstream is alive and draining its captured value-presented payload on onUpstreamFinish:

New stage Handles Drain behavior
IterableConcat[E](createIterator) IterableSource, IteratorSource, RangeSource, JavaStreamSource emitMultiple(out, createIterator(), () => completeStage())
RepeatConcat[E](elem) RepeatSource swap OutHandler so each onPull pushes elem
FailedConcat[E](failure) FailedSource failStage(failure)
FutureConcat[E](future) FutureSource If completed: emit/fail. If pending: swap OutHandler to a no-op (avoid pulling closed in), register async callback that emits/fails when resolved.

internalConcat is extended to dispatch via TraversalBuilder.getValuePresentedSource and pattern-match the eight value-presented source types (existing SingleSource path is preserved). The detached flag is irrelevant for these stages — the right-hand data is already present, so the one-element pre-fetch buffer that detached=true provides has nothing to fetch (matching SingleConcat's precedent).

Note: Source.future(Future.successful(x)) is itself optimized upstream into SingleSource, so completed futures dispatch via SingleConcat. Same for Source.future(Future.failed(ex))FailedSourceFailedConcat. Only genuinely-pending futures land on FutureConcat.

Result

For the eight value-presented source types, concat and concatLazy no longer pay for substream materialization or the two-port fan-in graph. Observable behavior is unchanged for all other sources, which still take the existing concatGraph path.

Tests

Eleven directional tests added to AbstractFlowConcatSpec (cover both concat and concatLazy paths):

  • optimize iterable concat / range concat / iterator concat / java-stream concat — assert IterableConcat in the traversal builder, output correct.
  • optimize repeat concat — assert RepeatConcat(0), bounded with .take(6).
  • optimize failed concat — assert FailedConcat, error propagated.
  • optimize completed-future concat — confirms it routes through SingleConcat (upstream optimization).
  • optimize pending-future concat — uses an unresolved Promise, asserts FutureConcat and correct delivery after resolution.
  • optimize failed-future concat — confirms it routes through FailedConcat (upstream optimization).
  • avoid downstream substream materialization for value-presented sources — counter-based assertion of zero substream materialization.

Verification commands:

sbt "stream-tests/testOnly *FlowConcatSpec *FlowConcatLazySpec *FlowConcatAllSpec *FlowConcatAllLazySpec *GraphConcatSpec"
sbt "stream/mimaReportBinaryIssues"
sbt "stream-tests/test"
sbt scalafmtAll headerCreateAll

All *ConcatSpec (96 tests) pass. MiMa clean (all new stages are private[pekko] / InternalApi). Full stream-tests/test (3027 tests) passes; the only failure was an unrelated BoundedSourceQueueSpec flake that does not use concat and passes in isolation.

References

Stacked on #2977.

He-Pin added 3 commits May 17, 2026 18:09
…sented sources

Motivation:
FlattenMerge previously only fast-pathed `SingleSource`. For all other
inner sources -- `Source.empty`, `Source(List)`, `Source.fromJavaStream`,
`Source.future(Future.successful(...))`, range/iterator/repeat sources --
each one paid the cost of materializing a `SubSinkInlet` and
`subFusingMaterializer.materialize(...)`. FlattenConcat already does this
optimization via `TraversalBuilder.getValuePresentedSource`; FlattenMerge
should benefit too, especially because the single-arg `flatMapConcat(f)`
internally uses `FlattenMerge(1)` and so depends on FlattenMerge for its
hot path.

Modification:
- Generalize FlattenMerge to dispatch on `getValuePresentedSource`
  (instead of `getSingleSource`) and consume `SingleSource`,
  `IterableSource`, `IteratorSource`, `RangeSource`, `RepeatSource`,
  `JavaStreamSource`, `FutureSource`, `FailedSource`, and empty sources
  in-place without materialization, mirroring FlattenConcat.
- Add an `InflightSource[T]` family inside the new `FlattenMerge`
  companion to occupy a breadth slot for multi-element value-presented
  sources. Track them via a new `pendingInflightSources` counter so
  `activeSources` correctly bounds the breadth budget.
- Preserve merge semantics: when an inflight source still has more
  elements after a push, re-enqueue it so other concurrent sources keep
  interleaving (instead of draining one source first, which is the
  concat behaviour).
- Fold completed `Future`s and `FailedSource` directly: success pushes
  or queues a single element, failure calls `failStage`.
- Pending `Future`s register a callback via `getAsyncCallback` and
  occupy a breadth slot until completion.
- Empty inner sources are discarded in place (no slot consumed).

Result:
- `flatMapMerge(breadth, ...)` and the default `flatMapConcat(...)`
  (which routes through `FlattenMerge(1)`) skip substream materialization
  for value-presented inner sources, reducing per-source GC and stage
  overhead.
- All existing FlattenMerge / flatMapConcat tests pass; new tests cover
  empty / single / iterable / range / java stream / completed and
  delayed future / failed inner sources across breadth = 1..128.
- Internal API only (`@InternalApi private[pekko]`); MiMa is clean.

References:
- The optimization mirrors FlattenConcat's value-presented-source
  handling introduced in 1.2.0.
…enMerge

Motivation:
After the previous commit, FlattenMerge grew its own copy of the
`InflightSource[T]` hierarchy (Iterator/Range/Repeat/CompletedFuture/
PendingFuture) duplicating what FlattenConcat already had in its
companion object. Two near-identical families across two files is a
maintenance hazard: any future tweak to the value-presented optimization
(e.g. adding a new source type, fixing a Java-stream cleanup leak) would
have to be mirrored, and the families had already drifted in small ways
(e.g. `tryPull`/`cancel`/`materialize` declared abstract in concat with
no-op overrides on every subclass; concat used `isClosed = true` for the
completed-future variant while merge used `!_hasNext`).

Modification:
- Extract the common `InflightSource[T]` base and the five value-presented
  subclasses (Iterator/Range/Repeat/CompletedFuture/PendingFuture) into
  a new `pekko.stream.impl.fusing.InflightSources` package-private object.
- Promote `tryPull` / `cancel` / `materialize` from abstract to concrete
  no-op defaults, so the value-presented subclasses no longer carry empty
  overrides. Stages that wrap a real `SubSinkInlet` (only FlattenConcat's
  `attachAndMaterializeSource` does this) override what they need.
- Align `InflightCompletedFutureSource.isClosed` to FlattenConcat's
  `true` semantics — behaviorally equivalent in both stages, but more
  faithful to the source being a one-shot cached value.
- Drop the `sealed` modifier on `InflightSource` so FlattenConcat's
  attached-substream anonymous subclass can still extend it from another
  file in the same package.
- Remove the duplicate definitions from FlattenConcat's companion (now
  unused, drop the empty companion entirely) and from FlattenMerge's
  companion. Both stages import from the shared object instead.

Result:
- Net -176 lines of duplication; one canonical home for the
  optimization's data types.
- Future additions (e.g. extending the optimization to other stream-of-
  streams stages such as `MergeMany`-style operators) only need to
  reference `InflightSources`.
- All FlattenConcat / FlattenMerge / flatMapConcat parallelism tests
  remain green; MiMa is clean (`@InternalApi private[fusing]`).
Motivation:
`FlowOps#internalConcat` previously had only one fast-path: `SingleSource`
on the right-hand side was rerouted through the lightweight `SingleConcat`
stage instead of the general two-port `Concat[U](2, detached)` fan-in graph
(which materializes the whole substream plus a detacher buffer). All other
value-presented sources (`IterableSource`, `IteratorSource`, `RangeSource`,
`RepeatSource`, `JavaStreamSource`, `FutureSource`, `FailedSource`) still
took the heavy `concatGraph` path even though their data is already in
memory or trivially producible — the fan-in machinery and substream
materialization were pure overhead. Heavy `concat` users (pekko-http and
others) carry that cost on every materialization.

Modification:
Add four small specialized `GraphStage[FlowShape[E, E]]` siblings of
`SingleConcat`, each passing through elements while upstream is alive and
draining its captured value-presented payload on `onUpstreamFinish`:

  - `IterableConcat[E](createIterator)` — emits via `emitMultiple`, covers
    `IterableSource`, `IteratorSource`, `RangeSource`, `JavaStreamSource`.
  - `RepeatConcat[E](elem)` — swaps `OutHandler` so each `onPull` pushes
    `elem`, covers `RepeatSource`.
  - `FailedConcat[E](failure)` — calls `failStage(failure)`, covers
    `FailedSource`.
  - `FutureConcat[E](future)` — emits/fails for completed futures, otherwise
    swaps `OutHandler` (to avoid pulling the now-closed `in` port) and
    registers an async callback that resolves once the future completes.

`internalConcat` is extended to dispatch via
`TraversalBuilder.getValuePresentedSource` and pattern-match the eight
value-presented source types (existing `SingleSource` path is preserved).
The `detached` flag is irrelevant for these stages — the right-hand data is
already present, so the one-element pre-fetch buffer that `detached=true`
provides has nothing to fetch (matching `SingleConcat`'s precedent).

Result:
For the eight value-presented source types, `concat` and `concatLazy` no
longer pay for substream materialization or the two-port fan-in graph.
Observable behavior is unchanged for all other sources, which still take the
existing `concatGraph` path. Eleven directional tests added to
`AbstractFlowConcatSpec` cover each new dispatch and assert (a) values
delivered correctly and (b) zero substream materialization for value-
presented sources. All `*FlowConcatSpec`, `*FlowConcatLazySpec`,
`*FlowConcatAllSpec`, `*FlowConcatAllLazySpec`, and `*GraphConcatSpec` pass.
MiMa is clean (all new stages are `private[pekko]` / `InternalApi`).
@He-Pin He-Pin force-pushed the optimize-internal-concat-value-presented branch 2 times, most recently from f01178f to c5a4e58 Compare May 18, 2026 03:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant