-
Notifications
You must be signed in to change notification settings - Fork 96
Patch for Github issue #524 #526
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
6c5dd25
Patch for Github issue #524
neelkanth-kaushik 5a18f2f
Patch for Github issue #524
neelkanth-kaushik e325921
Patch for Github issue #524
neelkanth-kaushik ee88f45
Fixed formatting issues identified by spotless
neelkanth-kaushik 9b75259
Fixing github issue 524
neelkanth-kaushik bc4e6da
Fixed Github Issue #524
neelkanth-kaushik 58ddfcb
Fixed Github Issue #524
neelkanth-kaushik 8add4dc
Fixed spotless issues
neelkanth-kaushik 90c2539
Fixed spotless issues
neelkanth-kaushik 09f292f
Reverted changes
neelkanth-kaushik d9753a6
Reverted changes with spotless fixes
neelkanth-kaushik ecd53c0
Changes with spotless fixes
neelkanth-kaushik 9c87d7b
Changed MILLISECONDS to SECONDS
neelkanth-kaushik 7c29fef
minor cleanup for copilot suggestions
MichaelGHSeg b6cf766
Shoulda checked the history, reverting to seconds
MichaelGHSeg b1af68c
fixing formatting
MichaelGHSeg File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,7 +25,9 @@ | |
| import java.util.concurrent.BlockingQueue; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.RejectedExecutionException; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.ThreadFactory; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
@@ -42,6 +44,8 @@ public class AnalyticsClient { | |
| private static final Charset ENCODING = StandardCharsets.UTF_8; | ||
| private Gson gsonInstance; | ||
| private static final String instanceId = UUID.randomUUID().toString(); | ||
| private static final int WAIT_FOR_THREAD_COMPLETE_S = 5; | ||
| private static final int TERMINATION_TIMEOUT_S = 1; | ||
|
|
||
| static { | ||
| Map<String, String> library = new LinkedHashMap<>(); | ||
|
|
@@ -67,6 +71,7 @@ public class AnalyticsClient { | |
| private final ScheduledExecutorService flushScheduler; | ||
| private final AtomicBoolean isShutDown; | ||
| private final String writeKey; | ||
| private volatile Future<?> looperFuture; | ||
|
|
||
| public static AnalyticsClient create( | ||
| HttpUrl uploadUrl, | ||
|
|
@@ -130,7 +135,9 @@ public AnalyticsClient( | |
|
|
||
| this.currentQueueSizeInBytes = 0; | ||
|
|
||
| if (!isShutDown.get()) looperExecutor.submit(new Looper()); | ||
| if (!isShutDown.get()) { | ||
| this.looperFuture = looperExecutor.submit(new Looper()); | ||
| } | ||
|
|
||
| flushScheduler = Executors.newScheduledThreadPool(1, threadFactory); | ||
| flushScheduler.scheduleAtFixedRate( | ||
|
|
@@ -218,6 +225,8 @@ public void shutdown() { | |
| // we can shutdown the flush scheduler without worrying | ||
| flushScheduler.shutdownNow(); | ||
|
|
||
| // Wait for the looper to complete processing before shutting down executors | ||
| waitForLooperCompletion(); | ||
| shutdownAndWait(looperExecutor, "looper"); | ||
| shutdownAndWait(networkExecutor, "network"); | ||
|
|
||
|
|
@@ -226,19 +235,81 @@ public void shutdown() { | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Wait for the looper to complete processing all messages before proceeding with shutdown. This | ||
| * prevents the race condition where the network executor is shut down before the looper finishes | ||
| * submitting all batches. | ||
| */ | ||
| private void waitForLooperCompletion() { | ||
| if (looperFuture != null) { | ||
| try { | ||
| // Wait for the looper to complete processing the STOP message and finish | ||
| // Use a reasonable timeout to avoid hanging indefinitely | ||
| looperFuture.get(WAIT_FOR_THREAD_COMPLETE_S, TimeUnit.SECONDS); | ||
| log.print(VERBOSE, "Looper completed successfully."); | ||
| } catch (Exception e) { | ||
| log.print(ERROR, e, "Error waiting for looper to complete."); | ||
| // Cancel the looper if it's taking too long or if there's an error | ||
| if (!looperFuture.isDone()) { | ||
| looperFuture.cancel(true); | ||
| log.print(VERBOSE, "Looper was cancelled due to timeout or error."); | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public void shutdownAndWait(ExecutorService executor, String name) { | ||
| boolean isLooperExecutor = name != null && name.equalsIgnoreCase("looper"); | ||
| try { | ||
| executor.shutdown(); | ||
| final boolean executorTerminated = executor.awaitTermination(1, TimeUnit.SECONDS); | ||
|
|
||
| log.print( | ||
| VERBOSE, | ||
| "%s executor %s.", | ||
| name, | ||
| executorTerminated ? "terminated normally" : "timed out"); | ||
| boolean terminated = executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS); | ||
| if (terminated) { | ||
| log.print(VERBOSE, "%s executor terminated normally.", name); | ||
| return; | ||
| } | ||
| if (isLooperExecutor) { // Handle looper - network should finish on its own | ||
| // not terminated within timeout -> force shutdown | ||
| log.print( | ||
| VERBOSE, | ||
| "%s did not terminate in %d seconds; requesting shutdownNow().", | ||
| name, | ||
| TERMINATION_TIMEOUT_S); | ||
| List<Runnable> dropped = executor.shutdownNow(); // interrupts running tasks | ||
| log.print( | ||
| VERBOSE, | ||
| "%s shutdownNow returned %d queued tasks that never started.", | ||
| name, | ||
| dropped.size()); | ||
|
|
||
| // optional short wait to give interrupted tasks a chance to exit | ||
| boolean terminatedAfterForce = | ||
| executor.awaitTermination(TERMINATION_TIMEOUT_S, TimeUnit.SECONDS); | ||
| log.print( | ||
| VERBOSE, | ||
| "%s executor %s after shutdownNow().", | ||
| name, | ||
| terminatedAfterForce ? "terminated" : "still running (did not terminate)"); | ||
|
|
||
| if (!terminatedAfterForce) { | ||
| // final warning — investigate tasks that ignore interrupts | ||
| log.print( | ||
| ERROR, | ||
| "%s executor still did not terminate; tasks may be ignoring interrupts.", | ||
| name); | ||
| } | ||
| } | ||
| } catch (InterruptedException e) { | ||
| // Preserve interrupt status and attempt forceful shutdown | ||
| log.print(ERROR, e, "Interrupted while stopping %s executor.", name); | ||
| Thread.currentThread().interrupt(); | ||
| if (isLooperExecutor) { | ||
| List<Runnable> dropped = executor.shutdownNow(); | ||
| log.print( | ||
| VERBOSE, | ||
| "%s shutdownNow invoked after interrupt; %d tasks returned.", | ||
| name, | ||
| dropped.size()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -299,8 +370,22 @@ public void run() { | |
| "Batching %s message(s) into batch %s.", | ||
| batch.batch().size(), | ||
| batch.sequence()); | ||
| networkExecutor.submit( | ||
| BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); | ||
| try { | ||
| networkExecutor.submit( | ||
| BatchUploadTask.create(AnalyticsClient.this, batch, maximumRetries)); | ||
| } catch (RejectedExecutionException e) { | ||
| log.print( | ||
| ERROR, | ||
| e, | ||
| "Failed to submit batch %s to network executor during shutdown. Batch will be lost.", | ||
| batch.sequence()); | ||
| // Notify callbacks about the failure | ||
| for (Message msg : batch.batch()) { | ||
| for (Callback callback : callbacks) { | ||
| callback.failure(msg, e); | ||
| } | ||
| } | ||
| } | ||
|
Comment on lines
+373
to
+388
|
||
|
|
||
| currentBatchSize.set(0); | ||
| messages.clear(); | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new
waitForLooperCompletion()method lacks test coverage. Consider adding tests to verify:looperFutureis nullThis is important for ensuring the shutdown race condition fix works as intended.