Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 100 additions & 23 deletions openkb/agent/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
Step 3: A + summary → concepts plan (create/update/related).
Step 4: Concurrent LLM calls (A cached) → generate new + rewrite updated concepts.
Step 5: Code adds cross-ref links to related concepts, updates index.

Anthropic prompt caching is enabled via ``cache_control`` markers at two
breakpoints: end of the document message (caches system + doc across all
N+M+2 calls) and end of the assistant summary message (caches the additional
summary prefix across N+M concept-generation calls). Providers that do not
support cache_control receive a normalized list-of-blocks content payload,
which LiteLLM passes through cleanly.
"""
from __future__ import annotations

Expand Down Expand Up @@ -131,6 +138,50 @@
# LLM helpers
# ---------------------------------------------------------------------------

def _cached_text(text: str) -> list[dict]:
"""Wrap a text payload into a content-block list with an Anthropic
ephemeral cache_control marker.

LiteLLM passes the marker through to Anthropic (and OpenRouter →
Anthropic). For providers that ignore cache_control, the list-of-blocks
payload remains a valid OpenAI-compatible content shape.
"""
return [{"type": "text", "text": text, "cache_control": {"type": "ephemeral"}}]


def _response_cache_headers(config: dict, model: str) -> dict:
"""Build OpenRouter Response Caching headers from config.

Returns an empty dict when the feature is disabled or the active model
is not routed through OpenRouter (the headers would have no effect on
direct provider calls). When enabled, emits ``X-OpenRouter-Cache: true``
and, if a TTL is configured, ``X-OpenRouter-Cache-TTL: <seconds>``.
"""
if not config.get("response_cache", False):
return {}
if not model.startswith("openrouter/"):
return {}
headers = {"X-OpenRouter-Cache": "true"}
ttl = config.get("response_cache_ttl")
if ttl is not None:
headers["X-OpenRouter-Cache-TTL"] = str(int(ttl))
return headers


def _build_llm_kwargs(config: dict, model: str) -> dict:
"""Compose extra LiteLLM kwargs derived from config (e.g. response cache).

Currently only emits an ``extra_headers`` entry when OpenRouter Response
Caching is enabled. Returns an empty dict when no extras apply, so the
caller can splat with ``**`` and fall back to existing behaviour.
"""
extras: dict = {}
cache_headers = _response_cache_headers(config, model)
if cache_headers:
extras["extra_headers"] = cache_headers
return extras


class _Spinner:
"""Animated dots spinner that runs in a background thread."""

Expand Down Expand Up @@ -168,15 +219,23 @@ def _format_usage(elapsed: float, usage) -> str:


def _fmt_messages(messages: list[dict], max_content: int = 200) -> str:
"""Format messages for debug output, truncating long content."""
"""Format messages for debug output, truncating long content.

Accepts both plain-string content and the list-of-blocks shape used by
cache_control-tagged messages (joins all text blocks for preview).
"""
parts = []
for msg in messages:
role = msg["role"]
content = msg["content"]
if len(content) > max_content:
preview = content[:max_content] + f"... ({len(content)} chars)"
raw = msg["content"]
if isinstance(raw, list):
text = "".join(b.get("text", "") for b in raw if isinstance(b, dict))
else:
text = raw
if len(text) > max_content:
preview = text[:max_content] + f"... ({len(text)} chars)"
else:
preview = content
preview = text
parts.append(f" [{role}] {preview}")
return "\n".join(parts)

Expand All @@ -199,13 +258,15 @@ def _llm_call(model: str, messages: list[dict], step_name: str, **kwargs) -> str
return content.strip()


async def _llm_call_async(model: str, messages: list[dict], step_name: str) -> str:
async def _llm_call_async(model: str, messages: list[dict], step_name: str, **kwargs) -> str:
"""Async LLM call with timing output and debug logging."""
logger.debug("LLM request [%s]:\n%s", step_name, _fmt_messages(messages))
if kwargs:
logger.debug("LLM kwargs [%s]: %s", step_name, kwargs)

t0 = time.time()

response = await litellm.acompletion(model=model, messages=messages)
response = await litellm.acompletion(model=model, messages=messages, **kwargs)
content = response.choices[0].message.content or ""

elapsed = time.time() - t0
Expand Down Expand Up @@ -576,25 +637,34 @@ async def _compile_concepts(
max_concurrency: int,
doc_brief: str = "",
doc_type: str = "short",
extra_kwargs: dict | None = None,
) -> None:
"""Shared Steps 2-4: concepts plan → generate/update → index.

Uses ``_CONCEPTS_PLAN_USER`` to get a plan with create/update/related
actions, then executes each action type accordingly.

