[NO REVIEW]fix: optimize ChangeFeed partition planning from O(P*T log T) to O(T log T + P log T)#49084
Closed
xinlian12 wants to merge 9 commits intoAzure:mainfrom
Closed
Conversation
… O(T log T + P log T) Cache sorted continuation tokens once per ChangeFeedState instance and use binary search to find overlapping ranges, eliminating redundant copy+sort per partition in Spark's planInputPartitions hot path. Changes: - Add SortedTokensSnapshot cache with identity-based invalidation - Replace linear scan with binary search for first overlapping token - Add fallback to full scan for non-contiguous/legacy ranges - Add comprehensive tests: correctness, caching, large-scale (10k tokens), edge cases (null continuation, single token, boundary crossover, full range) For 10,000 tokens and partitions, total extraction time drops from minutes to ~400ms. Implements Azure#49023 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…x partial-miss fallback, add test coverage - F1: Fallback now scans indices before startIndex even when primary found results, catching partial misses from overlapping/non-contiguous ranges - F2: Extracted shared collectOverlapping() method, eliminating DRY violation - F3: Added fallback tests (complete miss + partial miss) with overlapping token ranges - F4: Renamed misleading _noOverlap test to _lastTokenExactMatch, added real no-overlap test - F5: Increased perf test threshold to 30s with CI-fragility comment - F6: Wrapped cached sorted list with Collections.unmodifiableList() - F7: Added comment explaining intentional reference equality for cache invalidation - F8: Replaced size() > 0 with !isEmpty() via the new helper method - F10: Added test with unsorted input to exercise the sort path Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
… thread-safety and early-break, mark cache transient - F1: Add test verifying setContinuation() invalidates cached snapshot - F2: Document benign race and thread-safety intent on volatile field - F3: Document early-break contiguity assumption in collectOverlapping - F4: Mark cachedSortedTokensSnapshot field transient Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…inuation throw test, document setContinuation contract, tighten fallback comment, replace String[] with MinMaxAccumulator, include elapsed time in assertion Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Contributor
There was a problem hiding this comment.
Pull request overview
This PR optimizes Cosmos change feed partition planning by avoiding repeated sorting of continuation tokens during ChangeFeedState.extractForEffectiveRange, reducing planning complexity for large numbers of feed ranges/tokens.
Changes:
- Added a lazily-initialized, cached sorted snapshot of continuation tokens and a binary-search-based overlap scan in
ChangeFeedState. - Added/extended unit tests to validate correctness across repeated calls, edge cases, and fallback behavior.
- Documented the performance fix in the Cosmos SDK changelog.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/changefeed/common/ChangeFeedState.java | Introduces cached sorted-token snapshot, binary search start index, and overlap-collection logic with fallback. |
| sdk/cosmos/azure-cosmos/CHANGELOG.md | Adds a bug-fix entry describing the planning performance improvement. |
| sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/ChangeFeedStateTest.java | Adds extensive tests for correctness, caching behavior, and fallback scenarios (includes a large-scale timing assertion). |
…tability comment, reduce perf test scale Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…ectiveRanges - Add extractForEffectiveRanges(List<Range>) that sorts tokens once and uses binary search per range, following InMemoryCollectionRoutingMap pattern - Single-range extractForEffectiveRange delegates to batch method - Remove volatile cachedSortedTokensSnapshot, SortedTokensSnapshot, MinMaxAccumulator, collectOverlapping, findFirstPotentialOverlapIndex - Update Spark ChangeFeedBatch hot loop to use batch API - Add batch bridge method extractChangeFeedStateForRanges in SparkBridgeImplementationInternal - Add parity test between single and batch extraction - Complexity: O(T log T + P log T) with zero internal mutable state Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Simulates realistic Spark planning with 30K feed ranges. Compares: - Batch API (extractForEffectiveRanges): sort once + binary search per range - Single-call loop (extractForEffectiveRange per range): sort per call Logs timing for manual inspection and asserts batch completes < 30s. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
6ead43b to
36468c3
Compare
…tedTokensAndRanges - Replace two binary searches (minIndex + maxIndex) with single binary search for start position + forward scan with early break on non-overlap - Remove SortedTokensAndRanges inner class; use List<CompositeContinuationToken> directly - Use CompositeContinuationToken(null, range) as binary search key (null token is valid) - Equally efficient for non-overlapping contiguous ranges (Cosmos DB contract) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Member
Author
|
Closing to recreate with cleaner approach batch API with simplified implementation. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #49023
Summary
Eliminate the quadratic O(P T log T) complexity in
ChangeFeedState.extractForEffectiveRangeby introducing a batch APIextractForEffectiveRanges(List<Range>)that sorts continuation tokens once and uses binary search for overlap detection, reducing the total complexity to O(T log T + P log T).Problem
The Cosmos Spark change feed connector spent excessive driver time planning microbatches when reading containers with many feed ranges / continuation tokens (>30k). The hot path
ChangeFeedMicroBatchStream.planInputPartitionsrepeatedly copied and sorted the full continuation-token list for each planned partition, resulting in O(P T log T) behavior that could stall planning for >15 minutes.Solution
extractForEffectiveRanges(List<Range<String>>)method sorts tokens once (O(T log T)), then uses binary search per range (O(log T)) following the establishedInMemoryCollectionRoutingMap.getOverlappingRangespattern.cachedSortedTokensSnapshotfield and all cache invalidation machinery. The sorted token list is local to each batch call no cache, no invalidation risks, no race conditions.extractForEffectiveRange(range)now delegates toextractForEffectiveRanges(singletonList(range)), maintaining backward compatibility with a single code path.ChangeFeedBatch.planInputPartitionsnow calls the batch API once instead of callingextractForEffectiveRangeper partition.Files Changed
ChangeFeedState.javaAddedextractForEffectiveRanges, removed cache machinery (SortedTokensSnapshot,MinMaxAccumulator,collectOverlapping,findFirstPotentialOverlapIndex)SparkBridgeImplementationInternal.scalaAddedextractChangeFeedStateForRangesbatch bridge methodChangeFeedBatch.scalaHot loop uses batch APIChangeFeedStateTest.javaUpdated tests for batch API, added single/batch parity testCHANGELOG.mdUpdated entryGenerated by coding-agent-harness