Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import com.azure.ai.voicelive.models.VoiceLiveSessionOptions;
import com.azure.core.util.BinaryData;
import com.azure.identity.DefaultAzureCredentialBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import javax.sound.sampled.AudioFormat;
import javax.sound.sampled.AudioSystem;
Expand Down Expand Up @@ -57,6 +60,15 @@
* using AgentSessionConfig, rather than as a tool in the session. This allows the agent to be
* the primary responder for the voice session.</p>
*
* <p>Use this sample when you already have an Azure AI Foundry agent and want VoiceLive to talk to
* that agent directly instead of registering local tools or writing response orchestration logic in
* the sample itself.</p>
*
* <p>When you run it, the sample creates an {@code AgentSessionConfig}, opens a realtime session,
* sends the session configuration, waits for the service to report the session as ready, and then
* starts full-duplex microphone / speaker streaming while also writing a simple conversation log to
* the local {@code logs} directory.</p>
*
* <p>Features demonstrated:</p>
* <ul>
* <li>Using AgentSessionConfig to connect directly to an Azure AI Foundry agent</li>
Expand Down Expand Up @@ -235,8 +247,9 @@ static class AgentV2VoiceAssistant {
private final AtomicBoolean running = new AtomicBoolean(false);
private final CountDownLatch shutdownLatch = new CountDownLatch(1);

private VoiceLiveSessionAsyncClient session;
private AudioProcessor audioProcessor;
// volatile: written by reactor thread (doOnSuccess), read by shutdown-hook thread
private volatile VoiceLiveSessionAsyncClient session;
private volatile AudioProcessor audioProcessor;

AgentV2VoiceAssistant(String endpoint, AgentSessionConfig agentConfig, String voice) {
this.endpoint = endpoint;
Expand All @@ -259,25 +272,41 @@ void start() {

// Connect using AgentSessionConfig
client.startSession(agentConfig)
.doOnSuccess(s -> {
.flatMap(s -> {
this.session = s;
System.out.println("Connected to VoiceLive service");

// Initialize audio processor
this.audioProcessor = new AudioProcessor(s);

// Configure session
configureSession();

// Subscribe to events
subscribeToEvents();
Sinks.One<Void> eventSubscribed = Sinks.one();
Flux<SessionUpdate> eventStream = s.receiveEvents()
.doOnSubscribe(subscription -> eventSubscribed.tryEmitEmpty())
.doOnNext(this::handleEvent)
.doOnComplete(() -> {
System.out.println("Event stream completed");
shutdown();
})
.doOnError(error -> {
System.err.println("Error receiving events: " + error.getMessage());
shutdown();
});

Mono<Void> configurePipeline = eventSubscribed.asMono()
.then(configureSession());

return Flux.merge(
eventStream,
configurePipeline.thenMany(Flux.<SessionUpdate>empty()))
.then();
})
.doOnError(e -> {
System.err.println("Failed to connect: " + e.getMessage());
running.set(false);
shutdownLatch.countDown();
})
.subscribe();
.subscribe(
v -> {},
e -> {
System.err.println("Agent V2 session failed: " + e.getMessage());
shutdown();
}
);

// Wait for shutdown
try {
Expand All @@ -287,7 +316,7 @@ void start() {
}
}

private void configureSession() {
private Mono<Void> configureSession() {
System.out.println("Setting up voice conversation session...");
System.out.println("Enabling Azure Deep Noise Suppression");
System.out.println("Enabling Echo Cancellation");
Expand Down Expand Up @@ -329,24 +358,10 @@ private void configureSession() {

// Send session update
ClientEventSessionUpdate sessionUpdate = new ClientEventSessionUpdate(sessionOptions);
session.sendEvent(sessionUpdate)
return session.sendEvent(sessionUpdate)
.doOnSuccess(v -> System.out.println("Session configuration sent"))
.doOnError(e -> System.err.println("Failed to configure session: " + e.getMessage()))
.subscribe();
}

private void subscribeToEvents() {
session.receiveEvents()
.doOnNext(this::handleEvent)
.doOnError(e -> {
System.err.println("Error receiving events: " + e.getMessage());
shutdown();
})
.doOnComplete(() -> {
System.out.println("Event stream completed");
shutdown();
})
.subscribe();
.then();
}

private void handleEvent(SessionUpdate event) {
Expand Down Expand Up @@ -483,15 +498,16 @@ private void writeConversationLog(String message) {
static class AudioProcessor {
private final VoiceLiveSessionAsyncClient session;
private final AudioFormat format;
private final BlockingQueue<AudioPacket> playbackQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<AudioPacket> playbackQueue = new LinkedBlockingQueue<>(1000);
private final AtomicInteger nextSeqNum = new AtomicInteger(0);
private final AtomicInteger playbackBase = new AtomicInteger(0);
private final AtomicBoolean running = new AtomicBoolean(false);

private TargetDataLine inputLine;
private SourceDataLine outputLine;
private Thread captureThread;
private Thread playbackThread;
// volatile: written by reactor thread (startCapture/Playback), read/closed by shutdown-hook thread
private volatile TargetDataLine inputLine;
private volatile SourceDataLine outputLine;
private volatile Thread captureThread;
private volatile Thread playbackThread;

AudioProcessor(VoiceLiveSessionAsyncClient session) {
this.session = session;
Expand Down Expand Up @@ -532,8 +548,11 @@ private void captureLoop() {
? buffer.clone()
: Arrays.copyOf(buffer, bytesRead);

// Send audio to service (sendInputAudio takes byte[])
session.sendInputAudio(audioData).subscribe();
session.sendInputAudio(audioData)
.subscribe(
v -> {},
error -> System.err.println("Error sending audio: " + error.getMessage())
);
}
}
}
Expand Down Expand Up @@ -583,7 +602,10 @@ private void playbackLoop() {

void queueAudio(byte[] audioData) {
int seqNum = nextSeqNum.getAndIncrement();
playbackQueue.offer(new AudioPacket(seqNum, audioData));
// offer() returns false if the bounded queue is full; warn so a slow consumer is visible
if (!playbackQueue.offer(new AudioPacket(seqNum, audioData))) {
System.err.println("Warning: playback queue full, dropping audio packet seq=" + seqNum);
}
}

void skipPendingAudio() {
Expand Down
Loading