diff --git a/apps/memos-local-plugin/agent-contract/jsonrpc.ts b/apps/memos-local-plugin/agent-contract/jsonrpc.ts index 876fbaccc..8db1f564e 100644 --- a/apps/memos-local-plugin/agent-contract/jsonrpc.ts +++ b/apps/memos-local-plugin/agent-contract/jsonrpc.ts @@ -70,6 +70,8 @@ export const RPC_METHODS = { SESSION_CLOSE: "session.close", EPISODE_OPEN: "episode.open", EPISODE_CLOSE: "episode.close", + EPISODE_DELETE: "episode.delete", + EPISODE_DELETE_BULK: "episode.delete_bulk", // ── pipeline (per turn) ── TURN_START: "turn.start", diff --git a/apps/memos-local-plugin/agent-contract/memory-core.ts b/apps/memos-local-plugin/agent-contract/memory-core.ts index fbc4c2fc3..54dbdef94 100644 --- a/apps/memos-local-plugin/agent-contract/memory-core.ts +++ b/apps/memos-local-plugin/agent-contract/memory-core.ts @@ -91,6 +91,10 @@ export interface MemoryCore { userMessage?: string; }): Promise; closeEpisode(episodeId: EpisodeId): Promise; + /** Hard-delete a closed episode by id (idempotent). Rejects if the episode is still open. */ + deleteEpisode(id: string): Promise<{ deleted: boolean }>; + /** Bulk delete closed episodes — returns how many rows were actually removed. */ + deleteEpisodes(ids: readonly string[]): Promise<{ deleted: number }>; // ── pipeline (per turn) ── /** Called *before* the agent acts. Returns the context to inject. */ diff --git a/apps/memos-local-plugin/bridge/methods.ts b/apps/memos-local-plugin/bridge/methods.ts index d81829471..3c3a41457 100644 --- a/apps/memos-local-plugin/bridge/methods.ts +++ b/apps/memos-local-plugin/bridge/methods.ts @@ -134,6 +134,29 @@ export function makeDispatcher( await core.closeEpisode(requireString(p, "episodeId", method) as EpisodeId); return { ok: true }; } + case RPC_METHODS.EPISODE_DELETE: { + const p = asRecord(params, method); + return await core.deleteEpisode(requireString(p, "episodeId", method) as EpisodeId); + } + case RPC_METHODS.EPISODE_DELETE_BULK: { + const p = asRecord(params, method); + const ids = p.ids; + if (!Array.isArray(ids) || ids.length === 0) { + throw new MemosError( + "invalid_argument", + "ids must be a non-empty array", + ); + } + for (const id of ids) { + if (typeof id !== "string" || id.trim().length === 0) { + throw new MemosError( + "invalid_argument", + "each element in ids must be a non-empty string", + ); + } + } + return await core.deleteEpisodes(ids as EpisodeId[]); + } // ── turn lifecycle ── case RPC_METHODS.TURN_START: { diff --git a/apps/memos-local-plugin/core/pipeline/memory-core.ts b/apps/memos-local-plugin/core/pipeline/memory-core.ts index 8f5c1d60b..5dbb6552f 100644 --- a/apps/memos-local-plugin/core/pipeline/memory-core.ts +++ b/apps/memos-local-plugin/core/pipeline/memory-core.ts @@ -332,9 +332,25 @@ export function createMemoryCore( // to `closed` + sets `closeReason: "abandoned"` without touching // trace_ids_json. try { - const orphans = handle.repos.episodes.list({ status: "open", limit: 500 }); + const openEpisodes = handle.repos.episodes.list({ status: "open", limit: 500 }); + // Batch-fetch sessions to avoid N+1 lookups per episode. + const sessionIds = new Set(openEpisodes.map((ep) => ep.sessionId)); + const sessionById = new Map>(); + for (const sid of sessionIds) { + sessionById.set(sid, handle.repos.sessions.getById(sid)); + } + // Only treat an open episode as an orphan if its session has been + // explicitly closed (meta.closedAt is set) or no longer exists. + // Otherwise the session might reconnect — leave it alone. + const orphans = openEpisodes.filter((ep) => { + const session = sessionById.get(ep.sessionId); + if (!session) return true; + if (session.meta?.closedAt != null) return true; + return false; + }); if (orphans.length > 0) { - log.info("init.orphan_episodes.close", { count: orphans.length }); + const skipped = openEpisodes.length - orphans.length; + log.info("init.orphan_episodes.close", { count: orphans.length, skipped }); const endedAt = Date.now(); for (const ep of orphans) { try { @@ -632,6 +648,49 @@ export function createMemoryCore( handle.sessionManager.finalizeEpisode(episodeId); } + function assertEpisodeDeletable(episodeId: EpisodeId): void { + const snap = handle.sessionManager.getEpisode(episodeId); + if (snap?.status === "open") { + throw new MemosError( + "conflict", + `cannot delete open episode: ${episodeId}`, + ); + } + if (!snap && handle.repos.episodes.getById(episodeId)?.status === "open") { + throw new MemosError( + "conflict", + `cannot delete open episode: ${episodeId} (open in DB)`, + ); + } + } + + function deleteClosedEpisode(episodeId: EpisodeId): boolean { + const existing = handle.repos.episodes.getById(episodeId); + assertEpisodeDeletable(episodeId); + const deleted = handle.repos.episodes.deleteById(episodeId); + if (existing && !deleted) { + throw new MemosError( + "internal", + `failed to delete closed episode: ${episodeId}`, + ); + } + return deleted; + } + + async function deleteEpisode(episodeId: EpisodeId): Promise<{ deleted: boolean }> { + ensureLive(); + return { deleted: deleteClosedEpisode(episodeId) }; + } + + async function deleteEpisodes(ids: readonly EpisodeId[]): Promise<{ deleted: number }> { + ensureLive(); + let deleted = 0; + for (const id of ids) { + if (deleteClosedEpisode(id)) deleted++; + } + return { deleted }; + } + // ─── Pipeline (per turn) ── async function onTurnStart( turn: Parameters[0], @@ -1947,6 +2006,8 @@ export function createMemoryCore( closeSession, openEpisode, closeEpisode, + deleteEpisode, + deleteEpisodes, onTurnStart, onTurnEnd, submitFeedback,