diff --git a/integrations/pi/index.ts b/integrations/pi/index.ts index dee703f8..405d738c 100644 --- a/integrations/pi/index.ts +++ b/integrations/pi/index.ts @@ -1,4 +1,4 @@ -import type { ExtensionAPI } from "@mariozechner/pi-coding-agent"; +import type { ExtensionAPI } from "@earendil-works/pi-coding-agent"; import { Type } from "typebox"; import path from "node:path"; import crypto from "node:crypto"; @@ -29,7 +29,9 @@ type HealthResponse = { }; }; -const DEFAULT_URL = process.env.AGENTMEMORY_URL || "http://localhost:3111"; +function getBaseUrl(): string { + return (process.env.AGENTMEMORY_URL || "http://localhost:3111").replace(/\/+$/, ""); +} const guardPlaintextBearerAuth = createPlaintextBearerAuthGuard(); const TOOL_GUIDANCE = [ "agentmemory is available for cross-session memory.", @@ -37,8 +39,23 @@ const TOOL_GUIDANCE = [ "Use memory_save when you discover durable facts worth remembering beyond this session.", ].join(" "); -function normalizeBaseUrl(url: string): string { - return url.replace(/\/+$/, ""); +/** SHA-256 hex hash for dedup. */ +function sha256(data: string): string { + return crypto.createHash("sha256").update(data).digest("hex"); +} + +/** Return true if value looks like a base64-encoded image. */ +function isBase64Image(val: unknown): val is string { + return ( + typeof val === "string" && + (val.startsWith("data:image/") || val.startsWith("iVBORw0KGgo") || val.startsWith("/9j/")) + ); +} + +/** Strip base64 image data from tool output text. */ +function stripImageData(text: string): string { + if (isBase64Image(text)) return "[image data]"; + return text; } function getText(content: unknown): string { @@ -88,9 +105,10 @@ async function callAgentMemory( method?: "GET" | "POST"; body?: unknown; baseUrl?: string; + timeoutMs?: number; }, ): Promise { - const baseUrl = normalizeBaseUrl(options?.baseUrl || process.env.AGENTMEMORY_URL || DEFAULT_URL); + const baseUrl = options?.baseUrl?.replace(/\/+$/, "") || getBaseUrl(); const method = options?.method || "POST"; const url = `${baseUrl}/agentmemory/${pathname.replace(/^\/+/, "")}`; const headers: Record = {}; @@ -104,6 +122,7 @@ async function callAgentMemory( method, headers, body: options?.body !== undefined ? JSON.stringify(options.body) : undefined, + signal: options?.timeoutMs ? AbortSignal.timeout(options.timeoutMs) : undefined, }); if (!response.ok) return null; return (await response.json()) as T; @@ -114,18 +133,46 @@ async function callAgentMemory( export default function agentmemoryExtension(pi: ExtensionAPI) { if (process.env.AGENTMEMORY_REQUIRE_HTTPS === "1") { - guardPlaintextBearerAuth( - normalizeBaseUrl(process.env.AGENTMEMORY_URL || DEFAULT_URL), - process.env.AGENTMEMORY_SECRET, - ); + guardPlaintextBearerAuth(getBaseUrl(), process.env.AGENTMEMORY_SECRET); } let sessionId = `ephemeral-${crypto.randomUUID().slice(0, 8)}`; let currentProject = process.cwd(); let lastPrompt = ""; let lastHealthOk = false; + /** + * Dedup window: skip observations with the same content hash within this + * interval. Mirrors the 5-minute SHA-256 dedup the server applies, but + * avoids unnecessary HTTP round-trips for rapid tool bursts. + */ + const DEDUP_WINDOW_MS = 5 * 60 * 1000; + const recentHashes = new Map(); + + /** Track in-flight fire-and-forget POSTs so session_shutdown can drain them. */ + const pendingPosts = new Set>(); + + function trackPost(promise: Promise): void { + pendingPosts.add(promise); + void promise.finally(() => pendingPosts.delete(promise)); + } + + function isDuplicate(data: string): boolean { + const hash = sha256(data); + const now = Date.now(); + const prev = recentHashes.get(hash); + if (prev && now - prev < DEDUP_WINDOW_MS) return true; + // Prune stale entries to avoid unbounded growth. + if (recentHashes.size > 500) { + for (const [k, ts] of recentHashes) { + if (now - ts >= DEDUP_WINDOW_MS) recentHashes.delete(k); + } + } + recentHashes.set(hash, now); + return false; + } + async function getHealth() { - return await callAgentMemory("health", { method: "GET" }); + return await callAgentMemory("health", { method: "GET", timeoutMs: 2000 }); } async function refreshStatus(ctx: { ui: { setStatus: (key: string, text: string) => void } }) { @@ -134,12 +181,16 @@ export default function agentmemoryExtension(pi: ExtensionAPI) { ctx.ui.setStatus("agentmemory", lastHealthOk ? "🧠 agentmemory" : "🧠 agentmemory off"); } + // --------------------------------------------------------------------------- + // Commands + // --------------------------------------------------------------------------- + pi.registerCommand("agentmemory-status", { description: "Check local agentmemory server health", handler: async (_args, ctx) => { const health = await getHealth(); if (!health) { - ctx.ui.notify("agentmemory is unreachable at http://localhost:3111", "warning"); + ctx.ui.notify(`agentmemory is unreachable at ${getBaseUrl()}`, "warning"); return; } ctx.ui.notify( @@ -149,6 +200,10 @@ export default function agentmemoryExtension(pi: ExtensionAPI) { }, }); + // --------------------------------------------------------------------------- + // Tools + // --------------------------------------------------------------------------- + pi.registerTool({ name: "memory_health", label: "Memory Health", @@ -158,7 +213,7 @@ export default function agentmemoryExtension(pi: ExtensionAPI) { const health = await getHealth(); if (!health) { return { - content: [{ type: "text", text: "agentmemory is unreachable at http://localhost:3111" }], + content: [{ type: "text", text: `agentmemory is unreachable at ${getBaseUrl()}` }], details: { ok: false }, }; } @@ -224,27 +279,76 @@ export default function agentmemoryExtension(pi: ExtensionAPI) { }, }); + // --------------------------------------------------------------------------- + // Lifecycle hooks + // --------------------------------------------------------------------------- + + /** Build the common envelope for POST /observe calls. */ + function observePayload(data: Record) { + return { + hookType: "post_tool_use", + sessionId, + project: currentProject, + cwd: currentProject, + timestamp: new Date().toISOString(), + data, + }; + } + + /** + * session_start — Notify the server that a session is starting so it can + * load project profiles, top concepts, and prior context. + */ pi.on("session_start", async (_event, ctx) => { const sessionFile = ctx.sessionManager.getSessionFile(); - sessionId = sessionFile ? path.basename(sessionFile).replace(/\.[^.]+$/, "") : `ephemeral-${crypto.randomUUID().slice(0, 8)}`; + sessionId = sessionFile + ? path.basename(sessionFile).replace(/\.[^.]+$/, "") + : `ephemeral-${crypto.randomUUID().slice(0, 8)}`; currentProject = process.cwd(); + await refreshStatus(ctx); + + if (lastHealthOk) { + void callAgentMemory("session/start", { + body: { + sessionId, + project: currentProject, + cwd: currentProject, + }, + timeoutMs: 800, + }); + } }); + /** + * before_agent_start — Inject relevant memories into the system prompt so the + * model has cross-session context from the first turn. + */ pi.on("before_agent_start", async (event, ctx) => { currentProject = event.systemPromptOptions.cwd || process.cwd(); lastPrompt = event.prompt?.trim() || ""; if (!lastPrompt) return; + // Observe the prompt so the server knows what was asked. + trackPost(callAgentMemory("observe", { + body: { + hookType: "prompt_submit", + sessionId, + project: currentProject, + cwd: currentProject, + timestamp: new Date().toISOString(), + data: { prompt: lastPrompt }, + }, + timeoutMs: 3000, + })); + const result = await callAgentMemory<{ results?: SmartSearchResult[] }>("smart-search", { body: { query: lastPrompt, limit: 5 }, + timeoutMs: 3000, }); const results = result?.results || []; const recallBlock = results.length - ? [ - "Relevant long-term memory from agentmemory:", - formatSearchResults(results), - ].join("\n") + ? ["Relevant long-term memory from agentmemory:", formatSearchResults(results)].join("\n") : ""; await refreshStatus(ctx); @@ -253,23 +357,97 @@ export default function agentmemoryExtension(pi: ExtensionAPI) { }; }); - pi.on("agent_end", async (event) => { + /** + * tool_result — Capture every tool execution as a granular observation. + * This is the primary data-collection hook: the server receives each tool + * call with its name, input arguments, output, and error status. + */ + pi.on("tool_result", (event) => { + if (!lastHealthOk) return; + + const output = stripImageData(getText(event.content)); + const input = JSON.stringify(event.input ?? {}); + if (isDuplicate(`${event.toolName}:${input}:${output}`)) return; + + trackPost(callAgentMemory("observe", { + body: observePayload({ + tool_name: event.toolName, + tool_input: input.slice(0, 8000), + tool_output: output.slice(0, 8000), + is_error: event.isError ?? false, + }), + timeoutMs: 3000, + })); + }); + + /** + * session_before_compact — Before pi compacts the context window, request a + * context summary from the server. This mirrors the Codex PreCompact hook + * which calls POST /context (not /summarize — that is for session end). + * + * Fire-and-forget is acceptable here: the session continues after + * compaction, so the HTTP call can complete in the background. + */ + pi.on("session_before_compact", () => { + if (!lastHealthOk) return; + + void callAgentMemory("context", { + body: { + sessionId, + project: currentProject, + budget: 1500, + }, + timeoutMs: 5000, + }); + }); + + /** + * agent_end — Capture a high-level summary of the completed agent turn + * (prompt + final assistant response). + */ + pi.on("agent_end", (event) => { if (!lastHealthOk || !lastPrompt) return; const assistantText = getLastAssistantText(event.messages as unknown[]); if (!assistantText) return; - void callAgentMemory("observe", { + + if (isDuplicate(`conversation:${lastPrompt}:${assistantText}`)) return; + + trackPost(callAgentMemory("observe", { + body: observePayload({ + tool_name: "conversation", + tool_input: lastPrompt.slice(0, 8000), + tool_output: assistantText.slice(0, 8000), + }), + timeoutMs: 3000, + })); + }); + + /** + * session_shutdown — Notify the server that the session is ending so it can + * run final summarization, knowledge-graph extraction, and consolidation. + * + * Both calls are awaited sequentially: the process may exit immediately + * after this hook returns, so fire-and-forget would risk losing data. + */ + pi.on("session_shutdown", async () => { + if (!lastHealthOk) return; + + // Drain any in-flight observe POSTs before summarizing. + await Promise.allSettled([...pendingPosts]); + + await callAgentMemory("summarize", { + body: { sessionId }, + timeoutMs: 120_000, + }); + + await callAgentMemory("session/end", { body: { - hookType: "post_tool_use", sessionId, project: currentProject, cwd: currentProject, timestamp: new Date().toISOString(), - data: { - tool_name: "conversation", - input: lastPrompt.slice(0, 500), - output: assistantText.slice(0, 4000), - }, }, + timeoutMs: 5000, }); }); }