From 12a03672f4475dc5bae89284638479ef5f43165f Mon Sep 17 00:00:00 2001 From: Aditya Singh Date: Fri, 8 May 2026 04:00:20 -0700 Subject: [PATCH 1/2] fix(voice): emit turn_ended when workflow yields no audio When a workflow yields only empty/whitespace deltas (e.g. an LLM streaming keepalives), `_add_text` triggers `_start_turn` but the splitter leaves `_text_buffer` empty, so `_turn_done` never schedules a TTS task. The dispatcher then emits `session_ended` without a matching `turn_ended`, breaking consumers that pair lifecycle events. Push a synthetic `turn_ended` lifecycle onto the ordered task queue when no synthesizable text remains but a turn was started. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/agents/voice/result.py | 6 ++++++ tests/voice/test_pipeline.py | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/src/agents/voice/result.py b/src/agents/voice/result.py index 511c8e6e7d..cf2ac164b0 100644 --- a/src/agents/voice/result.py +++ b/src/agents/voice/result.py @@ -220,6 +220,12 @@ async def _turn_done(self): ) ) self._text_buffer = "" + elif self._started_processing_turn: + # Turn was started (turn_started emitted) but produced no synthesizable text. + # Emit turn_ended directly so consumers see balanced lifecycle events. + local_queue = asyncio.Queue() + await local_queue.put(VoiceStreamEventLifecycle(event="turn_ended")) + self._ordered_tasks.append(local_queue) self._done_processing = True if self._dispatcher_task is None: self._dispatcher_task = asyncio.create_task(self._dispatch_audio()) diff --git a/tests/voice/test_pipeline.py b/tests/voice/test_pipeline.py index 7bc46279ad..e121c21974 100644 --- a/tests/voice/test_pipeline.py +++ b/tests/voice/test_pipeline.py @@ -314,3 +314,21 @@ async def test_voicepipeline_multi_turn_on_start_exception_does_not_abort() -> N assert events[-1] == "session_ended" assert "error" not in events + + +@pytest.mark.asyncio +async def test_voicepipeline_empty_workflow_yield_emits_turn_ended() -> None: + # Workflow yields only an empty string (e.g. an LLM keepalive delta). turn_started must + # be balanced by turn_ended even though no audio is synthesized. + fake_stt = FakeSTT(["first"]) + workflow = FakeWorkflow([[""]]) + fake_tts = FakeTTS() + config = VoicePipelineConfig(tts_settings=TTSModelSettings(buffer_size=1)) + pipeline = VoicePipeline( + workflow=workflow, stt_model=fake_stt, tts_model=fake_tts, config=config + ) + audio_input = AudioInput(buffer=np.zeros(2, dtype=np.int16)) + result = await pipeline.run(audio_input) + events, audio_chunks = await asyncio.wait_for(extract_events(result), timeout=2.0) + assert events == ["turn_started", "turn_ended", "session_ended"] + assert audio_chunks == [] From 8f1f19c646a074f236162298ce3e61cbe9bc59e8 Mon Sep 17 00:00:00 2001 From: Aditya Singh Date: Mon, 11 May 2026 16:35:59 -0700 Subject: [PATCH 2/2] fix(voice): reset turn state before next turn's add_text The synthesized turn_ended for an empty workflow yield was enqueued in the dispatcher, but _started_processing_turn / tracing span stayed live until the dispatcher drained it. A follow-up _add_text could then skip turn_started, and the late _finish_turn could finish the next turn's span. Finish turn state synchronously, mark the synthetic event so the dispatcher's _finish_turn is skipped, and wait for the dispatcher to flush it before _turn_done returns so ordering is preserved. --- src/agents/voice/result.py | 31 +++++++++++++++++++++++++++++-- tests/voice/test_pipeline.py | 19 +++++++++++++++++++ 2 files changed, 48 insertions(+), 2 deletions(-) diff --git a/src/agents/voice/result.py b/src/agents/voice/result.py index cf2ac164b0..e2e9e43230 100644 --- a/src/agents/voice/result.py +++ b/src/agents/voice/result.py @@ -68,6 +68,11 @@ def __init__( self._completed_session = False self._stored_exception: BaseException | None = None self._tracing_span: Span[SpeechGroupSpanData] | None = None + # Count of synthetic turn_ended events whose lifecycle was already + # finalized synchronously (see `_turn_done`). The dispatcher should not + # call `_finish_turn` again for these — doing so could prematurely + # finish a later turn's tracing span that started after them. + self._pending_synthetic_turn_ends: int = 0 async def _start_turn(self): if self._started_processing_turn: @@ -222,10 +227,26 @@ async def _turn_done(self): self._text_buffer = "" elif self._started_processing_turn: # Turn was started (turn_started emitted) but produced no synthesizable text. - # Emit turn_ended directly so consumers see balanced lifecycle events. + # Finish turn state synchronously so a follow-up `_add_text` for the next + # turn doesn't race the dispatcher and skip its `turn_started`. The + # synthesized `turn_ended` flows through the dispatcher to stay ordered + # with any in-flight audio segments; a counter tells the dispatcher this + # event's `_finish_turn` is already done so it doesn't clobber a span + # that belongs to a later turn started in the meantime. + self._finish_turn() + self._pending_synthetic_turn_ends += 1 local_queue = asyncio.Queue() await local_queue.put(VoiceStreamEventLifecycle(event="turn_ended")) self._ordered_tasks.append(local_queue) + self._done_processing = True + if self._dispatcher_task is None: + self._dispatcher_task = asyncio.create_task(self._dispatch_audio()) + # Wait for the dispatcher to drain this synthesized `turn_ended` so the + # next turn's `turn_started` cannot precede it on `self._queue`. + await asyncio.gather(*self._tasks) + while self._pending_synthetic_turn_ends > 0: + await asyncio.sleep(0) + return self._done_processing = True if self._dispatcher_task is None: self._dispatcher_task = asyncio.create_task(self._dispatch_audio()) @@ -264,7 +285,13 @@ async def _dispatch_audio(self): if isinstance(chunk, VoiceStreamEventLifecycle): local_queue.task_done() if chunk.event == "turn_ended": - self._finish_turn() + if self._pending_synthetic_turn_ends > 0: + # `_turn_done` already ran `_finish_turn` synchronously + # for this event; skip to avoid finishing a span that + # belongs to a later turn started in the meantime. + self._pending_synthetic_turn_ends -= 1 + else: + self._finish_turn() break await self._queue.put(VoiceStreamEventLifecycle(event="session_ended")) diff --git a/tests/voice/test_pipeline.py b/tests/voice/test_pipeline.py index e121c21974..97de96ac4e 100644 --- a/tests/voice/test_pipeline.py +++ b/tests/voice/test_pipeline.py @@ -332,3 +332,22 @@ async def test_voicepipeline_empty_workflow_yield_emits_turn_ended() -> None: events, audio_chunks = await asyncio.wait_for(extract_events(result), timeout=2.0) assert events == ["turn_started", "turn_ended", "session_ended"] assert audio_chunks == [] + + +@pytest.mark.asyncio +async def test_voicepipeline_empty_turn_followed_by_real_turn_balances_lifecycle() -> None: + # An empty workflow yield followed by a non-empty turn must still produce + # `turn_started` for the second turn — otherwise the dispatcher's stale + # `turn_ended` could finish a span that belongs to the next turn. + fake_stt = FakeSTT(["first", "second"]) + workflow = FakeWorkflow([[""], ["hello"]]) + fake_tts = FakeTTS() + config = VoicePipelineConfig(tts_settings=TTSModelSettings(buffer_size=1)) + pipeline = VoicePipeline( + workflow=workflow, stt_model=fake_stt, tts_model=fake_tts, config=config + ) + streamed_audio_input = await FakeStreamedAudioInput.get(count=2) + result = await pipeline.run(streamed_audio_input) + events, _ = await asyncio.wait_for(extract_events(result), timeout=2.0) + lifecycle = [e for e in events if e in {"turn_started", "turn_ended"}] + assert lifecycle == ["turn_started", "turn_ended", "turn_started", "turn_ended"]