optimize: FlattenMerge avoids substream materialization for value-presented sources#2977
Open
He-Pin wants to merge 2 commits into
Open
optimize: FlattenMerge avoids substream materialization for value-presented sources#2977He-Pin wants to merge 2 commits into
He-Pin wants to merge 2 commits into
Conversation
…sented sources Motivation: FlattenMerge previously only fast-pathed `SingleSource`. For all other inner sources -- `Source.empty`, `Source(List)`, `Source.fromJavaStream`, `Source.future(Future.successful(...))`, range/iterator/repeat sources -- each one paid the cost of materializing a `SubSinkInlet` and `subFusingMaterializer.materialize(...)`. FlattenConcat already does this optimization via `TraversalBuilder.getValuePresentedSource`; FlattenMerge should benefit too, especially because the single-arg `flatMapConcat(f)` internally uses `FlattenMerge(1)` and so depends on FlattenMerge for its hot path. Modification: - Generalize FlattenMerge to dispatch on `getValuePresentedSource` (instead of `getSingleSource`) and consume `SingleSource`, `IterableSource`, `IteratorSource`, `RangeSource`, `RepeatSource`, `JavaStreamSource`, `FutureSource`, `FailedSource`, and empty sources in-place without materialization, mirroring FlattenConcat. - Add an `InflightSource[T]` family inside the new `FlattenMerge` companion to occupy a breadth slot for multi-element value-presented sources. Track them via a new `pendingInflightSources` counter so `activeSources` correctly bounds the breadth budget. - Preserve merge semantics: when an inflight source still has more elements after a push, re-enqueue it so other concurrent sources keep interleaving (instead of draining one source first, which is the concat behaviour). - Fold completed `Future`s and `FailedSource` directly: success pushes or queues a single element, failure calls `failStage`. - Pending `Future`s register a callback via `getAsyncCallback` and occupy a breadth slot until completion. - Empty inner sources are discarded in place (no slot consumed). Result: - `flatMapMerge(breadth, ...)` and the default `flatMapConcat(...)` (which routes through `FlattenMerge(1)`) skip substream materialization for value-presented inner sources, reducing per-source GC and stage overhead. - All existing FlattenMerge / flatMapConcat tests pass; new tests cover empty / single / iterable / range / java stream / completed and delayed future / failed inner sources across breadth = 1..128. - Internal API only (`@InternalApi private[pekko]`); MiMa is clean. References: - The optimization mirrors FlattenConcat's value-presented-source handling introduced in 1.2.0.
…enMerge Motivation: After the previous commit, FlattenMerge grew its own copy of the `InflightSource[T]` hierarchy (Iterator/Range/Repeat/CompletedFuture/ PendingFuture) duplicating what FlattenConcat already had in its companion object. Two near-identical families across two files is a maintenance hazard: any future tweak to the value-presented optimization (e.g. adding a new source type, fixing a Java-stream cleanup leak) would have to be mirrored, and the families had already drifted in small ways (e.g. `tryPull`/`cancel`/`materialize` declared abstract in concat with no-op overrides on every subclass; concat used `isClosed = true` for the completed-future variant while merge used `!_hasNext`). Modification: - Extract the common `InflightSource[T]` base and the five value-presented subclasses (Iterator/Range/Repeat/CompletedFuture/PendingFuture) into a new `pekko.stream.impl.fusing.InflightSources` package-private object. - Promote `tryPull` / `cancel` / `materialize` from abstract to concrete no-op defaults, so the value-presented subclasses no longer carry empty overrides. Stages that wrap a real `SubSinkInlet` (only FlattenConcat's `attachAndMaterializeSource` does this) override what they need. - Align `InflightCompletedFutureSource.isClosed` to FlattenConcat's `true` semantics — behaviorally equivalent in both stages, but more faithful to the source being a one-shot cached value. - Drop the `sealed` modifier on `InflightSource` so FlattenConcat's attached-substream anonymous subclass can still extend it from another file in the same package. - Remove the duplicate definitions from FlattenConcat's companion (now unused, drop the empty companion entirely) and from FlattenMerge's companion. Both stages import from the shared object instead. Result: - Net -176 lines of duplication; one canonical home for the optimization's data types. - Future additions (e.g. extending the optimization to other stream-of- streams stages such as `MergeMany`-style operators) only need to reference `InflightSources`. - All FlattenConcat / FlattenMerge / flatMapConcat parallelism tests remain green; MiMa is clean (`@InternalApi private[fusing]`).
8f297aa to
81c6aa4
Compare
Member
Author
|
I will do a follow up optimization around the |
Comment on lines
+144
to
+146
| case javaStream: JavaStreamSource[T, _] @unchecked => addInflightIteratorSource( | ||
| javaStream.open().iterator.asScala.asInstanceOf[Iterator[T]]) | ||
| case failed: FailedSource[T] @unchecked => addCompletedFutureElem(Failure(failed.failure)) |
Comment on lines
+375
to
+395
| s"not materialize value-presented sources, breadth = $breadth" in { | ||
| val materializationCounter = new AtomicInteger(0) | ||
| // SingleSource / IterableSource / RangeSource / FutureSource etc. are value-presented; | ||
| // FlattenMerge should consume them directly without paying for substream materialization. | ||
| // We attach a side effect to a non-value-presented wrapper to detect any unwanted materialization. | ||
| val n = breadth * 3 | ||
| val probe = Source(1 to n) | ||
| .flatMapMerge(breadth, value => Source.single(value)) | ||
| .map { v => | ||
| materializationCounter.incrementAndGet() | ||
| v | ||
| } | ||
| .runWith(TestSink()) | ||
|
|
||
| probe.request(n.toLong) | ||
| probe.expectNextN(n.toLong).toSet should ===((1 to n).toSet) | ||
| probe.expectComplete() | ||
| // The downstream map fires once per emitted element, but each Source.single | ||
| // is consumed without spinning up its own substream materialization. | ||
| materializationCounter.get() shouldBe n | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
FlattenMergepreviously only fast-pathedSingleSource. Every otherinner source —
Source.empty,Source(List),Source.fromJavaStream,Source.future(Future.successful(...)), range / iterator / repeatsources — paid the cost of materializing a
SubSinkInletplus a fullsubFusingMaterializer.materialize(...)round-trip.FlattenConcatalready eliminates this via
TraversalBuilder.getValuePresentedSource(introduced in 1.2.0);
FlattenMergeshould benefit too.This is especially important because the single-argument
flatMapConcat(f)is implemented asvia(new FlattenMerge(1)), so alldefault
flatMapConcatusers go throughFlattenMergeand miss out onthe value-presented optimization until now.
Modification
FlattenMerge.addSourcefromgetSingleSourcetogetValuePresentedSource, dispatching onSingleSource,IterableSource,IteratorSource,RangeSource,RepeatSource,JavaStreamSource,FutureSource,FailedSource, and empty sourcesin place — mirroring
FlattenConcat.InflightSource[T]family in a newFlattenMergecompanion to occupy a breadth slot for multi-element value-presented
sources without materializing a substream. Tracked via a new
pendingInflightSourcescounter soactiveSourcesstill bounds thebreadth budget correctly.
inflight source, re-enqueue it so other concurrent sources keep
interleaving (rather than draining one source first as
FlattenConcatdoes).
Futures /FailedSourceare folded directly: successpushes or queues a single element; failure calls
failStage.Futures register angetAsyncCallbackand occupy a slotuntil completion.
FlattenMergeis@InternalApi private[pekko]so this is purely aninternal performance change; MiMa is clean.
Result
flatMapMerge(breadth, ...)and the defaultflatMapConcat(...)(which routes through
FlattenMerge(1)) skip substream materializationfor value-presented inner sources, cutting per-source GC and stage
overhead.
Sources built from graphs, lazy futures, etc.) — they stillmaterialize a
SubSinkInletas before.Tests
FlowFlattenMergeSpectests pass.FlowFlatMapConcatParallelismSpectests pass (covers theFlattenMerge(1)path used by single-argflatMapConcat).FlowFlattenMergeSpecmirrorFlowFlatMapConcatParallelismSpec:work with value presented sources with breadth: {1,2,4,8,16,32,64,128}— covers empty / single / iterable / completed-future / lazy-future /
delayed-future inner sources.
work with generated value presented sources with breadth: ...—randomised mix of value-presented sources at scale.
work with value presented failed sources— failure inside theoptimized path.
avoid pre-materialization for value-presented sourcesandnot materialize value-presented sources— assert the fast pathactually fires.
Local commands run:
sbt "stream-tests/testOnly org.apache.pekko.stream.scaladsl.FlowFlattenMergeSpec"→ 37/37 passsbt "stream-tests/testOnly *FlatMap* *Flatten* *PrefixAndTail* *Concat*Spec"→ 232/232 passsbt scalafmtAll headerCreateAllsbt "stream/mimaReportBinaryIssues"→ no issuesReferences
FlattenConcatin 1.2.0 (TraversalBuilder.getValuePresentedSource).