Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ That's it. Run `assembly onboard` for a guided tour, or see [Installation](#-ins
- **🎯 One command for everything**: transcription, real-time streaming, voice agents, LLM prompts, and WER benchmarking — no SDK boilerplate.
- **🔌 Built for pipelines**: data goes to stdout, errors to stderr, `--json` gives stable machine-readable output, and `-` reads audio from stdin.
- **🔐 Secure by default**: your API key lives in the OS keyring, never in a dotfile — and run commands have no `--api-key` flag, so keys can't leak into `ps` or shell history.
- **🛠️ From demo to deployed app**: `assembly init` scaffolds a runnable FastAPI starter, `assembly dev` / `share` / `deploy` run, tunnel, and ship it, and `--show-code` prints the equivalent Python SDK script for any run command.
- **🛠️ From demo to deployed app**: `assembly init` scaffolds a runnable FastAPI starter, `assembly dev` / `share` / `deploy` run, tunnel, and ship it, and `--show-code` prints the equivalent Python SDK script for any run command (`transcribe` / `stream` / `agent` / `agent-cascade`).
- **🤖 Agent-ready**: `assembly setup install` wires your coding agent up with the AssemblyAI docs MCP server and skills.
- **📖 Open source**: MIT licensed.

Expand All @@ -62,7 +62,7 @@ That's it. Run `assembly onboard` for a guided tour, or see [Installation](#-ins
| `assembly transcripts` / `sessions` | Browse and fetch past transcripts and streaming sessions |
| `assembly keys` / `balance` / `usage` / `limits` / `audit` | Account self-service via browser login |

Add `--show-code` to `transcribe` / `stream` / `agent` to print the equivalent Python SDK script instead of running — the built-in path from CLI experiment to SDK code.
Add `--show-code` to `transcribe` / `stream` / `agent` / `agent-cascade` to print the equivalent Python SDK script instead of running — the built-in path from CLI experiment to SDK code.

## ✨ Things you can do with it

Expand Down Expand Up @@ -152,7 +152,7 @@ printf '%s\n' \
assembly agent --voice ivy --system-prompt "you're a helpful interviewer"
```

**Graduate to the SDK** — `--show-code` prints the equivalent Python script for any `transcribe`/`stream`/`agent` run instead of executing it:
**Graduate to the SDK** — `--show-code` prints the equivalent Python script for any `transcribe`/`stream`/`agent`/`agent-cascade` run instead of executing it:

```sh
assembly agent --system-prompt "you're a story generator" --show-code > story.py
Expand Down
17 changes: 17 additions & 0 deletions aai_cli/code_gen/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
from __future__ import annotations

from typing import TYPE_CHECKING

from aai_cli.code_gen import agent as _agent
from aai_cli.code_gen import agent_cascade as _agent_cascade
from aai_cli.code_gen import stream as _stream
from aai_cli.code_gen import transcribe as _transcribe

if TYPE_CHECKING:
from aai_cli.agent_cascade.config import CascadeConfig


