Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .importlinter
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ source_modules =
aai_cli.transcribe_batch
aai_cli.transcribe_exec
aai_cli.transcribe_render
aai_cli.transcribe_sources
aai_cli.transcribe_validate
aai_cli.tts
aai_cli.typer_patches
aai_cli.update_check
Expand Down
266 changes: 15 additions & 251 deletions aai_cli/commands/dub/_exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@

from rich.markup import escape

from aai_cli import jsonshape, mediafile, output, youtube
from aai_cli import llm as gateway
from aai_cli import mediafile, output, youtube
from aai_cli.commands.dub import _pipeline as pipeline
from aai_cli.context import AppState
from aai_cli.errors import APIError, CLIError, UsageError
from aai_cli.tts import audio, dialogue, session, voices
from aai_cli.tts.session import SpeakConfig
from aai_cli.errors import UsageError
from aai_cli.tts import audio, dialogue, session

# ISO-639-1 codes accepted by --lang, mapped to the language *name* both the
# translation prompt and the streaming-TTS `language` param expect. A value not
Expand All @@ -55,16 +54,6 @@
"zh": "Chinese",
}

# System prompt for the per-utterance translation calls. Length matters: the dub
# replaces speech that occupied a fixed window, so the model is told to keep the
# spoken length close to the original.
TRANSLATION_SYSTEM_TEMPLATE = (
"You translate dialogue for dubbing. Translate the user's text to {language}. "
"Keep the meaning and register, and stay close to the original spoken length so "
"the dub fits the original timing. Reply with only the translated text — no "
"quotes, notes, or extra commentary."
)


@dataclass(frozen=True)
class DubOptions:
Expand Down Expand Up @@ -108,236 +97,12 @@ def default_out_path(media: Path, language: str) -> Path:
return media.parent / f"{media.stem}.dub.{slug}{media.suffix}"


def assemble_timeline(
placed: list[tuple[int, bytes]],
sample_rate: int,
total_seconds: float | None,
) -> bytearray:
"""Lay each ``(start_ms, pcm)`` segment onto a silence timeline.

Gaps before a segment's start are filled with silence; a segment whose
predecessor overran its start time is appended immediately (the dub drifts
rather than dropping speech). The tail is padded out to ``total_seconds``
(the source duration) so the dubbed track never ends early.
"""
pcm = bytearray()
for start_ms, segment in placed:
gap = start_ms / 1000 - _pcm_seconds(pcm, sample_rate)
if gap > 0:
pcm.extend(audio.silence(sample_rate, gap))
pcm.extend(segment)
if total_seconds is not None:
tail = total_seconds - _pcm_seconds(pcm, sample_rate)
if tail > 0:
pcm.extend(audio.silence(sample_rate, tail))
return pcm


def _pcm_seconds(pcm: bytes | bytearray, sample_rate: int) -> float:
"""Seconds of audio in 16-bit mono PCM: two bytes per sample."""
return len(pcm) / 2 / sample_rate


def _mux(ffmpeg: str, media: Path, track: Path, out: Path) -> None:
"""Swap ``track`` in as the audio of ``media``, writing ``out``.

``-map 0:v?`` carries the video stream over untouched (``-c:v copy``) when
there is one, and maps nothing for audio-only input, so the same invocation
dubs both a video and a plain audio file. ``-y`` makes a re-run overwrite
its own earlier output instead of stalling on ffmpeg's prompt.
"""
result = mediafile.run_ffmpeg(
[
ffmpeg,
"-hide_banner",
"-loglevel",
"error",
"-y",
"-i",
str(media),
"-i",
str(track),
"-map",
"0:v?",
"-map",
"1:a",
"-c:v",
"copy",
mediafile.path_arg(out),
]
)
if result.returncode != 0:
raise mediafile.ffmpeg_failure(result, "write", out, error_type="dub_failed")


@dataclass(frozen=True)
class _Utterance:
"""One diarized utterance reduced to the fields the dub pipeline needs."""

start_ms: int
speaker: str
text: str


