From 5607a8220c9c4928d09de63e5dba1d395f4dacac Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 16 Jun 2026 18:47:23 +0000 Subject: [PATCH] Add batch-streaming mode to `assembly stream` (--from-stdin) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `assembly stream --from-stdin` reads a newline-delimited list of audio files/URLs on stdin and streams each as its own realtime session, in turn — distinct from `-` (raw PCM16 bytes on stdin). The realtime API handles one session at a time, so the list streams sequentially: each source gets a fresh StreamSession (its own transcript and --llm chain state) and is announced via a header (human/text) or a `source` NDJSON event (--json) before its turns. A per-source failure (bad path, missing ffmpeg, decode error) is recorded and the batch carries on, raising at the end so a script can trust the exit code; NotAuthenticated aborts the whole batch. Ctrl-C or a closed downstream pipe stops the batch cleanly (exit 0) — `StreamSession.run` grows a `handle_interrupt` flag so the batch driver owns those signals across the whole sequence instead of each session swallowing them. --- REFERENCE.md | 2 +- aai_cli/commands/stream/__init__.py | 10 ++ aai_cli/commands/stream/_exec.py | 88 +++++++++- aai_cli/streaming/events.py | 15 +- aai_cli/streaming/render.py | 15 ++ aai_cli/streaming/session.py | 91 +++++++++- .../test_snapshots_help_run.ambr | 21 ++- tests/test_stream_batch.py | 161 ++++++++++++++++++ tests/test_stream_exec.py | 67 ++++++++ tests/test_streaming_events.py | 5 + tests/test_streaming_render.py | 32 ++++ 11 files changed, 491 insertions(+), 16 deletions(-) create mode 100644 tests/test_stream_batch.py diff --git a/REFERENCE.md b/REFERENCE.md index b45f7643..68ef0830 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -74,7 +74,7 @@ each carrying a `"type"` field to dispatch on: | Command | Event types | | ------- | ----------- | -| `assembly stream --json` | `begin`, `turn`, `termination` | +| `assembly stream --json` | `begin`, `turn`, `termination` (with `--from-stdin`, a `source` event precedes each file's events) | | `assembly agent --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `reply.started`, `transcript.agent`, `reply.done` | | `assembly agent-cascade --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `reply.started`, `transcript.agent`, `reply.done` | | `assembly dictate --json` | `utterance` | diff --git a/aai_cli/commands/stream/__init__.py b/aai_cli/commands/stream/__init__.py index 210ada1f..d9018b3a 100644 --- a/aai_cli/commands/stream/__init__.py +++ b/aai_cli/commands/stream/__init__.py @@ -30,6 +30,7 @@ [ ("Stream from your microphone", "assembly stream"), ("Stream a file or URL in real time", "assembly stream recording.wav"), + ("Stream a list of files in turn", "ls *.wav | assembly stream --from-stdin"), ("Stream the hosted sample", "assembly stream --sample"), ("Label speakers in the live transcript", "assembly stream --speaker-labels"), ( @@ -52,6 +53,11 @@ def stream( "PCM16/mono/16k on stdin. Omit to use the microphone.", ), sample: bool = typer.Option(False, "--sample", help="Stream the hosted wildfires.mp3 sample"), + from_stdin: bool = typer.Option( + False, + "--from-stdin", + help="Read a list of audio files/URLs on stdin (one per line) and stream each in turn", + ), # audio capture sample_rate: int | None = typer.Option( None, @@ -302,6 +308,9 @@ def stream( Pass - as the source to read raw PCM16/mono/16k audio on stdin, e.g. ffmpeg -i input.mp4 -f s16le -ar 16000 -ac 1 - | assembly stream -. + --from-stdin instead reads a list of file paths/URLs on stdin (one per line) + and streams each as its own realtime session, in turn. + --prompt biases the speech model. --llm runs a prompt over the live transcript in-process, refreshing the answer on every finalized turn; for a separate step instead, pipe the text out with -o text | assembly llm -f "…". @@ -309,6 +318,7 @@ def stream( opts = stream_exec.StreamOptions( source=source, sample=sample, + from_stdin=from_stdin, sample_rate=sample_rate, device=device, system_audio=system_audio, diff --git a/aai_cli/commands/stream/_exec.py b/aai_cli/commands/stream/_exec.py index 688b03ba..81c8aa50 100644 --- a/aai_cli/commands/stream/_exec.py +++ b/aai_cli/commands/stream/_exec.py @@ -10,6 +10,7 @@ from __future__ import annotations import tempfile +from collections.abc import Iterable from dataclasses import dataclass from pathlib import Path @@ -18,8 +19,8 @@ from aai_cli import code_gen from aai_cli.app.context import AppState -from aai_cli.core import choices, client, config_builder, youtube -from aai_cli.core.errors import UsageError +from aai_cli.core import choices, client, config_builder, stdio, youtube +from aai_cli.core.errors import UsageError, mutually_exclusive from aai_cli.core.microphone import MicrophoneSource from aai_cli.streaming import turn_presets from aai_cli.streaming.macos import MacSystemAudioSource @@ -28,6 +29,7 @@ SourceOptions, StreamSession, resolve_output_modes, + stream_batch_sources, validate_sources, ) from aai_cli.streaming.sources import TARGET_RATE, FileSource, StdinSource @@ -46,6 +48,7 @@ class StreamOptions: source: str | None sample: bool + from_stdin: bool sample_rate: int | None device: int | None system_audio: bool @@ -214,9 +217,90 @@ def _dispatch(session: StreamSession, opts: SourceOptions) -> None: session.run(mic, mic.sample_rate) +def _collect_batch_sources(opts: StreamOptions, *, text_mode: bool) -> list[str]: + """The newline-delimited source list for ``--from-stdin``, with the flag combos it + can't honor rejected first. + + ``--from-stdin`` reinterprets stdin as a list of file paths/URLs (one per line), + each streamed as its own realtime session — distinct from ``-`` (raw PCM bytes). It + therefore can't also take a positional source, ``--sample``, the mic/system-audio + inputs, the mic-only capture flags, or ``--show-code`` (which renders one source). + """ + mutually_exclusive( + ("--from-stdin", True), + ("a source argument", opts.source is not None), + ("--sample", opts.sample), + suggestion="--from-stdin reads the source list from stdin; don't also pass one.", + ) + mutually_exclusive( + ("--from-stdin", True), + ("--system-audio", opts.system_audio), + ("--system-audio-only", opts.system_audio_only), + suggestion="--from-stdin streams files/URLs, not live capture.", + ) + if opts.device is not None or opts.sample_rate is not None: + raise UsageError("--device and --sample-rate apply only to microphone input.") + mutually_exclusive( + ("--from-stdin", True), + ("--show-code", opts.show_code), + suggestion="--show-code renders one source; pass a single file or URL.", + ) + mutually_exclusive( + ("--llm", bool(opts.llm_prompt)), + ("-o text", text_mode), + suggestion="--llm renders a live panel (or NDJSON when piped).", + ) + sources = list(dict.fromkeys(stdio.iter_piped_stdin_lines())) # dedupe, keep order + if not sources: + raise UsageError( + "No sources received on stdin.", + suggestion="Pipe one path or URL per line, e.g. " + "ls *.wav | assembly stream --from-stdin.", + ) + return sources + + +def _run_batch(opts: StreamOptions, state: AppState, *, json_mode: bool, text_mode: bool) -> None: + """Stream a ``--from-stdin`` list of sources, one realtime session each, in turn.""" + sources = _collect_batch_sources(opts, text_mode=text_mode) + api_key = state.resolve_api_key() + base_flags = opts.base_flags() + llm_prompts = list(opts.llm_prompt or []) + renderer = StreamRenderer(json_mode=json_mode, text_mode=text_mode) + + def make_session() -> StreamSession: + return StreamSession( + api_key=api_key, + base_flags=base_flags, + overrides=opts.config_kv, + config_file=opts.config_file, + renderer=renderer, + follow=FollowRenderer(json_mode=json_mode) if llm_prompts else None, + llm_prompts=llm_prompts, + model=opts.model, + max_tokens=opts.max_tokens, + llm_interval=opts.llm_interval, + ) + + def open_source(source: str) -> tuple[Iterable[bytes], int]: + file_audio = FileSource(client.resolve_audio_source(source, sample=False)) + return file_audio, file_audio.sample_rate + + stream_batch_sources( + sources, + make_session=make_session, + open_source=open_source, + renderer=renderer, + json_mode=json_mode, + ) + + def run_stream(opts: StreamOptions, state: AppState, *, json_mode: bool) -> None: """Execute one `assembly stream` invocation from already-parsed flags.""" text_mode, json_mode = resolve_output_modes(opts.output_field, json_mode=json_mode) + if opts.from_stdin: + _run_batch(opts, state, json_mode=json_mode, text_mode=text_mode) + return sources = opts.source_options() base_flags = opts.base_flags() diff --git a/aai_cli/streaming/events.py b/aai_cli/streaming/events.py index 39501a3a..72846fd9 100644 --- a/aai_cli/streaming/events.py +++ b/aai_cli/streaming/events.py @@ -66,4 +66,17 @@ class Termination(_StreamEvent): source: str | None = None -Event = Begin | Turn | Termination +class Source(_StreamEvent): + """A ``--from-stdin`` batch advanced to its next audio source. + + Emitted once before each source's own ``begin``/``turn``/``termination`` events, + so a consumer can segment the NDJSON stream by source. ``index`` is 1-based. + """ + + type: Literal["source"] = "source" + source: str + index: int + total: int + + +Event = Begin | Turn | Termination | Source diff --git a/aai_cli/streaming/render.py b/aai_cli/streaming/render.py index e2a720a5..5e7ac867 100644 --- a/aai_cli/streaming/render.py +++ b/aai_cli/streaming/render.py @@ -104,6 +104,21 @@ def listening(self) -> None: elif not self.json_mode: self._line(Text("Listening… (Ctrl-C to stop)", style="aai.muted")) + def source(self, source: str, *, index: int, total: int) -> None: + """Announce the next source in a ``--from-stdin`` batch stream. + + JSON mode emits a ``source`` event so consumers can segment the stream; text + mode writes a header to stderr (stdout stays pure transcript lines); human + mode prints a muted header above the upcoming turns. + """ + with self._lock: + if self.json_mode: + self._emit(events.Source(source=source, index=index, total=total).wire()) + elif self.text_mode: + self._status(f"[{index}/{total}] {source}") + else: + self._line(Text(f"[{index}/{total}] {source}", style="aai.muted")) + def turn(self, event: object, *, source: str | None = None) -> None: text = getattr(event, "transcript", "") or "" end = bool(getattr(event, "end_of_turn", False)) diff --git a/aai_cli/streaming/session.py b/aai_cli/streaming/session.py index 64601c8c..4ad173bd 100644 --- a/aai_cli/streaming/session.py +++ b/aai_cli/streaming/session.py @@ -10,7 +10,13 @@ import typer from aai_cli.core import choices, client, config_builder, llm -from aai_cli.core.errors import APIError, CLIError, UsageError, mutually_exclusive +from aai_cli.core.errors import ( + APIError, + CLIError, + NotAuthenticated, + UsageError, + mutually_exclusive, +) from aai_cli.streaming.render import StreamRenderer, speaker_prefix from aai_cli.ui import output from aai_cli.ui.follow import FollowRenderer @@ -265,10 +271,15 @@ def stream_one( ), ) - def _guarded(self, work: Callable[[], None]) -> None: + def _guarded(self, work: Callable[[], None], *, handle_interrupt: bool = True) -> None: """Run a streaming body with the shared lifecycle handling: enter the FollowRenderer's live panel if present, treat Ctrl-C as a clean stop, exit 0 on - a closed downstream pipe, and always close the renderer.""" + a closed downstream pipe, and always close the renderer. + + ``handle_interrupt=False`` lets a Ctrl-C or a closed pipe propagate instead of + being swallowed here — the batch driver owns those signals across the whole + ``--from-stdin`` sequence, so one Ctrl-C stops the batch rather than just + advancing to the next source.""" try: if self.follow is not None: with self.follow: @@ -281,19 +292,33 @@ def _guarded(self, work: Callable[[], None]) -> None: else: work() except KeyboardInterrupt: + if not handle_interrupt: + raise # Ctrl-C is a normal "user stopped" signal -> exit 0. if self.follow is None: self.renderer.close() self.renderer.stopped() except BrokenPipeError: + if not handle_interrupt: + raise # Downstream consumer (e.g. `| head`) closed the pipe; stop quietly. raise typer.Exit(code=0) from None finally: if self.follow is None: self.renderer.close() - def run(self, audio: Iterable[bytes], rate: int, *, source_label: str | None = None) -> None: - self._guarded(lambda: self.stream_one(audio, rate, source_label=source_label)) + def run( + self, + audio: Iterable[bytes], + rate: int, + *, + source_label: str | None = None, + handle_interrupt: bool = True, + ) -> None: + self._guarded( + lambda: self.stream_one(audio, rate, source_label=source_label), + handle_interrupt=handle_interrupt, + ) def run_parallel(self, streams: _ParallelStreams) -> None: self._guarded(lambda: self._drive(streams)) @@ -331,3 +356,59 @@ def worker(source_label: str, audio: Iterable[bytes], rate: int) -> None: raise errors.get() if not errors.empty(): raise errors.get() + + +# A batch source string resolved to its real-time audio chunks and declared rate. +_OpenedSource = tuple[Iterable[bytes], int] + + +def stream_batch_sources( + sources: list[str], + *, + make_session: Callable[[], StreamSession], + open_source: Callable[[str], _OpenedSource], + renderer: StreamRenderer, + json_mode: bool, +) -> None: + """Stream each source in ``sources`` in turn — the ``assembly stream --from-stdin`` + batch mode. + + The realtime API is one session at a time, so a list of files/URLs streams + sequentially: each source gets a fresh ``StreamSession`` from ``make_session`` (its + own transcript and ``--llm`` chain state) and is announced via ``renderer.source`` + before its turns. ``open_source`` resolves a source string to ``(audio, rate)`` and + may raise ``CLIError`` (bad path, missing ffmpeg, decode failure), which is recorded + as a per-source failure so the batch carries on — except ``NotAuthenticated``, which + re-raises to abort the whole batch (one rejected key fails every source identically). + + A Ctrl-C or a closed downstream pipe stops the batch cleanly (exit 0). When any + source failed, raises a ``CLIError`` at the end so a script can trust the exit code. + """ + total = len(sources) + failures: list[str] = [] + try: + for index, source in enumerate(sources, start=1): + renderer.source(source, index=index, total=total) + try: + audio, rate = open_source(source) + make_session().run(audio, rate, handle_interrupt=False) + except NotAuthenticated: + raise + except CLIError as exc: + failures.append(source) + output.emit_warning(f"{source}: {exc.message}", json_mode=json_mode) + except KeyboardInterrupt: + # One Ctrl-C stops the whole batch, not just the current source -> exit 0. + renderer.stopped() + return + except BrokenPipeError: + # Downstream consumer (e.g. `| head`) closed the pipe; stop quietly. + raise typer.Exit(code=0) from None + finally: + renderer.close() + if failures: + raise CLIError( + f"{len(failures)} of {total} sources failed.", + error_type="batch_failed", + suggestion="Check each failed path or URL, then re-run.", + ) diff --git a/tests/__snapshots__/test_snapshots_help_run.ambr b/tests/__snapshots__/test_snapshots_help_run.ambr index 74c27476..08bd8bd3 100644 --- a/tests/__snapshots__/test_snapshots_help_run.ambr +++ b/tests/__snapshots__/test_snapshots_help_run.ambr @@ -669,6 +669,9 @@ Pass - as the source to read raw PCM16/mono/16k audio on stdin, e.g. ffmpeg -i input.mp4 -f s16le -ar 16000 -ac 1 - | assembly stream -. + --from-stdin instead reads a list of file paths/URLs on stdin (one per line) + and streams each as its own realtime session, in turn. + --prompt biases the speech model. --llm runs a prompt over the live transcript in-process, refreshing the answer on every finalized turn; for a separate step instead, pipe the text out with -o text | assembly llm -f "…". @@ -679,13 +682,15 @@ │ to use the microphone. │ ╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ Options ────────────────────────────────────────────────────────────────────╮ - │ --sample Stream the hosted wildfires.mp3 sample │ - │ --json -j Emit newline-delimited JSON events │ - │ --output -o [text|json] Output mode: text (finalized turns as │ - │ plain lines, pipe-friendly) or json │ - │ --show-code Print the equivalent Python SDK code and │ - │ exit (does not stream) │ - │ --help Show this message and exit. │ + │ --sample Stream the hosted wildfires.mp3 sample │ + │ --from-stdin Read a list of audio files/URLs on stdin │ + │ (one per line) and stream each in turn │ + │ --json -j Emit newline-delimited JSON events │ + │ --output -o [text|json] Output mode: text (finalized turns as │ + │ plain lines, pipe-friendly) or json │ + │ --show-code Print the equivalent Python SDK code and │ + │ exit (does not stream) │ + │ --help Show this message and exit. │ ╰──────────────────────────────────────────────────────────────────────────────╯ ╭─ Audio Capture ──────────────────────────────────────────────────────────────╮ │ --sample-rate INTEGER RANGE [x>=1] Audio rate in Hz │ @@ -797,6 +802,8 @@ $ assembly stream Stream a file or URL in real time $ assembly stream recording.wav + Stream a list of files in turn + $ ls *.wav | assembly stream --from-stdin Stream the hosted sample $ assembly stream --sample Label speakers in the live transcript diff --git a/tests/test_stream_batch.py b/tests/test_stream_batch.py new file mode 100644 index 00000000..da586b24 --- /dev/null +++ b/tests/test_stream_batch.py @@ -0,0 +1,161 @@ +"""Batch streaming: `assembly stream --from-stdin` reads a list of audio files/URLs +on stdin and streams each as its own realtime session, in turn. + +These drive the whole command through CliRunner with the file-resolution/streaming +boundary faked, so no real audio (or network) is needed — the focus is the +sequencing, the per-source failure handling, and the Ctrl-C/broken-pipe lifecycle +that distinguish the batch driver from a single stream. +""" + +import json +import types + +from typer.testing import CliRunner + +from aai_cli.core import config +from aai_cli.main import app + +runner = CliRunner() + + +class _FakeFile: + """A FileSource stand-in: yields one chunk; raises CLIError for a named source.""" + + def __init__(self, source): + from aai_cli.core.errors import CLIError + + if source == "missing.wav": + raise CLIError(f"File not found: {source}", error_type="file_not_found", exit_code=2) + self.source = source + self.sample_rate = 16000 + + def __iter__(self): + return iter([b"\x00\x00"]) + + +def _patch_batch_inputs(monkeypatch, fake_stream_audio): + """Wire the file-resolution/streaming boundary so --from-stdin needs no real files.""" + monkeypatch.setattr("aai_cli.commands.stream._exec.FileSource", _FakeFile) + monkeypatch.setattr( + "aai_cli.commands.stream._exec.client.resolve_audio_source", + lambda source, *, sample=False: source, + ) + monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", fake_stream_audio) + + +def test_stream_from_stdin_streams_each_source_in_turn(monkeypatch): + config.set_api_key("default", "sk_live") + streamed = [] + + def fake_stream_audio(api_key, source, *, params, on_turn=None, **_kwargs): + streamed.append(source.source) + if on_turn: + on_turn(types.SimpleNamespace(transcript=source.source, end_of_turn=True)) + + _patch_batch_inputs(monkeypatch, fake_stream_audio) + result = runner.invoke(app, ["stream", "--from-stdin", "--json"], input="a.wav\nb.wav\n") + assert result.exit_code == 0 + # Sequential, in stdin order — the realtime API is one session at a time. + assert streamed == ["a.wav", "b.wav"] + lines = [json.loads(x) for x in result.output.splitlines() if x.strip()] + assert {"type": "source", "source": "a.wav", "index": 1, "total": 2} in lines + assert {"type": "source", "source": "b.wav", "index": 2, "total": 2} in lines + assert {"type": "turn", "transcript": "a.wav", "end_of_turn": True} in lines + + +def test_stream_from_stdin_resolves_each_source_not_the_hosted_sample(monkeypatch): + # --from-stdin sources are real files/URLs, so each is resolved with sample=False + # — never coerced to the hosted --sample clip. + config.set_api_key("default", "sk_live") + sample_flags = [] + + def recording_resolve(source, *, sample=False): + sample_flags.append(sample) + return source + + def fake_stream_audio(api_key, source, *, params, **_kwargs): + pass + + monkeypatch.setattr("aai_cli.commands.stream._exec.FileSource", _FakeFile) + monkeypatch.setattr( + "aai_cli.commands.stream._exec.client.resolve_audio_source", recording_resolve + ) + monkeypatch.setattr("aai_cli.commands.stream._exec.client.stream_audio", fake_stream_audio) + result = runner.invoke(app, ["stream", "--from-stdin"], input="a.wav\nb.wav\n") + assert result.exit_code == 0 + assert sample_flags == [False, False] + + +def test_stream_from_stdin_failed_source_is_recorded_and_batch_continues(monkeypatch): + config.set_api_key("default", "sk_live") + streamed = [] + + def fake_stream_audio(api_key, source, *, params, **_kwargs): + streamed.append(source.source) + + _patch_batch_inputs(monkeypatch, fake_stream_audio) + result = runner.invoke(app, ["stream", "--from-stdin"], input="missing.wav\nb.wav\n") + # The good source still streamed; the batch fails (exit 1) because one source did. + assert streamed == ["b.wav"] + assert result.exit_code == 1 + assert "1 of 2 sources failed" in result.output + assert "missing.wav" in result.output + + +def test_stream_from_stdin_not_authenticated_aborts_the_whole_batch(monkeypatch): + config.set_api_key("default", "sk_live") + from aai_cli.core.errors import NotAuthenticated + + streamed = [] + + def fake_stream_audio(api_key, source, *, params, **_kwargs): + streamed.append(source.source) + raise NotAuthenticated("rejected") + + _patch_batch_inputs(monkeypatch, fake_stream_audio) + result = runner.invoke(app, ["stream", "--from-stdin"], input="a.wav\nb.wav\n") + # One rejected key fails every source identically, so the batch aborts at the first. + assert streamed == ["a.wav"] + assert result.exit_code != 0 + + +def test_stream_from_stdin_keyboard_interrupt_stops_the_batch(monkeypatch): + config.set_api_key("default", "sk_live") + streamed = [] + + def fake_stream_audio(api_key, source, *, params, **_kwargs): + streamed.append(source.source) + raise KeyboardInterrupt + + _patch_batch_inputs(monkeypatch, fake_stream_audio) + result = runner.invoke(app, ["stream", "--from-stdin"], input="a.wav\nb.wav\n") + # One Ctrl-C stops the whole batch (exit 0), not just the current source. + assert streamed == ["a.wav"] + assert result.exit_code == 0 + assert "Stopped." in result.output + + +def test_stream_from_stdin_broken_pipe_exits_zero(monkeypatch): + config.set_api_key("default", "sk_live") + streamed = [] + + def fake_stream_audio(api_key, source, *, params, **_kwargs): + streamed.append(source.source) + raise BrokenPipeError + + _patch_batch_inputs(monkeypatch, fake_stream_audio) + result = runner.invoke(app, ["stream", "--from-stdin"], input="a.wav\nb.wav\n") + assert streamed == ["a.wav"] + assert result.exit_code == 0 + + +def test_stream_from_stdin_empty_pipe_is_a_usage_error(monkeypatch): + config.set_api_key("default", "sk_live") + + def fake_stream_audio(api_key, source, *, params, **_kwargs): + raise AssertionError("nothing should stream from an empty pipe") + + _patch_batch_inputs(monkeypatch, fake_stream_audio) + result = runner.invoke(app, ["stream", "--from-stdin"], input="") + assert result.exit_code == 2 + assert "No sources received on stdin" in result.output diff --git a/tests/test_stream_exec.py b/tests/test_stream_exec.py index 5781fea5..90b93983 100644 --- a/tests/test_stream_exec.py +++ b/tests/test_stream_exec.py @@ -23,6 +23,7 @@ DEFAULTS = stream_exec.StreamOptions( source=None, sample=False, + from_stdin=False, sample_rate=None, device=None, system_audio=False, @@ -155,3 +156,69 @@ def test_stream_options_are_immutable(): field_name = "sample" with pytest.raises(dataclasses.FrozenInstanceError): setattr(DEFAULTS, field_name, True) + + +# --- batch streaming (--from-stdin) validation ----------------------------- +# Each conflict is rejected before stdin is read, so these raise without a pipe. +@pytest.mark.parametrize( + "overrides", + [ + {"from_stdin": True, "source": "a.wav"}, # a positional source + {"from_stdin": True, "sample": True}, # the hosted sample + {"from_stdin": True, "system_audio": True}, # live system capture + {"from_stdin": True, "system_audio_only": True}, + {"from_stdin": True, "device": 2}, # mic-only capture flags + {"from_stdin": True, "sample_rate": 44100}, + {"from_stdin": True, "show_code": True}, # renders one source + ], +) +def test_from_stdin_rejects_incompatible_flags(overrides): + with pytest.raises(UsageError): + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, **overrides), AppState(), json_mode=False + ) + + +def test_from_stdin_rejects_llm_with_text_output(): + # --llm renders a live panel; -o text is a contradictory output shape. + from aai_cli.core import choices + + with pytest.raises(UsageError): + stream_exec.run_stream( + dataclasses.replace( + DEFAULTS, + from_stdin=True, + llm_prompt=["summarize"], + output_field=choices.TextOrJson.text, + ), + AppState(), + json_mode=False, + ) + + +def test_from_stdin_empty_stdin_is_a_usage_error(monkeypatch): + # An empty pipe (nothing to stream) is a clean usage error, not a silent no-op. + monkeypatch.setattr(stream_exec.stdio, "iter_piped_stdin_lines", lambda: iter([])) + with pytest.raises(UsageError): + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, from_stdin=True), AppState(), json_mode=True + ) + + +def test_from_stdin_dedupes_sources_keeping_order(monkeypatch): + # Duplicate lines stream once, in first-seen order — the batch driver receives the + # deduped list (mirrors `transcribe --from-stdin`). + config.set_api_key("default", "sk_live") + monkeypatch.setattr( + stream_exec.stdio, "iter_piped_stdin_lines", lambda: iter(["a.wav", "a.wav", "b.wav"]) + ) + seen: dict[str, list[str]] = {"sources": []} + + def fake_stream_batch(sources, *, make_session, open_source, renderer, json_mode): + seen["sources"] = list(sources) + + monkeypatch.setattr(stream_exec, "stream_batch_sources", fake_stream_batch) + stream_exec.run_stream( + dataclasses.replace(DEFAULTS, from_stdin=True), AppState(), json_mode=True + ) + assert seen["sources"] == ["a.wav", "b.wav"] diff --git a/tests/test_streaming_events.py b/tests/test_streaming_events.py index 28f1bd68..489d7fd7 100644 --- a/tests/test_streaming_events.py +++ b/tests/test_streaming_events.py @@ -45,6 +45,11 @@ events.Termination(audio_duration_seconds=None), {"type": "termination", "audio_duration_seconds": None}, ), + # the batch (--from-stdin) source marker carries the 1-based position. + ( + events.Source(source="a.wav", index=1, total=3), + {"type": "source", "source": "a.wav", "index": 1, "total": 3}, + ), ], ) def test_wire_record(event: events.Event, expected: dict[str, object]): diff --git a/tests/test_streaming_render.py b/tests/test_streaming_render.py index 5ca462b3..476f369b 100644 --- a/tests/test_streaming_render.py +++ b/tests/test_streaming_render.py @@ -112,6 +112,38 @@ def test_text_mode_labels_sources_and_statuses_to_stderr(): assert "Stopped." in err.getvalue() +def test_source_header_human_mode_prints_position_and_name(): + r, buf = _human() + r.source("a.wav", index=2, total=3) + r.close() + out = buf.getvalue() + assert "[2/3]" in out + assert "a.wav" in out + + +def test_source_header_json_mode_emits_source_event(): + out = io.StringIO() + r = StreamRenderer(json_mode=True, out=out) + r.source("a.wav", index=1, total=2) + assert json.loads(out.getvalue()) == { + "type": "source", + "source": "a.wav", + "index": 1, + "total": 2, + } + + +def test_source_header_text_mode_goes_to_stderr_not_stdout(): + # Text mode keeps stdout pure transcript lines, so the batch header is a stderr + # status — a downstream `| assembly llm` pipeline never sees it. + out = io.StringIO() + err = io.StringIO() + r = StreamRenderer(json_mode=False, text_mode=True, out=out, err=err) + r.source("a.wav", index=1, total=2) + assert out.getvalue() == "" + assert "[1/2] a.wav" in err.getvalue() + + def test_human_begin_is_silent_until_mic_opens(): # The session opening (Begin) no longer prints "Listening…"; that waits for # the mic to actually open and start recording (renderer.listening()).