fix: detect and recover from audio input stream stalls (#6075)#6255
fix: detect and recover from audio input stream stalls (#6075)#6255C1-BA-B1-F3 wants to merge 1 commit into
Conversation
When a microphone cable becomes loose (partial disconnection), the audio stream silently stops producing frames. The _forward_task loop blocked indefinitely on the async iterator with no timeout or recovery mechanism. Add a watchdog-based stall detection to _ParticipantInputStream that: - Monitors time since last received frame via a concurrent watchdog task - Logs a warning when no frames arrive within stall_timeout (default 10s) - Closes the stalled stream and reopens it from the same track - Retries up to 3 times before giving up The stall detection is inherited by _ParticipantAudioInputStream through the super()._forward_task() call chain. Fixes livekit#6075 Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
There was a problem hiding this comment.
π© Video streams get stall detection with default 10s timeout and no way to configure it
_ParticipantVideoInputStream.__init__ at livekit-agents/livekit/agents/voice/room_io/_input.py:476-485 does not accept or pass a stall_timeout parameter, so it inherits the default 10-second stall detection from the base class. This may be appropriate for audio but could cause false positives for video streams (e.g., screen share sources that go idle). Worth verifying whether stall detection is desired for video at all, or if it should be audio-only.
(Refers to lines 475-489)
Was this helpful? React with π or π to provide feedback.
| async for event in stream: | ||
| if not self._attached: | ||
| # drop frames if the stream is detached | ||
| continue | ||
| last_frame_time = time.time() | ||
| frame = cast(T, event.frame) | ||
| self._process_frame(frame) | ||
| await self._data_ch.send(frame) |
There was a problem hiding this comment.
π΄ Audio input falsely detected as stalled when listening is temporarily paused, permanently killing the audio stream
Received frames do not reset the stall timer when the stream is paused (continue at livekit-agents/livekit/agents/voice/room_io/_input.py:235 skips the timestamp update at line 236), so the watchdog closes and reopens the stream until all recovery attempts are exhausted and the forwarding loop exits permanently.
Impact: After audio input is disabled for longer than 10 seconds (e.g., during a warm transfer hold), re-enabling audio produces silence because no forwarding task is running.
Mechanism: detach flag blocks timestamp update, triggering false stall recovery
When set_audio_enabled(False) is called (e.g., warm_transfer.py:185), on_detached() sets self._attached = False. Inside _read_stream_with_stall_detection, the async for event in stream loop hits the if not self._attached: continue guard at line 233-235, which correctly drops the frame β but also skips last_frame_time = time.time() at line 236.
The watchdog (lines 206-227) compares time.time() - last_frame_time against self._stall_timeout. Since last_frame_time is never refreshed while detached, after stall_timeout seconds (default 10s) the watchdog calls await stream.aclose() at line 226.
The outer retry loop in _forward_task (lines 159-193) then tries to recover by creating a new stream. But the new stream also encounters the same detached state, so its watchdog also fires. After max_recoveries=3 failed attempts, the while loop exits and _forward_task returns.
When the warm transfer completes and _set_io_enabled(True) is called (warm_transfer.py:266), on_attached() sets self._attached = True, but there is no longer a running _forward_task to read frames from the track β audio is permanently dead for this session.
| async for event in stream: | |
| if not self._attached: | |
| # drop frames if the stream is detached | |
| continue | |
| last_frame_time = time.time() | |
| frame = cast(T, event.frame) | |
| self._process_frame(frame) | |
| await self._data_ch.send(frame) | |
| async for event in stream: | |
| last_frame_time = time.time() | |
| if not self._attached: | |
| # drop frames if the stream is detached | |
| continue | |
| frame = cast(T, event.frame) | |
| self._process_frame(frame) | |
| await self._data_ch.send(frame) |
Was this helpful? React with π or π to provide feedback.
| except asyncio.CancelledError: | ||
| raise | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
π© Stream errors silently swallowed during stall detection
The except Exception: pass block at livekit-agents/livekit/agents/voice/room_io/_input.py:242-243 catches and silently discards all non-cancellation exceptions from the stream iteration. In the previous code, such exceptions would propagate up through _forward_task and be logged by the @log_exceptions decorator. Now they are invisible. This was likely added to handle exceptions raised when the watchdog closes the stream mid-iteration, but it also swallows legitimate stream errors (e.g., codec failures, protocol errors). Consider at minimum logging the exception at debug/warning level before suppressing it.
Was this helpful? React with π or π to provide feedback.
Summary
Fixes #6075 β When a microphone cable becomes loose (partial disconnection), the audio stream silently stops producing frames and the agent never recovers.
Root Cause
_ParticipantInputStream._forward_taskusesasync for event in stream:which blocks indefinitely when the underlying audio source stops producing frames. No timeout, no error, no recovery.Fix
Added a watchdog-based stall detection mechanism to
_ParticipantInputStream:stall_timeout(default 10s), the watchdog logs a warning and closes the stalled stream_forward_taskloop then attempts to reopen the stream from the same track (up to 3 retries)The stall detection is inherited by
_ParticipantAudioInputStreamthrough thesuper()._forward_task()call chain. Thestall_timeoutis configurable via the constructor.Changes
livekit-agents/livekit/agents/voice/room_io/_input.py:stall_timeout: float = 10.0parameter to_ParticipantInputStream.__init___forward_taskinto a recovery loop +_read_stream_with_stall_detectionhelperasyncio.wait_forwithasyncio.shieldon a cancellation event_create_streamwhen the track is still availabletests/test_room_io.py:test_stall_detection_and_recoveryβ verifies stall is detected and stream is reopenedtest_stall_detection_max_retries_exhaustedβ verifies graceful exit after 3 failed recovery attemptstest_no_stall_normal_streamβ verifies normal streams are unaffectedTesting
All 10 tests pass:
π€ Generated with Claude Code