def gateway_options(
prompts: list[str], model: str, max_tokens: int, *, interval: float = 0.0
Expand All @@ -28,6 +34,17 @@ def agent(voice: str, system_prompt: str, greeting: str) -> str:
return _agent.render(voice, system_prompt, greeting)


def agent_cascade(config: CascadeConfig, *, speech_model: str) -> str:
"""Generate runnable Python that reproduces this terminal cascade session.

Unlike `agent` (one Voice Agent socket), the cascade wires the three primitives
itself — Streaming STT, the LLM Gateway, and streaming TTS — so the script mirrors
the CLI's client-side orchestration. Sandbox hosts only, since streaming TTS has no
production host.
"""
return _agent_cascade.render(config, speech_model=speech_model)


def transcribe(
merged: dict[str, object],
source: str,
Expand Down
119 changes: 119 additions & 0 deletions aai_cli/code_gen/agent_cascade.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from __future__ import annotations

import json
from typing import TYPE_CHECKING
from urllib.parse import urlencode

from aai_cli.code_gen import agent_cascade_body
from aai_cli.core import environments

if TYPE_CHECKING:
from aai_cli.agent_cascade.config import CascadeConfig

# The header carries only the injected constants and the reply-cue predicate, so it
# has no literal braces and is safe to fill with str.format. All the brace-heavy
# orchestration (dict/set literals, the protocol loops) lives in the static body,
# which is never formatted — so no brace has to be doubled.
_HEADER = """\
# Live voice cascade: Streaming STT -> LLM Gateway -> streaming TTS, wired client-side.
# This is what `assembly --sandbox agent-cascade` runs: it transcribes your speech,
# sends each finalized turn to the LLM Gateway, and speaks the reply through streaming
# TTS — the same three primitives the agent-cascade init template wires server-side.
# Requires audio + websockets: pip install sounddevice websockets openai
# Tip: use headphones — the mic stays open while the agent speaks, so on speakers it
# would hear itself and loop.
import base64
import json
import os
import queue
import threading

import sounddevice as sd
from openai import OpenAI
from websockets.sync.client import connect

# Export your key first: export ASSEMBLYAI_API_KEY="<your key>"
API_KEY = os.environ["ASSEMBLYAI_API_KEY"]
STT_URL = {stt_url}
TTS_URL = {tts_url}
GATEWAY_URL = {gateway_url}
MODEL = {model}
MAX_TOKENS = {max_tokens}
MAX_HISTORY = {max_history}
SYSTEM_PROMPT = {system_prompt}
GREETING = {greeting}
RATE = 24000 # one full-duplex rate for mic capture + TTS playback (TTS native PCM16 mono)


def is_reply_cue(event):
# The cue to generate a reply. {cue_comment}
return {cue_expr}
"""


def _stt_url(speech_model: str, *, format_turns: bool) -> str:
"""The Streaming v3 socket URL for the active environment.

The mic is captured and streamed at 24 kHz (the one full-duplex rate), so the
sample_rate query param matches — a mismatch corrupts the audio server-side.
"""
params = urlencode(
{
"sample_rate": 24000,
"encoding": "pcm_s16le",
"speech_model": speech_model,
"format_turns": "true" if format_turns else "false",
}
)
return f"wss://{environments.active().streaming_host}/v3/ws?{params}"


def _tts_url(voice: str, language: str | None) -> str:
"""The streaming-TTS socket URL for the configured voice (sandbox-only host)."""
params: dict[str, str] = {"voice": voice, "sample_rate": "24000"}
if language is not None:
params["language"] = language
return f"wss://{environments.active().streaming_tts_host}/v1/ws/?{urlencode(params)}"


def _cue(*, format_turns: bool) -> tuple[str, str]:
"""The (comment, predicate) for the reply trigger.

With formatting on, wait for the *formatted* end-of-turn (better text for the LLM);
with it off the server never sets turn_is_formatted, so a bare end-of-turn is the cue.
"""
if format_turns:
return (
"With --format-turns, wait for the punctuated end-of-turn.",
'bool(event.get("end_of_turn")) and bool(event.get("turn_is_formatted"))',
)
return (
"With --no-format-turns the server never formats, so a bare end-of-turn is the cue.",
'bool(event.get("end_of_turn"))',
)


def render(config: CascadeConfig, *, speech_model: str) -> str:
"""Generate a runnable terminal cascade script from a cascade config + STT model.

Hosts come from the active environment, so a sandbox run generates a script that
targets the sandbox its key was minted for. The script mirrors the CLI run path:
one full-duplex mic+speaker stream, one LLM completion per finalized turn, spoken
sentence-by-sentence through a fresh TTS socket, with barge-in on the next turn.
The named per-leg knobs are reflected; the --stt/--llm/--tts-config escape hatches
(config.llm_extra / config.tts_extra) are not.
"""
cue_comment, cue_expr = _cue(format_turns=config.format_turns)
header = _HEADER.format(
stt_url=json.dumps(_stt_url(speech_model, format_turns=config.format_turns)),
tts_url=json.dumps(_tts_url(config.voice, config.language)),
gateway_url=json.dumps(environments.active().llm_gateway_base),
model=json.dumps(config.model),
max_tokens=config.max_tokens,
max_history=config.max_history,
system_prompt=json.dumps(config.system_prompt),
greeting=json.dumps(config.greeting),
cue_comment=cue_comment,
cue_expr=cue_expr,
)
return header + agent_cascade_body.BODY
173 changes: 173 additions & 0 deletions aai_cli/code_gen/agent_cascade_body.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
"""The static body of the generated agent-cascade script.

Kept separate from the header so the orchestration's many literal braces (dict/set
literals, the STT/TTS protocol loops) stay verbatim — this string is concatenated
onto the formatted header, never passed through str.format itself.
"""

from __future__ import annotations

# The constants (API_KEY, STT_URL, TTS_URL, GATEWAY_URL, MODEL, …) and is_reply_cue()
# are defined by the header above this body; everything here references them.
BODY = """

gateway = OpenAI(api_key=API_KEY, base_url=GATEWAY_URL)
history = [] # alternating user/assistant turns — the sliding LLM-context window
stop_reply = threading.Event() # set on barge-in to cut a reply short
reply_thread = None

# ONE full-duplex stream (mic + speaker together) at 24 kHz. Opening two separate
# input/output streams on one device fails on macOS CoreAudio, which silently kills
# capture; a single sd.RawStream callback handles both directions.
mic_queue: queue.Queue = queue.Queue()
play_buffer = bytearray()
buffer_lock = threading.Lock()


def on_audio(indata, outdata, _frames, _time, _status):
mic_queue.put_nowait(bytes(indata)) # capture -> queue for STT
# Playback: drain the agent's audio into the output, zero-filling any shortfall.
needed = len(outdata)
with buffer_lock:
take = bytes(play_buffer[:needed])
del play_buffer[:needed]
outdata[: len(take)] = take
if len(take) < needed:
outdata[len(take):] = b"\\x00" * (needed - len(take))


def enqueue_audio(pcm):
with buffer_lock:
play_buffer.extend(pcm)


def flush_audio(): # drop queued-but-unplayed audio (used on barge-in)
with buffer_lock:
play_buffer.clear()


def trim_history(): # cap the running history to the most recent MAX_HISTORY messages
if len(history) > MAX_HISTORY:
del history[: len(history) - MAX_HISTORY]


def split_sentences(text):
# Split a reply into sentences (each ending in . ! ?) so the first audio can play
# before the whole answer is synthesized; a trailing fragment is kept too.
sentences, start = [], 0
for i, ch in enumerate(text):
if ch in ".!?":
piece = text[start: i + 1].strip()
if piece:
sentences.append(piece)
start = i + 1
tail = text[start:].strip()
if tail:
sentences.append(tail)
return sentences


def synthesize(text):
# Open a fresh streaming-TTS socket (the voice is fixed at connect time), drive the
# Begin -> Generate -> Flush -> Audio protocol, and return the concatenated PCM. TTS
# authenticates with the raw API key, not a Bearer token (the streaming convention).
pcm = bytearray()
with connect(TTS_URL, additional_headers={"Authorization": API_KEY}, max_size=None) as ws:
if json.loads(ws.recv()).get("type") != "Begin":
return b""
ws.send(json.dumps({"type": "Generate", "text": text}))
ws.send(json.dumps({"type": "Flush"}))
for raw in ws:
frame = json.loads(raw)
kind = frame.get("type")
if kind == "Audio":
pcm += base64.b64decode(frame.get("audio", ""))
if frame.get("is_final"):
break
elif kind in ("FlushDone", "Error"):
break
ws.send(json.dumps({"type": "Terminate"}))
return bytes(pcm)


def speak(text): # show + synthesize one chunk of agent speech, honoring a barge-in
print("agent:", text)
if not stop_reply.is_set():
enqueue_audio(synthesize(text))


def generate_reply():
# One LLM completion over the running history, spoken sentence-by-sentence. Record
# what was actually spoken so a barge-in still leaves the history alternating.
messages = [{"role": "system", "content": SYSTEM_PROMPT}, *history]
reply = gateway.chat.completions.create(
model=MODEL, messages=messages, max_tokens=MAX_TOKENS
).choices[0].message.content or ""
spoken = []
for sentence in split_sentences(reply):
if stop_reply.is_set():
break
speak(sentence)
spoken.append(sentence)
said = " ".join(spoken).strip()
if said:
history.append({"role": "assistant", "content": said})
trim_history()


def barge_in():
# A new user turn cuts off any reply still playing: stop the worker and drop the
# queued audio (the flush is what silences the already-buffered speech).
if reply_thread is not None and reply_thread.is_alive():
stop_reply.set()
flush_audio()
reply_thread.join()


def send_mic(stt):
while True:
chunk = mic_queue.get()
try:
stt.send(chunk)
except Exception:
return # socket closed (session over): end the mic thread quietly


stream = sd.RawStream(
samplerate=RATE, channels=1, dtype="int16", blocksize=RATE // 10, callback=on_audio
)
stream.start()

# Greet first, seeding the opening line into the history so the model has a record of it.
if GREETING:
history.append({"role": "assistant", "content": GREETING})
speak(GREETING)

with connect(STT_URL, additional_headers={"Authorization": API_KEY}) as stt:
threading.Thread(target=send_mic, args=(stt,), daemon=True).start()
print("Connected — start talking. (Ctrl-C to stop)")
try:
for raw in stt:
event = json.loads(raw)
if event.get("type") != "Turn":
continue
text = (event.get("transcript") or "").strip()
if not text:
continue
if is_reply_cue(event):
print("you: ", text)
barge_in()
history.append({"role": "user", "content": text})
trim_history()
stop_reply.clear()
reply_thread = threading.Thread(target=generate_reply, daemon=True)
reply_thread.start()
else:
barge_in() # an interim turn only interrupts a playing reply
except KeyboardInterrupt:
print("\\nStopped.")
finally:
stop_reply.set()
stream.stop()
stream.close()
"""
10 changes: 10 additions & 0 deletions aai_cli/commands/agent_cascade/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ def _emit_voice_list(_state: AppState, json_mode: bool) -> None:
'assembly --sandbox agent-cascade --system-prompt "You are a terse pirate."',
),
("See available voices", "assembly --sandbox agent-cascade --list-voices"),
(
"Print equivalent Python instead of running",
"assembly --sandbox agent-cascade --show-code",
),
]
),
)
Expand Down Expand Up @@ -159,6 +163,11 @@ def agent_cascade(
"--output",
help="Output mode: text (you:/agent: lines as plain stdout, pipe-friendly) or json",
),
show_code: bool = typer.Option(
False,
"--show-code",
help="Print the equivalent Python SDK code and exit (does not start a session)",
),
) -> None:
"""\\[sandbox] Hold a live voice conversation through a self-wired cascade

Expand Down Expand Up @@ -201,5 +210,6 @@ def agent_cascade(
llm_config=tuple(llm_config or ()),
language=language,
tts_config=tuple(tts_config or ()),
show_code=show_code,
)
run_with_options(ctx, agent_cascade_exec.run_agent_cascade, opts, json=json_out)
Loading
Loading