Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 203 additions & 25 deletions integrations/pi/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { ExtensionAPI } from "@mariozechner/pi-coding-agent";
import type { ExtensionAPI } from "@earendil-works/pi-coding-agent";
import { Type } from "typebox";
import path from "node:path";
import crypto from "node:crypto";
Expand Down Expand Up @@ -29,16 +29,33 @@ type HealthResponse = {
};
};

const DEFAULT_URL = process.env.AGENTMEMORY_URL || "http://localhost:3111";
function getBaseUrl(): string {
return (process.env.AGENTMEMORY_URL || "http://localhost:3111").replace(/\/+$/, "");
}
const guardPlaintextBearerAuth = createPlaintextBearerAuthGuard();
const TOOL_GUIDANCE = [
"agentmemory is available for cross-session memory.",
"Use memory_search to recall prior decisions, preferences, bugs, and workflows.",
"Use memory_save when you discover durable facts worth remembering beyond this session.",
].join(" ");

function normalizeBaseUrl(url: string): string {
return url.replace(/\/+$/, "");
/** SHA-256 hex hash for dedup. */
function sha256(data: string): string {
return crypto.createHash("sha256").update(data).digest("hex");
}

/** Return true if value looks like a base64-encoded image. */
function isBase64Image(val: unknown): val is string {
return (
typeof val === "string" &&
(val.startsWith("data:image/") || val.startsWith("iVBORw0KGgo") || val.startsWith("/9j/"))
);
}

/** Strip base64 image data from tool output text. */
function stripImageData(text: string): string {
if (isBase64Image(text)) return "[image data]";
return text;
}

function getText(content: unknown): string {
Expand Down Expand Up @@ -88,9 +105,10 @@ async function callAgentMemory<T>(
method?: "GET" | "POST";
body?: unknown;
baseUrl?: string;
timeoutMs?: number;
},
): Promise<T | null> {
const baseUrl = normalizeBaseUrl(options?.baseUrl || process.env.AGENTMEMORY_URL || DEFAULT_URL);
const baseUrl = options?.baseUrl?.replace(/\/+$/, "") || getBaseUrl();
const method = options?.method || "POST";
const url = `${baseUrl}/agentmemory/${pathname.replace(/^\/+/, "")}`;
const headers: Record<string, string> = {};
Expand All @@ -104,6 +122,7 @@ async function callAgentMemory<T>(
method,
headers,
body: options?.body !== undefined ? JSON.stringify(options.body) : undefined,
signal: options?.timeoutMs ? AbortSignal.timeout(options.timeoutMs) : undefined,
});
if (!response.ok) return null;
return (await response.json()) as T;
Expand All @@ -114,18 +133,46 @@ async function callAgentMemory<T>(

export default function agentmemoryExtension(pi: ExtensionAPI) {
if (process.env.AGENTMEMORY_REQUIRE_HTTPS === "1") {
guardPlaintextBearerAuth(
normalizeBaseUrl(process.env.AGENTMEMORY_URL || DEFAULT_URL),
process.env.AGENTMEMORY_SECRET,
);
guardPlaintextBearerAuth(getBaseUrl(), process.env.AGENTMEMORY_SECRET);
}
let sessionId = `ephemeral-${crypto.randomUUID().slice(0, 8)}`;
let currentProject = process.cwd();
let lastPrompt = "";
let lastHealthOk = false;

/**
* Dedup window: skip observations with the same content hash within this
* interval. Mirrors the 5-minute SHA-256 dedup the server applies, but
* avoids unnecessary HTTP round-trips for rapid tool bursts.
*/
const DEDUP_WINDOW_MS = 5 * 60 * 1000;
const recentHashes = new Map<string, number>();

/** Track in-flight fire-and-forget POSTs so session_shutdown can drain them. */
const pendingPosts = new Set<Promise<unknown>>();

function trackPost(promise: Promise<unknown>): void {
pendingPosts.add(promise);
void promise.finally(() => pendingPosts.delete(promise));
}

function isDuplicate(data: string): boolean {
const hash = sha256(data);
const now = Date.now();
const prev = recentHashes.get(hash);
if (prev && now - prev < DEDUP_WINDOW_MS) return true;
// Prune stale entries to avoid unbounded growth.
if (recentHashes.size > 500) {
for (const [k, ts] of recentHashes) {
if (now - ts >= DEDUP_WINDOW_MS) recentHashes.delete(k);
}
}
recentHashes.set(hash, now);
return false;
}

async function getHealth() {
return await callAgentMemory<HealthResponse>("health", { method: "GET" });
return await callAgentMemory<HealthResponse>("health", { method: "GET", timeoutMs: 2000 });
}

async function refreshStatus(ctx: { ui: { setStatus: (key: string, text: string) => void } }) {
Expand All @@ -134,12 +181,16 @@ export default function agentmemoryExtension(pi: ExtensionAPI) {
ctx.ui.setStatus("agentmemory", lastHealthOk ? "🧠 agentmemory" : "🧠 agentmemory off");
}

// ---------------------------------------------------------------------------
// Commands
// ---------------------------------------------------------------------------

pi.registerCommand("agentmemory-status", {
description: "Check local agentmemory server health",
handler: async (_args, ctx) => {
const health = await getHealth();
if (!health) {
ctx.ui.notify("agentmemory is unreachable at http://localhost:3111", "warning");
ctx.ui.notify(`agentmemory is unreachable at ${getBaseUrl()}`, "warning");
return;
}
ctx.ui.notify(
Expand All @@ -149,6 +200,10 @@ export default function agentmemoryExtension(pi: ExtensionAPI) {
},
});

// ---------------------------------------------------------------------------
// Tools
// ---------------------------------------------------------------------------

pi.registerTool({
name: "memory_health",
label: "Memory Health",
Expand All @@ -158,7 +213,7 @@ export default function agentmemoryExtension(pi: ExtensionAPI) {
const health = await getHealth();
if (!health) {
return {
content: [{ type: "text", text: "agentmemory is unreachable at http://localhost:3111" }],
content: [{ type: "text", text: `agentmemory is unreachable at ${getBaseUrl()}` }],
details: { ok: false },
};
}
Expand Down Expand Up @@ -224,27 +279,76 @@ export default function agentmemoryExtension(pi: ExtensionAPI) {
},
});

// ---------------------------------------------------------------------------
// Lifecycle hooks
// ---------------------------------------------------------------------------

/** Build the common envelope for POST /observe calls. */
function observePayload(data: Record<string, unknown>) {
return {
hookType: "post_tool_use",
sessionId,
project: currentProject,
cwd: currentProject,
timestamp: new Date().toISOString(),
data,
};
}

/**
* session_start — Notify the server that a session is starting so it can
* load project profiles, top concepts, and prior context.
*/
pi.on("session_start", async (_event, ctx) => {
const sessionFile = ctx.sessionManager.getSessionFile();
sessionId = sessionFile ? path.basename(sessionFile).replace(/\.[^.]+$/, "") : `ephemeral-${crypto.randomUUID().slice(0, 8)}`;
sessionId = sessionFile
? path.basename(sessionFile).replace(/\.[^.]+$/, "")
: `ephemeral-${crypto.randomUUID().slice(0, 8)}`;
currentProject = process.cwd();

await refreshStatus(ctx);

if (lastHealthOk) {
void callAgentMemory("session/start", {
body: {
sessionId,
project: currentProject,
cwd: currentProject,
},
timeoutMs: 800,
});
Comment on lines +311 to +319
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Await session/start before relying on first-turn recall.

This hook is supposed to initialize session context, but it is fired in the background with an 800 ms timeout. before_agent_start can race ahead and query smart-search before the server has finished loading profiles/context, so the first recall block can miss the very data this PR is adding.

Suggested fix
     if (lastHealthOk) {
-      void callAgentMemory("session/start", {
+      await callAgentMemory("session/start", {
         body: {
           sessionId,
           project: currentProject,
           cwd: currentProject,
         },
-        timeoutMs: 800,
+        timeoutMs: 3000,
       });
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (lastHealthOk) {
void callAgentMemory("session/start", {
body: {
sessionId,
project: currentProject,
cwd: currentProject,
},
timeoutMs: 800,
});
if (lastHealthOk) {
await callAgentMemory("session/start", {
body: {
sessionId,
project: currentProject,
cwd: currentProject,
},
timeoutMs: 3000,
});
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@integrations/pi/index.ts` around lines 311 - 319, The background call to
callAgentMemory("session/start") (with sessionId/currentProject) can race with
before_agent_start/smart-search and cause first-turn recall to miss newly-loaded
profiles; change this to await the call (or otherwise wait for its completion)
instead of firing it void with an 800ms timeout so session/context
initialization finishes before before_agent_start runs — update the
callAgentMemory invocation near the lastHealthOk branch to await the promise
(and remove or extend the timeout) or gate before_agent_start on the
callAgentMemory promise so the server has finished loading profiles/context
before first-turn recall executes.

}
});

/**
* before_agent_start — Inject relevant memories into the system prompt so the
* model has cross-session context from the first turn.
*/
pi.on("before_agent_start", async (event, ctx) => {
currentProject = event.systemPromptOptions.cwd || process.cwd();
lastPrompt = event.prompt?.trim() || "";
if (!lastPrompt) return;

// Observe the prompt so the server knows what was asked.
trackPost(callAgentMemory("observe", {
body: {
hookType: "prompt_submit",
sessionId,
project: currentProject,
cwd: currentProject,
timestamp: new Date().toISOString(),
data: { prompt: lastPrompt },
},
timeoutMs: 3000,
}));

const result = await callAgentMemory<{ results?: SmartSearchResult[] }>("smart-search", {
body: { query: lastPrompt, limit: 5 },
timeoutMs: 3000,
});
const results = result?.results || [];
const recallBlock = results.length
? [
"Relevant long-term memory from agentmemory:",
formatSearchResults(results),
].join("\n")
? ["Relevant long-term memory from agentmemory:", formatSearchResults(results)].join("\n")
: "";

await refreshStatus(ctx);
Comment on lines +332 to 354
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Refresh health before the blocking recall path.

before_agent_start starts observe and then awaits smart-search before it checks health. When agentmemory is down, every turn pays the search timeout first and only marks the backend unhealthy afterward, which turns an outage into repeated prompt-start latency.

Suggested fix
   pi.on("before_agent_start", async (event, ctx) => {
     currentProject = event.systemPromptOptions.cwd || process.cwd();
     lastPrompt = event.prompt?.trim() || "";
     if (!lastPrompt) return;
+
+    await refreshStatus(ctx);
+    if (!lastHealthOk) {
+      return {
+        systemPrompt: [event.systemPrompt, TOOL_GUIDANCE].filter(Boolean).join("\n\n"),
+      };
+    }
 
     // Observe the prompt so the server knows what was asked.
     trackPost(callAgentMemory("observe", {
@@
     const results = result?.results || [];
     const recallBlock = results.length
       ? ["Relevant long-term memory from agentmemory:", formatSearchResults(results)].join("\n")
       : "";
 
-    await refreshStatus(ctx);
     return {
       systemPrompt: [event.systemPrompt, TOOL_GUIDANCE, recallBlock].filter(Boolean).join("\n\n"),
     };
   });
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@integrations/pi/index.ts` around lines 332 - 354, The health check
(refreshStatus(ctx)) must run before blocking agentmemory calls so an outage
doesn't cause repeated turn latency; in before_agent_start call
refreshStatus(ctx) prior to invoking trackPost/callAgentMemory("observe") and
before awaiting callAgentMemory("smart-search"), and if refreshStatus indicates
the backend is unhealthy, skip or short-circuit the smart-search/recall logic
(i.e., avoid awaiting callAgentMemory and set recallBlock = ""), keeping the
rest of the flow (lastPrompt, results/recallBlock usage) unchanged.

Expand All @@ -253,23 +357,97 @@ export default function agentmemoryExtension(pi: ExtensionAPI) {
};
});

pi.on("agent_end", async (event) => {
/**
* tool_result — Capture every tool execution as a granular observation.
* This is the primary data-collection hook: the server receives each tool
* call with its name, input arguments, output, and error status.
*/
pi.on("tool_result", (event) => {
if (!lastHealthOk) return;

const output = stripImageData(getText(event.content));
const input = JSON.stringify(event.input ?? {});
if (isDuplicate(`${event.toolName}:${input}:${output}`)) return;

trackPost(callAgentMemory("observe", {
body: observePayload({
tool_name: event.toolName,
tool_input: input.slice(0, 8000),
tool_output: output.slice(0, 8000),
is_error: event.isError ?? false,
}),
timeoutMs: 3000,
}));
});
Comment thread
paulodearaujo marked this conversation as resolved.

/**
* session_before_compact — Before pi compacts the context window, request a
* context summary from the server. This mirrors the Codex PreCompact hook
* which calls POST /context (not /summarize — that is for session end).
*
* Fire-and-forget is acceptable here: the session continues after
* compaction, so the HTTP call can complete in the background.
*/
pi.on("session_before_compact", () => {
if (!lastHealthOk) return;

void callAgentMemory("context", {
body: {
sessionId,
project: currentProject,
budget: 1500,
},
timeoutMs: 5000,
});
});

/**
* agent_end — Capture a high-level summary of the completed agent turn
* (prompt + final assistant response).
*/
pi.on("agent_end", (event) => {
if (!lastHealthOk || !lastPrompt) return;
const assistantText = getLastAssistantText(event.messages as unknown[]);
if (!assistantText) return;
void callAgentMemory("observe", {

if (isDuplicate(`conversation:${lastPrompt}:${assistantText}`)) return;

trackPost(callAgentMemory("observe", {
body: observePayload({
tool_name: "conversation",
tool_input: lastPrompt.slice(0, 8000),
tool_output: assistantText.slice(0, 8000),
}),
timeoutMs: 3000,
}));
});

/**
* session_shutdown — Notify the server that the session is ending so it can
* run final summarization, knowledge-graph extraction, and consolidation.
*
* Both calls are awaited sequentially: the process may exit immediately
* after this hook returns, so fire-and-forget would risk losing data.
*/
pi.on("session_shutdown", async () => {
if (!lastHealthOk) return;

// Drain any in-flight observe POSTs before summarizing.
await Promise.allSettled([...pendingPosts]);

await callAgentMemory("summarize", {
body: { sessionId },
timeoutMs: 120_000,
});

await callAgentMemory("session/end", {
body: {
hookType: "post_tool_use",
sessionId,
project: currentProject,
cwd: currentProject,
timestamp: new Date().toISOString(),
data: {
tool_name: "conversation",
input: lastPrompt.slice(0, 500),
output: assistantText.slice(0, 4000),
},
},
timeoutMs: 5000,
});
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}