Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ each carrying a `"type"` field to dispatch on:

| Command | Event types |
| ------- | ----------- |
| `assembly stream --json` | `begin`, `turn`, `termination` |
| `assembly stream --json` | `begin`, `turn`, `termination` (with `--from-stdin`, a `source` event precedes each file's events) |
| `assembly agent --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `reply.started`, `transcript.agent`, `reply.done` |
| `assembly agent-cascade --json` | `session.ready`, `transcript.user.delta`, `transcript.user`, `reply.started`, `transcript.agent`, `reply.done` |
| `assembly dictate --json` | `utterance` |
Expand Down
10 changes: 10 additions & 0 deletions aai_cli/commands/stream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
[
("Stream from your microphone", "assembly stream"),
("Stream a file or URL in real time", "assembly stream recording.wav"),
("Stream a list of files in turn", "ls *.wav | assembly stream --from-stdin"),
("Stream the hosted sample", "assembly stream --sample"),
("Label speakers in the live transcript", "assembly stream --speaker-labels"),
(
Expand All @@ -52,6 +53,11 @@ def stream(
"PCM16/mono/16k on stdin. Omit to use the microphone.",
),
sample: bool = typer.Option(False, "--sample", help="Stream the hosted wildfires.mp3 sample"),
from_stdin: bool = typer.Option(
False,
"--from-stdin",
help="Read a list of audio files/URLs on stdin (one per line) and stream each in turn",
),
# audio capture
sample_rate: int | None = typer.Option(
None,
Expand Down Expand Up @@ -302,13 +308,17 @@ def stream(
Pass - as the source to read raw PCM16/mono/16k audio on stdin, e.g.
ffmpeg -i input.mp4 -f s16le -ar 16000 -ac 1 - | assembly stream -.

--from-stdin instead reads a list of file paths/URLs on stdin (one per line)
and streams each as its own realtime session, in turn.

--prompt biases the speech model. --llm runs a prompt over the live transcript
in-process, refreshing the answer on every finalized turn; for a separate step
instead, pipe the text out with -o text | assembly llm -f "…".
"""
opts = stream_exec.StreamOptions(
source=source,
sample=sample,
from_stdin=from_stdin,
sample_rate=sample_rate,
device=device,
system_audio=system_audio,
Expand Down
88 changes: 86 additions & 2 deletions aai_cli/commands/stream/_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from __future__ import annotations

import tempfile
from collections.abc import Iterable
from dataclasses import dataclass
from pathlib import Path

Expand All @@ -18,8 +19,8 @@

from aai_cli import code_gen
from aai_cli.app.context import AppState
from aai_cli.core import choices, client, config_builder, youtube
from aai_cli.core.errors import UsageError
from aai_cli.core import choices, client, config_builder, stdio, youtube
from aai_cli.core.errors import UsageError, mutually_exclusive
from aai_cli.core.microphone import MicrophoneSource
from aai_cli.streaming import turn_presets
from aai_cli.streaming.macos import MacSystemAudioSource
Expand All @@ -28,6 +29,7 @@
SourceOptions,
StreamSession,
resolve_output_modes,
stream_batch_sources,
validate_sources,
)
from aai_cli.streaming.sources import TARGET_RATE, FileSource, StdinSource
Expand All @@ -46,6 +48,7 @@ class StreamOptions:

source: str | None
sample: bool
from_stdin: bool
sample_rate: int | None
device: int | None
system_audio: bool
Expand Down Expand Up @@ -214,9 +217,90 @@ def _dispatch(session: StreamSession, opts: SourceOptions) -> None:
session.run(mic, mic.sample_rate)


def _collect_batch_sources(opts: StreamOptions, *, text_mode: bool) -> list[str]:
"""The newline-delimited source list for ``--from-stdin``, with the flag combos it
can't honor rejected first.

``--from-stdin`` reinterprets stdin as a list of file paths/URLs (one per line),
each streamed as its own realtime session — distinct from ``-`` (raw PCM bytes). It
therefore can't also take a positional source, ``--sample``, the mic/system-audio
inputs, the mic-only capture flags, or ``--show-code`` (which renders one source).
"""
mutually_exclusive(
("--from-stdin", True),
("a source argument", opts.source is not None),
("--sample", opts.sample),
suggestion="--from-stdin reads the source list from stdin; don't also pass one.",
)
mutually_exclusive(
("--from-stdin", True),
("--system-audio", opts.system_audio),
("--system-audio-only", opts.system_audio_only),
suggestion="--from-stdin streams files/URLs, not live capture.",
)
if opts.device is not None or opts.sample_rate is not None:
raise UsageError("--device and --sample-rate apply only to microphone input.")
mutually_exclusive(
("--from-stdin", True),
("--show-code", opts.show_code),
suggestion="--show-code renders one source; pass a single file or URL.",
)
mutually_exclusive(
("--llm", bool(opts.llm_prompt)),
("-o text", text_mode),
suggestion="--llm renders a live panel (or NDJSON when piped).",
)
sources = list(dict.fromkeys(stdio.iter_piped_stdin_lines())) # dedupe, keep order
if not sources:
raise UsageError(
"No sources received on stdin.",
suggestion="Pipe one path or URL per line, e.g. "
"ls *.wav | assembly stream --from-stdin.",
)
return sources


def _run_batch(opts: StreamOptions, state: AppState, *, json_mode: bool, text_mode: bool) -> None:
"""Stream a ``--from-stdin`` list of sources, one realtime session each, in turn."""
sources = _collect_batch_sources(opts, text_mode=text_mode)
api_key = state.resolve_api_key()
base_flags = opts.base_flags()
llm_prompts = list(opts.llm_prompt or [])
renderer = StreamRenderer(json_mode=json_mode, text_mode=text_mode)

def make_session() -> StreamSession:
return StreamSession(
api_key=api_key,
base_flags=base_flags,
overrides=opts.config_kv,
config_file=opts.config_file,
renderer=renderer,
follow=FollowRenderer(json_mode=json_mode) if llm_prompts else None,
llm_prompts=llm_prompts,
model=opts.model,
max_tokens=opts.max_tokens,
llm_interval=opts.llm_interval,
)

def open_source(source: str) -> tuple[Iterable[bytes], int]:
file_audio = FileSource(client.resolve_audio_source(source, sample=False))
return file_audio, file_audio.sample_rate

stream_batch_sources(
sources,
make_session=make_session,
open_source=open_source,
renderer=renderer,
json_mode=json_mode,
)


def run_stream(opts: StreamOptions, state: AppState, *, json_mode: bool) -> None:
"""Execute one `assembly stream` invocation from already-parsed flags."""
text_mode, json_mode = resolve_output_modes(opts.output_field, json_mode=json_mode)
if opts.from_stdin:
_run_batch(opts, state, json_mode=json_mode, text_mode=text_mode)
return
sources = opts.source_options()
base_flags = opts.base_flags()

Expand Down
15 changes: 14 additions & 1 deletion aai_cli/streaming/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,17 @@ class Termination(_StreamEvent):
source: str | None = None


Event = Begin | Turn | Termination
class Source(_StreamEvent):
"""A ``--from-stdin`` batch advanced to its next audio source.

Emitted once before each source's own ``begin``/``turn``/``termination`` events,
so a consumer can segment the NDJSON stream by source. ``index`` is 1-based.
"""

type: Literal["source"] = "source"
source: str
index: int
total: int


Event = Begin | Turn | Termination | Source
15 changes: 15 additions & 0 deletions aai_cli/streaming/render.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,21 @@ def listening(self) -> None:
elif not self.json_mode:
self._line(Text("Listening… (Ctrl-C to stop)", style="aai.muted"))

def source(self, source: str, *, index: int, total: int) -> None:
"""Announce the next source in a ``--from-stdin`` batch stream.

JSON mode emits a ``source`` event so consumers can segment the stream; text
mode writes a header to stderr (stdout stays pure transcript lines); human
mode prints a muted header above the upcoming turns.
"""
with self._lock:
if self.json_mode:
self._emit(events.Source(source=source, index=index, total=total).wire())
elif self.text_mode:
self._status(f"[{index}/{total}] {source}")
else:
self._line(Text(f"[{index}/{total}] {source}", style="aai.muted"))

def turn(self, event: object, *, source: str | None = None) -> None:
text = getattr(event, "transcript", "") or ""
end = bool(getattr(event, "end_of_turn", False))
Expand Down
91 changes: 86 additions & 5 deletions aai_cli/streaming/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
import typer

from aai_cli.core import choices, client, config_builder, llm
from aai_cli.core.errors import APIError, CLIError, UsageError, mutually_exclusive
from aai_cli.core.errors import (
APIError,
CLIError,
NotAuthenticated,
UsageError,
mutually_exclusive,
)
from aai_cli.streaming.render import StreamRenderer, speaker_prefix
from aai_cli.ui import output
from aai_cli.ui.follow import FollowRenderer
Expand Down Expand Up @@ -265,10 +271,15 @@ def stream_one(
),
)

def _guarded(self, work: Callable[[], None]) -> None:
def _guarded(self, work: Callable[[], None], *, handle_interrupt: bool = True) -> None:
"""Run a streaming body with the shared lifecycle handling: enter the
FollowRenderer's live panel if present, treat Ctrl-C as a clean stop, exit 0 on
a closed downstream pipe, and always close the renderer."""
a closed downstream pipe, and always close the renderer.

``handle_interrupt=False`` lets a Ctrl-C or a closed pipe propagate instead of
being swallowed here — the batch driver owns those signals across the whole
``--from-stdin`` sequence, so one Ctrl-C stops the batch rather than just
advancing to the next source."""
try:
if self.follow is not None:
with self.follow:
Expand All @@ -281,19 +292,33 @@ def _guarded(self, work: Callable[[], None]) -> None:
else:
work()
except KeyboardInterrupt:
if not handle_interrupt:
raise
# Ctrl-C is a normal "user stopped" signal -> exit 0.
if self.follow is None:
self.renderer.close()
self.renderer.stopped()
except BrokenPipeError:
if not handle_interrupt:
raise
# Downstream consumer (e.g. `| head`) closed the pipe; stop quietly.
raise typer.Exit(code=0) from None
finally:
if self.follow is None:
self.renderer.close()

def run(self, audio: Iterable[bytes], rate: int, *, source_label: str | None = None) -> None:
self._guarded(lambda: self.stream_one(audio, rate, source_label=source_label))
def run(
self,
audio: Iterable[bytes],
rate: int,
*,
source_label: str | None = None,
handle_interrupt: bool = True,
) -> None:
self._guarded(
lambda: self.stream_one(audio, rate, source_label=source_label),
handle_interrupt=handle_interrupt,
)

def run_parallel(self, streams: _ParallelStreams) -> None:
self._guarded(lambda: self._drive(streams))
Expand Down Expand Up @@ -331,3 +356,59 @@ def worker(source_label: str, audio: Iterable[bytes], rate: int) -> None:
raise errors.get()
if not errors.empty():
raise errors.get()


# A batch source string resolved to its real-time audio chunks and declared rate.
_OpenedSource = tuple[Iterable[bytes], int]


def stream_batch_sources(
sources: list[str],
*,
make_session: Callable[[], StreamSession],
open_source: Callable[[str], _OpenedSource],
renderer: StreamRenderer,
json_mode: bool,
) -> None:
"""Stream each source in ``sources`` in turn — the ``assembly stream --from-stdin``
batch mode.

The realtime API is one session at a time, so a list of files/URLs streams
sequentially: each source gets a fresh ``StreamSession`` from ``make_session`` (its
own transcript and ``--llm`` chain state) and is announced via ``renderer.source``
before its turns. ``open_source`` resolves a source string to ``(audio, rate)`` and
may raise ``CLIError`` (bad path, missing ffmpeg, decode failure), which is recorded
as a per-source failure so the batch carries on — except ``NotAuthenticated``, which
re-raises to abort the whole batch (one rejected key fails every source identically).

A Ctrl-C or a closed downstream pipe stops the batch cleanly (exit 0). When any
source failed, raises a ``CLIError`` at the end so a script can trust the exit code.
"""
total = len(sources)
failures: list[str] = []
try:
for index, source in enumerate(sources, start=1):
renderer.source(source, index=index, total=total)
try:
audio, rate = open_source(source)
make_session().run(audio, rate, handle_interrupt=False)
except NotAuthenticated:
raise
except CLIError as exc:
failures.append(source)
output.emit_warning(f"{source}: {exc.message}", json_mode=json_mode)
except KeyboardInterrupt:
# One Ctrl-C stops the whole batch, not just the current source -> exit 0.
renderer.stopped()
return
except BrokenPipeError:
# Downstream consumer (e.g. `| head`) closed the pipe; stop quietly.
raise typer.Exit(code=0) from None
finally:
renderer.close()
if failures:
raise CLIError(
f"{len(failures)} of {total} sources failed.",
error_type="batch_failed",
suggestion="Check each failed path or URL, then re-run.",
)
21 changes: 14 additions & 7 deletions tests/__snapshots__/test_snapshots_help_run.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,9 @@
Pass - as the source to read raw PCM16/mono/16k audio on stdin, e.g.
ffmpeg -i input.mp4 -f s16le -ar 16000 -ac 1 - | assembly stream -.

--from-stdin instead reads a list of file paths/URLs on stdin (one per line)
and streams each as its own realtime session, in turn.

--prompt biases the speech model. --llm runs a prompt over the live transcript
in-process, refreshing the answer on every finalized turn; for a separate step
instead, pipe the text out with -o text | assembly llm -f "…".
Expand All @@ -679,13 +682,15 @@
│ to use the microphone. │
╰──────────────────────────────────────────────────────────────────────────────╯
╭─ Options ────────────────────────────────────────────────────────────────────╮
│ --sample Stream the hosted wildfires.mp3 sample │
│ --json -j Emit newline-delimited JSON events │
│ --output -o [text|json] Output mode: text (finalized turns as │
│ plain lines, pipe-friendly) or json │
│ --show-code Print the equivalent Python SDK code and │
│ exit (does not stream) │
│ --help Show this message and exit. │
│ --sample Stream the hosted wildfires.mp3 sample │
│ --from-stdin Read a list of audio files/URLs on stdin │
│ (one per line) and stream each in turn │
│ --json -j Emit newline-delimited JSON events │
│ --output -o [text|json] Output mode: text (finalized turns as │
│ plain lines, pipe-friendly) or json │
│ --show-code Print the equivalent Python SDK code and │
│ exit (does not stream) │
│ --help Show this message and exit. │
╰──────────────────────────────────────────────────────────────────────────────╯
╭─ Audio Capture ──────────────────────────────────────────────────────────────╮
│ --sample-rate INTEGER RANGE [x>=1] Audio rate in Hz │
Expand Down Expand Up @@ -797,6 +802,8 @@
$ assembly stream
Stream a file or URL in real time
$ assembly stream recording.wav
Stream a list of files in turn
$ ls *.wav | assembly stream --from-stdin
Stream the hosted sample
$ assembly stream --sample
Label speakers in the live transcript
Expand Down
Loading
Loading