Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ jobs:
test:
name: Python ${{ matrix.py }} on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
env:
# pip>=24.1 rejects legacy dep metadata (e.g. pytest-httpx's `pytest (<8.*,>=6.*)`)
# and backtracks through dozens of versions, stalling the matrix 30+ min.
# Seed each tox venv with the last pip that tolerates it.
VIRTUALENV_PIP: "24.0"
strategy:
fail-fast: false
matrix:
py:
- "3.12"
- "3.11"
- "3.10"
- "3.9"
Expand Down
7 changes: 0 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@

With a single API call, get access to AI models built on the latest AI breakthroughs to transcribe and understand audio and speech data securely at large scale.

> **⚠️ WARNING**
> This SDK is intended for **testing and light usage only**. It is not
> recommended for use at scale or with production traffic. For best
> results, we recommend calling the AssemblyAI API directly via HTTP
> request. See our [official documentation](https://www.assemblyai.com/docs)
> for more information, including HTTP code examples.

## Using with AI coding agents

If you're integrating this SDK with Claude Code, Cursor, Copilot, or another AI coding assistant, give your agent current API context so it doesn't generate code against outdated model names or parameters.
Expand Down
2 changes: 0 additions & 2 deletions assemblyai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
SyncTranscriptError,
SyncTranscriptionConfig,
SyncTranscriptResponse,
SyncWord,
Timestamp,
TranscriptError,
TranscriptionConfig,
Expand Down Expand Up @@ -149,7 +148,6 @@
"SyncTranscriptError",
"SyncTranscriptionConfig",
"SyncTranscriptResponse",
"SyncWord",
"Timestamp",
"Transcriber",
"TranscriptionConfig",
Expand Down
2 changes: 1 addition & 1 deletion assemblyai/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.64.11"
__version__ = "0.64.16"
2 changes: 2 additions & 0 deletions assemblyai/streaming/v3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
EventMessage,
LLMGatewayResponseEvent,
NoiseSuppressionModel,
SessionConfiguration,
SpeakerRevisionEvent,
SpeakerRevisionItem,
SpeechModel,
Expand Down Expand Up @@ -35,6 +36,7 @@
"SpeakerRevisionEvent",
"SpeakerRevisionItem",
"SpeechModel",
"SessionConfiguration",
"SpeechStartedEvent",
"StreamingClient",
"StreamingClientOptions",
Expand Down
87 changes: 60 additions & 27 deletions assemblyai/streaming/v3/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,35 +132,52 @@ async def connect(self, params: StreamingParameters) -> None:

uri = _build_uri(self._options.api_host, params)
headers = _build_headers(self._options)
options = self._options

try:
self._websocket = await asyncio.wait_for(
websocket_connect_async(uri, additional_headers=headers),
timeout=15,
)
except websockets.exceptions.InvalidStatus as exc:
status_code = getattr(getattr(exc, "response", None), "status_code", None)
await self._report_connection_closed(
StreamingError(
message=f"WebSocket handshake rejected (HTTP {status_code})",
code=status_code,
for attempt in range(options.max_connection_retries + 1):
Comment thread
bgotthold-aai marked this conversation as resolved.
try:
self._websocket = await asyncio.wait_for(
websocket_connect_async(uri, additional_headers=headers),
timeout=options.connect_timeout,
)
)
# Single-use design: a failed handshake terminates the client. Close
# the HTTP client now so users who treat ``on_error`` as the
# terminal signal don't leak the httpx pool.
await self._client.aclose()
return
except (
websockets.exceptions.InvalidHandshake,
websockets.exceptions.ConnectionClosed,
OSError,
asyncio.TimeoutError,
TimeoutError,
) as exc:
await self._report_connection_closed(exc)
await self._client.aclose()
return
break
except websockets.exceptions.InvalidStatus as exc:
# HTTP-level rejection (auth, quota, bad request): a retry would
# hit the same response, so fail fast.
status_code = getattr(
getattr(exc, "response", None), "status_code", None
)
await self._report_connection_closed(
StreamingError(
message=f"WebSocket handshake rejected (HTTP {status_code})",
code=status_code,
)
)
# Single-use design: a failed handshake terminates the client.
# Close the HTTP client now so users who treat ``on_error`` as
# the terminal signal don't leak the httpx pool.
await self._client.aclose()
return
except (
websockets.exceptions.InvalidHandshake,
websockets.exceptions.ConnectionClosed,
OSError,
asyncio.TimeoutError,
TimeoutError,
) as exc:
if attempt < options.max_connection_retries:
logger.debug(
"WebSocket connect attempt %d/%d failed (%s); retrying",
attempt + 1,
options.max_connection_retries + 1,
exc,
)
if options.connection_retry_delay > 0:
await asyncio.sleep(options.connection_retry_delay)
continue
await self._report_connection_closed(exc)
await self._client.aclose()
return

self._read_task = asyncio.create_task(
self._read_loop(), name="AsyncStreamingClient._read_loop"
Expand Down Expand Up @@ -188,6 +205,22 @@ async def disconnect(self, terminate: bool = False) -> None:
# cancel the awaited task on timeout, unlike ``wait_for``.
if self._write_task is not None and not self._write_task.done():
await asyncio.wait({self._write_task}, timeout=2.0)
# Don't stop the read task yet — the server sends the final Turn
# and TerminationEvent after receiving Terminate. Every terminal
# path sets ``_stop_event``, so waiting on it here lets those
# messages dispatch before teardown.
if (
self._read_task is not None
and not self._read_task.done()
and asyncio.current_task() is not self._read_task
):
try:
await asyncio.wait_for(
self._stop_event.wait(),
timeout=self._options.terminate_timeout,
)
except asyncio.TimeoutError:
pass

self._stop_event.set()

Expand Down
87 changes: 59 additions & 28 deletions assemblyai/streaming/v3/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import queue
import threading
import time
from typing import Any, Dict, Generator, Iterable, Optional, Union

import httpx
Expand Down Expand Up @@ -57,39 +58,59 @@ def __init__(self, options: StreamingClientOptions):
def connect(self, params: StreamingParameters) -> None:
"""Open the WebSocket session and start the read/write threads.

Blocks until the handshake completes. If the server rejects the
handshake (auth error, etc.) ``Error`` is dispatched to any
Blocks until the handshake completes. A transient handshake failure
(timeout, network drop) is retried up to
``options.max_connection_retries`` times before the failure is
reported. If the server rejects the handshake at the HTTP layer (auth
error, etc.) ``Error`` is dispatched to any
``on(StreamingEvents.Error, ...)`` handler rather than raised, so
registration order matters: call ``on()`` before ``connect()``.
"""
_emit_param_warnings(params)

uri = _build_uri(self._options.api_host, params)
headers = _build_headers(self._options)
options = self._options

try:
self._websocket = websocket_connect(
uri,
additional_headers=headers,
open_timeout=15,
)
except websockets.exceptions.InvalidStatus as exc:
status_code = getattr(getattr(exc, "response", None), "status_code", None)
self._report_connection_closed(
StreamingError(
message=f"WebSocket handshake rejected (HTTP {status_code})",
code=status_code,
for attempt in range(options.max_connection_retries + 1):
Comment thread
bgotthold-aai marked this conversation as resolved.
try:
self._websocket = websocket_connect(
uri,
additional_headers=headers,
open_timeout=options.connect_timeout,
)
)
return
except (
websockets.exceptions.InvalidHandshake,
websockets.exceptions.ConnectionClosed,
OSError,
TimeoutError,
) as exc:
self._report_connection_closed(exc)
return
break
except websockets.exceptions.InvalidStatus as exc:
# HTTP-level rejection (auth, quota, bad request): a retry
# would hit the same response, so fail fast.
status_code = getattr(
getattr(exc, "response", None), "status_code", None
)
self._report_connection_closed(
StreamingError(
message=f"WebSocket handshake rejected (HTTP {status_code})",
code=status_code,
)
)
return
except (
websockets.exceptions.InvalidHandshake,
websockets.exceptions.ConnectionClosed,
OSError,
TimeoutError,
) as exc:
if attempt < options.max_connection_retries:
logger.debug(
"WebSocket connect attempt %d/%d failed (%s); retrying",
attempt + 1,
options.max_connection_retries + 1,
exc,
)
if options.connection_retry_delay > 0:
time.sleep(options.connection_retry_delay)
continue
self._report_connection_closed(exc)
return

self._write_thread.start()
self._read_thread.start()
Expand All @@ -100,16 +121,26 @@ def disconnect(self, terminate: bool = False) -> None:
"""Stop the read/write threads and close the WebSocket.

Pass ``terminate=True`` for a graceful close — the client sends a
``TerminateSession`` frame and waits for the server's
``TerminationEvent`` (which reports total audio duration). Without
``terminate=True`` the WebSocket is closed without notifying the
server.
``TerminateSession`` frame and waits up to ``options.terminate_timeout``
seconds for the server's ``TerminationEvent`` (which reports total
audio duration). Without ``terminate=True`` the WebSocket is closed
without notifying the server.
"""
# Enqueue Terminate even when stop is already set: `_write_message`
# bypasses the stop gate for TerminateSession so the frame still
# reaches the server when the write thread is alive.
if terminate:
self._write_queue.put(TerminateSession())
# Don't stop the read thread yet — the server sends the final Turn
# and TerminationEvent after receiving Terminate. Every terminal
# path sets `_stop_event` (TerminationEvent via `_handle_message`,
# server close, server error), so waiting on it here lets those
# messages dispatch before teardown.
if (
self._read_thread.is_alive()
and threading.current_thread() is not self._read_thread
):
self._stop_event.wait(timeout=self._options.terminate_timeout)

self._stop_event.set()

Expand Down
21 changes: 21 additions & 0 deletions assemblyai/streaming/v3/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,19 @@ class TurnEvent(BaseModel):
speaker_label: Optional[str] = None


class SessionConfiguration(BaseModel):
# `mode` stays a plain str so a new server-side mode value can't fail
# validation and drop the whole BeginEvent.
model: Optional[str] = None
mode: Optional[str] = None
api_version: Optional[str] = None


class BeginEvent(BaseModel):
type: Literal["Begin"] = "Begin"
id: str
expires_at: datetime
configuration: Optional[SessionConfiguration] = None


class TerminationEvent(BaseModel):
Expand Down Expand Up @@ -285,6 +294,18 @@ class StreamingClientOptions(BaseModel):
api_host: str = "streaming.assemblyai.com"
api_key: Optional[str] = None
token: Optional[str] = None
# Seconds to wait for the WebSocket handshake to complete before treating
# the attempt as failed.
connect_timeout: float = 1.0
# Additional handshake attempts after the first one fails on a transient
# error (timeout, network drop). 0 disables retries. HTTP-level rejections
# (auth, quota, bad request) are never retried.
max_connection_retries: int = 2
# Seconds to wait between handshake attempts.
connection_retry_delay: float = 0.5
# Seconds disconnect(terminate=True) waits for the server's
# TerminationEvent (and any final Turn) before tearing down.
terminate_timeout: float = 5.0


class StreamingError(Exception):
Expand Down
16 changes: 1 addition & 15 deletions assemblyai/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3101,24 +3101,13 @@ def _normalize_conversation_context(cls, v):
return _normalize_conversation_context(v)


class SyncWord(BaseModel):
"""A single word from a synchronous transcript, with timing and confidence."""

text: str
start_ms: int
"Word start time in milliseconds."
end_ms: int
"Word end time in milliseconds."
confidence: float


class SyncTranscriptResponse(BaseModel):
"""The result of a synchronous transcription request."""

text: str
"The full transcript text."

words: List[SyncWord] = Field(default_factory=list)
words: List[Word] = Field(default_factory=list)
"Per-word timing and confidence."

confidence: float
Expand All @@ -3127,8 +3116,5 @@ class SyncTranscriptResponse(BaseModel):
audio_duration_ms: int
"Total audio duration in milliseconds."

inference_time_ms: float
"Model inference time in milliseconds. Excludes auth, decode, and queue wait."

session_id: str
"Server-generated UUID for this request. Record it to correlate with support."
Loading
Loading