Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/proud-cooks-pump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": patch
---

brianyin/agt-2866-delete-room-on-session-close
90 changes: 90 additions & 0 deletions agents/src/job.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { Room } from '@livekit/rtc-node';
import { describe, expect, it, vi } from 'vitest';
import type { InferenceExecutor } from './ipc/inference_executor.js';
import { JobContext, type JobProcess, type RunningJobInfo } from './job.js';

const { deleteRoomMock, roomServiceClientMock } = vi.hoisted(() => ({
deleteRoomMock: vi.fn(async () => {}),
roomServiceClientMock: vi.fn(function RoomServiceClient() {
return { deleteRoom: deleteRoomMock };
}),
}));

vi.mock('livekit-server-sdk', () => ({
RoomServiceClient: roomServiceClientMock,
}));

function createJobContext(infoOverrides: Partial<RunningJobInfo> = {}) {
const room = {
name: 'connected-room',
on: vi.fn(),
off: vi.fn(),
isConnected: false,
remoteParticipants: new Map(),
};

return new JobContext(
{} as unknown as JobProcess,
{
acceptArguments: {
name: 'agent',
identity: 'agent',
metadata: '',
},
job: {
id: 'job-id',
room: { name: 'assigned-room' },
},
url: 'wss://example.livekit.cloud',
token: 'token',
workerId: 'worker-id',
...infoOverrides,
} as unknown as RunningJobInfo,
room as unknown as Room,
vi.fn(),
vi.fn(),
{} as unknown as InferenceExecutor,
);
}