def _utterances_of(transcript: object, transcript_id: str) -> list[_Utterance]:
"""The transcript's spoken utterances, with empty-text ones dropped."""
utterances = [
_Utterance(
start_ms=jsonshape.as_int(getattr(item, "start", 0)),
speaker=str(getattr(item, "speaker", None) or "A"),
text=str(getattr(item, "text", "") or "").strip(),
)
for item in jsonshape.object_list(getattr(transcript, "utterances", None))
]
spoken = [utterance for utterance in utterances if utterance.text]
if not spoken:
raise CLIError(
f"Transcript {transcript_id} has no utterances to dub.",
error_type="no_utterances",
exit_code=2,
suggestion=(
"Dubbing needs a diarized transcript. Pass a --transcript-id created "
"with --speaker-labels, or drop -t to let dub transcribe the file."
),
)
return spoken


def _total_seconds(transcript: object) -> float | None:
"""The source duration in seconds (used to pad the dubbed track's tail)."""
duration = getattr(transcript, "audio_duration", None)
if isinstance(duration, int | float) and not isinstance(duration, bool):
return float(duration)
return None


def _translate(
api_key: str,
utterances: list[_Utterance],
language: str,
opts: DubOptions,
*,
json_mode: bool,
quiet: bool,
) -> list[str]:
"""Translate each utterance to ``language`` with the LLM Gateway, in order.

One call per utterance keeps the translation↔timestamp alignment exact —
no reply-parsing step that could shift a line against its window.
"""
system = TRANSLATION_SYSTEM_TEMPLATE.format(language=language)
translating = f"Translating {len(utterances)} utterance(s) to {language} with {opts.model}…"
translations: list[str] = []
with output.status(translating, json_mode=json_mode, quiet=quiet):
for index, utterance in enumerate(utterances, 1):
messages = gateway.build_messages(utterance.text, system=system)
response = gateway.complete(
api_key, model=opts.model, messages=messages, max_tokens=opts.max_tokens
)
translated = gateway.content_of(response).strip()
# "length" is OpenAI's truncation marker; the gateway's Anthropic-flavored
# responses use "max_tokens". A clipped translation must never be dubbed.
if getattr(response.choices[0], "finish_reason", None) in {"length", "max_tokens"}:
raise APIError(
f"The translation of utterance {index} was cut off at --max-tokens "
f"({opts.max_tokens}).",
suggestion="Re-run with a higher --max-tokens.",
)
if not translated:
raise APIError(
f"The model returned an empty translation for utterance {index} "
f"({utterance.text[:50]!r})."
)
translations.append(translated)
return translations


def _synthesize(
api_key: str,
segments: list[tuple[str, str]],
language: str,
*,
json_mode: bool,
quiet: bool,
) -> tuple[list[bytes], int]:
"""Synthesize each ``(voice, text)`` segment; returns the PCM list + sample rate.

Every segment must come back at one rate — the timeline math places segments
by sample position, so a mid-run rate change would silently shift timing.
"""
synthesizing = f"Synthesizing {len(segments)} segment(s)…"
with output.status(synthesizing, json_mode=json_mode, quiet=quiet):
results = [
session.synthesize(
api_key,
SpeakConfig(text=text, voice=voice, language=language),
on_warning=lambda m: output.emit_warning(m, json_mode=json_mode),
)
for voice, text in segments
]
rates = {result.sample_rate for result in results}
if len(rates) > 1:
raise APIError(f"TTS service returned mixed sample rates ({sorted(rates)}).")
# `segments` is never empty (_utterances_of raised otherwise), so results[0] exists.
return [result.pcm for result in results], results[0].sample_rate


def _warn_ignored_voice_pins(
overrides: dict[str, str], speakers: dict[str, str], *, json_mode: bool
) -> None:
"""Mirror `assembly speak`: a requested --voice mapping is never dropped
silently, so a pin for a speaker the diarization didn't produce is called out."""
present = {speaker.casefold() for speaker in speakers}
ignored = [speaker for speaker in overrides if speaker not in present]
if ignored:
output.emit_warning(
"Ignoring --voice mapping(s) for speaker(s) not in the transcript: "
f"{', '.join(ignored)}.",
json_mode=json_mode,
)


@dataclass(frozen=True)
class _VoicePlan:
"""The parsed --voice flags: the bare voice (if any) plus SPEAKER=VOICE pins.

Parsed in run_dub — before the billed pipeline, so a malformed mapping
fails fast — and carried as one value through _dub_and_emit."""

bare: str | None
overrides: dict[str, str]


def _assign_voices(
utterances: list[_Utterance],
translations: list[str],
plan: _VoicePlan,
language: str,
) -> tuple[list[tuple[str, str]], dict[str, str]]:
"""Resolve each translated utterance to ``(voice, text)`` plus the speaker→voice map.

A bare ``--voice`` dubs every speaker with that one voice; ``SPEAKER=VOICE``
mappings pin individual speakers; everyone else takes the target language's
rotation in first-appearance order (the same rules as `assembly speak`) —
each voice speaks one language, so a non-English dub switches to that
language's native voice(s).
"""
rotation = (plan.bare,) if plan.bare is not None else voices.rotation_for(language)
segments = [
dialogue.Segment(utterance.speaker, translated)
# strict=True is an invariant guard only: _translate returns exactly one
# translation per utterance, so the lengths can never differ.
for utterance, translated in zip(utterances, translations, strict=True) # pragma: no mutate
]
return dialogue.assign_voices(segments, rotation, plan.overrides)


def run_dub(opts: DubOptions, state: AppState, *, json_mode: bool) -> None:
"""Execute one `assembly dub` invocation from already-parsed flags."""
language = resolve_language(opts.language)
session.require_available("dub")
# Parse --voice now: a malformed mapping must fail before the billed pipeline.
voice_plan = _VoicePlan(*dialogue.parse_voice_overrides(opts.voice))
voice_plan = pipeline.VoicePlan(*dialogue.parse_voice_overrides(opts.voice))
youtube.validate_video_flag(opts.media, video=opts.video)
youtube.validate_sections_flag(opts.media, opts.download_sections)
# ffmpeg is checked before any (billed) download/transcription so a missing
Expand Down Expand Up @@ -371,7 +136,7 @@ def _dub_and_emit(
out: Path,
language: str,
ffmpeg: str,
voice_plan: _VoicePlan,
voice_plan: pipeline.VoicePlan,
state: AppState,
*,
json_mode: bool,
Expand All @@ -391,30 +156,29 @@ def _dub_and_emit(
detect_language=opts.source_language is None,
)
transcript_id = str(getattr(transcript, "id", ""))
utterances = _utterances_of(transcript, transcript_id)
translations = _translate(
utterances = pipeline.utterances_of(transcript, transcript_id)
translations = pipeline.translate(
api_key, utterances, language, opts, json_mode=json_mode, quiet=state.quiet
)
resolved, speakers = _assign_voices(utterances, translations, voice_plan, language)
_warn_ignored_voice_pins(voice_plan.overrides, speakers, json_mode=json_mode)
pcm_segments, sample_rate = _synthesize(
resolved, speakers = pipeline.assign_voices(utterances, translations, voice_plan, language)
pipeline.warn_ignored_voice_pins(voice_plan.overrides, speakers, json_mode=json_mode)
pcm_segments, sample_rate = pipeline.synthesize(
api_key, resolved, language, json_mode=json_mode, quiet=state.quiet
)

# strict=True is an invariant guard only: _synthesize returns one PCM per segment.
# strict=True is an invariant guard only: synthesize returns one PCM per segment.
placed = [
(utterance.start_ms, pcm)
for utterance, pcm in zip(utterances, pcm_segments, strict=True) # pragma: no mutate
]
track = assemble_timeline(placed, sample_rate, _total_seconds(transcript))
track = pipeline.assemble_timeline(placed, sample_rate, pipeline.total_seconds(transcript))
with tempfile.TemporaryDirectory(prefix="aai-dub-") as tmp:
wav = Path(tmp) / "dub.wav"
audio.write_wav(wav, track, sample_rate)
with output.status("Writing the dubbed file…", json_mode=json_mode, quiet=state.quiet):
_mux(ffmpeg, media, wav, out)
pipeline.mux(ffmpeg, media, wav, out)

duration = round(_pcm_seconds(track, sample_rate), 3)
# Not named `voices`: that would shadow the tts.voices module imported above.
duration = round(pipeline.pcm_seconds(track, sample_rate), 3)
voices_text = ", ".join(f"{speaker}={voice}" for speaker, voice in speakers.items())
payload: dict[str, object] = {
"source": opts.media,
Expand Down
Loading
Loading