diff --git a/.changeset/elevenlabs-recv-error-var.md b/.changeset/elevenlabs-recv-error-var.md new file mode 100644 index 000000000..a7bce847e --- /dev/null +++ b/.changeset/elevenlabs-recv-error-var.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents-plugin-elevenlabs': patch +--- + +refactor(elevenlabs): abort the TTS recv channel on websocket close/error instead of holding the error in a Future diff --git a/plugins/elevenlabs/src/tts.ts b/plugins/elevenlabs/src/tts.ts index 561a795ee..144604ab9 100644 --- a/plugins/elevenlabs/src/tts.ts +++ b/plugins/elevenlabs/src/tts.ts @@ -8,7 +8,6 @@ import { APIStatusError, APITimeoutError, AudioByteStream, - Future, type TimedString, asError, createTimedString, @@ -429,181 +428,165 @@ class Connection { } async #recvLoop(): Promise { - try { - const messageChannel = stream.createStreamChannel>(); - const errorFuture = new Future(); - - const onMessage = (rawData: Buffer) => { - try { - const parsed = JSON.parse(rawData.toString()); - messageChannel.write(parsed); - } catch (e) { - this.#logger.warn({ error: e }, 'failed to parse WebSocket message'); - } - }; + if (!this.#ws) return; - const onClose = (code: number) => { - if (!this.#closed && this.#contextData.size > 0) { - errorFuture.reject( - new APIStatusError({ - message: 'ElevenLabs websocket connection closed unexpectedly', - options: { statusCode: code }, - }), - ); - } - messageChannel.close(); - }; + const messageChannel = stream.createStreamChannel>(); + + const onMessage = (rawData: Buffer) => { + try { + const parsed = JSON.parse(rawData.toString()); + messageChannel.write(parsed); + } catch (e) { + this.#logger.warn({ error: e }, 'failed to parse WebSocket message'); + } + }; - const onError = (error: Error) => { - errorFuture.reject(error); + const onClose = (code: number) => { + if (!this.#closed && this.#contextData.size > 0) { + messageChannel.abort( + new APIStatusError({ + message: 'ElevenLabs websocket connection closed unexpectedly', + options: { statusCode: code }, + }), + ); + } else { messageChannel.close(); - }; + } + }; - // Set up persistent listeners - if (!this.#ws) return; - this.#ws.on('message', onMessage); - this.#ws.on('close', onClose); - this.#ws.on('error', onError); + const onError = (error: Error) => { + messageChannel.abort(error); + }; - const cleanup = () => { - this.#ws?.off('message', onMessage); - this.#ws?.off('close', onClose); - this.#ws?.off('error', onError); - }; + this.#ws.on('message', onMessage); + this.#ws.on('close', onClose); + this.#ws.on('error', onError); - const reader = messageChannel.stream().getReader(); - try { - while (!this.#closed) { - const result = await reader.read(); - if (result.done || this.#closed) break; - - const data = result.value; - const contextId = data.contextId as string | undefined; - const ctx = contextId ? this.#contextData.get(contextId) : undefined; - - if (data.error) { - this.#logger.error( - { context_id: contextId, error: data.error, data }, - 'elevenlabs tts returned error', - ); - if (contextId) { - if (ctx) { - ctx.waiter.reject(new APIError(data.error as string)); - } - this.#cleanupContext(contextId); + const reader = messageChannel.stream().getReader(); + + try { + while (!this.#closed) { + const result = await reader.read(); + if (result.done || this.#closed) break; + + const data = result.value; + const contextId = data.contextId as string | undefined; + const ctx = contextId ? this.#contextData.get(contextId) : undefined; + + if (data.error) { + this.#logger.error( + { context_id: contextId, error: data.error, data }, + 'elevenlabs tts returned error', + ); + if (contextId) { + if (ctx) { + ctx.waiter.reject(new APIError(data.error as string)); } - continue; + this.#cleanupContext(contextId); } + continue; + } - if (!ctx) { - this.#logger.warn({ data }, 'unexpected message received from elevenlabs tts'); - continue; - } + if (!ctx) { + this.#logger.warn({ data }, 'unexpected message received from elevenlabs tts'); + continue; + } - const stream = ctx.stream; - - // Process alignment data - const alignment = - this.#opts.preferredAlignment === 'normalized' - ? (data.normalizedAlignment as Record) - : (data.alignment as Record); - - if (alignment && stream) { - const chars = alignment.chars as string[] | undefined; - const starts = (alignment.charStartTimesMs || alignment.charsStartTimesMs) as - | number[] - | undefined; - const durs = (alignment.charDurationsMs || alignment.charsDurationsMs) as - | number[] - | undefined; - - if ( - chars && - starts && - durs && - chars.length === durs.length && - starts.length === durs.length - ) { - ctx.textBuffer += chars.join(''); - - // Handle chars with multiple characters - for (let i = 0; i < chars.length; i++) { - const char = chars[i]!; - const start = starts[i]!; - const dur = durs[i]!; - - // Capture the first word's start time for normalization - // This removes leading silence from timestamps - if (ctx.firstWordOffsetMs === null && start > 0) { - ctx.firstWordOffsetMs = start; - } + const stream = ctx.stream; + + // Process alignment data + const alignment = + this.#opts.preferredAlignment === 'normalized' + ? (data.normalizedAlignment as Record) + : (data.alignment as Record); + + if (alignment && stream) { + const chars = alignment.chars as string[] | undefined; + const starts = (alignment.charStartTimesMs || alignment.charsStartTimesMs) as + | number[] + | undefined; + const durs = (alignment.charDurationsMs || alignment.charsDurationsMs) as + | number[] + | undefined; + + if ( + chars && + starts && + durs && + chars.length === durs.length && + starts.length === durs.length + ) { + ctx.textBuffer += chars.join(''); + + // Handle chars with multiple characters + for (let i = 0; i < chars.length; i++) { + const char = chars[i]!; + const start = starts[i]!; + const dur = durs[i]!; + + // Capture the first word's start time for normalization + // This removes leading silence from timestamps + if (ctx.firstWordOffsetMs === null && start > 0) { + ctx.firstWordOffsetMs = start; + } - if (char.length > 1) { - for (let j = 0; j < char.length - 1; j++) { - ctx.startTimesMs.push(start); - ctx.durationsMs.push(0); - } + if (char.length > 1) { + for (let j = 0; j < char.length - 1; j++) { + ctx.startTimesMs.push(start); + ctx.durationsMs.push(0); } - ctx.startTimesMs.push(start); - ctx.durationsMs.push(dur); } + ctx.startTimesMs.push(start); + ctx.durationsMs.push(dur); + } - const [timedWords, remainingText] = toTimedWords( - ctx.textBuffer, - ctx.startTimesMs, - ctx.durationsMs, - false, - ctx.firstWordOffsetMs ?? 0, - ); - - if (timedWords.length > 0) { - stream.pushTimedTranscript(timedWords); - } + const [timedWords, remainingText] = toTimedWords( + ctx.textBuffer, + ctx.startTimesMs, + ctx.durationsMs, + false, + ctx.firstWordOffsetMs ?? 0, + ); - ctx.textBuffer = remainingText; - ctx.startTimesMs = ctx.startTimesMs.slice(-remainingText.length); - ctx.durationsMs = ctx.durationsMs.slice(-remainingText.length); + if (timedWords.length > 0) { + stream.pushTimedTranscript(timedWords); } - } - if (data.audio) { - const audioData = Buffer.from(data.audio as string, 'base64'); - stream.pushAudio(audioData); + ctx.textBuffer = remainingText; + ctx.startTimesMs = ctx.startTimesMs.slice(-remainingText.length); + ctx.durationsMs = ctx.durationsMs.slice(-remainingText.length); } + } - if (data.isFinal) { - // Flush remaining alignment data - if (ctx.textBuffer) { - const [timedWords] = toTimedWords( - ctx.textBuffer, - ctx.startTimesMs, - ctx.durationsMs, - true, - ctx.firstWordOffsetMs ?? 0, - ); - if (timedWords.length > 0) { - stream.pushTimedTranscript(timedWords); - } - } - - stream.markDone(); - ctx.waiter.resolve(); - this.#cleanupContext(contextId!); + if (data.audio) { + const audioData = Buffer.from(data.audio as string, 'base64'); + stream.pushAudio(audioData); + } - if (!this.#isCurrent && this.#activeContexts.size === 0) { - this.#logger.debug('no active contexts, shutting down connection'); - break; + if (data.isFinal) { + // Flush remaining alignment data + if (ctx.textBuffer) { + const [timedWords] = toTimedWords( + ctx.textBuffer, + ctx.startTimesMs, + ctx.durationsMs, + true, + ctx.firstWordOffsetMs ?? 0, + ); + if (timedWords.length > 0) { + stream.pushTimedTranscript(timedWords); } } - } - // Throw any error that occurred - if (errorFuture.done) { - await errorFuture.await; + stream.markDone(); + ctx.waiter.resolve(); + this.#cleanupContext(contextId!); + + if (!this.#isCurrent && this.#activeContexts.size === 0) { + this.#logger.debug('no active contexts, shutting down connection'); + break; + } } - } finally { - reader.releaseLock(); - cleanup(); } } catch (e) { this.#logger.warn({ error: e }, 'recv loop error'); @@ -612,6 +595,10 @@ class Connection { } this.#contextData.clear(); } finally { + reader.releaseLock(); + this.#ws?.off('message', onMessage); + this.#ws?.off('close', onClose); + this.#ws?.off('error', onError); if (!this.#closed) { await this.close(); }