Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/elevenlabs-recv-error-var.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@livekit/agents-plugin-elevenlabs': patch
---

refactor(elevenlabs): abort the TTS recv channel on websocket close/error instead of holding the error in a Future
291 changes: 139 additions & 152 deletions plugins/elevenlabs/src/tts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
APIStatusError,
APITimeoutError,
AudioByteStream,
Future,
type TimedString,
asError,
createTimedString,
Expand Down Expand Up @@ -429,181 +428,165 @@ class Connection {
}

async #recvLoop(): Promise<void> {
try {
const messageChannel = stream.createStreamChannel<Record<string, unknown>>();
const errorFuture = new Future<Error>();

const onMessage = (rawData: Buffer) => {
try {
const parsed = JSON.parse(rawData.toString());
messageChannel.write(parsed);
} catch (e) {
this.#logger.warn({ error: e }, 'failed to parse WebSocket message');
}
};
if (!this.#ws) return;

const onClose = (code: number) => {
if (!this.#closed && this.#contextData.size > 0) {
errorFuture.reject(
new APIStatusError({
message: 'ElevenLabs websocket connection closed unexpectedly',
options: { statusCode: code },
}),
);
}
messageChannel.close();
};
const messageChannel = stream.createStreamChannel<Record<string, unknown>>();

const onMessage = (rawData: Buffer) => {
try {
const parsed = JSON.parse(rawData.toString());
messageChannel.write(parsed);
} catch (e) {
this.#logger.warn({ error: e }, 'failed to parse WebSocket message');
}
};

const onError = (error: Error) => {
errorFuture.reject(error);
const onClose = (code: number) => {
if (!this.#closed && this.#contextData.size > 0) {
messageChannel.abort(
new APIStatusError({
message: 'ElevenLabs websocket connection closed unexpectedly',
options: { statusCode: code },
}),
);
} else {
messageChannel.close();
};
}
};

// Set up persistent listeners
if (!this.#ws) return;
this.#ws.on('message', onMessage);
this.#ws.on('close', onClose);
this.#ws.on('error', onError);
const onError = (error: Error) => {
messageChannel.abort(error);
};

const cleanup = () => {
this.#ws?.off('message', onMessage);
this.#ws?.off('close', onClose);
this.#ws?.off('error', onError);
};
this.#ws.on('message', onMessage);
this.#ws.on('close', onClose);
this.#ws.on('error', onError);

const reader = messageChannel.stream().getReader();
try {
while (!this.#closed) {
const result = await reader.read();
if (result.done || this.#closed) break;

const data = result.value;
const contextId = data.contextId as string | undefined;
const ctx = contextId ? this.#contextData.get(contextId) : undefined;

if (data.error) {
this.#logger.error(
{ context_id: contextId, error: data.error, data },
'elevenlabs tts returned error',
);
if (contextId) {
if (ctx) {
ctx.waiter.reject(new APIError(data.error as string));
}
this.#cleanupContext(contextId);
const reader = messageChannel.stream().getReader();

try {
while (!this.#closed) {
const result = await reader.read();
if (result.done || this.#closed) break;

const data = result.value;
const contextId = data.contextId as string | undefined;
const ctx = contextId ? this.#contextData.get(contextId) : undefined;

if (data.error) {
this.#logger.error(
{ context_id: contextId, error: data.error, data },
'elevenlabs tts returned error',
);
if (contextId) {
if (ctx) {
ctx.waiter.reject(new APIError(data.error as string));
}
continue;
this.#cleanupContext(contextId);
}
continue;
}

if (!ctx) {
this.#logger.warn({ data }, 'unexpected message received from elevenlabs tts');
continue;
}
if (!ctx) {
this.#logger.warn({ data }, 'unexpected message received from elevenlabs tts');
continue;
}

const stream = ctx.stream;

// Process alignment data
const alignment =
this.#opts.preferredAlignment === 'normalized'
? (data.normalizedAlignment as Record<string, unknown>)
: (data.alignment as Record<string, unknown>);

if (alignment && stream) {
const chars = alignment.chars as string[] | undefined;
const starts = (alignment.charStartTimesMs || alignment.charsStartTimesMs) as
| number[]
| undefined;
const durs = (alignment.charDurationsMs || alignment.charsDurationsMs) as
| number[]
| undefined;

if (
chars &&
starts &&
durs &&
chars.length === durs.length &&
starts.length === durs.length
) {
ctx.textBuffer += chars.join('');

// Handle chars with multiple characters
for (let i = 0; i < chars.length; i++) {
const char = chars[i]!;
const start = starts[i]!;
const dur = durs[i]!;

// Capture the first word's start time for normalization
// This removes leading silence from timestamps
if (ctx.firstWordOffsetMs === null && start > 0) {
ctx.firstWordOffsetMs = start;
}
const stream = ctx.stream;

// Process alignment data
const alignment =
this.#opts.preferredAlignment === 'normalized'
? (data.normalizedAlignment as Record<string, unknown>)
: (data.alignment as Record<string, unknown>);

if (alignment && stream) {
const chars = alignment.chars as string[] | undefined;
const starts = (alignment.charStartTimesMs || alignment.charsStartTimesMs) as
| number[]
| undefined;
const durs = (alignment.charDurationsMs || alignment.charsDurationsMs) as
| number[]
| undefined;

if (
chars &&
starts &&
durs &&
chars.length === durs.length &&
starts.length === durs.length
) {
ctx.textBuffer += chars.join('');

// Handle chars with multiple characters
for (let i = 0; i < chars.length; i++) {
const char = chars[i]!;
const start = starts[i]!;
const dur = durs[i]!;

// Capture the first word's start time for normalization
// This removes leading silence from timestamps
if (ctx.firstWordOffsetMs === null && start > 0) {
ctx.firstWordOffsetMs = start;
}

if (char.length > 1) {
for (let j = 0; j < char.length - 1; j++) {
ctx.startTimesMs.push(start);
ctx.durationsMs.push(0);
}
if (char.length > 1) {
for (let j = 0; j < char.length - 1; j++) {
ctx.startTimesMs.push(start);
ctx.durationsMs.push(0);
}
ctx.startTimesMs.push(start);
ctx.durationsMs.push(dur);
}
ctx.startTimesMs.push(start);
ctx.durationsMs.push(dur);
}

const [timedWords, remainingText] = toTimedWords(
ctx.textBuffer,
ctx.startTimesMs,
ctx.durationsMs,
false,
ctx.firstWordOffsetMs ?? 0,
);

if (timedWords.length > 0) {
stream.pushTimedTranscript(timedWords);
}
const [timedWords, remainingText] = toTimedWords(
ctx.textBuffer,
ctx.startTimesMs,
ctx.durationsMs,
false,
ctx.firstWordOffsetMs ?? 0,
);

ctx.textBuffer = remainingText;
ctx.startTimesMs = ctx.startTimesMs.slice(-remainingText.length);
ctx.durationsMs = ctx.durationsMs.slice(-remainingText.length);
if (timedWords.length > 0) {
stream.pushTimedTranscript(timedWords);
}
}

if (data.audio) {
const audioData = Buffer.from(data.audio as string, 'base64');
stream.pushAudio(audioData);
ctx.textBuffer = remainingText;
ctx.startTimesMs = ctx.startTimesMs.slice(-remainingText.length);
ctx.durationsMs = ctx.durationsMs.slice(-remainingText.length);
}
}

if (data.isFinal) {
// Flush remaining alignment data
if (ctx.textBuffer) {
const [timedWords] = toTimedWords(
ctx.textBuffer,
ctx.startTimesMs,
ctx.durationsMs,
true,
ctx.firstWordOffsetMs ?? 0,
);
if (timedWords.length > 0) {
stream.pushTimedTranscript(timedWords);
}
}

stream.markDone();
ctx.waiter.resolve();
this.#cleanupContext(contextId!);
if (data.audio) {
const audioData = Buffer.from(data.audio as string, 'base64');
stream.pushAudio(audioData);
}

if (!this.#isCurrent && this.#activeContexts.size === 0) {
this.#logger.debug('no active contexts, shutting down connection');
break;
if (data.isFinal) {
// Flush remaining alignment data
if (ctx.textBuffer) {
const [timedWords] = toTimedWords(
ctx.textBuffer,
ctx.startTimesMs,
ctx.durationsMs,
true,
ctx.firstWordOffsetMs ?? 0,
);
if (timedWords.length > 0) {
stream.pushTimedTranscript(timedWords);
}
}
}

// Throw any error that occurred
if (errorFuture.done) {
await errorFuture.await;
stream.markDone();
ctx.waiter.resolve();
this.#cleanupContext(contextId!);

if (!this.#isCurrent && this.#activeContexts.size === 0) {
this.#logger.debug('no active contexts, shutting down connection');
break;
}
}
} finally {
reader.releaseLock();
cleanup();
}
} catch (e) {
this.#logger.warn({ error: e }, 'recv loop error');
Expand All @@ -612,6 +595,10 @@ class Connection {
}
this.#contextData.clear();
} finally {
reader.releaseLock();
this.#ws?.off('message', onMessage);
this.#ws?.off('close', onClose);
this.#ws?.off('error', onError);
if (!this.#closed) {
await this.close();
}
Expand Down
Loading