Skip to content

Add batch streaming mode: assembly stream --from-stdin#181

Merged
alexkroman merged 1 commit into
mainfrom
claude/compassionate-cori-emokef
Jun 16, 2026
Merged

Add batch streaming mode: assembly stream --from-stdin#181
alexkroman merged 1 commit into
mainfrom
claude/compassionate-cori-emokef

Conversation

@alexkroman

Copy link
Copy Markdown
Collaborator

Implements batch streaming for the assembly stream command, allowing users to pipe a list of audio file paths or URLs (one per line) to stdin and stream each as its own realtime session in turn.

Summary

This adds the --from-stdin flag to assembly stream, which reads a newline-delimited list of audio sources from stdin and streams each sequentially with its own transcript and LLM chain state. This complements the existing - (raw PCM) stdin mode and enables workflows like ls *.wav | assembly stream --from-stdin.

Key Changes

Core batch streaming logic:

  • Added stream_batch_sources() function in aai_cli/streaming/session.py that orchestrates sequential streaming of multiple sources with per-source error handling
  • Batch continues on CLIError (file not found, decode failure) but aborts on NotAuthenticated (rejected API key applies to all sources)
  • Ctrl-C and broken pipe stop the batch cleanly (exit 0); other failures raise at the end so scripts can trust the exit code

Command integration:

  • Added --from-stdin flag to assembly stream command
  • Implemented _run_batch() and _collect_batch_sources() in aai_cli/commands/stream/_exec.py to validate flag combinations and collect sources
  • Rejects incompatible flags: positional source, --sample, --system-audio, --device, --sample-rate, --show-code
  • Deduplicates sources while preserving order (mirrors transcribe --from-stdin behavior)

Streaming session updates:

  • Extended StreamSession.run() and _guarded() with handle_interrupt parameter so batch driver owns Ctrl-C/pipe signals across the whole sequence (one Ctrl-C stops the batch, not just the current source)
  • Added StreamRenderer.source() method to announce each source in the batch with position (e.g., [2/3] file.wav)

Event and output handling:

  • Added Source event type to aai_cli/streaming/events.py for JSON mode segmentation
  • JSON mode emits {"type": "source", "source": "...", "index": 1, "total": 2} before each source's events
  • Text mode writes source headers to stderr (stdout stays pure transcript lines for piping)
  • Human mode prints muted source headers above turns

Validation and error handling:

  • Empty stdin raises UsageError with helpful suggestion
  • Per-source failures logged as warnings; batch summary at end if any failed
  • NotAuthenticated re-raises immediately to abort (one rejected key fails every source)

Testing

Added comprehensive test suite (tests/test_stream_batch.py) covering:

  • Sequential streaming in stdin order
  • Per-source resolution with sample=False (never coerced to hosted sample)
  • Failed source handling (batch continues, exit 1 at end)
  • Authentication failure (aborts immediately)
  • Ctrl-C and broken pipe lifecycle
  • Empty stdin validation

Updated existing tests to include from_stdin=False in defaults and added validation tests for flag conflicts.

Notable Implementation Details

  • Batch sources are deduplicated using dict.fromkeys() to preserve order while removing duplicates
  • Each source gets a fresh StreamSession from make_session() so transcripts and LLM state don't bleed between sources
  • The open_source callback resolves sources with sample=False to ensure real files/URLs are used, never the hosted sample
  • Renderer is shared across all sources so JSON output is a continuous NDJSON stream with source events marking boundaries

https://claude.ai/code/session_01AA7v87iZNvfeGKhs2czBqq

`assembly stream --from-stdin` reads a newline-delimited list of audio
files/URLs on stdin and streams each as its own realtime session, in
turn — distinct from `-` (raw PCM16 bytes on stdin). The realtime API
handles one session at a time, so the list streams sequentially: each
source gets a fresh StreamSession (its own transcript and --llm chain
state) and is announced via a header (human/text) or a `source` NDJSON
event (--json) before its turns.

A per-source failure (bad path, missing ffmpeg, decode error) is recorded
and the batch carries on, raising at the end so a script can trust the
exit code; NotAuthenticated aborts the whole batch. Ctrl-C or a closed
downstream pipe stops the batch cleanly (exit 0) — `StreamSession.run`
grows a `handle_interrupt` flag so the batch driver owns those signals
across the whole sequence instead of each session swallowing them.
@alexkroman alexkroman enabled auto-merge June 16, 2026 18:59
@alexkroman alexkroman added this pull request to the merge queue Jun 16, 2026
Merged via the queue into main with commit 5d9191a Jun 16, 2026
19 checks passed
@alexkroman alexkroman deleted the claude/compassionate-cori-emokef branch June 16, 2026 19:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants