[CELEBORN-2065] Worker should support wait partition sort asynchronously#3652
[CELEBORN-2065] Worker should support wait partition sort asynchronously#3652leixm wants to merge 4 commits intoapache:mainfrom
Conversation
|
@SteNicholas @RexXiong Can you help review? |
|
Pretty similar to something I was trying to do with #3593. +1 on getting this in. I have one concern here though, it seems like we're making the sorting thread write and flush the response. This can mean that the sorting threads, supposed to purely handle sorting tasks are now assigned a new response serialization and flush responsibilities. Should we instead make sure that the response is instead sent by some other set of thread (preferrably the eventloop thread)? We can create a dedicate set of threads for responding to clients perhaps? |
The sort thread only needs to call |
Inside the if (executor.inEventLoop()) { |
What changes were proposed in this pull request?
This PR makes partition file sorting asynchronous in the worker's FetchHandler, so that open stream requests no longer block RPC threads while waiting for sort completion.
Key changes:
PartitionFilesSorter: Replaced the blocking Thread.sleep polling loop in getSortedFileInfo with a callback-based async model using FileResolvedCallback and FileSortedCallback. When a sort is initiated, the caller's callback is invoked upon completion instead of blocking. Also introduced pendingSortCallbacks to correctly handle concurrent readers — when multiple open stream requests arrive for the same file being sorted, all their callbacks are notified upon sort completion.
FetchHandler: Refactored the open stream handling into three clear responsibilities:
openReduceStreamAsync — decides whether sorting is needed and dispatches to PartitionFilesSorter asynchronously or invokes the callback directly.
registerAndHandleStream — pure stream registration logic (no sorting), called from the async callback with the resolved FileInfo.
Both single PbOpenStream and batch PbOpenStreamList handlers now use the async model. The batch handler uses AtomicInteger completion counting and an ordered Array[PbStreamHandlerOpt] to aggregate results and send a single response when all files are resolved.
FileResolvedCallback (new interface): Defines onSuccess(FileInfo) / onFailure(Throwable) for async file resolution notification.
Why are the changes needed?
Currently, getSortedFileInfo blocks the calling RPC thread with a Thread.sleep(50) polling loop (up to 220s timeout) while waiting for partition file sorting to complete. This ties up Netty RPC threads, which are a shared and limited resource. Under high concurrency — especially when many reduce tasks open streams simultaneously and trigger sorting — this can exhaust the RPC thread pool and cause timeouts or stalls for unrelated requests.
By making the sort wait asynchronous via callbacks, RPC threads are released immediately after dispatching the sort request, significantly improving worker responsiveness and throughput under concurrent open stream workloads.
Does this PR resolve a correctness bug?
No.
Does this PR introduce any user-facing change?
No. This is an internal implementation change. The RPC protocol and behavior remain unchanged.
How was this patch tested?
Existing UTs.