describe('JobContext.deleteRoom', () => {
it('deletes the connected room by default using the job URL and credentials', async () => {
const ctx = createJobContext({
apiKey: 'api-key',
apiSecret: 'api-secret',
});

await ctx.deleteRoom();

expect(roomServiceClientMock).toHaveBeenCalledWith(
'wss://example.livekit.cloud',
'api-key',
'api-secret',
);
expect(deleteRoomMock).toHaveBeenCalledWith('connected-room');
});

it('falls back to environment credentials when job credentials are absent', async () => {
const ctx = createJobContext();

await ctx.deleteRoom();

expect(roomServiceClientMock).toHaveBeenCalledWith(
'wss://example.livekit.cloud',
undefined,
undefined,
);
expect(deleteRoomMock).toHaveBeenCalledWith('connected-room');
});

it('deletes the provided room name when specified', async () => {
const ctx = createJobContext();

await ctx.deleteRoom('other-room');

expect(deleteRoomMock).toHaveBeenCalledWith('other-room');
});
});
22 changes: 21 additions & 1 deletion agents/src/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type {
} from '@livekit/rtc-node';
import { ParticipantKind, RoomEvent, TrackKind } from '@livekit/rtc-node';
import { ThrowsPromise } from '@livekit/throws-transformer/throws';
import { RoomServiceClient } from 'livekit-server-sdk';
import { AsyncLocalStorage } from 'node:async_hooks';
import * as os from 'node:os';
import * as path from 'node:path';
Expand All @@ -29,7 +30,7 @@ const jobContextStorage = new AsyncLocalStorage<JobContext<unknown>>();
* Returns the current job context.
*
* @param required - If true (default), throws when no context is found. If false, returns undefined.
* @throws {Error} if no job context is found and required is true
* @throws Error if no job context is found and required is true
*/
export function getJobContext<ProcessUserData = Record<string, unknown>>(
required?: true,
Expand Down Expand Up @@ -84,6 +85,8 @@ export type RunningJobInfo = {
url: string;
token: string;
workerId: string;
apiKey?: string;
apiSecret?: string;
};

/** Attempted to add a function callback, but the function already exists. */
Expand Down Expand Up @@ -272,6 +275,23 @@ export class JobContext<ProcessUserData = Record<string, unknown>> {
this.connected = true;
}

/** Deletes the room and disconnects all participants. */
async deleteRoom(roomName?: string): Promise<void> {
const targetRoomName = roomName ?? this.#room.name;
if (!targetRoomName) {
this.#logger.warn('cannot delete room because room name is missing');
return;
}

try {
const client = new RoomServiceClient(this.#info.url, this.#info.apiKey, this.#info.apiSecret);
await client.deleteRoom(targetRoomName);
this.#logger.info({ roomName: targetRoomName }, 'room deleted');
} catch (error) {
this.#logger.warn({ error, roomName: targetRoomName }, 'error while deleting room');
}
}

makeSessionReport(session?: AgentSession): SessionReport {
const targetSession = session || this._primaryAgentSession;

Expand Down
12 changes: 6 additions & 6 deletions agents/src/voice/agent_session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1325,12 +1325,6 @@ export class AgentSession<
this.output.audio = null;
this.output.transcription = null;

await this.sessionHost?.close();
this.sessionHost = undefined;

await this._roomIO?.close();
this._roomIO = undefined;

await this.activity?.close();
this.activity = undefined;

Expand Down Expand Up @@ -1359,6 +1353,12 @@ export class AgentSession<
this.llmErrorCounts = 0;
this.ttsErrorCounts = 0;

await this.sessionHost?.close();
this.sessionHost = undefined;

await this._roomIO?.close();
this._roomIO = undefined;

this.logger.info({ reason, error }, 'AgentSession closed');
}
}
184 changes: 183 additions & 1 deletion agents/src/voice/room_io/room_io.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
// SPDX-FileCopyrightText: 2026 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import { describe, expect, it } from 'vitest';
import { EventEmitter } from 'node:events';
import { afterEach, describe, expect, it, vi } from 'vitest';
import * as jobModule from '../../job.js';
import { IdentityTransform } from '../../stream/identity_transform.js';
import { DEFAULT_API_CONNECT_OPTIONS } from '../../types.js';
import { AgentSessionEventTypes, CloseReason, createCloseEvent } from '../events.js';
import { RoomIO } from './room_io.js';

type RoomIOArgs = ConstructorParameters<typeof RoomIO>[0];

/**
* Regression tests proving WritableStream.close() rejects when the writer is
Expand Down Expand Up @@ -36,3 +43,178 @@ describe('RoomIO WritableStream close guard', () => {
await expect(writer.close()).rejects.toThrow();
});
});

function createFakeRoom() {
const emitter = new EventEmitter();

return {
name: 'test-room',
isConnected: false,
remoteParticipants: new Map(),
localParticipant: { identity: 'agent' },
on: vi.fn((event: string | symbol, listener: (...args: unknown[]) => void) => {
emitter.on(event, listener);
return emitter;
}),
off: vi.fn((event: string | symbol, listener: (...args: unknown[]) => void) => {
emitter.off(event, listener);
return emitter;
}),
registerTextStreamHandler: vi.fn(),
unregisterTextStreamHandler: vi.fn(),
};
}

function createFakeSession() {
const emitter = new EventEmitter();

return {
input: { audio: null },
output: { audio: null, transcription: null },
currentAgent: undefined,
llm: undefined,
on: vi.fn((event: string | symbol, listener: (...args: unknown[]) => void) => {
emitter.on(event, listener);
return emitter;
}),
off: vi.fn((event: string | symbol, listener: (...args: unknown[]) => void) => {
emitter.off(event, listener);
return emitter;
}),
emit: (event: string | symbol, value: unknown) => emitter.emit(event, value),
_closeSoon: vi.fn(),
};
}

describe('RoomIO deleteRoomOnClose', () => {
afterEach(() => {
vi.restoreAllMocks();
vi.useRealTimers();
});

it('does not delete the room by default when the session closes', async () => {
const deleteRoom = vi.fn(async () => {});
vi.spyOn(jobModule, 'getJobContext').mockReturnValue({
deleteRoom,
} as unknown as ReturnType<typeof jobModule.getJobContext>);
const room = createFakeRoom();
const session = createFakeSession();
const roomIO = new RoomIO({
agentSession: session as unknown as RoomIOArgs['agentSession'],
room: room as unknown as RoomIOArgs['room'],
inputOptions: {
audioEnabled: false,
textEnabled: false,
},
outputOptions: {
audioEnabled: false,
transcriptionEnabled: false,
},
});

roomIO.start();
session.emit(AgentSessionEventTypes.Close, createCloseEvent(CloseReason.USER_INITIATED, null));
await roomIO.close();

expect(deleteRoom).not.toHaveBeenCalled();
});

it('deletes the room once when deleteRoomOnClose is enabled and the session closes', async () => {
const deleteRoom = vi.fn(async () => {});
vi.spyOn(jobModule, 'getJobContext').mockReturnValue({
deleteRoom,
} as unknown as ReturnType<typeof jobModule.getJobContext>);
const room = createFakeRoom();
const session = createFakeSession();
const roomIO = new RoomIO({
agentSession: session as unknown as RoomIOArgs['agentSession'],
room: room as unknown as RoomIOArgs['room'],
inputOptions: {
audioEnabled: false,
textEnabled: false,
deleteRoomOnClose: true,
},
outputOptions: {
audioEnabled: false,
transcriptionEnabled: false,
},
});

roomIO.start();
session.emit(AgentSessionEventTypes.Close, createCloseEvent(CloseReason.USER_INITIATED, null));
session.emit(AgentSessionEventTypes.Close, createCloseEvent(CloseReason.USER_INITIATED, null));
await roomIO.close();

expect(deleteRoom).toHaveBeenCalledTimes(1);
expect(deleteRoom).toHaveBeenCalledWith(room.name);
});

it('uses the job context captured at construction when close runs outside job context', async () => {
const deleteRoom = vi.fn(async () => {});
vi.spyOn(jobModule, 'getJobContext')
.mockReturnValueOnce({
deleteRoom,
} as unknown as ReturnType<typeof jobModule.getJobContext>)
.mockReturnValue(undefined);
const room = createFakeRoom();
const session = createFakeSession();
const roomIO = new RoomIO({
agentSession: session as unknown as RoomIOArgs['agentSession'],
room: room as unknown as RoomIOArgs['room'],
inputOptions: {
audioEnabled: false,
textEnabled: false,
deleteRoomOnClose: true,
},
outputOptions: {
audioEnabled: false,
transcriptionEnabled: false,
},
});

roomIO.start();
session.emit(AgentSessionEventTypes.Close, createCloseEvent(CloseReason.USER_INITIATED, null));
await roomIO.close();

expect(deleteRoom).toHaveBeenCalledTimes(1);
expect(deleteRoom).toHaveBeenCalledWith(room.name);
});

it('waits up to the API timeout for an in-flight room deletion during close', async () => {
vi.useFakeTimers();
const deleteRoom = vi.fn(() => new Promise<void>(() => {}));
vi.spyOn(jobModule, 'getJobContext').mockReturnValue({
deleteRoom,
} as unknown as ReturnType<typeof jobModule.getJobContext>);
const room = createFakeRoom();
const session = createFakeSession();
const roomIO = new RoomIO({
agentSession: session as unknown as RoomIOArgs['agentSession'],
room: room as unknown as RoomIOArgs['room'],
inputOptions: {
audioEnabled: false,
textEnabled: false,
deleteRoomOnClose: true,
},
outputOptions: {
audioEnabled: false,
transcriptionEnabled: false,
},
});

roomIO.start();
session.emit(AgentSessionEventTypes.Close, createCloseEvent(CloseReason.USER_INITIATED, null));

let closed = false;
const closePromise = roomIO.close().then(() => {
closed = true;
});

await vi.advanceTimersByTimeAsync(DEFAULT_API_CONNECT_OPTIONS.timeoutMs - 1);
expect(closed).toBe(false);

await vi.advanceTimersByTimeAsync(1);
await closePromise;
expect(closed).toBe(true);
});
});
Loading
Loading