``extra_kwargs`` is forwarded to every LiteLLM call (e.g. response-cache
headers). Defaults to no extras.
"""
source_file = f"summaries/{doc_name}.md"
extra_kwargs = extra_kwargs or {}

# --- Step 2: Get concepts plan (A cached) ---
concept_briefs = _read_concept_briefs(wiki_dir)

# Second cache breakpoint: end of the assistant summary message. Covers
# (system + doc + summary) for the plan call and every concept call.
summary_msg = {"role": "assistant", "content": _cached_text(summary)}

plan_raw = _llm_call(model, [
system_msg,
doc_msg,
{"role": "assistant", "content": summary},
summary_msg,
{"role": "user", "content": _CONCEPTS_PLAN_USER.format(
concept_briefs=concept_briefs,
)},
], "concepts-plan", max_tokens=1024)
], "concepts-plan", max_tokens=1024, **extra_kwargs)

try:
parsed = _parse_json(plan_raw)
Expand Down Expand Up @@ -632,12 +702,12 @@ async def _gen_create(concept: dict) -> tuple[str, str, bool, str]:
raw = await _llm_call_async(model, [
system_msg,
doc_msg,
{"role": "assistant", "content": summary},
summary_msg,
{"role": "user", "content": _CONCEPT_PAGE_USER.format(
title=title, doc_name=doc_name,
update_instruction="",
)},
], f"concept: {name}")
], f"concept: {name}", **extra_kwargs)
try:
parsed = _parse_json(raw)
brief = parsed.get("brief", "")
Expand All @@ -663,12 +733,12 @@ async def _gen_update(concept: dict) -> tuple[str, str, bool, str]:
raw = await _llm_call_async(model, [
system_msg,
doc_msg,
{"role": "assistant", "content": summary},
summary_msg,
{"role": "user", "content": _CONCEPT_UPDATE_USER.format(
title=title, doc_name=doc_name,
existing_content=existing_content,
)},
], f"update: {name}")
], f"update: {name}", **extra_kwargs)
try:
parsed = _parse_json(raw)
brief = parsed.get("brief", "")
Expand Down Expand Up @@ -741,16 +811,20 @@ async def compile_short_doc(
schema_md = get_agents_md(wiki_dir)
content = source_path.read_text(encoding="utf-8")

# Base context A: system + document
# Base context A: system + document. cache_control marker on the doc
# message creates a cache breakpoint that covers (system + doc) for
# every downstream call (summary, concepts-plan, every concept page).
system_msg = {"role": "system", "content": _SYSTEM_TEMPLATE.format(
schema_md=schema_md, language=language,
)}
doc_msg = {"role": "user", "content": _SUMMARY_USER.format(
doc_msg = {"role": "user", "content": _cached_text(_SUMMARY_USER.format(
doc_name=doc_name, content=content,
)}
))}

extra_kwargs = _build_llm_kwargs(config, model)

# --- Step 1: Generate summary ---
summary_raw = _llm_call(model, [system_msg, doc_msg], "summary")
summary_raw = _llm_call(model, [system_msg, doc_msg], "summary", **extra_kwargs)
try:
summary_parsed = _parse_json(summary_raw)
doc_brief = summary_parsed.get("brief", "")
Expand All @@ -764,7 +838,7 @@ async def compile_short_doc(
await _compile_concepts(
wiki_dir, kb_dir, model, system_msg, doc_msg,
summary, doc_name, max_concurrency, doc_brief=doc_brief,
doc_type="short",
doc_type="short", extra_kwargs=extra_kwargs,
)


Expand Down Expand Up @@ -792,20 +866,23 @@ async def compile_long_doc(
schema_md = get_agents_md(wiki_dir)
summary_content = summary_path.read_text(encoding="utf-8")

# Base context A
# Base context A. cache_control marker on the doc message creates a
# cache breakpoint covering (system + doc) for every concept call.
system_msg = {"role": "system", "content": _SYSTEM_TEMPLATE.format(
schema_md=schema_md, language=language,
)}
doc_msg = {"role": "user", "content": _LONG_DOC_SUMMARY_USER.format(
doc_msg = {"role": "user", "content": _cached_text(_LONG_DOC_SUMMARY_USER.format(
doc_name=doc_name, doc_id=doc_id, content=summary_content,
)}
))}

extra_kwargs = _build_llm_kwargs(config, model)

# --- Step 1: Generate overview ---
overview = _llm_call(model, [system_msg, doc_msg], "overview")
overview = _llm_call(model, [system_msg, doc_msg], "overview", **extra_kwargs)

# --- Steps 2-4: Concept plan → generate/update → index ---
await _compile_concepts(
wiki_dir, kb_dir, model, system_msg, doc_msg,
overview, doc_name, max_concurrency, doc_brief=doc_description,
doc_type="pageindex",
doc_type="pageindex", extra_kwargs=extra_kwargs,
)
9 changes: 9 additions & 0 deletions openkb/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@
"model": "gpt-5.4-mini",
"language": "en",
"pageindex_threshold": 20,
# Opt-in OpenRouter Response Caching for compiler LLM calls.
# When enabled and the active model is routed via openrouter/, identical
# requests (same model, messages, params) return a cached response with
# zero token billing. Default off because responses are stored on
# OpenRouter — conflicts with strict zero-data-retention postures.
"response_cache": False,
# Optional TTL override in seconds (1..86400). When None, OpenRouter's
# default of 300s applies.
"response_cache_ttl": None,
}

GLOBAL_CONFIG_DIR = Path.home() / ".config" / "openkb"
Expand Down
Loading