Skip to content

Fix check-then-act race condition in parallel subscriber#6956

Merged
jencymaryjoseph merged 2 commits into
feature/master/pre-signed-url-getobjectfrom
jencyjos/presigned-url/fix-inflight-race-condition
May 13, 2026
Merged

Fix check-then-act race condition in parallel subscriber#6956
jencymaryjoseph merged 2 commits into
feature/master/pre-signed-url-getobjectfrom
jencyjos/presigned-url/fix-inflight-race-condition

Conversation

@jencymaryjoseph
Copy link
Copy Markdown
Contributor

@jencymaryjoseph jencymaryjoseph commented May 11, 2026

Motivation and Context

The ParallelPresignedUrlMultipartDownloaderSubscriber has a check-then-act race condition when limiting concurrent in-flight requests. Two threads can simultaneously read inFlightRequestsNum as below the limit, both pass the check, and both send a request — exceeding maxInFlightParts.

The race occurs between:

  • Thread A (onNext → processRequest): checks inFlightRequestsNum.get() < max, passes, then calls sendPartRequest which increments
  • Thread B (whenComplete → processPendingTransformers): checks inFlightRequestsNum.get() < max, also passes (sees same value), then calls sendPartRequest which increments
  • Both threads read the counter before either increments it — a classic check-then-act race. The overshoot is bounded to 1 (max 11 instead of 10) because only one onNext thread exists (Reactive Streams spec) and only one thread can hold the processingPending CAS at a time.

Modifications

Replaced AtomicInteger inFlightRequestsNum with java.util.concurrent.Semaphore. Semaphore.tryAcquire() atomically checks availability and reserves a permit in a single operation — eliminating the gap between check and act. No two threads can both "pass the check" because the check IS the reservation.

  • processRequest: tryAcquire() — atomic check+reserve, enqueue if unavailable
  • sendPartRequest: release() on completion, error, or validation failure
  • processPendingTransformers: tryAcquire() in the drain loop, release() if poll returns null
  • sendFirstRequest: tryAcquire()/release() for the initial request

ParallelPresignedUrlMultipartDownloaderSubscriber: Replaced AtomicInteger inFlightRequestsNum with Semaphore inFlightPermits. All increment/decrement/check operations replaced with tryAcquire()/release(). Added permit release on all error paths to prevent permit leaks.

Testing

  • ParallelPresignedUrlMultipartDownloaderSubscriberTest: Added multiPartDownload_manyParts_shouldCompleteSuccessfully — downloads a 13-part object (exceeding maxInFlightParts=10) to verify the Semaphore correctly batches requests without deadlock or permit leaks.
  • All existing tests pass (single-part, multi-part, error propagation, validation failures).

Screenshots (if appropriate)

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)

Checklist

  • I have read the CONTRIBUTING document
  • Local run of mvn install succeeds
  • My code follows the code style of this project
  • My change requires a change to the Javadoc documentation
  • I have updated the Javadoc documentation accordingly
  • I have added tests to cover my changes
  • All new and existing tests passed
  • I have added a changelog entry. Adding a new entry must be accomplished by running the scripts/new-change script and following the instructions. Commit the new file created by the script in .changes/next-release with your changes.
  • My change is to implement 1.11 parity feature and I have updated LaunchChangelog

License

  • I confirm that this pull request can be released under the Apache 2 license

@jencymaryjoseph jencymaryjoseph requested a review from a team as a code owner May 11, 2026 22:34
Comment on lines +136 to +137
inFlightRequests.put(0, response);
inFlightRequestsNum.incrementAndGet();
inFlightPermits.tryAcquire();
Copy link
Copy Markdown
Contributor

@alextwoods alextwoods May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably missing something here - but is there a reason that tryAcquire is before doing inFlightRequests.put and the presignedUrlExtension().getObject?

Additionally - is tryAcquire what we want to use here? It can return false which we're not handling, should we use acquire here instead? I know this is the first request so it should in theory always return true, but probably better to either handle the false or use acquire?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved tryAcquire before getObject and inFlightRequests.put.
Added a defensive check, throws IllegalStateException if it tryAcquire fails.


response.whenComplete((res, error) -> {
if (error != null || isCompletedExceptionally.get()) {
inFlightPermits.release();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we don't also do inFlightRequests.remove(0); in the error case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't do inFlightRequests.remove(0) because handlePartError cancels all in-flight requests.
But's its more cleaner to remove explicitly - added inFlightRequests.remove(0); and moved remove and release to top of whenComplete


response.whenComplete((res, error) -> {
if (error != null || isCompletedExceptionally.get()) {
inFlightPermits.release();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're doing inFlightPermits.release(); in all of the paths through whenComplete right? If so, I think it would make more sense to move up top and do only once. (ie, put it at line 220 before the first if)

@jencymaryjoseph jencymaryjoseph merged commit 7e1c3a0 into feature/master/pre-signed-url-getobject May 13, 2026
2 of 3 checks passed
@github-actions
Copy link
Copy Markdown

This pull request has been closed and the conversation has been locked. Comments on closed PRs are hard for our team to see. If you need more assistance, please open a new issue that references this one.

@github-actions github-actions Bot locked as resolved and limited conversation to collaborators May 13, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants