[VoiceLive] Harden async samples for GA: fix threading races and reactor stream issues#49036
Open
[VoiceLive] Harden async samples for GA: fix threading races and reactor stream issues#49036
Conversation
…ssues - Mark cross-thread reference fields volatile to fix JMM visibility races between reactor event-handler thread and JVM shutdown-hook thread (session, audioProcessor, microphone/speaker/inputLine/outputLine, capture/playback threads) - Surface dropped audio packets when bounded playback queue is full (LinkedBlockingQueue.offer returns false) - Replace detached receiveEvents().subscribe() with chained thenMany() so a single reactor stream processes events (avoids events being silently dropped by competing subscribers) - Replace poll()+Thread.sleep busy-wait with blocking take() in audio playback loops - Convert fire-and-forget sendInputAudio/sendEvent .subscribe() to subscribe(onNext, onError) so errors surface - Replace doOnError(...).subscribe() with subscribe(onNext, onError) (doOnError is a side-effect, not a terminal handler) - Bound previously-unbounded LinkedBlockingQueue<>() to capacity 1000 to prevent OOM with slow consumers - GlobalTracingSample: move countDown() out of doFinally into subscribe(onComplete, onError) - AudioPlaybackSample: fix broken thenMany chain using take(Duration.ofSeconds(10)) - Switch all production samples to DefaultAzureCredential (KeyCredential shown as alternative comment) - MCPSample: fix runMCPSample signature mismatch left over from credential migration
Contributor
There was a problem hiding this comment.
Pull request overview
This PR hardens the VoiceLive Java async samples ahead of GA by addressing common race conditions in audio processing and improving reactive subscriptions/error handling, while also shifting sample authentication guidance toward DefaultAzureCredential (with API key as an option).
Changes:
- Update samples to use
DefaultAzureCredentialBuilderby default (API key becomes optional / flag-driven where applicable). - Reduce concurrency hazards in audio capture/playback by adding
volatilefields and bounding playback queues with drop warnings. - Improve reactive
subscribe()usage by providing explicit error consumers and restructuring some event pipelines.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/VoiceAssistantSample.java | Switches auth default to DefaultAzureCredential; adds volatile audio lines and bounded playback queue; updates CLI/env var guidance (needs doc fixes). |
| sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/telemetry/GlobalTracingSample.java | Uses DefaultAzureCredential by default and updates env var requirements accordingly. |
| sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MicrophoneInputSample.java | Uses DefaultAzureCredential by default and updates env var requirements accordingly. |
| sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MCPSample.java | Uses DefaultAzureCredential; bounds playback queue; adds subscribe error handlers (but introduces a blocking playback-thread shutdown issue). |
| sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/FunctionCallingSample.java | Uses DefaultAzureCredential; bounds playback queue; adds subscribe error handlers (but introduces a blocking playback-thread shutdown issue). |
| sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/BasicVoiceConversationSample.java | Uses DefaultAzureCredential by default and updates env var requirements accordingly. |
| sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AudioPlaybackSample.java | Uses DefaultAzureCredential; bounds audio queue; changes receiveEvents timing and adds a receiveSink-related comment (needs adjustment to avoid relying on internals). |
| sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AgentV2Sample.java | Makes session/audio processor references volatile; improves subscribe error handling; bounds playback queue and adds drop warnings. |
Comments suppressed due to low confidence (2)
sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MCPSample.java:554
- startPlayback() now blocks on playbackQueue.take(), but cleanup() only sets isPlaying=false and closes the speaker. If the queue is empty, the playback thread can block indefinitely and won’t observe isPlaying=false, so playback may not stop cleanly. Consider sending a shutdown marker into the queue and/or keeping a thread reference and interrupting it (and/or mark the thread daemon).
// Start playback thread
new Thread(() -> {
while (isPlaying.get()) {
try {
byte[] audioData = playbackQueue.take();
speaker.write(audioData, 0, audioData.length);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "AudioPlayback").start();
sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/FunctionCallingSample.java:539
- startPlayback() now blocks on playbackQueue.take(), but cleanup() only flips isPlaying=false and clears the queue. If the queue is empty at shutdown, the playback thread can block indefinitely and won’t exit. Consider enqueuing a sentinel/shutdown item during cleanup and/or keeping a thread reference and interrupting it (and/or mark the thread daemon).
// Start playback thread
new Thread(() -> {
while (isPlaying.get()) {
try {
byte[] audioData = playbackQueue.take();
speaker.write(audioData, 0, audioData.length);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "AudioPlayback").start();
- Add Sinks.One eventSubscribed receive-first barrier so sendEvent(sessionConfig) waits for the hot multicast receiveEvents() subscription, preventing dropped events. - Compose receive + send into a single Flux.merge(...).then() lifecycle (notably AgentV2Sample) instead of detached subscribe() calls. - Mark audio capture/playback worker threads as daemons; replace busy poll() with blocking take()/read(); interrupt threads during cleanup so JVM shutdown completes. - Use volatile / AtomicReference for cross-thread audio line and thread handles to fix JMM visibility races. - Replace unbounded queues with bounded LinkedBlockingQueue(1000); offer() overflow now logs a warning instead of silently dropping audio. - Replace doOnError().subscribe() and bare subscribe() patterns with subscribe(onNext, onError) so errors are not swallowed. - Fix AuthenticationMethodsSample unreachable completion message and MCPSample.runMCPSample signature. - Enhance Javadoc on all 9 runnable samples (when to use / what happens at runtime).
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
Addresses reviewer feedback before GA:
"The async samples have some issues that should be addressed before GA. Customers using these samples in production can run into various threading issues and can also lead to certain events not getting processed."
All SDK Contribution checklist:
General Guidelines and Best Practices
Testing Guidelines