From 50d5c1fd4291b31a2585a43d8e3191fcc5ca718b Mon Sep 17 00:00:00 2001 From: Xiting Zhang Date: Mon, 4 May 2026 09:10:33 -0700 Subject: [PATCH 1/2] Harden async samples for GA: fix threading races and reactor stream issues - Mark cross-thread reference fields volatile to fix JMM visibility races between reactor event-handler thread and JVM shutdown-hook thread (session, audioProcessor, microphone/speaker/inputLine/outputLine, capture/playback threads) - Surface dropped audio packets when bounded playback queue is full (LinkedBlockingQueue.offer returns false) - Replace detached receiveEvents().subscribe() with chained thenMany() so a single reactor stream processes events (avoids events being silently dropped by competing subscribers) - Replace poll()+Thread.sleep busy-wait with blocking take() in audio playback loops - Convert fire-and-forget sendInputAudio/sendEvent .subscribe() to subscribe(onNext, onError) so errors surface - Replace doOnError(...).subscribe() with subscribe(onNext, onError) (doOnError is a side-effect, not a terminal handler) - Bound previously-unbounded LinkedBlockingQueue<>() to capacity 1000 to prevent OOM with slow consumers - GlobalTracingSample: move countDown() out of doFinally into subscribe(onComplete, onError) - AudioPlaybackSample: fix broken thenMany chain using take(Duration.ofSeconds(10)) - Switch all production samples to DefaultAzureCredential (KeyCredential shown as alternative comment) - MCPSample: fix runMCPSample signature mismatch left over from credential migration --- .../com/azure/ai/voicelive/AgentV2Sample.java | 61 +++++++++------ .../ai/voicelive/AudioPlaybackSample.java | 41 +++++----- .../BasicVoiceConversationSample.java | 17 +++-- .../ai/voicelive/FunctionCallingSample.java | 50 ++++++------- .../com/azure/ai/voicelive/MCPSample.java | 75 +++++++++---------- .../ai/voicelive/MicrophoneInputSample.java | 17 +++-- .../ai/voicelive/VoiceAssistantSample.java | 51 +++++++------ .../telemetry/GlobalTracingSample.java | 13 ++-- 8 files changed, 176 insertions(+), 149 deletions(-) diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AgentV2Sample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AgentV2Sample.java index 29713a6bdf4f..30763ca89d3b 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AgentV2Sample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AgentV2Sample.java @@ -235,8 +235,9 @@ static class AgentV2VoiceAssistant { private final AtomicBoolean running = new AtomicBoolean(false); private final CountDownLatch shutdownLatch = new CountDownLatch(1); - private VoiceLiveSessionAsyncClient session; - private AudioProcessor audioProcessor; + // volatile: written by reactor thread (doOnSuccess), read by shutdown-hook thread + private volatile VoiceLiveSessionAsyncClient session; + private volatile AudioProcessor audioProcessor; AgentV2VoiceAssistant(String endpoint, AgentSessionConfig agentConfig, String voice) { this.endpoint = endpoint; @@ -272,12 +273,14 @@ void start() { // Subscribe to events subscribeToEvents(); }) - .doOnError(e -> { - System.err.println("Failed to connect: " + e.getMessage()); - running.set(false); - shutdownLatch.countDown(); - }) - .subscribe(); + .subscribe( + v -> {}, + e -> { + System.err.println("Failed to connect: " + e.getMessage()); + running.set(false); + shutdownLatch.countDown(); + } + ); // Wait for shutdown try { @@ -330,23 +333,26 @@ private void configureSession() { // Send session update ClientEventSessionUpdate sessionUpdate = new ClientEventSessionUpdate(sessionOptions); session.sendEvent(sessionUpdate) - .doOnSuccess(v -> System.out.println("Session configuration sent")) - .doOnError(e -> System.err.println("Failed to configure session: " + e.getMessage())) - .subscribe(); + .subscribe( + v -> System.out.println("Session configuration sent"), + e -> System.err.println("Failed to configure session: " + e.getMessage()) + ); } private void subscribeToEvents() { session.receiveEvents() .doOnNext(this::handleEvent) - .doOnError(e -> { - System.err.println("Error receiving events: " + e.getMessage()); - shutdown(); - }) .doOnComplete(() -> { System.out.println("Event stream completed"); shutdown(); }) - .subscribe(); + .subscribe( + v -> {}, + e -> { + System.err.println("Error receiving events: " + e.getMessage()); + shutdown(); + } + ); } private void handleEvent(SessionUpdate event) { @@ -483,15 +489,16 @@ private void writeConversationLog(String message) { static class AudioProcessor { private final VoiceLiveSessionAsyncClient session; private final AudioFormat format; - private final BlockingQueue playbackQueue = new LinkedBlockingQueue<>(); + private final BlockingQueue playbackQueue = new LinkedBlockingQueue<>(1000); private final AtomicInteger nextSeqNum = new AtomicInteger(0); private final AtomicInteger playbackBase = new AtomicInteger(0); private final AtomicBoolean running = new AtomicBoolean(false); - private TargetDataLine inputLine; - private SourceDataLine outputLine; - private Thread captureThread; - private Thread playbackThread; + // volatile: written by reactor thread (startCapture/Playback), read/closed by shutdown-hook thread + private volatile TargetDataLine inputLine; + private volatile SourceDataLine outputLine; + private volatile Thread captureThread; + private volatile Thread playbackThread; AudioProcessor(VoiceLiveSessionAsyncClient session) { this.session = session; @@ -532,8 +539,11 @@ private void captureLoop() { ? buffer.clone() : Arrays.copyOf(buffer, bytesRead); - // Send audio to service (sendInputAudio takes byte[]) - session.sendInputAudio(audioData).subscribe(); + session.sendInputAudio(audioData) + .subscribe( + v -> {}, + error -> System.err.println("Error sending audio: " + error.getMessage()) + ); } } } @@ -583,7 +593,10 @@ private void playbackLoop() { void queueAudio(byte[] audioData) { int seqNum = nextSeqNum.getAndIncrement(); - playbackQueue.offer(new AudioPacket(seqNum, audioData)); + // offer() returns false if the bounded queue is full; warn so a slow consumer is visible + if (!playbackQueue.offer(new AudioPacket(seqNum, audioData))) { + System.err.println("Warning: playback queue full, dropping audio packet seq=" + seqNum); + } } void skipPendingAudio() { diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AudioPlaybackSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AudioPlaybackSample.java index 956c4f73d26b..3b8f2cf28a0f 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AudioPlaybackSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AudioPlaybackSample.java @@ -18,7 +18,7 @@ import com.azure.ai.voicelive.models.SessionUpdateResponseAudioDelta; import com.azure.ai.voicelive.models.UserMessageItem; import com.azure.ai.voicelive.models.VoiceLiveSessionOptions; -import com.azure.core.credential.KeyCredential; +import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.core.util.BinaryData; import reactor.core.publisher.Mono; @@ -59,7 +59,7 @@ *

Environment Variables Required:

*
    *
  • AZURE_VOICELIVE_ENDPOINT - The VoiceLive service endpoint URL
  • - *
  • AZURE_VOICELIVE_API_KEY - The API key for authentication
  • + *
  • AZURE_VOICELIVE_API_KEY - (Optional) The API key, if not using DefaultAzureCredential
  • *
* *

Audio Requirements:

@@ -87,12 +87,11 @@ public final class AudioPlaybackSample { * @param args Unused command line arguments */ public static void main(String[] args) { - // Get credentials from environment variables + // Get endpoint from environment variable String endpoint = System.getenv("AZURE_VOICELIVE_ENDPOINT"); - String apiKey = System.getenv("AZURE_VOICELIVE_API_KEY"); - if (endpoint == null || apiKey == null) { - System.err.println("Please set AZURE_VOICELIVE_ENDPOINT and AZURE_VOICELIVE_API_KEY environment variables"); + if (endpoint == null) { + System.err.println("Please set AZURE_VOICELIVE_ENDPOINT environment variable"); return; } @@ -102,10 +101,12 @@ public static void main(String[] args) { return; } - // Create the VoiceLive client + // Create the VoiceLive client using DefaultAzureCredential (recommended). + // To use an API key instead: + // .credential(new KeyCredential(System.getenv("AZURE_VOICELIVE_API_KEY"))) VoiceLiveAsyncClient client = new VoiceLiveClientBuilder() .endpoint(endpoint) - .credential(new KeyCredential(apiKey)) + .credential(new DefaultAzureCredentialBuilder().build()) .buildAsyncClient(); System.out.println("Starting audio playback sample..."); @@ -123,7 +124,7 @@ public static void main(String[] args) { .setInputAudioSamplingRate(SAMPLE_RATE); // Audio playback components - final BlockingQueue audioQueue = new LinkedBlockingQueue<>(); + final BlockingQueue audioQueue = new LinkedBlockingQueue<>(1000); final AtomicBoolean isPlaying = new AtomicBoolean(false); final SourceDataLine[] speakerRef = new SourceDataLine[1]; @@ -132,17 +133,16 @@ public static void main(String[] args) { .flatMap(session -> { System.out.println("āœ“ Session started"); - // Send session configuration, then listen for events. + // Send session configuration, send text message, trigger response, + // then listen for events. Events are buffered by the SDK's receiveSink, + // so none are lost between sending and subscribing. ClientEventSessionUpdate updateEvent = new ClientEventSessionUpdate(sessionOptions); return session.sendEvent(updateEvent) .doOnSuccess(v -> { - System.out.println("\u2713 Session configured"); + System.out.println("āœ“ Session configured"); // Start audio playback system startPlayback(audioQueue, isPlaying, speakerRef); }) - .thenMany(session.receiveEvents() - .doOnNext(event -> handleEvent(event, audioQueue)) - .doOnError(error -> System.err.println("Error: " + error.getMessage()))) .then(Mono.delay(Duration.ofMillis(500))) // Wait for session to be fully ready .flatMap(v -> { // Send a user message to trigger an audio response @@ -162,7 +162,11 @@ public static void main(String[] args) { ClientEventResponseCreate responseEvent = new ClientEventResponseCreate(); return session.sendEvent(responseEvent); }) - .then(Mono.delay(Duration.ofSeconds(10))) // Wait for audio response + .thenMany(session.receiveEvents() + .doOnNext(event -> handleEvent(event, audioQueue)) + .doOnError(error -> System.err.println("Error: " + error.getMessage())) + .take(Duration.ofSeconds(10))) // Listen for 10 seconds then complete + .then() .doFinally(signal -> System.out.println("\nāœ“ Sample completed - audio playback demonstrated")); }) .doFinally(signalType -> { @@ -284,8 +288,11 @@ private static void handleEvent(SessionUpdate event, BlockingQueue audio SessionUpdateResponseAudioDelta audioEvent = (SessionUpdateResponseAudioDelta) event; byte[] audioData = audioEvent.getDelta(); if (audioData != null && audioData.length > 0) { - audioQueue.offer(audioData); - System.out.println("šŸ”Š Received audio chunk: " + audioData.length + " bytes"); + if (!audioQueue.offer(audioData)) { + System.err.println("Warning: audio queue full, dropping chunk of " + audioData.length + " bytes"); + } else { + System.out.println("šŸ”Š Received audio chunk: " + audioData.length + " bytes"); + } } } } else if (eventType == ServerEventType.RESPONSE_AUDIO_DONE) { diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/BasicVoiceConversationSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/BasicVoiceConversationSample.java index ed1a9c8664be..043d73727426 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/BasicVoiceConversationSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/BasicVoiceConversationSample.java @@ -12,7 +12,7 @@ import com.azure.ai.voicelive.models.ServerEventType; import com.azure.ai.voicelive.models.SessionUpdate; import com.azure.ai.voicelive.models.VoiceLiveSessionOptions; -import com.azure.core.credential.KeyCredential; +import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.core.util.BinaryData; import java.util.Arrays; @@ -40,7 +40,7 @@ *

Environment Variables Required:

*
    *
  • AZURE_VOICELIVE_ENDPOINT - The VoiceLive service endpoint URL
  • - *
  • AZURE_VOICELIVE_API_KEY - The API key for authentication
  • + *
  • AZURE_VOICELIVE_API_KEY - (Optional) The API key, if not using DefaultAzureCredential
  • *
* *

How to Run:

@@ -56,19 +56,20 @@ public final class BasicVoiceConversationSample { * @param args Unused command line arguments */ public static void main(String[] args) { - // Get credentials from environment variables + // Get endpoint from environment variable String endpoint = System.getenv("AZURE_VOICELIVE_ENDPOINT"); - String apiKey = System.getenv("AZURE_VOICELIVE_API_KEY"); - if (endpoint == null || apiKey == null) { - System.err.println("Please set AZURE_VOICELIVE_ENDPOINT and AZURE_VOICELIVE_API_KEY environment variables"); + if (endpoint == null) { + System.err.println("Please set AZURE_VOICELIVE_ENDPOINT environment variable"); return; } - // Create the VoiceLive client + // Create the VoiceLive client using DefaultAzureCredential (recommended). + // To use an API key instead: + // .credential(new KeyCredential(System.getenv("AZURE_VOICELIVE_API_KEY"))) VoiceLiveAsyncClient client = new VoiceLiveClientBuilder() .endpoint(endpoint) - .credential(new KeyCredential(apiKey)) + .credential(new DefaultAzureCredentialBuilder().build()) .buildAsyncClient(); System.out.println("Starting basic voice conversation..."); diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/FunctionCallingSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/FunctionCallingSample.java index 8c076c394638..5621c659a786 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/FunctionCallingSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/FunctionCallingSample.java @@ -29,7 +29,7 @@ import com.azure.ai.voicelive.models.SessionUpdateSessionUpdated; import com.azure.ai.voicelive.models.VoiceLiveSessionOptions; import com.azure.ai.voicelive.models.VoiceLiveToolDefinition; -import com.azure.core.credential.KeyCredential; +import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.core.util.BinaryData; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; @@ -70,7 +70,7 @@ *

Environment Variables Required:

*
    *
  • AZURE_VOICELIVE_ENDPOINT - The VoiceLive service endpoint URL
  • - *
  • AZURE_VOICELIVE_API_KEY - The API key for authentication
  • + *
  • AZURE_VOICELIVE_API_KEY - (Optional) The API key, if not using DefaultAzureCredential
  • *
* *

How to Run:

@@ -90,7 +90,6 @@ public final class FunctionCallingSample { // Service configuration private static final String DEFAULT_MODEL = "gpt-realtime"; private static final String ENV_ENDPOINT = "AZURE_VOICELIVE_ENDPOINT"; - private static final String ENV_API_KEY = "AZURE_VOICELIVE_API_KEY"; // Audio format constants private static final int SAMPLE_RATE = 24000; @@ -111,29 +110,24 @@ private FunctionCallingSample() { public static void main(String[] args) { // Load configuration String endpoint = System.getenv(ENV_ENDPOINT); - String apiKey = System.getenv(ENV_API_KEY); if (endpoint == null || endpoint.isEmpty()) { System.err.println("Error: AZURE_VOICELIVE_ENDPOINT environment variable is not set."); System.exit(1); } - if (apiKey == null || apiKey.isEmpty()) { - System.err.println("Error: AZURE_VOICELIVE_API_KEY environment variable is not set."); - System.exit(1); - } - String separator = new String(new char[70]).replace("\0", "="); System.out.println(separator); System.out.println("šŸŽ¤ļø Voice Assistant with Function Calling - Azure VoiceLive SDK"); System.out.println(separator); try { - // Create client - KeyCredential credential = new KeyCredential(apiKey); + // Create client using DefaultAzureCredential (recommended). + // To use an API key instead: + // .credential(new KeyCredential(System.getenv("AZURE_VOICELIVE_API_KEY"))) VoiceLiveAsyncClient client = new VoiceLiveClientBuilder() .endpoint(endpoint) - .credential(credential) + .credential(new DefaultAzureCredentialBuilder().build()) .buildAsyncClient(); runFunctionCallingSession(client); @@ -379,8 +373,10 @@ private static void handleServerEvent( session.sendEvent(createItem) .then(session.sendEvent(new ClientEventResponseCreate())) - .doOnSuccess(v -> System.out.println("šŸ¤– Function result sent")) - .subscribe(); + .subscribe( + v -> System.out.println("šŸ¤– Function result sent"), + error -> System.err.println("āŒ Failed to send function result: " + error.getMessage()) + ); } } @@ -468,11 +464,12 @@ private static class AudioProcessor { private final VoiceLiveSessionAsyncClient session; private final AudioFormat audioFormat; - private TargetDataLine microphone; + // volatile: written by reactor thread (startCapture/Playback), read/closed by shutdown-hook thread + private volatile TargetDataLine microphone; private final AtomicBoolean isCapturing = new AtomicBoolean(false); - private SourceDataLine speaker; - private final BlockingQueue playbackQueue = new LinkedBlockingQueue<>(); + private volatile SourceDataLine speaker; + private final BlockingQueue playbackQueue = new LinkedBlockingQueue<>(1000); private final AtomicBoolean isPlaying = new AtomicBoolean(false); AudioProcessor(VoiceLiveSessionAsyncClient session) { @@ -499,7 +496,11 @@ void startCapture() { int bytesRead = microphone.read(buffer, 0, buffer.length); if (bytesRead > 0) { byte[] audioData = Arrays.copyOf(buffer, bytesRead); - session.sendInputAudio(BinaryData.fromBytes(audioData)).subscribe(); + session.sendInputAudio(BinaryData.fromBytes(audioData)) + .subscribe( + v -> {}, + error -> System.err.println("Error sending audio: " + error.getMessage()) + ); } } }, "AudioCapture").start(); @@ -527,12 +528,8 @@ void startPlayback() { new Thread(() -> { while (isPlaying.get()) { try { - byte[] audioData = playbackQueue.poll(); - if (audioData != null) { - speaker.write(audioData, 0, audioData.length); - } else { - Thread.sleep(10); - } + byte[] audioData = playbackQueue.take(); + speaker.write(audioData, 0, audioData.length); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; @@ -553,7 +550,10 @@ void skipPendingAudio() { void queueAudio(byte[] audioData) { if (isPlaying.get()) { - playbackQueue.offer(audioData); + // offer() returns false if the bounded queue is full; warn so a slow consumer is visible + if (!playbackQueue.offer(audioData)) { + System.err.println("Warning: playback queue full, dropping audio chunk of " + audioData.length + " bytes"); + } } } diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MCPSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MCPSample.java index 82e7f6dba4fe..24938a79f405 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MCPSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MCPSample.java @@ -32,7 +32,7 @@ import com.azure.ai.voicelive.models.InputAudioFormat; import com.azure.ai.voicelive.models.OutputAudioFormat; import com.azure.ai.voicelive.models.ServerVadTurnDetection; -import com.azure.core.credential.KeyCredential; +import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.core.util.BinaryData; import javax.sound.sampled.AudioFormat; @@ -64,7 +64,7 @@ *

Environment Variables Required:

*
    *
  • AZURE_VOICELIVE_ENDPOINT - The VoiceLive service endpoint URL
  • - *
  • AZURE_VOICELIVE_API_KEY - The API key for authentication
  • + *
  • AZURE_VOICELIVE_API_KEY - (Optional) The API key, if not using DefaultAzureCredential
  • *
* *

How to Run:

@@ -83,7 +83,6 @@ public final class MCPSample { // Service configuration private static final String DEFAULT_MODEL = "gpt-realtime"; private static final String ENV_ENDPOINT = "AZURE_VOICELIVE_ENDPOINT"; - private static final String ENV_API_KEY = "AZURE_VOICELIVE_API_KEY"; // Audio format constants private static final int SAMPLE_RATE = 24000; @@ -109,15 +108,8 @@ public static void main(String[] args) { System.exit(1); } - String apiKey = System.getenv(ENV_API_KEY); - if (apiKey == null || apiKey.trim().isEmpty()) { - System.err.println("āŒ Error: No API key provided"); - System.err.println("Please set the " + ENV_API_KEY + " environment variable."); - System.exit(1); - } - try { - runMCPSample(endpoint, apiKey); + runMCPSample(endpoint); } catch (Exception e) { System.err.println("āŒ Error: " + e.getMessage()); e.printStackTrace(); @@ -128,21 +120,22 @@ public static void main(String[] args) { /** * Run the MCP sample. */ - private static void runMCPSample(String endpoint, String apiKey) { + private static void runMCPSample(String endpoint) { System.out.println("šŸ”Œ Connecting to VoiceLive API with MCP support..."); System.out.println("šŸ“” Endpoint: " + endpoint); System.out.println("šŸ¤– Model: " + DEFAULT_MODEL); - KeyCredential credential = new KeyCredential(apiKey); AtomicReference activeMCPCallId = new AtomicReference<>(); AtomicBoolean running = new AtomicBoolean(true); AtomicReference audioProcessorRef = new AtomicReference<>(); AtomicReference sessionRef = new AtomicReference<>(); - // Create VoiceLive client + // Create VoiceLive client using DefaultAzureCredential (recommended). + // To use an API key instead: + // .credential(new KeyCredential(System.getenv("AZURE_VOICELIVE_API_KEY"))) VoiceLiveAsyncClient client = new VoiceLiveClientBuilder() .endpoint(endpoint) - .credential(credential) + .credential(new DefaultAzureCredentialBuilder().build()) .serviceVersion(VoiceLiveServiceVersion.V2026_01_01_PREVIEW) .buildAsyncClient(); @@ -409,15 +402,16 @@ private static void handleMCPApprovalRequest( new MCPApprovalResponseRequestItem(approvalId, approved); session.sendEvent(new ClientEventConversationItemCreate().setItem(approvalResponse)) - .doOnSuccess(v -> { - if (approved) { - System.out.println("āœ… MCP call approved and response sent"); - } else { - System.out.println("āŒ MCP call denied and response sent"); - } - }) - .doOnError(error -> System.err.println("āŒ Error sending approval response: " + error.getMessage())) - .subscribe(); + .subscribe( + v -> { + if (approved) { + System.out.println("āœ… MCP call approved and response sent"); + } else { + System.out.println("āŒ MCP call denied and response sent"); + } + }, + error -> System.err.println("āŒ Error sending approval response: " + error.getMessage()) + ); } /** @@ -462,9 +456,10 @@ private static void handleMCPCallCompleted( // In a real implementation, you might want to collect the output // For this sample, we'll just trigger a new response session.sendEvent(new ClientEventResponseCreate()) - .doOnSuccess(v -> System.out.println("šŸ“¤ New response created to process MCP output")) - .doOnError(error -> System.err.println("āŒ Error creating response: " + error.getMessage())) - .subscribe(); + .subscribe( + v -> System.out.println("šŸ“¤ New response created to process MCP output"), + error -> System.err.println("āŒ Error creating response: " + error.getMessage()) + ); } /** @@ -485,11 +480,12 @@ private static class AudioProcessor { private final VoiceLiveSessionAsyncClient session; private final AudioFormat audioFormat; - private TargetDataLine microphone; + // volatile: written by reactor thread (startCapture/Playback), read/closed by shutdown-hook thread + private volatile TargetDataLine microphone; private final AtomicBoolean isCapturing = new AtomicBoolean(false); - private SourceDataLine speaker; - private final BlockingQueue playbackQueue = new LinkedBlockingQueue<>(); + private volatile SourceDataLine speaker; + private final BlockingQueue playbackQueue = new LinkedBlockingQueue<>(1000); private final AtomicBoolean isPlaying = new AtomicBoolean(false); AudioProcessor(VoiceLiveSessionAsyncClient session) { @@ -516,7 +512,11 @@ void startCapture() { int bytesRead = microphone.read(buffer, 0, buffer.length); if (bytesRead > 0) { byte[] audioData = Arrays.copyOf(buffer, bytesRead); - session.sendInputAudio(BinaryData.fromBytes(audioData)).subscribe(); + session.sendInputAudio(BinaryData.fromBytes(audioData)) + .subscribe( + v -> {}, + error -> System.err.println("Error sending audio: " + error.getMessage()) + ); } } }, "AudioCapture").start(); @@ -544,12 +544,8 @@ void startPlayback() { new Thread(() -> { while (isPlaying.get()) { try { - byte[] audioData = playbackQueue.poll(); - if (audioData != null) { - speaker.write(audioData, 0, audioData.length); - } else { - Thread.sleep(10); - } + byte[] audioData = playbackQueue.take(); + speaker.write(audioData, 0, audioData.length); } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; @@ -570,7 +566,10 @@ void skipPendingAudio() { void queueAudio(byte[] audioData) { if (isPlaying.get()) { - playbackQueue.offer(audioData); + // offer() returns false if the bounded queue is full; warn so a slow consumer is visible + if (!playbackQueue.offer(audioData)) { + System.err.println("Warning: playback queue full, dropping audio chunk of " + audioData.length + " bytes"); + } } } diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MicrophoneInputSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MicrophoneInputSample.java index 4fdfe3e70484..64ba9f845146 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MicrophoneInputSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MicrophoneInputSample.java @@ -15,7 +15,7 @@ import com.azure.ai.voicelive.models.SessionUpdate; import com.azure.ai.voicelive.models.SessionUpdateResponseDone; import com.azure.ai.voicelive.models.VoiceLiveSessionOptions; -import com.azure.core.credential.KeyCredential; +import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.core.util.BinaryData; import javax.sound.sampled.AudioFormat; @@ -49,7 +49,7 @@ *

Environment Variables Required:

*
    *
  • AZURE_VOICELIVE_ENDPOINT - The VoiceLive service endpoint URL
  • - *
  • AZURE_VOICELIVE_API_KEY - The API key for authentication
  • + *
  • AZURE_VOICELIVE_API_KEY - (Optional) The API key, if not using DefaultAzureCredential
  • *
* *

Audio Requirements:

@@ -74,12 +74,11 @@ public final class MicrophoneInputSample { * @param args Unused command line arguments */ public static void main(String[] args) { - // Get credentials from environment variables + // Get endpoint from environment variable String endpoint = System.getenv("AZURE_VOICELIVE_ENDPOINT"); - String apiKey = System.getenv("AZURE_VOICELIVE_API_KEY"); - if (endpoint == null || apiKey == null) { - System.err.println("Please set AZURE_VOICELIVE_ENDPOINT and AZURE_VOICELIVE_API_KEY environment variables"); + if (endpoint == null) { + System.err.println("Please set AZURE_VOICELIVE_ENDPOINT environment variable"); return; } @@ -89,10 +88,12 @@ public static void main(String[] args) { return; } - // Create the VoiceLive client + // Create the VoiceLive client using DefaultAzureCredential (recommended). + // To use an API key instead: + // .credential(new KeyCredential(System.getenv("AZURE_VOICELIVE_API_KEY"))) VoiceLiveAsyncClient client = new VoiceLiveClientBuilder() .endpoint(endpoint) - .credential(new KeyCredential(apiKey)) + .credential(new DefaultAzureCredentialBuilder().build()) .buildAsyncClient(); System.out.println("Starting microphone input sample..."); diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/VoiceAssistantSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/VoiceAssistantSample.java index 9f49e4910292..019f671ae0e9 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/VoiceAssistantSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/VoiceAssistantSample.java @@ -24,7 +24,7 @@ import com.azure.core.credential.KeyCredential; import com.azure.core.credential.TokenCredential; import com.azure.core.util.BinaryData; -import com.azure.identity.AzureCliCredentialBuilder; +import com.azure.identity.DefaultAzureCredentialBuilder; import javax.sound.sampled.AudioFormat; @@ -69,7 +69,7 @@ *

Environment Variables Required:

*
    *
  • AZURE_VOICELIVE_ENDPOINT - The VoiceLive service endpoint URL
  • - *
  • AZURE_VOICELIVE_API_KEY - The API key (required if not using --use-token-credential)
  • + *
  • AZURE_VOICELIVE_API_KEY - The API key (required only with --use-api-key flag)
  • *
* *

Audio Requirements:

@@ -82,7 +82,7 @@ * mvn exec:java -Dexec.mainClass="com.azure.ai.voicelive.VoiceAssistantSample" -Dexec.classpathScope=test * * # With Token Credential: - * mvn exec:java -Dexec.mainClass="com.azure.ai.voicelive.VoiceAssistantSample" -Dexec.classpathScope=test -Dexec.args="--use-token-credential" + * mvn exec:java -Dexec.mainClass="com.azure.ai.voicelive.VoiceAssistantSample" -Dexec.classpathScope=test -Dexec.args="--use-api-key" * } */ public final class VoiceAssistantSample { @@ -136,12 +136,14 @@ private static class AudioProcessor { private final AudioFormat audioFormat; // Audio capture components - private TargetDataLine microphone; + // volatile: written by reactor thread (startCapture), read/closed by shutdown-hook thread + private volatile TargetDataLine microphone; private final AtomicBoolean isCapturing = new AtomicBoolean(false); // Audio playback components - private SourceDataLine speaker; - private final BlockingQueue playbackQueue = new LinkedBlockingQueue<>(); + // volatile: written by reactor thread (startPlayback), read/closed by shutdown-hook thread + private volatile SourceDataLine speaker; + private final BlockingQueue playbackQueue = new LinkedBlockingQueue<>(1000); private final AtomicBoolean isPlaying = new AtomicBoolean(false); private final AtomicInteger nextSequenceNumber = new AtomicInteger(0); private final AtomicInteger playbackBase = new AtomicInteger(0); @@ -303,7 +305,10 @@ private void playbackAudioLoop() { void queueAudio(byte[] audioData) { if (audioData != null && audioData.length > 0) { int seqNum = nextSequenceNumber.getAndIncrement(); - playbackQueue.offer(new AudioPlaybackPacket(seqNum, audioData)); + // offer() returns false if the bounded queue is full; warn so a slow consumer is visible + if (!playbackQueue.offer(new AudioPlaybackPacket(seqNum, audioData))) { + System.err.println("Warning: playback queue full, dropping audio packet seq=" + seqNum); + } } } @@ -351,17 +356,17 @@ void shutdown() { *

Supports two authentication methods:

*
    *
  • API Key: Default authentication (requires AZURE_VOICELIVE_API_KEY env var)
  • - *
  • Token Credential: Use --use-token-credential flag
  • + *
  • API Key: Use --use-api-key flag
  • *
* - * @param args Command line arguments. Use --use-token-credential to use token-based authentication. + * @param args Command line arguments. Use --use-api-key to use API key authentication instead of DefaultAzureCredential. */ public static void main(String[] args) { // Parse command line arguments - boolean useTokenCredential = false; + boolean useApiKey = false; for (String arg : args) { - if ("--use-token-credential".equals(arg)) { - useTokenCredential = true; + if ("--use-api-key".equals(arg)) { + useApiKey = true; } } @@ -374,8 +379,8 @@ public static void main(String[] args) { return; } - if (!useTokenCredential && apiKey == null) { - System.err.println("āŒ AZURE_VOICELIVE_API_KEY environment variable is required when not using --use-token-credential"); + if (useApiKey && apiKey == null) { + System.err.println("āŒ AZURE_VOICELIVE_API_KEY environment variable is required when using --use-api-key"); printUsage(); return; } @@ -389,16 +394,16 @@ public static void main(String[] args) { System.out.println("šŸŽ™ļø Starting Voice Assistant..."); try { - if (useTokenCredential) { - // Use token credential authentication (Azure CLI) - System.out.println("šŸ”‘ Using Token Credential authentication (Azure CLI)"); - System.out.println(" Make sure you have run 'az login' before running this sample"); - TokenCredential credential = new AzureCliCredentialBuilder().build(); - runVoiceAssistant(endpoint, credential); - } else { + if (useApiKey) { // Use API Key authentication System.out.println("šŸ”‘ Using API Key authentication"); runVoiceAssistant(endpoint, new KeyCredential(apiKey)); + } else { + // Use token credential authentication (DefaultAzureCredential, recommended) + System.out.println("šŸ”‘ Using DefaultAzureCredential authentication"); + System.out.println(" Make sure you have run 'az login' before running this sample"); + TokenCredential credential = new DefaultAzureCredentialBuilder().build(); + runVoiceAssistant(endpoint, credential); } System.out.println("āœ“ Voice Assistant completed successfully"); } catch (Exception e) { @@ -443,9 +448,9 @@ private static boolean checkAudioSystem() { private static void printUsage() { System.err.println("\nRequired Environment Variables:"); System.err.println(" " + ENV_ENDPOINT + "="); - System.err.println(" " + ENV_API_KEY + "= (required if not using --use-token-credential)"); + System.err.println(" " + ENV_API_KEY + "= (required only with --use-api-key flag)"); System.err.println("\nOptional:"); - System.err.println(" Use --use-token-credential flag to authenticate with Azure CLI (requires 'az login')"); + System.err.println(" Use --use-api-key flag to authenticate with an API key instead of DefaultAzureCredential"); } /** diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/telemetry/GlobalTracingSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/telemetry/GlobalTracingSample.java index 74b5e4dfbf8e..29859306f045 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/telemetry/GlobalTracingSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/telemetry/GlobalTracingSample.java @@ -9,7 +9,7 @@ import com.azure.ai.voicelive.models.InteractionModality; import com.azure.ai.voicelive.models.SessionUpdateResponseDone; import com.azure.ai.voicelive.models.VoiceLiveSessionOptions; -import com.azure.core.credential.KeyCredential; +import com.azure.identity.DefaultAzureCredentialBuilder; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.SdkTracerProvider; @@ -36,7 +36,7 @@ *

Environment Variables Required:

*
    *
  • {@code AZURE_VOICELIVE_ENDPOINT} — The VoiceLive service endpoint URL
  • - *
  • {@code AZURE_VOICELIVE_API_KEY} — The API key for authentication
  • + *
  • {@code AZURE_VOICELIVE_API_KEY} — (Optional) The API key, if not using DefaultAzureCredential
  • *
* *

How to Run:

@@ -48,10 +48,9 @@ public final class GlobalTracingSample { public static void main(String[] args) throws InterruptedException { String endpoint = System.getenv("AZURE_VOICELIVE_ENDPOINT"); - String apiKey = System.getenv("AZURE_VOICELIVE_API_KEY"); - if (endpoint == null || apiKey == null) { - System.err.println("Please set AZURE_VOICELIVE_ENDPOINT and AZURE_VOICELIVE_API_KEY environment variables"); + if (endpoint == null) { + System.err.println("Please set AZURE_VOICELIVE_ENDPOINT environment variable"); return; } @@ -68,9 +67,11 @@ public static void main(String[] args) throws InterruptedException { System.out.println("GlobalOpenTelemetry registered (console exporter)"); // 2. Build client — it picks up GlobalOpenTelemetry automatically. + // Uses DefaultAzureCredential (recommended). To use an API key instead: + // .credential(new KeyCredential(System.getenv("AZURE_VOICELIVE_API_KEY"))) VoiceLiveAsyncClient client = new VoiceLiveClientBuilder() .endpoint(endpoint) - .credential(new KeyCredential(apiKey)) + .credential(new DefaultAzureCredentialBuilder().build()) .buildAsyncClient(); System.out.println("Starting voice session (automatic tracing)..."); From 2991ebea072b337831b5fe5b699eab86ac48601b Mon Sep 17 00:00:00 2001 From: Xiting Zhang Date: Mon, 4 May 2026 12:49:14 -0700 Subject: [PATCH 2/2] Fix threading and reactive lifecycle issues in VoiceLive samples - Add Sinks.One eventSubscribed receive-first barrier so sendEvent(sessionConfig) waits for the hot multicast receiveEvents() subscription, preventing dropped events. - Compose receive + send into a single Flux.merge(...).then() lifecycle (notably AgentV2Sample) instead of detached subscribe() calls. - Mark audio capture/playback worker threads as daemons; replace busy poll() with blocking take()/read(); interrupt threads during cleanup so JVM shutdown completes. - Use volatile / AtomicReference for cross-thread audio line and thread handles to fix JMM visibility races. - Replace unbounded queues with bounded LinkedBlockingQueue(1000); offer() overflow now logs a warning instead of silently dropping audio. - Replace doOnError().subscribe() and bare subscribe() patterns with subscribe(onNext, onError) so errors are not swallowed. - Fix AuthenticationMethodsSample unreachable completion message and MCPSample.runMCPSample signature. - Enhance Javadoc on all 9 runnable samples (when to use / what happens at runtime). --- .../com/azure/ai/voicelive/AgentV2Sample.java | 71 ++++---- .../ai/voicelive/AudioPlaybackSample.java | 95 +++++++---- .../AuthenticationMethodsSample.java | 29 +++- .../BasicVoiceConversationSample.java | 28 +++- .../ai/voicelive/FunctionCallingSample.java | 139 ++++++++++------ .../com/azure/ai/voicelive/MCPSample.java | 151 ++++++++++++------ .../ai/voicelive/MicrophoneInputSample.java | 90 ++++++++--- .../ai/voicelive/VoiceAssistantSample.java | 94 ++++++----- .../telemetry/GlobalTracingSample.java | 30 +++- 9 files changed, 484 insertions(+), 243 deletions(-) diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AgentV2Sample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AgentV2Sample.java index 30763ca89d3b..e03762dbe151 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AgentV2Sample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AgentV2Sample.java @@ -28,6 +28,9 @@ import com.azure.ai.voicelive.models.VoiceLiveSessionOptions; import com.azure.core.util.BinaryData; import com.azure.identity.DefaultAzureCredentialBuilder; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import javax.sound.sampled.AudioFormat; import javax.sound.sampled.AudioSystem; @@ -57,6 +60,15 @@ * using AgentSessionConfig, rather than as a tool in the session. This allows the agent to be * the primary responder for the voice session.

* + *

Use this sample when you already have an Azure AI Foundry agent and want VoiceLive to talk to + * that agent directly instead of registering local tools or writing response orchestration logic in + * the sample itself.

+ * + *

When you run it, the sample creates an {@code AgentSessionConfig}, opens a realtime session, + * sends the session configuration, waits for the service to report the session as ready, and then + * starts full-duplex microphone / speaker streaming while also writing a simple conversation log to + * the local {@code logs} directory.

+ * *

Features demonstrated:

*
    *
  • Using AgentSessionConfig to connect directly to an Azure AI Foundry agent
  • @@ -260,25 +272,39 @@ void start() { // Connect using AgentSessionConfig client.startSession(agentConfig) - .doOnSuccess(s -> { + .flatMap(s -> { this.session = s; System.out.println("Connected to VoiceLive service"); // Initialize audio processor this.audioProcessor = new AudioProcessor(s); - // Configure session - configureSession(); - - // Subscribe to events - subscribeToEvents(); + Sinks.One eventSubscribed = Sinks.one(); + Flux eventStream = s.receiveEvents() + .doOnSubscribe(subscription -> eventSubscribed.tryEmitEmpty()) + .doOnNext(this::handleEvent) + .doOnComplete(() -> { + System.out.println("Event stream completed"); + shutdown(); + }) + .doOnError(error -> { + System.err.println("Error receiving events: " + error.getMessage()); + shutdown(); + }); + + Mono configurePipeline = eventSubscribed.asMono() + .then(configureSession()); + + return Flux.merge( + eventStream, + configurePipeline.thenMany(Flux.empty())) + .then(); }) .subscribe( v -> {}, e -> { - System.err.println("Failed to connect: " + e.getMessage()); - running.set(false); - shutdownLatch.countDown(); + System.err.println("Agent V2 session failed: " + e.getMessage()); + shutdown(); } ); @@ -290,7 +316,7 @@ void start() { } } - private void configureSession() { + private Mono configureSession() { System.out.println("Setting up voice conversation session..."); System.out.println("Enabling Azure Deep Noise Suppression"); System.out.println("Enabling Echo Cancellation"); @@ -332,27 +358,10 @@ private void configureSession() { // Send session update ClientEventSessionUpdate sessionUpdate = new ClientEventSessionUpdate(sessionOptions); - session.sendEvent(sessionUpdate) - .subscribe( - v -> System.out.println("Session configuration sent"), - e -> System.err.println("Failed to configure session: " + e.getMessage()) - ); - } - - private void subscribeToEvents() { - session.receiveEvents() - .doOnNext(this::handleEvent) - .doOnComplete(() -> { - System.out.println("Event stream completed"); - shutdown(); - }) - .subscribe( - v -> {}, - e -> { - System.err.println("Error receiving events: " + e.getMessage()); - shutdown(); - } - ); + return session.sendEvent(sessionUpdate) + .doOnSuccess(v -> System.out.println("Session configuration sent")) + .doOnError(e -> System.err.println("Failed to configure session: " + e.getMessage())) + .then(); } private void handleEvent(SessionUpdate event) { diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AudioPlaybackSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AudioPlaybackSample.java index 3b8f2cf28a0f..58cd0627eccf 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AudioPlaybackSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AudioPlaybackSample.java @@ -20,7 +20,9 @@ import com.azure.ai.voicelive.models.VoiceLiveSessionOptions; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.core.util.BinaryData; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import java.time.Duration; import java.util.Collections; @@ -34,10 +36,17 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** * Sample demonstrating how to receive and play audio responses from VoiceLive service. * + *

    Use this sample when you want to understand downstream audio playback only. It is a good next + * step after the basic sample because it avoids microphone capture and focuses on speaker output.

    + * + *

    When you run it, the sample sends a fixed text prompt, asks the model to generate an audio + * response, and plays the returned PCM audio through your default speaker or headphones.

    + * *

    This sample shows how to:

    *
      *
    • Send a text message to trigger an audio response
    • @@ -126,52 +135,55 @@ public static void main(String[] args) { // Audio playback components final BlockingQueue audioQueue = new LinkedBlockingQueue<>(1000); final AtomicBoolean isPlaying = new AtomicBoolean(false); - final SourceDataLine[] speakerRef = new SourceDataLine[1]; + final AtomicReference speakerRef = new AtomicReference<>(); + final AtomicReference playbackThreadRef = new AtomicReference<>(); // Start session client.startSession("gpt-realtime") .flatMap(session -> { System.out.println("āœ“ Session started"); - // Send session configuration, send text message, trigger response, - // then listen for events. Events are buffered by the SDK's receiveSink, - // so none are lost between sending and subscribing. + // Build the receive stream first, then gate the send pipeline on its subscription. + // This avoids missing early events from the SDK's hot shared event stream. ClientEventSessionUpdate updateEvent = new ClientEventSessionUpdate(sessionOptions); - return session.sendEvent(updateEvent) + InputTextContentPart textContent = new InputTextContentPart( + "Please say 'Hello! This is a test of the audio playback system.' in a friendly voice."); + UserMessageItem messageItem = new UserMessageItem(Collections.singletonList(textContent)); + ClientEventConversationItemCreate createEvent = new ClientEventConversationItemCreate() + .setItem(messageItem); + ClientEventResponseCreate responseEvent = new ClientEventResponseCreate(); + Sinks.One eventSubscribed = Sinks.one(); + + Flux eventStream = session.receiveEvents() + .doOnSubscribe(subscription -> eventSubscribed.tryEmitEmpty()) + .doOnNext(event -> handleEvent(event, audioQueue)) + .doOnError(error -> System.err.println("Error: " + error.getMessage())); + + Mono sendPipeline = eventSubscribed.asMono() + .then(session.sendEvent(updateEvent)) .doOnSuccess(v -> { System.out.println("āœ“ Session configured"); - // Start audio playback system - startPlayback(audioQueue, isPlaying, speakerRef); + startPlayback(audioQueue, isPlaying, speakerRef, playbackThreadRef); }) .then(Mono.delay(Duration.ofMillis(500))) // Wait for session to be fully ready - .flatMap(v -> { - // Send a user message to trigger an audio response - System.out.println("šŸ“¤ Sending text message to trigger audio response..."); - InputTextContentPart textContent = new InputTextContentPart( - "Please say 'Hello! This is a test of the audio playback system.' in a friendly voice."); - UserMessageItem messageItem = new UserMessageItem(Collections.singletonList(textContent)); - ClientEventConversationItemCreate createEvent = new ClientEventConversationItemCreate() - .setItem(messageItem); - - return session.sendEvent(createEvent); - }) + .then(Mono.fromRunnable(() -> + System.out.println("šŸ“¤ Sending text message to trigger audio response..."))) + .then(session.sendEvent(createEvent)) .then(Mono.delay(Duration.ofMillis(100))) - .flatMap(v -> { - // Trigger response generation - System.out.println("šŸŽÆ Triggering response generation..."); - ClientEventResponseCreate responseEvent = new ClientEventResponseCreate(); - return session.sendEvent(responseEvent); - }) - .thenMany(session.receiveEvents() - .doOnNext(event -> handleEvent(event, audioQueue)) - .doOnError(error -> System.err.println("Error: " + error.getMessage())) - .take(Duration.ofSeconds(10))) // Listen for 10 seconds then complete + .then(Mono.fromRunnable(() -> + System.out.println("šŸŽÆ Triggering response generation..."))) + .then(session.sendEvent(responseEvent)) + .then(); + + return Flux.merge( + eventStream.take(Duration.ofSeconds(10)), // Listen for 10 seconds then complete + sendPipeline.thenMany(Flux.empty())) .then() .doFinally(signal -> System.out.println("\nāœ“ Sample completed - audio playback demonstrated")); }) .doFinally(signalType -> { // Cleanup - stopPlayback(audioQueue, isPlaying, speakerRef[0]); + stopPlayback(audioQueue, isPlaying, speakerRef, playbackThreadRef); }) .block(); // Block for demo purposes } @@ -197,8 +209,10 @@ private static boolean checkSpeakerAvailable() { * @param audioQueue Queue containing audio data to play * @param isPlaying Flag to control playback loop * @param speakerRef Reference to store the speaker line + * @param playbackThreadRef Reference to store the playback thread */ - private static void startPlayback(BlockingQueue audioQueue, AtomicBoolean isPlaying, SourceDataLine[] speakerRef) { + private static void startPlayback(BlockingQueue audioQueue, AtomicBoolean isPlaying, + AtomicReference speakerRef, AtomicReference playbackThreadRef) { try { AudioFormat format = new AudioFormat( AudioFormat.Encoding.PCM_SIGNED, @@ -215,7 +229,7 @@ private static void startPlayback(BlockingQueue audioQueue, AtomicBoolea speaker.open(format, CHUNK_SIZE * 4); speaker.start(); - speakerRef[0] = speaker; + speakerRef.set(speaker); isPlaying.set(true); System.out.println("šŸ”Š Audio playback started"); @@ -245,6 +259,7 @@ private static void startPlayback(BlockingQueue audioQueue, AtomicBoolea } }, "AudioPlayback"); playbackThread.setDaemon(true); + playbackThreadRef.set(playbackThread); playbackThread.start(); } catch (LineUnavailableException e) { @@ -257,11 +272,25 @@ private static void startPlayback(BlockingQueue audioQueue, AtomicBoolea * * @param audioQueue Queue containing audio data * @param isPlaying Flag to control playback loop - * @param speaker The speaker line to close + * @param speakerRef Reference to the speaker line to close + * @param playbackThreadRef Reference to the playback thread */ - private static void stopPlayback(BlockingQueue audioQueue, AtomicBoolean isPlaying, SourceDataLine speaker) { + private static void stopPlayback(BlockingQueue audioQueue, AtomicBoolean isPlaying, + AtomicReference speakerRef, AtomicReference playbackThreadRef) { isPlaying.set(false); audioQueue.offer(new byte[0]); // Shutdown signal + + Thread playbackThread = playbackThreadRef.getAndSet(null); + if (playbackThread != null) { + playbackThread.interrupt(); + try { + playbackThread.join(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + SourceDataLine speaker = speakerRef.getAndSet(null); if (speaker != null) { speaker.stop(); speaker.close(); diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AuthenticationMethodsSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AuthenticationMethodsSample.java index c392e606e120..61daf188c1ad 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AuthenticationMethodsSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/AuthenticationMethodsSample.java @@ -15,13 +15,23 @@ import com.azure.core.credential.TokenCredential; import com.azure.core.util.BinaryData; import com.azure.identity.DefaultAzureCredentialBuilder; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; import java.util.Arrays; /** * Sample demonstrating different authentication methods for VoiceLive service. * + *

      Use this sample first when you are not sure whether connection failures are caused by auth or + * by session logic. It gives you a small, low-noise way to verify that your chosen credential can + * open a VoiceLive session successfully.

      + * + *

      When you run it, the sample builds a client with either API key auth or token-based auth, + * opens a short-lived session, applies a minimal configuration, prints a few server events, and + * then exits after a brief validation window.

      + * *

      This sample shows two authentication approaches:

      *
        *
      • API Key authentication (simplest for development)
      • @@ -224,12 +234,19 @@ private static void runSampleSession(VoiceLiveAsyncClient client, String authMet // Send session configuration, then listen for events. ClientEventSessionUpdate updateEvent = new ClientEventSessionUpdate(sessionOptions); - return session.sendEvent(updateEvent) - .doOnSuccess(v -> System.out.println("āœ“ Session configured successfully")) - .thenMany(session.receiveEvents() - .doOnNext(event -> handleEvent(event)) - .doOnError(error -> System.err.println("Error: " + error.getMessage()))) - .then(Mono.delay(java.time.Duration.ofSeconds(2))) + Sinks.One eventSubscribed = Sinks.one(); + Flux eventStream = session.receiveEvents() + .doOnSubscribe(subscription -> eventSubscribed.tryEmitEmpty()) + .doOnNext(event -> handleEvent(event)) + .doOnError(error -> System.err.println("Error: " + error.getMessage())) + .take(java.time.Duration.ofSeconds(2)); + + return Flux.merge( + eventStream, + eventSubscribed.asMono() + .then(session.sendEvent(updateEvent)) + .doOnSuccess(v -> System.out.println("āœ“ Session configured successfully")) + .thenMany(Flux.empty())) .then(Mono.fromRunnable(() -> System.out.println("āœ“ Authentication test completed successfully\n"))); }) .doOnError(error -> { diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/BasicVoiceConversationSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/BasicVoiceConversationSample.java index 043d73727426..6828275a1d9f 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/BasicVoiceConversationSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/BasicVoiceConversationSample.java @@ -14,6 +14,8 @@ import com.azure.ai.voicelive.models.VoiceLiveSessionOptions; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.core.util.BinaryData; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; import java.util.Arrays; @@ -22,6 +24,13 @@ * *

        Start here if you're new to VoiceLive! This is the simplest sample showing core concepts.

        * + *

        Use this sample to verify that your endpoint, credential, and basic realtime session setup are + * working before you add microphone input, speaker output, tool calls, or tracing.

        + * + *

        When you run it, the sample opens a realtime session, sends a minimal {@code session.update} + * request, and prints the server events that come back so you can see the connection lifecycle end + * to end.

        + * *

        This sample shows the simplest way to:

        *
          *
        • Create a VoiceLive client
        • @@ -93,12 +102,19 @@ public static void main(String[] args) { // Send session configuration, then listen for events. ClientEventSessionUpdate updateEvent = new ClientEventSessionUpdate(sessionOptions); - return session.sendEvent(updateEvent) - .doOnSuccess(v -> System.out.println("āœ“ Session configured")) - .thenMany(session.receiveEvents() - .doOnNext(event -> handleEvent(event)) - .doOnError(error -> System.err.println("Error: " + error.getMessage())) - .doOnComplete(() -> System.out.println("Event stream completed"))) + Sinks.One eventSubscribed = Sinks.one(); + Flux eventStream = session.receiveEvents() + .doOnSubscribe(subscription -> eventSubscribed.tryEmitEmpty()) + .doOnNext(thisEvent -> handleEvent(thisEvent)) + .doOnError(error -> System.err.println("Error: " + error.getMessage())) + .doOnComplete(() -> System.out.println("Event stream completed")); + + return Flux.merge( + eventStream, + eventSubscribed.asMono() + .then(session.sendEvent(updateEvent)) + .doOnSuccess(v -> System.out.println("āœ“ Session configured")) + .thenMany(Flux.empty())) .then(); // receiveEvents() never completes, so this keeps session alive }) .block(); // Block for demo purposes diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/FunctionCallingSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/FunctionCallingSample.java index 5621c659a786..d8219d4d4ef0 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/FunctionCallingSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/FunctionCallingSample.java @@ -34,6 +34,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; import javax.sound.sampled.AudioFormat; import javax.sound.sampled.AudioSystem; @@ -58,6 +60,14 @@ /** * Function calling sample demonstrating how to use VoiceLive with custom function tools. * + *

          Use this sample when you want the model to call your local business logic instead of only + * producing text or audio. It is the best sample for understanding how tool schemas, tool calls, + * and tool outputs fit into a realtime conversation.

          + * + *

          When you run it, the sample registers a small set of demo functions, waits for the model to + * request them, executes the matching Java method locally, sends the result back to the session, + * and then continues the conversation with the tool output in context.

          + * *

          This sample shows how to:

          *
            *
          • Define function tools with parameters
          • @@ -164,43 +174,50 @@ private static void runFunctionCallingSession(VoiceLiveAsyncClient client) throw // Send session configuration with function tools, then listen for events. System.out.println("šŸ“¤ Sending session configuration with function tools..."); ClientEventSessionUpdate sessionConfig = createSessionConfigWithFunctions(); - return session.sendEvent(sessionConfig) - .doOnSuccess(v -> { - System.out.println("āœ“ Session configured with function tools"); - - // Start audio playback - audioProcessor.startPlayback(); - - String separator = new String(new char[70]).replace("\0", "="); - System.out.println("\n" + separator); - System.out.println("šŸŽ¤ VOICE ASSISTANT WITH FUNCTION CALLING READY"); - System.out.println("Try saying:"); - System.out.println(" • 'What's the current time?'"); - System.out.println(" • 'What's the weather in Seattle?'"); - System.out.println(" • 'What time is it in UTC?'"); - System.out.println("Press Ctrl+C to exit"); - System.out.println(separator + "\n"); - - // Add shutdown hook - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - System.out.println("\nšŸ‘‹ Shutting down voice assistant..."); - running.set(false); - audioProcessor.cleanup(); - try { - session.closeAsync().block(Duration.ofSeconds(5)); - } catch (Exception e) { - // Suppress errors during forced JVM shutdown - } - })); + Sinks.One eventSubscribed = Sinks.one(); + Flux eventStream = session.receiveEvents() + .doOnSubscribe(subscription -> eventSubscribed.tryEmitEmpty()) + .doOnNext(event -> handleServerEvent(session, event, audioProcessor, pendingFunctionCalls)) + .doOnError(error -> { + System.err.println("Error processing events: " + error.getMessage()); + running.set(false); }) - .doOnError(error -> System.err.println("āŒ Failed to send session.update: " + error.getMessage())) - .thenMany(session.receiveEvents() - .doOnNext(event -> handleServerEvent(session, event, audioProcessor, pendingFunctionCalls)) - .doOnError(error -> { - System.err.println("Error processing events: " + error.getMessage()); - running.set(false); + .doOnComplete(() -> System.out.println("āœ“ Event stream completed")); + + return Flux.merge( + eventStream, + eventSubscribed.asMono() + .then(session.sendEvent(sessionConfig)) + .doOnSuccess(v -> { + System.out.println("āœ“ Session configured with function tools"); + + // Start audio playback + audioProcessor.startPlayback(); + + String separator = new String(new char[70]).replace("\0", "="); + System.out.println("\n" + separator); + System.out.println("šŸŽ¤ VOICE ASSISTANT WITH FUNCTION CALLING READY"); + System.out.println("Try saying:"); + System.out.println(" • 'What's the current time?'"); + System.out.println(" • 'What's the weather in Seattle?'"); + System.out.println(" • 'What time is it in UTC?'"); + System.out.println("Press Ctrl+C to exit"); + System.out.println(separator + "\n"); + + // Add shutdown hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("\nšŸ‘‹ Shutting down voice assistant..."); + running.set(false); + audioProcessor.cleanup(); + try { + session.closeAsync().block(Duration.ofSeconds(5)); + } catch (Exception e) { + // Suppress errors during forced JVM shutdown + } + })); }) - .doOnComplete(() -> System.out.println("āœ“ Event stream completed"))) + .doOnError(error -> System.err.println("āŒ Failed to send session.update: " + error.getMessage())) + .thenMany(Flux.empty())) .then(); // receiveEvents() never completes, so this keeps session alive }) .doOnError(error -> System.err.println("āŒ Error: " + error.getMessage())) @@ -471,6 +488,8 @@ private static class AudioProcessor { private volatile SourceDataLine speaker; private final BlockingQueue playbackQueue = new LinkedBlockingQueue<>(1000); private final AtomicBoolean isPlaying = new AtomicBoolean(false); + private volatile Thread captureThread; + private volatile Thread playbackThread; AudioProcessor(VoiceLiveSessionAsyncClient session) { this.session = session; @@ -490,20 +509,29 @@ void startCapture() { isCapturing.set(true); // Start capture thread - new Thread(() -> { + captureThread = new Thread(() -> { byte[] buffer = new byte[CHUNK_SIZE]; while (isCapturing.get()) { - int bytesRead = microphone.read(buffer, 0, buffer.length); - if (bytesRead > 0) { - byte[] audioData = Arrays.copyOf(buffer, bytesRead); - session.sendInputAudio(BinaryData.fromBytes(audioData)) - .subscribe( - v -> {}, - error -> System.err.println("Error sending audio: " + error.getMessage()) - ); + try { + int bytesRead = microphone.read(buffer, 0, buffer.length); + if (bytesRead > 0) { + byte[] audioData = Arrays.copyOf(buffer, bytesRead); + session.sendInputAudio(BinaryData.fromBytes(audioData)) + .subscribe( + v -> {}, + error -> System.err.println("Error sending audio: " + error.getMessage()) + ); + } + } catch (Exception e) { + if (isCapturing.get()) { + System.err.println("Error capturing audio: " + e.getMessage()); + } + break; } } - }, "AudioCapture").start(); + }, "AudioCapture"); + captureThread.setDaemon(true); + captureThread.start(); System.out.println("āœ… Audio capture started"); } catch (LineUnavailableException e) { @@ -525,7 +553,7 @@ void startPlayback() { isPlaying.set(true); // Start playback thread - new Thread(() -> { + playbackThread = new Thread(() -> { while (isPlaying.get()) { try { byte[] audioData = playbackQueue.take(); @@ -533,9 +561,16 @@ void startPlayback() { } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; + } catch (Exception e) { + if (isPlaying.get()) { + System.err.println("Error playing audio: " + e.getMessage()); + } + break; } } - }, "AudioPlayback").start(); + }, "AudioPlayback"); + playbackThread.setDaemon(true); + playbackThread.start(); System.out.println("āœ… Audio playback started"); } catch (LineUnavailableException e) { @@ -564,12 +599,22 @@ void cleanup() { if (microphone != null) { microphone.stop(); microphone.close(); + microphone = null; + } + if (captureThread != null) { + captureThread.interrupt(); + captureThread = null; } if (speaker != null) { speaker.drain(); speaker.stop(); speaker.close(); + speaker = null; + } + if (playbackThread != null) { + playbackThread.interrupt(); + playbackThread = null; } playbackQueue.clear(); diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MCPSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MCPSample.java index 24938a79f405..af6f2fa74912 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MCPSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MCPSample.java @@ -34,6 +34,8 @@ import com.azure.ai.voicelive.models.ServerVadTurnDetection; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.core.util.BinaryData; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; import javax.sound.sampled.AudioFormat; import javax.sound.sampled.AudioSystem; @@ -53,6 +55,14 @@ /** * MCP (Model Context Protocol) sample demonstrating how to use VoiceLive with MCP servers. * + *

            Use this sample when your assistant needs tools that live outside the current process, such as + * documentation search or repo analysis exposed through an MCP server. It is the right sample for + * learning approval flows and external tool execution.

            + * + *

            When you run it, the sample configures one or more MCP-backed tools, starts a voice session, + * listens for MCP call and approval events, and forwards the user's choices and tool outputs back + * into the realtime conversation.

            + * *

            This sample shows how to:

            *
              *
            • Configure MCP servers for external tool integration
            • @@ -152,49 +162,56 @@ private static void runMCPSample(String endpoint) { // Send session configuration with MCP tools, then listen for events. System.out.println("šŸ“¤ Sending session configuration with MCP tools..."); ClientEventSessionUpdate sessionConfig = createSessionConfigWithMCPTools(); - return session.sendEvent(sessionConfig) - .doOnSuccess(v -> { - System.out.println("āœ“ Session configured with MCP tools"); - System.out.println(); - printSeparator(); - System.out.println("šŸŽ¤ MCP VOICE ASSISTANT READY"); - System.out.println("šŸŽÆ Available MCP Tools:"); - System.out.println(" • deepwiki: Can search and read wiki structure"); - System.out.println(" • azure_doc: Requires approval for tool calls"); - System.out.println(); - System.out.println("Try asking:"); - System.out.println(" • 'Can you summary github repo azure sdk for java?'"); - System.out.println(" • 'Can you summary azure docs about voice live?'"); - System.out.println("Press Ctrl+C to exit"); - printSeparator(); - System.out.println(); - - // Start audio playback - audioProcessor.startPlayback(); - - // Add shutdown hook - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - System.out.println("\nšŸ‘‹ Shutting down MCP voice assistant..."); - running.set(false); - AudioProcessor processor = audioProcessorRef.get(); - if (processor != null) { - processor.cleanup(); - } - try { - session.closeAsync().block(Duration.ofSeconds(5)); - } catch (Exception e) { - // Suppress errors during forced JVM shutdown - } - })); + Sinks.One eventSubscribed = Sinks.one(); + Flux eventStream = session.receiveEvents() + .doOnSubscribe(subscription -> eventSubscribed.tryEmitEmpty()) + .doOnNext(event -> handleServerEvent(session, event, activeMCPCallId, audioProcessor)) + .doOnError(error -> { + System.err.println("āŒ Error processing events: " + error.getMessage()); + running.set(false); }) - .doOnError(error -> System.err.println("āŒ Failed to send session.update: " + error.getMessage())) - .thenMany(session.receiveEvents() - .doOnNext(event -> handleServerEvent(session, event, activeMCPCallId, audioProcessor)) - .doOnError(error -> { - System.err.println("āŒ Error processing events: " + error.getMessage()); - running.set(false); + .doOnComplete(() -> System.out.println("āœ“ Event stream completed")); + + return Flux.merge( + eventStream, + eventSubscribed.asMono() + .then(session.sendEvent(sessionConfig)) + .doOnSuccess(v -> { + System.out.println("āœ“ Session configured with MCP tools"); + System.out.println(); + printSeparator(); + System.out.println("šŸŽ¤ MCP VOICE ASSISTANT READY"); + System.out.println("šŸŽÆ Available MCP Tools:"); + System.out.println(" • deepwiki: Can search and read wiki structure"); + System.out.println(" • azure_doc: Requires approval for tool calls"); + System.out.println(); + System.out.println("Try asking:"); + System.out.println(" • 'Can you summary github repo azure sdk for java?'"); + System.out.println(" • 'Can you summary azure docs about voice live?'"); + System.out.println("Press Ctrl+C to exit"); + printSeparator(); + System.out.println(); + + // Start audio playback + audioProcessor.startPlayback(); + + // Add shutdown hook + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + System.out.println("\nšŸ‘‹ Shutting down MCP voice assistant..."); + running.set(false); + AudioProcessor processor = audioProcessorRef.get(); + if (processor != null) { + processor.cleanup(); + } + try { + session.closeAsync().block(Duration.ofSeconds(5)); + } catch (Exception e) { + // Suppress errors during forced JVM shutdown + } + })); }) - .doOnComplete(() -> System.out.println("āœ“ Event stream completed"))) + .doOnError(error -> System.err.println("āŒ Failed to send session.update: " + error.getMessage())) + .thenMany(Flux.empty())) .then(); // receiveEvents() never completes, so this keeps session alive }) .doOnError(error -> System.err.println("āŒ Error: " + error.getMessage())) @@ -487,6 +504,8 @@ private static class AudioProcessor { private volatile SourceDataLine speaker; private final BlockingQueue playbackQueue = new LinkedBlockingQueue<>(1000); private final AtomicBoolean isPlaying = new AtomicBoolean(false); + private volatile Thread captureThread; + private volatile Thread playbackThread; AudioProcessor(VoiceLiveSessionAsyncClient session) { this.session = session; @@ -506,20 +525,29 @@ void startCapture() { isCapturing.set(true); // Start capture thread - new Thread(() -> { + captureThread = new Thread(() -> { byte[] buffer = new byte[CHUNK_SIZE]; while (isCapturing.get()) { - int bytesRead = microphone.read(buffer, 0, buffer.length); - if (bytesRead > 0) { - byte[] audioData = Arrays.copyOf(buffer, bytesRead); - session.sendInputAudio(BinaryData.fromBytes(audioData)) - .subscribe( - v -> {}, - error -> System.err.println("Error sending audio: " + error.getMessage()) - ); + try { + int bytesRead = microphone.read(buffer, 0, buffer.length); + if (bytesRead > 0) { + byte[] audioData = Arrays.copyOf(buffer, bytesRead); + session.sendInputAudio(BinaryData.fromBytes(audioData)) + .subscribe( + v -> {}, + error -> System.err.println("Error sending audio: " + error.getMessage()) + ); + } + } catch (Exception e) { + if (isCapturing.get()) { + System.err.println("Error capturing audio: " + e.getMessage()); + } + break; } } - }, "AudioCapture").start(); + }, "AudioCapture"); + captureThread.setDaemon(true); + captureThread.start(); System.out.println("āœ… Audio capture started"); } catch (LineUnavailableException e) { @@ -541,7 +569,7 @@ void startPlayback() { isPlaying.set(true); // Start playback thread - new Thread(() -> { + playbackThread = new Thread(() -> { while (isPlaying.get()) { try { byte[] audioData = playbackQueue.take(); @@ -549,9 +577,16 @@ void startPlayback() { } catch (InterruptedException e) { Thread.currentThread().interrupt(); break; + } catch (Exception e) { + if (isPlaying.get()) { + System.err.println("Error playing audio: " + e.getMessage()); + } + break; } } - }, "AudioPlayback").start(); + }, "AudioPlayback"); + playbackThread.setDaemon(true); + playbackThread.start(); System.out.println("āœ… Audio playback started"); } catch (LineUnavailableException e) { @@ -580,12 +615,22 @@ void cleanup() { if (microphone != null) { microphone.stop(); microphone.close(); + microphone = null; + } + if (captureThread != null) { + captureThread.interrupt(); + captureThread = null; } if (speaker != null) { speaker.drain(); speaker.stop(); speaker.close(); + speaker = null; + } + if (playbackThread != null) { + playbackThread.interrupt(); + playbackThread = null; } playbackQueue.clear(); diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MicrophoneInputSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MicrophoneInputSample.java index 64ba9f845146..ed41397fafa8 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MicrophoneInputSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/MicrophoneInputSample.java @@ -17,6 +17,8 @@ import com.azure.ai.voicelive.models.VoiceLiveSessionOptions; import com.azure.identity.DefaultAzureCredentialBuilder; import com.azure.core.util.BinaryData; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; import javax.sound.sampled.AudioFormat; import javax.sound.sampled.AudioSystem; @@ -25,10 +27,18 @@ import javax.sound.sampled.TargetDataLine; import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; /** * Sample demonstrating how to capture audio from microphone and send it to VoiceLive service. * + *

              Use this sample when you want to validate microphone capture and upstream audio streaming + * without the extra moving parts of local speaker playback or function/tool integration.

              + * + *

              When you run it, the sample opens a realtime session, starts reading PCM audio from your + * default microphone, streams that audio to the service, and prints speech / response events so + * you can confirm the service is receiving your input.

              + * *

              This sample shows how to:

              *
                *
              • Initialize microphone for audio capture
              • @@ -109,7 +119,8 @@ public static void main(String[] args) { .setInputAudioSamplingRate(SAMPLE_RATE); final AtomicBoolean isCapturing = new AtomicBoolean(false); - final TargetDataLine[] microphoneRef = new TargetDataLine[1]; + final AtomicReference microphoneRef = new AtomicReference<>(); + final AtomicReference captureThreadRef = new AtomicReference<>(); // Start session client.startSession("gpt-realtime") @@ -118,20 +129,27 @@ public static void main(String[] args) { // Send session configuration, then listen for events. ClientEventSessionUpdate updateEvent = new ClientEventSessionUpdate(sessionOptions); - return session.sendEvent(updateEvent) - .doOnSuccess(v -> { - System.out.println("\u2713 Session configured"); - // Start microphone capture - startMicrophone(session, isCapturing, microphoneRef); - }) - .thenMany(session.receiveEvents() - .doOnNext(event -> handleEvent(event, isCapturing)) - .doOnError(error -> System.err.println("Error: " + error.getMessage()))) + Sinks.One eventSubscribed = Sinks.one(); + Flux eventStream = session.receiveEvents() + .doOnSubscribe(subscription -> eventSubscribed.tryEmitEmpty()) + .doOnNext(event -> handleEvent(event, isCapturing)) + .doOnError(error -> System.err.println("Error: " + error.getMessage())); + + return Flux.merge( + eventStream, + eventSubscribed.asMono() + .then(session.sendEvent(updateEvent)) + .doOnSuccess(v -> { + System.out.println("\u2713 Session configured"); + // Start microphone capture + startMicrophone(session, isCapturing, microphoneRef, captureThreadRef); + }) + .thenMany(Flux.empty())) .then(); // receiveEvents() never completes, so this keeps session alive }) .doFinally(signalType -> { // Cleanup - stopMicrophone(isCapturing, microphoneRef[0]); + stopMicrophone(isCapturing, microphoneRef, captureThreadRef); }) .block(); // Block for demo purposes } @@ -157,8 +175,10 @@ private static boolean checkMicrophoneAvailable() { * @param session The VoiceLive session * @param isCapturing Flag to control capture loop * @param microphoneRef Reference to store the microphone line + * @param captureThreadRef Reference to store the capture thread */ - private static void startMicrophone(VoiceLiveSessionAsyncClient session, AtomicBoolean isCapturing, TargetDataLine[] microphoneRef) { + private static void startMicrophone(VoiceLiveSessionAsyncClient session, AtomicBoolean isCapturing, + AtomicReference microphoneRef, AtomicReference captureThreadRef) { try { AudioFormat format = new AudioFormat( AudioFormat.Encoding.PCM_SIGNED, @@ -175,7 +195,7 @@ private static void startMicrophone(VoiceLiveSessionAsyncClient session, AtomicB microphone.open(format, CHUNK_SIZE * 4); microphone.start(); - microphoneRef[0] = microphone; + microphoneRef.set(microphone); isCapturing.set(true); System.out.println("šŸŽ¤ Microphone started - speak now"); @@ -186,19 +206,27 @@ private static void startMicrophone(VoiceLiveSessionAsyncClient session, AtomicB byte[] buffer = new byte[CHUNK_SIZE * 2]; // 16-bit samples while (isCapturing.get()) { - int bytesRead = microphone.read(buffer, 0, buffer.length); - if (bytesRead > 0) { - // Send audio to VoiceLive service - byte[] audioChunk = Arrays.copyOf(buffer, bytesRead); - session.sendInputAudio(BinaryData.fromBytes(audioChunk)) - .subscribe( - v -> {}, - error -> System.err.println("Error sending audio: " + error.getMessage()) - ); + try { + int bytesRead = microphone.read(buffer, 0, buffer.length); + if (bytesRead > 0) { + // Send audio to VoiceLive service + byte[] audioChunk = Arrays.copyOf(buffer, bytesRead); + session.sendInputAudio(BinaryData.fromBytes(audioChunk)) + .subscribe( + v -> {}, + error -> System.err.println("Error sending audio: " + error.getMessage()) + ); + } + } catch (Exception e) { + if (isCapturing.get()) { + System.err.println("Error capturing audio: " + e.getMessage()); + } + break; } } }, "MicrophoneCapture"); captureThread.setDaemon(true); + captureThreadRef.set(captureThread); captureThread.start(); } catch (LineUnavailableException e) { @@ -210,10 +238,24 @@ private static void startMicrophone(VoiceLiveSessionAsyncClient session, AtomicB * Stop microphone capture. * * @param isCapturing Flag to control capture loop - * @param microphone The microphone line to close + * @param microphoneRef Reference to the microphone line to close + * @param captureThreadRef Reference to the capture thread */ - private static void stopMicrophone(AtomicBoolean isCapturing, TargetDataLine microphone) { + private static void stopMicrophone(AtomicBoolean isCapturing, AtomicReference microphoneRef, + AtomicReference captureThreadRef) { isCapturing.set(false); + + Thread captureThread = captureThreadRef.getAndSet(null); + if (captureThread != null) { + captureThread.interrupt(); + try { + captureThread.join(500); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + TargetDataLine microphone = microphoneRef.getAndSet(null); if (microphone != null) { microphone.stop(); microphone.close(); diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/VoiceAssistantSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/VoiceAssistantSample.java index 019f671ae0e9..a2a89c616537 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/VoiceAssistantSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/VoiceAssistantSample.java @@ -25,6 +25,8 @@ import com.azure.core.credential.TokenCredential; import com.azure.core.util.BinaryData; import com.azure.identity.DefaultAzureCredentialBuilder; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; import javax.sound.sampled.AudioFormat; @@ -47,6 +49,14 @@ * *

                NOTE: This is a comprehensive sample showing all features together. * For easier understanding, see these focused samples:

                + * + *

                Use this sample when you want the closest thing to an end-to-end assistant experience in this + * package. It combines session configuration, microphone capture, speaker playback, interruption + * handling, and authentication in one place.

                + * + *

                When you run it, the sample opens a realtime session, configures the assistant, starts local + * playback, waits for the session to be ready, and then begins streaming microphone audio while + * playing the model's responses through your speakers.

                *
                  *
                • {@link BasicVoiceConversationSample} - Minimal setup and session management
                • *
                • {@link MicrophoneInputSample} - Audio capture from microphone
                • @@ -78,10 +88,10 @@ * *

                  How to Run:

                  *
                  {@code
                  - * # With API Key (default):
                  + * # With DefaultAzureCredential (default):
                    * mvn exec:java -Dexec.mainClass="com.azure.ai.voicelive.VoiceAssistantSample" -Dexec.classpathScope=test
                    *
                  - * # With Token Credential:
                  + * # With API Key (requires AZURE_VOICELIVE_API_KEY):
                    * mvn exec:java -Dexec.mainClass="com.azure.ai.voicelive.VoiceAssistantSample" -Dexec.classpathScope=test -Dexec.args="--use-api-key"
                    * }
                  */ @@ -355,11 +365,14 @@ void shutdown() { * *

                  Supports two authentication methods:

                  *
                    - *
                  • API Key: Default authentication (requires AZURE_VOICELIVE_API_KEY env var)
                  • - *
                  • API Key: Use --use-api-key flag
                  • + *
                  • DefaultAzureCredential (default): uses the credential chain from azure-identity (managed identity, + * environment variables, Azure CLI, etc.). No flag required.
                  • + *
                  • API Key: pass {@code --use-api-key} on the command line; requires the {@code AZURE_VOICELIVE_API_KEY} + * environment variable to be set.
                  • *
                  * - * @param args Command line arguments. Use --use-api-key to use API key authentication instead of DefaultAzureCredential. + * @param args Command line arguments. Pass {@code --use-api-key} to use API key authentication instead + * of the default DefaultAzureCredential. */ public static void main(String[] args) { // Parse command line arguments @@ -517,38 +530,45 @@ private static void runVoiceAssistantWithClient(VoiceLiveAsyncClient client) { // Send session configuration, then listen for events. System.out.println("šŸ“¤ Sending session.update configuration..."); ClientEventSessionUpdate updateEvent = new ClientEventSessionUpdate(sessionOptions); - return session.sendEvent(updateEvent) - .doOnSuccess(v -> { - System.out.println("āœ“ Session configuration sent"); - - // Start audio systems - audioProcessor.startPlayback(); - - System.out.println("šŸŽ¤ VOICE ASSISTANT READY"); - System.out.println("Start speaking to begin conversation"); - System.out.println("Press Ctrl+C to exit"); - - // Install shutdown hook for graceful cleanup - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - System.out.println("\nšŸ›‘ Shutting down gracefully..."); - } catch (Exception ignored) { - // jansi may have torn down the ANSI output stream already - } - audioProcessor.shutdown(); - try { - session.closeAsync().block(Duration.ofSeconds(5)); - } catch (Exception e) { - // Suppress errors during forced JVM shutdown - - // the WebSocket connection may already be partially torn down - } - })); - }) - .doOnError(error -> System.err.println("āŒ Failed to send session.update: " + error.getMessage())) - .thenMany(session.receiveEvents() - .doOnNext(event -> handleServerEvent(event, audioProcessor)) - .doOnComplete(() -> System.out.println("āœ“ Event stream completed")) - .doOnError(error -> System.err.println("āŒ Error receiving events: " + error.getMessage()))) + Sinks.One eventSubscribed = Sinks.one(); + Flux eventStream = session.receiveEvents() + .doOnSubscribe(subscription -> eventSubscribed.tryEmitEmpty()) + .doOnNext(event -> handleServerEvent(event, audioProcessor)) + .doOnComplete(() -> System.out.println("āœ“ Event stream completed")) + .doOnError(error -> System.err.println("āŒ Error receiving events: " + error.getMessage())); + + return Flux.merge( + eventStream, + eventSubscribed.asMono() + .then(session.sendEvent(updateEvent)) + .doOnSuccess(v -> { + System.out.println("āœ“ Session configuration sent"); + + // Start audio systems + audioProcessor.startPlayback(); + + System.out.println("šŸŽ¤ VOICE ASSISTANT READY"); + System.out.println("Start speaking to begin conversation"); + System.out.println("Press Ctrl+C to exit"); + + // Install shutdown hook for graceful cleanup + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + System.out.println("\nšŸ›‘ Shutting down gracefully..."); + } catch (Exception ignored) { + // jansi may have torn down the ANSI output stream already + } + audioProcessor.shutdown(); + try { + session.closeAsync().block(Duration.ofSeconds(5)); + } catch (Exception e) { + // Suppress errors during forced JVM shutdown - + // the WebSocket connection may already be partially torn down + } + })); + }) + .doOnError(error -> System.err.println("āŒ Failed to send session.update: " + error.getMessage())) + .thenMany(Flux.empty())) .then(); // receiveEvents() never completes, so this keeps session alive }) .doOnError(error -> System.err.println("āŒ Error: " + error.getMessage())) diff --git a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/telemetry/GlobalTracingSample.java b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/telemetry/GlobalTracingSample.java index 29859306f045..c12b9b8cfdf3 100644 --- a/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/telemetry/GlobalTracingSample.java +++ b/sdk/voicelive/azure-ai-voicelive/src/samples/java/com/azure/ai/voicelive/telemetry/GlobalTracingSample.java @@ -17,6 +17,8 @@ import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import io.opentelemetry.sdk.trace.export.SpanExporter; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Sinks; import java.util.Arrays; import java.util.Collection; @@ -26,6 +28,13 @@ /** * Sample demonstrating automatic tracing via {@code GlobalOpenTelemetry}. * + *

                  Use this sample when you want to confirm that the VoiceLive client emits OpenTelemetry spans + * automatically without adding manual tracing calls around each SDK operation.

                  + * + *

                  When you run it, the sample registers a simple console span exporter, opens a short text-only + * VoiceLive session, waits for one model response to complete, closes the session, and then flushes + * the spans so you can inspect the emitted telemetry immediately.

                  + * *

                  This sample registers a global OpenTelemetry instance with * {@code OpenTelemetrySdk.builder().buildAndRegisterGlobal()}. The VoiceLive client picks it * up automatically via {@code GlobalOpenTelemetry.getOrNoop()}.

                  @@ -87,12 +96,21 @@ public static void main(String[] args) throws InterruptedException { // Configure the session, trigger a response, then wait for response.done. // Uses a single reactive chain: send config → start response → wait for done → close. - return session.sendEvent(new ClientEventSessionUpdate(options)) - .then(session.startResponse()) - .thenMany(session.receiveEvents() - .doOnNext(event -> System.out.println("Event: " + event.getType())) - .filter(event -> event instanceof SessionUpdateResponseDone) - .take(1)) + Sinks.One eventSubscribed = Sinks.one(); + Flux responseDoneEvents = session.receiveEvents() + .doOnSubscribe(subscription -> eventSubscribed.tryEmitEmpty()) + .doOnNext(event -> System.out.println("Event: " + event.getType())) + .filter(event -> event instanceof SessionUpdateResponseDone) + .map(event -> (SessionUpdateResponseDone) event) + .take(1); + + return Flux.merge( + responseDoneEvents, + eventSubscribed.asMono() + .then(session.sendEvent(new ClientEventSessionUpdate(options))) + .then(session.startResponse()) + .thenMany(Flux.empty())) + .next() .flatMap(event -> session.closeAsync()) .doOnError(error -> System.err.println("Error: " + error.getMessage())) .onErrorComplete()