Skip to content

[Feature Request] DakeraMemoryService — self-hosted, non-GCP persistent agent memory #6254

Description

@ferhimedamine

Problem

InMemoryMemoryService is explicitly labeled as non-production (its docstring warns it loses data on restart). Both VertexAiMemoryBankService and VertexAiRagMemoryService require Google Cloud Platform, making ADK unusable for:

  • Air-gapped deployments
  • EU data-residency requirements (GDPR)
  • On-premise enterprise environments
  • Developers without GCP access during prototyping

Proposed: DakeraMemoryService

Dakera is a self-hosted, open-source-deployable AI agent memory server. It exposes a REST API backed by RocksDB + HNSW vector index and returns decay-weighted results (recent + frequently-accessed memories score higher).

The proposed implementation fits directly into src/google/adk/memory/dakera_memory_service.py:

from typing import AsyncIterator
import httpx
from google.adk.memory.base_memory_service import BaseMemoryService
from google.adk.sessions.session import Session
from google.adk.memory.base_memory_service import SearchMemoryResponse, MemoryResult

class DakeraMemoryService(BaseMemoryService):
    """Self-hosted memory service backed by Dakera.

    Dakera is a persistent, decay-weighted vector memory server for AI agents.
    Run locally: docker run -p 3000:3000 dakera/dakera:latest

    Args:
        base_url: Dakera server URL. Defaults to http://localhost:3000.
        api_key: Optional API key. Defaults to DAKERA_API_KEY env var.
        top_k: Number of memories to return per query. Defaults to 5.
    """

    def __init__(
        self,
        base_url: str = "http://localhost:3000",
        api_key: str | None = None,
        top_k: int = 5,
    ) -> None:
        import os
        self._base_url = base_url.rstrip("/")
        self._api_key = api_key or os.getenv("DAKERA_API_KEY", "")
        self._top_k = top_k

    def _headers(self) -> dict[str, str]:
        headers: dict[str, str] = {"Content-Type": "application/json"}
        if self._api_key:
            headers["Authorization"] = f"Bearer {self._api_key}"
        return headers

    async def add_session_to_memory(self, session: Session) -> None:
        """Persist all events from a completed session to Dakera."""
        async with httpx.AsyncClient() as client:
            for event in session.events:
                if not event.content:
                    continue
                for part in event.content.parts or []:
                    if not part.text:
                        continue
                    await client.post(
                        f"{self._base_url}/v1/memories",
                        headers=self._headers(),
                        json={
                            "content": part.text,
                            "agent_id": session.app_name,
                            "session_id": session.id,
                            "metadata": {"user_id": session.user_id, "role": event.author},
                        },
                        timeout=10.0,
                    )

    async def search_memory(
        self,
        *,
        app_name: str,
        user_id: str,
        query: str,
    ) -> SearchMemoryResponse:
        """Search Dakera for memories relevant to the query."""
        async with httpx.AsyncClient() as client:
            resp = await client.post(
                f"{self._base_url}/v1/memories/search",
                headers=self._headers(),
                json={
                    "query": query,
                    "agent_id": app_name,
                    "top_k": self._top_k,
                    "filter": {"metadata.user_id": user_id},
                },
                timeout=10.0,
            )
            resp.raise_for_status()
        results = resp.json().get("results", [])
        return SearchMemoryResponse(
            memories=[
                MemoryResult(
                    content=r["content"],
                    score=r.get("score", 0.0),
                )
                for r in results
            ]
        )

Usage

from google.adk.agents import Agent
from google.adk.runners import Runner
from dakera_memory_service import DakeraMemoryService  # from this PR

root_agent = Agent(
    name="support_agent",
    model="gemini-2.0-flash",
    instruction="You are a helpful support agent with persistent memory.",
)

runner = Runner(
    agent=root_agent,
    app_name="support",
    memory_service=DakeraMemoryService(
        base_url="http://localhost:3000",
        api_key="dk_your_key",
    ),
)

What's needed in this PR

  1. src/google/adk/memory/dakera_memory_service.py — the implementation above
  2. tests/unittests/memory/test_dakera_memory_service.py — using pytest-httpx for mocked HTTP
  3. docs/memory/dakera.md (separate PR to google/adk-docs)
  4. Export from src/google/adk/memory/__init__.py

Implementation notes

  • Only dependency: httpx (already a transitive dep in many ADK setups; add to pyproject.toml optional extras)
  • Both abstract methods from BaseMemoryService are implemented: add_session_to_memory and search_memory
  • No GCP credentials required — Dakera is self-hosted
  • Graceful: httpx.TimeoutException and non-2xx responses are surfaced as typed exceptions

Happy to submit a PR. Wanted to open the issue first per the contribution guidelines.

Metadata

Metadata

Assignees

Labels

services[Component] This issue is related to runtime services, e.g. sessions, memory, artifacts, etc
No fields configured for Feature.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions