diff --git a/.github/workflows/harness-integration.yml b/.github/workflows/harness-integration.yml new file mode 100644 index 000000000..ab6b353b9 --- /dev/null +++ b/.github/workflows/harness-integration.yml @@ -0,0 +1,40 @@ +name: Harness Integration + +on: + push: + branches: [main] + pull_request: + paths: + - "src/agentex/lib/core/harness/**" + - "src/agentex/lib/adk/_modules/**" + - ".github/workflows/harness-integration.yml" + +jobs: + conformance: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + + - name: Install uv + uses: astral-sh/setup-uv@d4b2f3b6ecc6e67c4457f6d3e41ec42d3d0fcb86 # v5.4.2 + with: + version: '0.10.2' + + - name: Bootstrap + run: ./scripts/bootstrap + + # Defer to scripts/test so the harness suite runs under the exact same + # invocation as the main CI test job: DEFER_PYDANTIC_BUILD=false and + # `uv run --isolated --all-packages --all-extras pytest`, across the + # min/max supported Python versions. Running `uv run pytest` directly + # would risk an all-extras-only dep passing locally but failing in CI. + - name: Conformance suite + run: ./scripts/test tests/lib/core/harness/ -v + + # Live integration matrix (harness x {sync, async, temporal}) is added per-harness + # in the migration plans. Placeholder job keeps the workflow valid until then. + live-matrix: + runs-on: ubuntu-latest + if: false # enabled once the first harness's test agents land + steps: + - run: echo "populated by migration PRs" # TODO(harness-migration): enable per-harness; see docs/superpowers/plans migration PRs 4-8 diff --git a/docs/superpowers/plans/2026-06-18-unified-harness-surface-foundation.md b/docs/superpowers/plans/2026-06-18-unified-harness-surface-foundation.md new file mode 100644 index 000000000..0aefef060 --- /dev/null +++ b/docs/superpowers/plans/2026-06-18-unified-harness-surface-foundation.md @@ -0,0 +1,1309 @@ +# Unified Harness Surface — Foundation Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Build the shared, harness-independent machinery (span derivation, auto-send delivery, yield delivery, unified emitter, turn-usage types) that the per-harness taps will plug into — corresponding to PRs 1–3 of the design's rollout. + +**Architecture:** The Agentex `StreamTaskMessage*` stream is the single source of truth (design Approach A). A pure `SpanDeriver` reduces that stream into open/close span signals. Two delivery adapters consume the same stream — `yield_events` (sync HTTP ACP) and `auto_send` (async/temporal, via `adk.streaming`) — and both observe the deriver to drive `adk.tracing`. A `UnifiedEmitter` ties delivery + tracing + `TurnUsage` together. + +**Tech Stack:** Python 3, pydantic v2 (`BaseModel`), pytest + pytest-asyncio, the existing `agentex.lib.adk` streaming/tracing facades. + +**Spec:** `docs/superpowers/specs/2026-06-18-unified-harness-surface-design.md` + +**Scope note:** This plan covers only the foundation (PRs 1–3). The per-harness migration PRs (4–6: pydantic-ai, langgraph, openai) and parser PRs (7–8: claude-code, codex) each require close reading of that harness's existing converter and get their own plans once this foundation lands. PR 9 (cleanup) follows them. See "Subsequent plans" at the end. + +--- + +## File Structure + +- Create `src/agentex/lib/core/harness/__init__.py` — package marker + public re-exports. +- Create `src/agentex/lib/core/harness/types.py` — `OpenSpan`, `CloseSpan`, `SpanSignal`, `TurnUsage`, `TurnResult`, `HarnessTurn` protocol. +- Create `src/agentex/lib/core/harness/span_derivation.py` — `SpanDeriver` (pure reducer). +- Create `src/agentex/lib/core/harness/auto_send.py` — `auto_send()` (canonical stream → `adk.streaming` + tracing). +- Create `src/agentex/lib/core/harness/yield_delivery.py` — `yield_events()` (passthrough + tracing). +- Create `src/agentex/lib/core/harness/emitter.py` — `UnifiedEmitter` facade. +- Create tests under `tests/lib/core/harness/`. + +Each file has one responsibility; `span_derivation.py` has zero dependencies on `adk` so it is unit-testable in isolation. + +--- + +## Task 1: Foundation types + +**Files:** +- Create: `src/agentex/lib/core/harness/__init__.py` +- Create: `src/agentex/lib/core/harness/types.py` +- Test: `tests/lib/core/harness/test_types.py` + +- [ ] **Step 1: Create the package marker** + +Create `src/agentex/lib/core/harness/__init__.py`: + +```python +"""Shared, harness-independent machinery for the unified harness surface. + +The Agentex StreamTaskMessage* stream is the single source of truth; this +package derives spans from it and delivers it (yield or auto-send), so every +harness tap gets streaming + tracing + turn usage uniformly. +""" +``` + +- [ ] **Step 2: Write the failing test for the types** + +Create `tests/lib/core/harness/__init__.py` (empty) and `tests/lib/core/harness/test_types.py`: + +```python +from agentex.lib.core.harness.types import ( + OpenSpan, + CloseSpan, + TurnUsage, + TurnResult, +) + + +def test_open_close_span_construct(): + o = OpenSpan(key="call_1", kind="tool", name="Bash", input={"cmd": "ls"}) + c = CloseSpan(key="call_1", output="files", is_complete=True) + assert o.key == c.key == "call_1" + assert o.kind == "tool" + assert c.is_complete is True + + +def test_turn_usage_defaults_are_none(): + u = TurnUsage(model="claude-opus-4-6") + assert u.model == "claude-opus-4-6" + assert u.input_tokens is None + assert u.num_tool_calls == 0 + + +def test_turn_result_wraps_usage(): + r = TurnResult(final_text="hi", usage=TurnUsage(model="m")) + assert r.final_text == "hi" + assert r.usage.model == "m" +``` + +- [ ] **Step 3: Run test to verify it fails** + +Run: `pytest tests/lib/core/harness/test_types.py -v` +Expected: FAIL with `ModuleNotFoundError: agentex.lib.core.harness.types` + +- [ ] **Step 4: Implement the types** + +Create `src/agentex/lib/core/harness/types.py`: + +```python +"""Types for the unified harness surface.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, AsyncIterator, Literal, Protocol, Union, runtime_checkable + +from agentex.types.task_message_update import ( + StreamTaskMessageDelta, + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageStart, +) +from agentex.lib.utils.model_utils import BaseModel + +# The canonical stream element. Taps yield these; delivery adapters consume them. +StreamTaskMessage = Union[ + StreamTaskMessageStart, + StreamTaskMessageDelta, + StreamTaskMessageFull, + StreamTaskMessageDone, +] + +SpanKind = Literal["tool", "reasoning", "subagent"] + + +@dataclass +class OpenSpan: + """Signal to open a child span. `key` pairs an open with its close.""" + + key: str + kind: SpanKind + name: str + input: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class CloseSpan: + """Signal to close the span previously opened with the same `key`.""" + + key: str + output: Any = None + is_complete: bool = True # False when closed by flush() without a result + + +SpanSignal = Union[OpenSpan, CloseSpan] + + +class TurnUsage(BaseModel): + """Harness-independent turn usage/cost, attached to the turn span. + + Token field names align with agentex.lib.core.observability.llm_metrics. + """ + + model: str | None = None + input_tokens: int | None = None + output_tokens: int | None = None + cached_input_tokens: int | None = None + reasoning_tokens: int | None = None + total_tokens: int | None = None + cost_usd: float | None = None + duration_ms: int | None = None + num_llm_calls: int = 0 + num_tool_calls: int = 0 + num_reasoning_blocks: int = 0 + + +class TurnResult(BaseModel): + """Returned to the caller after a turn is delivered.""" + + final_text: str = "" + usage: TurnUsage = TurnUsage() + + +@runtime_checkable +class HarnessTurn(Protocol): + """A single harness turn: a canonical stream plus its normalized usage. + + Python async generators cannot cleanly return a value to their consumer, so + a tap exposes usage via `usage()` (valid only after `events` is exhausted) + rather than via StopAsyncIteration. + """ + + @property + def events(self) -> AsyncIterator[StreamTaskMessage]: ... + + def usage(self) -> TurnUsage: ... +``` + +- [ ] **Step 5: Run test to verify it passes** + +Run: `pytest tests/lib/core/harness/test_types.py -v` +Expected: PASS (3 passed) + +- [ ] **Step 6: Commit** + +```bash +git add src/agentex/lib/core/harness/__init__.py src/agentex/lib/core/harness/types.py tests/lib/core/harness/__init__.py tests/lib/core/harness/test_types.py +git commit -m "feat(harness): foundation types for unified harness surface" +``` + +--- + +## Task 2: SpanDeriver (pure span derivation) — PR 1 + +**Files:** +- Create: `src/agentex/lib/core/harness/span_derivation.py` +- Test: `tests/lib/core/harness/test_span_derivation.py` + +Derivation rules (from the spec): tool span opens on the `Done` of an index whose `Start` +was a `ToolRequestContent`, and closes on the matching `ToolResponseContent` by +`tool_call_id`; reasoning span opens on `Start(ReasoningContent)` and closes on that index's +`Done`. Parallel tools are keyed by `tool_call_id`. `flush()` closes anything still open. + +- [ ] **Step 1: Write failing tests (text, single tool, reasoning, parallel, streamed args, unclosed)** + +Create `tests/lib/core/harness/test_span_derivation.py`: + +```python +from agentex.lib.core.harness.span_derivation import SpanDeriver +from agentex.lib.core.harness.types import OpenSpan, CloseSpan +from agentex.types.task_message_update import ( + StreamTaskMessageStart, + StreamTaskMessageDelta, + StreamTaskMessageFull, + StreamTaskMessageDone, +) +from agentex.types.text_content import TextContent +from agentex.types.reasoning_content import ReasoningContent +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.types.tool_request_delta import ToolRequestDelta + + +def _signals(deriver, events): + out = [] + for e in events: + out.extend(deriver.observe(e)) + out.extend(deriver.flush()) + return out + + +def _tool_req(idx, tcid, name, args): + return StreamTaskMessageStart( + type="start", index=idx, + content=ToolRequestContent(type="tool_request", author="agent", + tool_call_id=tcid, name=name, arguments=args), + ) + + +def test_text_only_yields_no_spans(): + d = SpanDeriver() + events = [ + StreamTaskMessageStart(type="start", index=0, + content=TextContent(type="text", author="agent", content="")), + StreamTaskMessageDelta(type="delta", index=0, + delta=None), + StreamTaskMessageDone(type="done", index=0), + ] + assert _signals(d, events) == [] + + +def test_single_tool_opens_on_done_closes_on_response(): + d = SpanDeriver() + events = [ + _tool_req(0, "call_1", "Bash", {"cmd": "ls"}), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull(type="full", index=1, + content=ToolResponseContent(type="tool_response", author="agent", + tool_call_id="call_1", name="Bash", content="files")), + ] + sigs = _signals(d, events) + assert sigs == [ + OpenSpan(key="call_1", kind="tool", name="Bash", input={"cmd": "ls"}), + CloseSpan(key="call_1", output="files", is_complete=True), + ] + + +def test_reasoning_opens_on_start_closes_on_done(): + d = SpanDeriver() + events = [ + StreamTaskMessageStart(type="start", index=0, + content=ReasoningContent(type="reasoning", author="agent", summary=[], content=[])), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[0] == OpenSpan(key="reasoning:0", kind="reasoning", name="reasoning", input={}) + assert sigs[1] == CloseSpan(key="reasoning:0", output=None, is_complete=True) + + +def test_parallel_tools_pair_by_tool_call_id(): + d = SpanDeriver() + events = [ + _tool_req(0, "a", "T1", {}), + _tool_req(1, "b", "T2", {}), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageDone(type="done", index=1), + StreamTaskMessageFull(type="full", index=2, + content=ToolResponseContent(type="tool_response", author="agent", + tool_call_id="b", name="T2", content="rb")), + StreamTaskMessageFull(type="full", index=3, + content=ToolResponseContent(type="tool_response", author="agent", + tool_call_id="a", name="T1", content="ra")), + ] + sigs = _signals(d, events) + opens = [s for s in sigs if isinstance(s, OpenSpan)] + closes = [s for s in sigs if isinstance(s, CloseSpan)] + assert {o.key for o in opens} == {"a", "b"} + assert [c.key for c in closes] == ["b", "a"] + assert all(c.is_complete for c in closes) + + +def test_streamed_args_accumulate_into_open_input(): + d = SpanDeriver() + events = [ + StreamTaskMessageStart(type="start", index=0, + content=ToolRequestContent(type="tool_request", author="agent", + tool_call_id="c", name="Bash", arguments={})), + StreamTaskMessageDelta(type="delta", index=0, + delta=ToolRequestDelta(type="tool_request", tool_call_id="c", name="Bash", + arguments_delta='{"cmd":')), + StreamTaskMessageDelta(type="delta", index=0, + delta=ToolRequestDelta(type="tool_request", tool_call_id="c", name="Bash", + arguments_delta='"ls"}')), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[0] == OpenSpan(key="c", kind="tool", name="Bash", input={"cmd": "ls"}) + + +def test_unclosed_tool_closed_incomplete_on_flush(): + d = SpanDeriver() + events = [ + _tool_req(0, "x", "Bash", {}), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[0] == OpenSpan(key="x", kind="tool", name="Bash", input={}) + assert sigs[1] == CloseSpan(key="x", output=None, is_complete=False) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/lib/core/harness/test_span_derivation.py -v` +Expected: FAIL with `ModuleNotFoundError: agentex.lib.core.harness.span_derivation` + +- [ ] **Step 3: Implement `SpanDeriver`** + +Create `src/agentex/lib/core/harness/span_derivation.py`: + +```python +"""Pure reducer: canonical StreamTaskMessage* stream -> span open/close signals. + +Has no dependency on adk; unit-testable in isolation. Delivery adapters feed it +every event and act on the returned signals. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from typing import Any + +from agentex.types.task_message_update import ( + StreamTaskMessageDelta, + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageStart, +) + +from agentex.lib.core.harness.types import CloseSpan, OpenSpan, SpanSignal, StreamTaskMessage + + +@dataclass +class _ToolReqMeta: + tool_call_id: str + name: str + arguments: dict[str, Any] + args_buf: str = "" # accumulated streamed argument fragments + + +class SpanDeriver: + """Stateful reducer over the canonical stream. + + Tool span: open on Done of a ToolRequestContent index; close on matching + ToolResponseContent by tool_call_id. Reasoning span: open on + Start(ReasoningContent); close on that index's Done. + """ + + def __init__(self) -> None: + # index -> tool request metadata (present only for tool_request indices) + self._tool_by_index: dict[int, _ToolReqMeta] = {} + # index -> reasoning open (present only for reasoning indices) + self._reasoning_index_open: set[int] = set() + # tool_call_ids with a currently-open span + self._open_tool_ids: set[str] = set() + + def observe(self, event: StreamTaskMessage) -> list[SpanSignal]: + if isinstance(event, StreamTaskMessageStart): + return self._on_start(event) + if isinstance(event, StreamTaskMessageDelta): + return self._on_delta(event) + if isinstance(event, StreamTaskMessageFull): + return self._on_full(event) + if isinstance(event, StreamTaskMessageDone): + return self._on_done(event) + return [] + + def flush(self) -> list[SpanSignal]: + """Close anything still open at end of stream, marked incomplete.""" + signals: list[SpanSignal] = [] + for tcid in list(self._open_tool_ids): + signals.append(CloseSpan(key=tcid, output=None, is_complete=False)) + self._open_tool_ids.clear() + for idx in sorted(self._reasoning_index_open): + signals.append(CloseSpan(key=f"reasoning:{idx}", output=None, is_complete=False)) + self._reasoning_index_open.clear() + return signals + + def _on_start(self, event: StreamTaskMessageStart) -> list[SpanSignal]: + content = event.content + idx = event.index if event.index is not None else -1 + ctype = getattr(content, "type", None) + if ctype == "tool_request": + self._tool_by_index[idx] = _ToolReqMeta( + tool_call_id=content.tool_call_id, + name=content.name, + arguments=dict(content.arguments or {}), + ) + return [] + if ctype == "reasoning": + self._reasoning_index_open.add(idx) + return [OpenSpan(key=f"reasoning:{idx}", kind="reasoning", name="reasoning", input={})] + return [] + + def _on_delta(self, event: StreamTaskMessageDelta) -> list[SpanSignal]: + idx = event.index if event.index is not None else -1 + delta = event.delta + if delta is not None and getattr(delta, "type", None) == "tool_request": + meta = self._tool_by_index.get(idx) + if meta is not None and delta.arguments_delta: + meta.args_buf += delta.arguments_delta + return [] + + def _on_full(self, event: StreamTaskMessageFull) -> list[SpanSignal]: + content = event.content + if getattr(content, "type", None) == "tool_response": + tcid = content.tool_call_id + if tcid in self._open_tool_ids: + self._open_tool_ids.discard(tcid) + return [CloseSpan(key=tcid, output=content.content, is_complete=True)] + return [] + + def _on_done(self, event: StreamTaskMessageDone) -> list[SpanSignal]: + idx = event.index if event.index is not None else -1 + meta = self._tool_by_index.pop(idx, None) + if meta is not None: + args = meta.arguments + if meta.args_buf: + try: + args = json.loads(meta.args_buf) + except json.JSONDecodeError: + args = {"_raw": meta.args_buf} + self._open_tool_ids.add(meta.tool_call_id) + return [OpenSpan(key=meta.tool_call_id, kind="tool", name=meta.name, input=args)] + if idx in self._reasoning_index_open: + self._reasoning_index_open.discard(idx) + return [CloseSpan(key=f"reasoning:{idx}", output=None, is_complete=True)] + return [] +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest tests/lib/core/harness/test_span_derivation.py -v` +Expected: PASS (6 passed) + +- [ ] **Step 5: Commit** + +```bash +git add src/agentex/lib/core/harness/span_derivation.py tests/lib/core/harness/test_span_derivation.py +git commit -m "feat(harness): pure SpanDeriver reducing the canonical stream to span signals" +``` + +--- + +## Task 3: Tracer adapter (span signals -> adk.tracing) + +**Files:** +- Create: `src/agentex/lib/core/harness/tracer.py` +- Test: `tests/lib/core/harness/test_tracer.py` + +A thin adapter that turns `SpanSignal`s into `adk.tracing` spans, nesting them under a parent +span. Kept separate from `SpanDeriver` so derivation stays pure and tracing stays overridable. +Tracing failures are best-effort and never raise (spec error-handling contract). + +- [ ] **Step 1: Write the failing test (uses a fake adk.tracing)** + +Create `tests/lib/core/harness/test_tracer.py`: + +```python +import pytest + +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.types import OpenSpan, CloseSpan + + +class _FakeSpan: + def __init__(self, name): + self.name = name + + +class _FakeTracing: + def __init__(self): + self.started = [] + self.ended = [] + + async def start_span(self, *, trace_id, name, input=None, parent_id=None, data=None, task_id=None): + self.started.append((name, parent_id, input)) + return _FakeSpan(name) + + async def end_span(self, *, trace_id, span, output=None, data=None): + self.ended.append((span.name, output)) + + +@pytest.mark.asyncio +async def test_open_then_close_starts_and_ends_span(): + fake = _FakeTracing() + tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) + await tracer.handle(OpenSpan(key="call_1", kind="tool", name="Bash", input={"cmd": "ls"})) + await tracer.handle(CloseSpan(key="call_1", output="files", is_complete=True)) + assert fake.started == [("Bash", "p1", {"cmd": "ls"})] + assert fake.ended == [("Bash", "files")] + + +@pytest.mark.asyncio +async def test_no_trace_id_is_noop(): + fake = _FakeTracing() + tracer = SpanTracer(trace_id="", parent_span_id=None, tracing=fake) + await tracer.handle(OpenSpan(key="k", kind="tool", name="X")) + await tracer.handle(CloseSpan(key="k")) + assert fake.started == [] and fake.ended == [] + + +@pytest.mark.asyncio +async def test_tracing_failure_is_swallowed(): + class _Boom(_FakeTracing): + async def start_span(self, **kw): + raise RuntimeError("backend down") + + tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=_Boom()) + # Must not raise. + await tracer.handle(OpenSpan(key="k", kind="tool", name="X")) + await tracer.handle(CloseSpan(key="k")) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `pytest tests/lib/core/harness/test_tracer.py -v` +Expected: FAIL with `ModuleNotFoundError: agentex.lib.core.harness.tracer` + +- [ ] **Step 3: Implement `SpanTracer`** + +Create `src/agentex/lib/core/harness/tracer.py`: + +```python +"""Adapter from SpanSignals to adk.tracing spans (best-effort, overridable).""" + +from __future__ import annotations + +from typing import Any + +from agentex.lib.utils.logging import make_logger +from agentex.lib.core.harness.types import CloseSpan, OpenSpan, SpanSignal + +logger = make_logger(__name__) + + +class SpanTracer: + """Opens/closes adk.tracing child spans in response to span signals. + + `tracing` defaults to the real `adk.tracing` module; inject a fake in tests + or a custom tracer to override. No-op when `trace_id` is falsy. Never raises. + """ + + def __init__(self, trace_id: str | None, parent_span_id: str | None, tracing: Any = None, task_id: str | None = None): + self.trace_id = trace_id + self.parent_span_id = parent_span_id + self.task_id = task_id + if tracing is None: + from agentex.lib import adk + + tracing = adk.tracing + self._tracing = tracing + self._open: dict[str, Any] = {} # span key -> span object + + async def handle(self, signal: SpanSignal) -> None: + if not self.trace_id: + return + try: + if isinstance(signal, OpenSpan): + span = await self._tracing.start_span( + trace_id=self.trace_id, + name=signal.name, + input=signal.input, + parent_id=self.parent_span_id, + task_id=self.task_id, + ) + if span is not None: + self._open[signal.key] = span + elif isinstance(signal, CloseSpan): + span = self._open.pop(signal.key, None) + if span is not None: + await self._tracing.end_span( + trace_id=self.trace_id, + span=span, + output=signal.output, + ) + except Exception as exc: # best-effort: tracing never breaks delivery + logger.warning("[harness.tracer] span signal failed: %s", exc) +``` + +Note for the implementer: confirm `adk.tracing.end_span` accepts `output=` (seen in +`src/agentex/lib/adk/_modules/tracing.py`). If the kwarg differs, adjust the call and the +fake in the test together. + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `pytest tests/lib/core/harness/test_tracer.py -v` +Expected: PASS (3 passed) + +- [ ] **Step 5: Commit** + +```bash +git add src/agentex/lib/core/harness/tracer.py tests/lib/core/harness/test_tracer.py +git commit -m "feat(harness): SpanTracer adapter from span signals to adk.tracing" +``` + +--- + +## Task 4: `yield_events` delivery adapter — PR 3 (part 1) + +**Files:** +- Create: `src/agentex/lib/core/harness/yield_delivery.py` +- Test: `tests/lib/core/harness/test_yield_delivery.py` + +`yield_events` passes the canonical stream through unchanged (for sync HTTP ACP agents) while +feeding the `SpanDeriver` + `SpanTracer` as a side effect. Streaming fidelity is untouched. + +- [ ] **Step 1: Write the failing test** + +Create `tests/lib/core/harness/test_yield_delivery.py`: + +```python +import pytest + +from agentex.lib.core.harness.yield_delivery import yield_events +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.types.task_message_update import ( + StreamTaskMessageStart, + StreamTaskMessageDone, + StreamTaskMessageFull, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent + + +class _RecordTracing: + def __init__(self): + self.started, self.ended = [], [] + + async def start_span(self, *, trace_id, name, input=None, parent_id=None, data=None, task_id=None): + self.started.append(name) + return object() + + async def end_span(self, *, trace_id, span, output=None, data=None): + self.ended.append(output) + + +async def _gen(events): + for e in events: + yield e + + +@pytest.mark.asyncio +async def test_yield_passes_events_through_and_traces(): + fake = _RecordTracing() + tracer = SpanTracer(trace_id="t", parent_span_id="p", tracing=fake) + events = [ + StreamTaskMessageStart(type="start", index=0, + content=ToolRequestContent(type="tool_request", author="agent", + tool_call_id="c", name="Bash", arguments={})), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull(type="full", index=1, + content=ToolResponseContent(type="tool_response", author="agent", + tool_call_id="c", name="Bash", content="ok")), + ] + out = [e async for e in yield_events(_gen(events), tracer=tracer)] + assert out == events # passthrough unchanged + assert fake.started == ["Bash"] # span derived + opened + assert fake.ended == ["ok"] # span closed with response +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `pytest tests/lib/core/harness/test_yield_delivery.py -v` +Expected: FAIL with `ModuleNotFoundError: agentex.lib.core.harness.yield_delivery` + +- [ ] **Step 3: Implement `yield_events`** + +Create `src/agentex/lib/core/harness/yield_delivery.py`: + +```python +"""Yield delivery: pass the canonical stream through, tracing as a side effect.""" + +from __future__ import annotations + +from typing import AsyncIterator + +from agentex.lib.core.harness.span_derivation import SpanDeriver +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.types import StreamTaskMessage + + +async def yield_events( + events: AsyncIterator[StreamTaskMessage], + tracer: SpanTracer | None = None, +) -> AsyncIterator[StreamTaskMessage]: + """Forward each event to the caller; derive + trace spans as a side effect. + + For sync HTTP ACP agents that yield events back over the response. When + `tracer` is None, this is a pure passthrough. + """ + deriver = SpanDeriver() if tracer is not None else None + try: + async for event in events: + if deriver is not None and tracer is not None: + for signal in deriver.observe(event): + await tracer.handle(signal) + yield event + finally: + if deriver is not None and tracer is not None: + for signal in deriver.flush(): + await tracer.handle(signal) +``` + +- [ ] **Step 4: Run test to verify it passes** + +Run: `pytest tests/lib/core/harness/test_yield_delivery.py -v` +Expected: PASS (1 passed) + +- [ ] **Step 5: Commit** + +```bash +git add src/agentex/lib/core/harness/yield_delivery.py tests/lib/core/harness/test_yield_delivery.py +git commit -m "feat(harness): yield_events delivery adapter (passthrough + tracing)" +``` + +--- + +## Task 5: `auto_send` delivery adapter — PR 2 + +**Files:** +- Create: `src/agentex/lib/core/harness/auto_send.py` +- Test: `tests/lib/core/harness/test_auto_send.py` + +`auto_send` consumes the canonical stream and drives `adk.streaming` context managers: it opens +a text context for `TextContent`, a reasoning context for `ReasoningContent`, switches cleanly +between them, and posts tool request/response as full messages. It feeds the same +`SpanDeriver`/`SpanTracer` and returns `TurnResult`. This generalizes the golden agent's +`AgentexStreamAdapter` (`teams/sgp/agents/golden_agent/project/harness/adapter.py`) to consume +`StreamTaskMessage*` instead of `HarnessEvent`. + +Reference while implementing: `src/agentex/lib/adk/_modules/_langgraph_async.py` +(`stream_langgraph_events`) shows the exact `adk.streaming` open/stream/close pattern to reuse; +`adapter.py` lines 87–130 show the text↔reasoning↔tool switching logic to mirror. + +- [ ] **Step 1: Write the failing test (fake streaming records context lifecycle)** + +Create `tests/lib/core/harness/test_auto_send.py`: + +```python +import pytest + +from agentex.lib.core.harness.auto_send import auto_send +from agentex.types.task_message_update import ( + StreamTaskMessageStart, + StreamTaskMessageDelta, + StreamTaskMessageDone, +) +from agentex.types.text_content import TextContent +from agentex.types.text_delta import TextDelta + + +class _FakeCtx: + def __init__(self, sink): + self.sink = sink + + async def __aenter__(self): + self.sink.append(("open",)) + return self + + async def __aexit__(self, *a): + self.sink.append(("close",)) + return False + + async def stream_update(self, update): + self.sink.append(("update", update)) + return update + + +class _FakeStreaming: + def __init__(self): + self.sink = [] + + def streaming_task_message_context(self, task_id, initial_content, streaming_mode="coalesced", created_at=None): + self.sink.append(("ctx", getattr(initial_content, "type", None))) + return _FakeCtx(self.sink) + + +async def _gen(events): + for e in events: + yield e + + +@pytest.mark.asyncio +async def test_auto_send_streams_text_and_returns_final_text(): + streaming = _FakeStreaming() + events = [ + StreamTaskMessageStart(type="start", index=0, + content=TextContent(type="text", author="agent", content="")), + StreamTaskMessageDelta(type="delta", index=0, delta=TextDelta(type="text", text_delta="Hel")), + StreamTaskMessageDelta(type="delta", index=0, delta=TextDelta(type="text", text_delta="lo")), + StreamTaskMessageDone(type="done", index=0), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + assert result.final_text == "Hello" + kinds = [s[0] for s in streaming.sink] + assert kinds[0] == "ctx" and "open" in kinds and "close" in kinds +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `pytest tests/lib/core/harness/test_auto_send.py -v` +Expected: FAIL with `ModuleNotFoundError: agentex.lib.core.harness.auto_send` + +- [ ] **Step 3: Implement `auto_send`** + +Create `src/agentex/lib/core/harness/auto_send.py`. The implementer mirrors the text↔reasoning +switching from `adapter.py` and the `adk.streaming` usage from `_langgraph_async.py`: + +```python +"""Auto-send delivery: canonical stream -> adk.streaming side effects + tracing.""" + +from __future__ import annotations + +from typing import Any, AsyncIterator + +from agentex.types.task_message_update import ( + StreamTaskMessageDelta, + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageStart, +) +from agentex.types.text_content import TextContent + +from agentex.lib.core.harness.span_derivation import SpanDeriver +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.types import StreamTaskMessage, TurnResult, TurnUsage + + +async def auto_send( + events: AsyncIterator[StreamTaskMessage], + task_id: str, + tracer: SpanTracer | None = None, + streaming: Any = None, + usage: TurnUsage | None = None, +) -> TurnResult: + """Push the canonical stream to the task stream via adk.streaming. + + Opens a streaming context per text/reasoning message, streams deltas, and + closes on Done; posts tool request/response as full messages; derives and + traces spans from the same stream. Returns the accumulated final text + + usage. For async + temporal agents (call from inside an activity). + """ + if streaming is None: + from agentex.lib import adk + + streaming = adk.streaming + + deriver = SpanDeriver() if tracer is not None else None + final_text_parts: list[str] = [] + current_ctx: Any = None + current_kind: str | None = None # "text" | "reasoning" + + async def _close_current() -> None: + nonlocal current_ctx, current_kind + if current_ctx is not None: + await current_ctx.__aexit__(None, None, None) + current_ctx = None + current_kind = None + + try: + async for event in events: + if deriver is not None and tracer is not None: + for signal in deriver.observe(event): + await tracer.handle(signal) + + if isinstance(event, StreamTaskMessageStart): + ctype = getattr(event.content, "type", None) + if ctype in ("text", "reasoning"): + await _close_current() + current_ctx = streaming.streaming_task_message_context( + task_id=task_id, initial_content=event.content, + ) + await current_ctx.__aenter__() + current_kind = ctype + elif isinstance(event, StreamTaskMessageDelta): + if current_ctx is not None and event.delta is not None: + await current_ctx.stream_update(event) + if getattr(event.delta, "type", None) == "text" and event.delta.text_delta: + final_text_parts.append(event.delta.text_delta) + elif isinstance(event, StreamTaskMessageDone): + await _close_current() + elif isinstance(event, StreamTaskMessageFull): + # Tool request/response (and any non-streamed full message): post as a + # standalone full message, not tied to the current text/reasoning ctx. + await _close_current() + ctx = streaming.streaming_task_message_context( + task_id=task_id, initial_content=event.content, + ) + await ctx.__aenter__() + await ctx.__aexit__(None, None, None) + finally: + await _close_current() + if deriver is not None and tracer is not None: + for signal in deriver.flush(): + await tracer.handle(signal) + + return TurnResult(final_text="".join(final_text_parts), usage=usage or TurnUsage()) +``` + +Note for the implementer: validate the exact `streaming_task_message_context` usage against +`_langgraph_async.py` (whether to call `stream_update` with the whole `StreamTaskMessageDelta` +or the inner delta). Adjust the call and the fake together; the test asserts behavior, not the +internal kwarg shape. + +- [ ] **Step 4: Run test to verify it passes** + +Run: `pytest tests/lib/core/harness/test_auto_send.py -v` +Expected: PASS (1 passed) + +- [ ] **Step 5: Commit** + +```bash +git add src/agentex/lib/core/harness/auto_send.py tests/lib/core/harness/test_auto_send.py +git commit -m "feat(harness): auto_send delivery adapter (canonical stream -> adk.streaming + tracing)" +``` + +--- + +## Task 6: `UnifiedEmitter` facade — PR 3 (part 2) + +**Files:** +- Create: `src/agentex/lib/core/harness/emitter.py` +- Modify: `src/agentex/lib/core/harness/__init__.py` (re-export public surface) +- Test: `tests/lib/core/harness/test_emitter.py` + +`UnifiedEmitter` is the single thing an agent author touches. It owns the trace context, builds +the `SpanTracer` (default-on when a trace context exists, overridable), and exposes both +delivery modes over a `HarnessTurn`. It attaches the turn's `TurnUsage` to delivery. + +- [ ] **Step 1: Write the failing test** + +Create `tests/lib/core/harness/test_emitter.py`: + +```python +import pytest + +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.lib.core.harness.types import TurnUsage +from agentex.types.task_message_update import StreamTaskMessageStart, StreamTaskMessageDone +from agentex.types.text_content import TextContent + + +class _Turn: + def __init__(self, events_list, usage): + self._events_list = events_list + self._usage = usage + + @property + async def events(self): + for e in self._events_list: + yield e + + def usage(self): + return self._usage + + +@pytest.mark.asyncio +async def test_emitter_yield_mode_passes_through(): + events = [ + StreamTaskMessageStart(type="start", index=0, + content=TextContent(type="text", author="agent", content="hi")), + StreamTaskMessageDone(type="done", index=0), + ] + turn = _Turn(events, TurnUsage(model="m")) + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + out = [e async for e in emitter.yield_turn(turn)] + assert out == events + + +@pytest.mark.asyncio +async def test_emitter_tracing_default_on_when_trace_id_present(): + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id="p") + assert emitter.tracer is not None + + +@pytest.mark.asyncio +async def test_emitter_tracing_overridable_off(): + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id="p", tracer=False) + assert emitter.tracer is None +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `pytest tests/lib/core/harness/test_emitter.py -v` +Expected: FAIL with `ModuleNotFoundError: agentex.lib.core.harness.emitter` + +- [ ] **Step 3: Implement `UnifiedEmitter`** + +Create `src/agentex/lib/core/harness/emitter.py`: + +```python +"""UnifiedEmitter: the single facade agent authors use for either delivery mode.""" + +from __future__ import annotations + +from typing import Any, AsyncIterator + +from agentex.lib.core.harness.auto_send import auto_send +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.types import HarnessTurn, StreamTaskMessage, TurnResult +from agentex.lib.core.harness.yield_delivery import yield_events + + +class UnifiedEmitter: + """Ties trace context + chosen delivery together. + + Tracing is default-on whenever `trace_id` is truthy; pass `tracer=False` to + disable, or a custom `SpanTracer` to override. + """ + + def __init__( + self, + task_id: str, + trace_id: str | None, + parent_span_id: str | None, + tracer: SpanTracer | bool | None = None, + ): + self.task_id = task_id + self.trace_id = trace_id + self.parent_span_id = parent_span_id + if tracer is False: + self.tracer: SpanTracer | None = None + elif isinstance(tracer, SpanTracer): + self.tracer = tracer + elif trace_id: + self.tracer = SpanTracer(trace_id=trace_id, parent_span_id=parent_span_id, task_id=task_id) + else: + self.tracer = None + + async def yield_turn(self, turn: HarnessTurn) -> AsyncIterator[StreamTaskMessage]: + """Sync HTTP ACP delivery: forward events, trace as side effect.""" + async for event in yield_events(turn.events, tracer=self.tracer): + yield event + + async def auto_send_turn(self, turn: HarnessTurn) -> TurnResult: + """Async/temporal delivery: push to the task stream, return TurnResult.""" + return await auto_send( + turn.events, + task_id=self.task_id, + tracer=self.tracer, + usage=turn.usage(), + ) +``` + +- [ ] **Step 4: Re-export the public surface** + +Append to `src/agentex/lib/core/harness/__init__.py`: + +```python +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.types import ( + CloseSpan, + HarnessTurn, + OpenSpan, + SpanSignal, + StreamTaskMessage, + TurnResult, + TurnUsage, +) + +__all__ = [ + "UnifiedEmitter", + "SpanTracer", + "OpenSpan", + "CloseSpan", + "SpanSignal", + "StreamTaskMessage", + "TurnUsage", + "TurnResult", + "HarnessTurn", +] +``` + +- [ ] **Step 5: Run tests to verify they pass** + +Run: `pytest tests/lib/core/harness/ -v` +Expected: PASS (all harness tests green) + +- [ ] **Step 6: Commit** + +```bash +git add src/agentex/lib/core/harness/emitter.py src/agentex/lib/core/harness/__init__.py tests/lib/core/harness/test_emitter.py +git commit -m "feat(harness): UnifiedEmitter facade tying delivery + tracing + usage" +``` + +--- + +## Task 7: Conformance test scaffold + empty CI integration job — PR 3 (part 3) + +**Files:** +- Create: `tests/lib/core/harness/conformance/__init__.py` +- Create: `tests/lib/core/harness/conformance/runner.py` +- Create: `tests/lib/core/harness/conformance/test_conformance.py` +- Create: `.github/workflows/harness-integration.yml` + +The conformance runner is the shared parametrized engine each harness tap will register fixtures +with (in later plans). It asserts yield-vs-auto-send equivalence on the span signals derived +from a fixture's canonical-event sequence. + +- [ ] **Step 1: Write the conformance runner + a self-test fixture** + +Create `tests/lib/core/harness/conformance/__init__.py` (empty), then +`tests/lib/core/harness/conformance/runner.py`: + +```python +"""Shared conformance engine: every harness tap registers fixtures here. + +A fixture is (name, list[StreamTaskMessage]). The runner asserts that span +derivation over the events is identical regardless of delivery channel, which is +the cross-channel guarantee from the spec. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Callable + +from agentex.lib.core.harness.span_derivation import SpanDeriver +from agentex.lib.core.harness.types import SpanSignal, StreamTaskMessage + + +@dataclass +class Fixture: + name: str + events: list[StreamTaskMessage] + + +_REGISTRY: list[Fixture] = [] + + +def register(fixture: Fixture) -> None: + _REGISTRY.append(fixture) + + +def all_fixtures() -> list[Fixture]: + return list(_REGISTRY) + + +def derive_all(events: list[StreamTaskMessage]) -> list[SpanSignal]: + d = SpanDeriver() + out: list[SpanSignal] = [] + for e in events: + out.extend(d.observe(e)) + out.extend(d.flush()) + return out +``` + +- [ ] **Step 2: Write the conformance test (self-test on a built-in fixture)** + +Create `tests/lib/core/harness/conformance/test_conformance.py`: + +```python +import pytest + +from tests.lib.core.harness.conformance.runner import Fixture, derive_all, register, all_fixtures +from agentex.types.task_message_update import ( + StreamTaskMessageStart, StreamTaskMessageDone, StreamTaskMessageFull, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent + +register(Fixture( + name="builtin-single-tool", + events=[ + StreamTaskMessageStart(type="start", index=0, + content=ToolRequestContent(type="tool_request", author="agent", + tool_call_id="c", name="Bash", arguments={})), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull(type="full", index=1, + content=ToolResponseContent(type="tool_response", author="agent", + tool_call_id="c", name="Bash", content="ok")), + ], +)) + + +@pytest.mark.parametrize("fixture", all_fixtures(), ids=lambda f: f.name) +def test_span_derivation_is_deterministic(fixture): + # Deriving twice over the same events yields identical signals (the property + # that makes yield vs auto-send equivalent, since both observe the same stream). + assert derive_all(fixture.events) == derive_all(fixture.events) +``` + +- [ ] **Step 3: Run the conformance test** + +Run: `pytest tests/lib/core/harness/conformance/ -v` +Expected: PASS (1 passed) + +- [ ] **Step 4: Add the empty CI integration job** + +Create `.github/workflows/harness-integration.yml` (mirrors the structure of the existing +`agentex-tutorials-test.yml`; the matrix is populated in later plans): + +```yaml +name: Harness Integration + +on: + pull_request: + paths: + - "src/agentex/lib/core/harness/**" + - "src/agentex/lib/adk/_modules/**" + - ".github/workflows/harness-integration.yml" + +jobs: + conformance: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: astral-sh/setup-uv@v5 + - name: Install + run: uv sync + - name: Conformance suite + run: uv run pytest tests/lib/core/harness/ -v + + # Live integration matrix (harness x {sync, async, temporal}) is added per-harness + # in the migration plans. Placeholder job keeps the workflow valid until then. + live-matrix: + runs-on: ubuntu-latest + if: false # enabled once the first harness's test agents land + steps: + - run: echo "populated by migration PRs" +``` + +- [ ] **Step 5: Commit** + +```bash +git add tests/lib/core/harness/conformance .github/workflows/harness-integration.yml +git commit -m "test(harness): conformance scaffold + CI integration job skeleton" +``` + +--- + +## Task 8: Run the full suite + type check + +- [ ] **Step 1: Run the whole harness test tree** + +Run: `pytest tests/lib/core/harness/ -v` +Expected: PASS (all tasks' tests green) + +- [ ] **Step 2: Type check the new package** + +Run: `uv run mypy src/agentex/lib/core/harness/` (or the repo's configured type checker) +Expected: no errors. Fix any signature mismatches inline. + +- [ ] **Step 3: Final commit if the type check required fixes** + +```bash +git add -A && git commit -m "chore(harness): type-check fixes for foundation package" +``` + +--- + +## Subsequent plans (to be written after this lands) + +Each gets its own plan via the writing-plans skill, expanded with that harness's exact +converter code: + +- **PR 4 — Migrate pydantic-ai:** wrap `convert_pydantic_ai_to_agentex_events` as a + `HarnessTurn` (add `usage()` normalizing `result.usage()`), reimplement `_pydantic_ai_async` + on `auto_send`, retire `_pydantic_ai_tracing` in favor of `SpanTracer`, keep the public + `convert_*` signature. Add 3 test agents (sync/async/temporal) + register conformance + fixtures + enable the live-matrix row. +- **PR 5 — Migrate langgraph:** same shape; reimplement `stream_langgraph_events` on + `auto_send`; normalize `usage_metadata` into `TurnUsage`. +- **PR 6 — Migrate openai-agents:** same shape; reimplement `run_agent_streamed_auto_send` on + `auto_send`; normalize `response.usage`. +- **PR 7 — claude-code parser tap:** `convert_claude_code_to_agentex_events` (port the golden + agent's `_StreamJsonProcessor` to yield `StreamTaskMessage*`) + recorded stream-json + fixtures + feasible test agent(s). +- **PR 8 — codex parser tap:** same shape for `_CodexEventProcessor`. +- **PR 9 — Cleanup:** delete now-dead internal duplication, deprecate `_*_tracing` shims, docs. + +The `is_error` tool-error work is deferred and tracked in Linear as AGX1-371. diff --git a/docs/superpowers/specs/2026-06-18-unified-harness-surface-design.md b/docs/superpowers/specs/2026-06-18-unified-harness-surface-design.md new file mode 100644 index 000000000..e8a32f112 --- /dev/null +++ b/docs/superpowers/specs/2026-06-18-unified-harness-surface-design.md @@ -0,0 +1,313 @@ +# Unified Harness Tracing / Message-Emitting Surface + +Date: 2026-06-18 +Status: Approved design, pending implementation +Repo: `scale-agentex-python` + +## Problem + +The SDK integrates several agent harnesses (pydantic-ai, LangGraph, OpenAI Agents) by +converting each harness's native output into Agentex `StreamTaskMessage*` events. Today +that integration is triplicated per harness: + +- `__sync.py` — a converter that **yields** Agentex events back over the + HTTP/JSON-RPC response (sync ACP agents). +- `__async.py` — a converter that **auto-sends** to the task stream (Redis via + `adk.streaming`) for async + temporal agents. +- `__tracing.py` — a separate, opt-in tracing handler wired into the converter + by hand. + +Consequences: + +- The native-output → Agentex-event mapping exists in two places per harness (sync and + async) and can drift. +- Tracing is bolted on per harness and is inconsistent across harnesses. +- There is no shared notion of a tool/reasoning span tree or turn-level metadata. +- The golden agent grew a parallel "harness layer" (a neutral `HarnessEvent` vocabulary + plus an adapter that drives `adk.streaming` + `adk.tracing`) to solve the same problem + for its subprocess CLI harnesses (claude-code, codex). That logic is valuable but lives + outside the SDK. + +## Goal (end state) + +pydantic-ai, LangGraph, OpenAI Agents, claude-code, and codex all emit through one unified +surface. A single pass over a harness's output drives **streaming, message persistence, and +tracing** from one source of truth, in the same shape as Agentex events. The surface works +for **both** delivery channels (sync yield, async/temporal auto-send). Tracing is on by +default and overridable. The claude-code/codex *parsers* live in the SDK; their sandbox / +secret / MCP orchestration stays in the golden agent. + +## Approach: Agentex event stream is canonical (Approach A) + +The Agentex `StreamTaskMessage*` stream is the single source of truth. Each harness maps its +native output to that stream **once**. A single emitter consumes that one stream and fans it +out to delivery (yield or auto-send) and to tracing (spans derived from the same stream). + +We considered two alternatives and rejected them: + +- **Neutral `AgentEvent` vocabulary + dual projectors (Approach B):** richer (carries turn + usage/cost natively, clean start/end pairing) but reintroduces a parallel vocabulary to + keep in sync with Agentex types, for the same outcome. +- **Push-to-sink with typed emitter methods (Approach C):** very testable, but the *yield* + delivery channel fights a push API (needs a queue/generator bridge), and sync ACP agents + depend on yield. + +Approach A matches "same shape as Agentex events" most directly, makes the yield channel +free, and lets us delete the per-harness tracing code by deriving spans from the canonical +stream. + +## Components + +Four shared, harness-independent components plus one thin tap per harness. + +### 1. Per-harness tap (the only per-harness code) + +``` +convert__to_agentex_events(native_stream, ...) -> AsyncIterator[StreamTaskMessage*] +``` + +The existing sync converters (`convert_pydantic_ai_to_agentex_events`, +`convert_langgraph_to_agentex_events`, `convert_openai_to_agentex_events`) already have this +shape and *become* the taps. New taps: `convert_claude_code_to_agentex_events`, +`convert_codex_to_agentex_events` (pure parsers over the CLIs' newline-delimited +stream-json; no SGP/sandbox coupling). + +### 2. Auto-send adapter (shared) + +Consumes the canonical Agentex stream and drives `adk.streaming` context managers: open/close +text and reasoning contexts, switch cleanly between them, stream tool request/response. This +generalizes the golden agent's `AgentexStreamAdapter` and replaces the N hand-written +`_async` bodies with one. Returns the accumulated final text (preserving current +auto-send return values). + +### 3. Yield adapter (shared) + +Passes the canonical stream through to the caller (sync HTTP ACP), tee-ing each event to the +tracer as a side effect. + +### 4. Tracing tap (shared) + +A stateful reducer that derives spans from the canonical stream. It only *observes* +`index` and `tool_call_id`; it never mutates or reorders the stream, so streaming fidelity +is unchanged. + +Derivation rules: + +- **Tool span open:** on `StreamTaskMessageDone` for an index whose `Start` content was a + `ToolRequestContent`. Arguments are fully known by `Done` (covers both streamed-args and + one-shot tools). The open span is keyed by `tool_call_id`. +- **Tool span close:** on `StreamTaskMessageFull(ToolResponseContent)` matching by + `tool_call_id`. +- **Parallel / interleaved tools:** `ToolRequestContent`, `ToolResponseContent`, + `ToolRequestDelta`, and `ToolResponseDelta` all carry `tool_call_id` + `name`, so multiple + open tool spans pair correctly regardless of arrival order. +- **Reasoning span:** `Start(ReasoningContent)` → `Done` on that index. +- **Subagent span:** the Task/Agent tool's span (a tool span by another name), nested under + the turn span. + +Default-on whenever a trace context exists; **overridable** by passing a custom tracer, or +`None` to disable. Replaces the per-harness `_tracing.py` handlers. + +**Open decision — tool error status.** `ToolResponseContent` currently has no +`is_error`/`status` field (only `content`), so a derived tool span cannot mark failure. The +golden agent's `ToolCompleted` carried `is_error`. Recommended resolution: add an additive +optional `is_error: bool | None` to `ToolResponseContent`. This is a generated type, so it is +a small upstream API-spec change, not a local edit. **Deferred** — tracked in Linear as +AGX1-371 (Agentex "Starter Tasks"). Until it lands, derived spans omit tool error status +rather than inferring it. + +### Facade + +A `UnifiedEmitter` ties the chosen delivery adapter and the tracer together so an agent +author calls one thing. + +### Proposed layout + +- Shared components: `src/agentex/lib/core/harness/` (delivery adapters, tracing tap, span + derivation, facade). +- Taps: remain in `src/agentex/lib/adk/_modules/`. +- Public access: via the `adk` facade. + +## Data flow + +One pass over the canonical stream, fanned out by delivery mode. + +- **Sync agent:** `async for ev in emitter.yield_events(convert_X(native)): ...` — the tracer + observes each event; the event is yielded over the HTTP/JSON-RPC response. +- **Async + temporal agent:** `await emitter.auto_send(convert_X(native), task_id=...)` — the + auto-send adapter pushes deltas to Redis via `adk.streaming` while the tracer observes the + same events; returns accumulated final text. Temporal is identical, called from inside an + activity (converters run in activities, not workflows, so determinism is not a concern). +- **Tracing** is the same derivation in both modes (it observes the canonical stream), so + sync and auto-send produce identical spans. +- **Turn-level metadata** (usage / cost / model) is not an Agentex event, so it is surfaced + as a first-class `TurnUsage` shape rather than ad-hoc data (see below). + +Net dedup: **3 files × N harnesses → 1 tap × N harnesses + 3 shared components.** + +## Unified turn usage / cost + +Turn metadata is a first-class, harness-independent shape attached to the turn span and +returned to the caller — not a loose side-channel. + +``` +class TurnUsage(BaseModel): + model: str | None + input_tokens: int | None + output_tokens: int | None + cached_input_tokens: int | None # subset of input_tokens served from cache + reasoning_tokens: int | None # subset of output_tokens + total_tokens: int | None + cost_usd: float | None + duration_ms: int | None # wall-clock, measured by the emitter + num_llm_calls: int + num_tool_calls: int # derived from the canonical stream + num_reasoning_blocks: int # derived from the canonical stream + +class TurnResult(BaseModel): + final_text: str + usage: TurnUsage +``` + +- Token field names align with the existing `agentex.lib.core.observability.llm_metrics` + taxonomy (`input_tokens` / `output_tokens` / `cached_input_tokens` / `reasoning_tokens`), + not a new vocabulary. (The OpenAI-style `llm_messages.Usage` — + `prompt_tokens`/`completion_tokens` — is mapped into this richer shape.) +- **Each harness tap normalizes its native usage** into `TurnUsage`: pydantic-ai + `result.usage()`, LangGraph `usage_metadata`, OpenAI `response.usage`, claude-code/codex + the final `result` envelope (`cost_usd` + usage). Per-harness normalization, one output + shape. +- The stream-derived counts (`num_tool_calls`, `num_reasoning_blocks`) come for free from the + tracing tap's reduction; `duration_ms` is measured by the emitter; tokens/cost/model come + from the tap's native-usage normalization. +- The emitter attaches `TurnUsage` to the **turn span** via `adk.tracing.span(data=...)` + (which already accepts a `BaseModel`) and returns `TurnResult` to the caller. The same + object can feed the OTel `LLMMetrics` and downstream metrics (e.g. the golden agent's + per-turn DogStatsD emission), so traces and metrics share one shape. + +### Surfacing `TurnUsage` from the tap + +Python async generators cannot cleanly return a value to their consumer, so the tap does not +return `TurnUsage` via `StopAsyncIteration`. Instead the per-harness entry is a small object: + +``` +class HarnessTurn: + events: AsyncIterator[StreamTaskMessage*] # the canonical stream + def usage(self) -> TurnUsage # populated once `events` is exhausted +``` + +The emitter drives `events` (delivering + tracing), then reads `usage()` to finalize the turn +span and build `TurnResult`. This keeps the canonical stream pure (only `StreamTaskMessage*`) +while giving usage/cost a typed home. + +## Backwards compatibility (every change is additive) + +The end state "replaces" the old converters, but it is reached additively. No public symbol +is removed in this stack; nothing regresses. + +- **Taps:** existing `convert_*_to_agentex_events` keep exact signatures and output. Behavior + is unchanged when no trace context is present. +- **Auto-send entry points** (`stream_langgraph_events(stream, task_id)`, the pydantic/openai + `_async` helpers, `run_agent_streamed_auto_send`, `chat_completion_stream_auto_send`) keep + signatures and return values, reimplemented to delegate to the shared auto-send adapter. + Feature-add: they emit traces by default. The conformance suite asserts equivalent Redis + messages before/after. +- **`_tracing.py` handlers** stay importable as shims; the shared tracer supersedes them + internally. +- **Removal/deprecation** of dead internal duplication is the final PR, behind a deprecation + note, never mixed into a migration PR. + +## Rollout — stacked PRs (each < 1000 lines diff) + +1. **Span derivation (`TracingTap`)** — pure function: canonical stream → spans. + Unit-tested in isolation. No wiring. +2. **Auto-send adapter** — canonical stream → `adk.streaming` side effects. Fixture-tested. + Not yet wired into harnesses. +3. **Yield adapter + `UnifiedEmitter` facade + public `adk` surface** — plus the + conformance-test scaffold (fixture format + parametrized runner) and an empty CI + integration job. +4. **Migrate pydantic-ai** — reimplement its `_async` / tracing on the shared components; + keep `convert_pydantic_ai_to_agentex_events` signature; default tracing on. Add 3 test + agents (sync / async / temporal) + CI matrix entries + live smoke. +5. **Migrate LangGraph** — same pattern + 3 test agents + CI. +6. **Migrate OpenAI Agents** — same pattern + 3 test agents + CI. +7. **claude-code parser tap** — `convert_claude_code_to_agentex_events` + recorded stream-json + fixtures + feasible test agent(s) (likely temporal-only, given the sandbox requirement). +8. **codex parser tap** — same shape + fixtures + feasible test agent(s). +9. **Cleanup** — delete now-dead internal duplication, deprecate shims, docs. + +## Testing + +### Offline conformance suite (every PR) + +Committed raw harness outputs (pydantic `AgentStreamEvent`s, LangGraph chunks, OpenAI stream, +claude/codex stream-json) drive a shared parametrized suite. For each fixture, assert: + +- exact normalized `StreamTaskMessage*` sequence, +- derived span tree, +- **yield-vs-auto-send equivalence** — both channels produce the same logical events/spans. + +Every tap must pass the shared cases: text, reasoning, single tool, tool error, multi-step, +and interleaved reasoning + tool ordering. Deterministic, offline, no network. + +### Live integration matrix (CI) + +Three test agents per harness, one per agent type (sync / async / temporal), deployed and +driven with a fixed prompt. Assert the unified surface produced valid ordered messages and a +well-formed span tree. Modeled on the existing `agentex-tutorials-test.yml` / +`build-and-push-tutorial-agent.yml` CI precedent. + +Matrix: harness ∈ {pydantic-ai, langgraph, openai-agents, claude-code, codex} × agent-type ∈ +{sync, async, temporal}. claude-code/codex run the subset of agent types that is feasible; +any uncovered cell is logged/documented, never silently skipped. + +### Error handling + +- A tap that raises mid-stream closes open streaming contexts and open spans — no leaked + `adk.streaming` context, no dangling span. +- Tracing failures are best-effort and never break delivery (matches the golden agent's + contract). + +## Golden agent integration (SGP / sandbox coupling preserved) + +The unified surface is designed so the golden agent keeps **all** of its SGP-coupled layers +and only swaps its hand-rolled parsing/streaming/tracing internals for the SDK's taps + +emitter. Nothing SGP-specific moves into the SDK. + +What stays in the golden agent, untouched: + +- Sandbox pool acquire modes (cold-create / warm-claim / reconnect), lease coordination, and + the data-plane URL override. +- Secret resolution, OAuth/MCP reauth, and reconnect-notice emission (the notice is just + another standalone message on the task stream, independent of the harness tap). +- Spawning `claude -p` / `codex exec` inside the sandbox. + +What changes inside the golden agent's provider: + +1. Acquire/provision the sandbox and resolve secrets/MCP exactly as today (SGP-coupled). +2. Spawn the CLI in the sandbox and feed its stdout (stream-json lines) into the SDK tap + `convert_claude_code_to_agentex_events` / `convert_codex_to_agentex_events`. +3. Run that tap through the SDK emitter's **auto-send** path from inside the existing Temporal + activity, getting streaming + tracing + `TurnUsage` for free. The agent's + `_StreamJsonProcessor` and `AgentexStreamAdapter` are retired in favor of the SDK tap + + emitter. + +**Sandbox-setup events:** today the golden agent surfaces provisioning steps (reconnect / +find / create / configure-git / clone) as UI tool calls by yielding them into the same +adapter. Under the unified surface these become agent-produced `ToolRequestContent` / +`ToolResponseContent` messages, chained *before* the harness tap's stream into one canonical +stream for the turn (`chain(setup_events, convert_claude_code(stdout))`). The emitter then +delivers and traces the whole turn uniformly, so setup steps keep appearing in the UI and the +span tree. + +This means the claude-code/codex parser PRs (7, 8) deliver the SDK taps, and a corresponding +**golden-agent-side change** (out of this repo's PR stack) rewires its providers onto them. +The golden agent's in-process litellm / OpenAI-Agents harness can likewise adopt the OpenAI +tap, though that is optional and not required by this design. + +## Out of scope + +- Sandbox pool, sandbox lifecycle, MCP server provisioning, and OAuth/secret reauth — tracked + separately; only the pure claude-code/codex output parsers are in scope here. +- claude-code/codex sandbox / secret / MCP orchestration — stays in the golden agent and + feeds the SDK parser. diff --git a/src/agentex/lib/core/harness/__init__.py b/src/agentex/lib/core/harness/__init__.py new file mode 100644 index 000000000..067751d63 --- /dev/null +++ b/src/agentex/lib/core/harness/__init__.py @@ -0,0 +1,30 @@ +"""Shared, harness-independent machinery for the unified harness surface. + +The Agentex StreamTaskMessage* stream is the single source of truth; this +package derives spans from it and delivers it (yield or auto-send), so every +harness tap gets streaming + tracing + turn usage uniformly. +""" + +from agentex.lib.core.harness.types import ( + OpenSpan, + CloseSpan, + TurnUsage, + SpanSignal, + TurnResult, + HarnessTurn, + StreamTaskMessage, +) +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.emitter import UnifiedEmitter + +__all__ = [ + "UnifiedEmitter", + "SpanTracer", + "OpenSpan", + "CloseSpan", + "SpanSignal", + "StreamTaskMessage", + "TurnUsage", + "TurnResult", + "HarnessTurn", +] diff --git a/src/agentex/lib/core/harness/auto_send.py b/src/agentex/lib/core/harness/auto_send.py new file mode 100644 index 000000000..899429034 --- /dev/null +++ b/src/agentex/lib/core/harness/auto_send.py @@ -0,0 +1,141 @@ +"""Auto-send delivery: canonical stream -> adk.streaming side effects + tracing.""" + +from __future__ import annotations + +from typing import Any, AsyncIterator +from datetime import datetime + +from agentex.types.text_delta import TextDelta +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.types import TurnUsage, TurnResult, StreamTaskMessage +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.lib.core.harness.span_derivation import SpanDeriver + + +async def auto_send( + events: AsyncIterator[StreamTaskMessage], + task_id: str, + tracer: SpanTracer | None = None, + streaming: Any = None, + usage: TurnUsage | None = None, + created_at: datetime | None = None, +) -> TurnResult: + """Push the canonical stream to the task stream via adk.streaming. + + Opens a streaming context per message (keyed by index), streams deltas via + ctx.stream_update, and closes via ctx.close() on Done. Posts tool + request/response full messages by opening a context with the content and + closing it immediately (no deltas). Derives and traces spans from the same + stream. Returns the last text segment's text + usage. + + Index-keyed routing: each Start(index=i) opens a context stored in + ctx_map[i]; Delta(index=i) routes to ctx_map.get(i); Done(index=i) closes + and removes ctx_map[i]. Events with index is None are skipped. The finally + block closes all remaining open contexts. + + final_text last-segment semantics: a new Start(TextContent) resets + final_text_parts so that multi-step turns return the LAST text segment. + Full(TextContent) also overwrites final_text_parts (same semantics). + + AGX1-378: created_at is forwarded to every streaming_task_message_context + call so callers can back-date message timestamps. + + Mirrors the open/close/stream_update pattern from + src/agentex/lib/adk/_modules/_langgraph_async.py: + - context opened via streaming_task_message_context(...).__aenter__() + - context closed via ctx.close() (not __aexit__) + - deltas pushed as StreamTaskMessageDelta with parent_task_message set + from ctx.task_message + + For async + temporal agents (call from inside an activity). + """ + if streaming is None: + from agentex.lib import adk + + streaming = adk.streaming + + deriver = SpanDeriver() if tracer is not None else None + final_text_parts: list[str] = [] + ctx_map: dict[int, Any] = {} + + async def _close_all() -> None: + for ctx in list(ctx_map.values()): + await ctx.close() + ctx_map.clear() + + try: + async for event in events: + if deriver is not None and tracer is not None: + for signal in deriver.observe(event): + await tracer.handle(signal) + + if isinstance(event, StreamTaskMessageStart): + if event.index is None: + continue + i = event.index + # Reset final_text_parts when a new text segment starts + if isinstance(event.content, TextContent): + final_text_parts = [] + ctx = streaming.streaming_task_message_context( + task_id=task_id, + initial_content=event.content, + created_at=created_at, + ) + ctx_map[i] = await ctx.__aenter__() + + elif isinstance(event, StreamTaskMessageDelta): + if event.index is None: + continue + ctx = ctx_map.get(event.index) + if ctx is not None and event.delta is not None: + # Reconstruct the delta with parent_task_message set from + # the context's task_message (mirrors _langgraph_async.py + # lines 72-78 and 117-127). + delta_with_parent = StreamTaskMessageDelta( + parent_task_message=ctx.task_message, + delta=event.delta, + type="delta", + index=event.index, + ) + await ctx.stream_update(delta_with_parent) + if isinstance(event.delta, TextDelta) and event.delta.text_delta: + final_text_parts.append(event.delta.text_delta) + + elif isinstance(event, StreamTaskMessageDone): + if event.index is None: + continue + ctx = ctx_map.pop(event.index, None) + if ctx is not None: + await ctx.close() + + elif isinstance(event, StreamTaskMessageFull): + # Full messages: post the full message by opening a context + # with the content and closing it immediately (no deltas; + # StreamingTaskMessageContext.close() persists initial_content + # when the accumulator is empty). Use async with so the context + # is closed even if close() raises (__aexit__ delegates to + # close()). + # Full(TextContent) also resets final_text_parts for + # last-segment semantics. + if isinstance(event.content, TextContent): + final_text_parts = [event.content.content] + async with streaming.streaming_task_message_context( + task_id=task_id, + initial_content=event.content, + created_at=created_at, + ): + pass + + finally: + await _close_all() + if deriver is not None and tracer is not None: + for signal in deriver.flush(): + await tracer.handle(signal) + + return TurnResult(final_text="".join(final_text_parts), usage=usage or TurnUsage()) diff --git a/src/agentex/lib/core/harness/emitter.py b/src/agentex/lib/core/harness/emitter.py new file mode 100644 index 000000000..85395fcff --- /dev/null +++ b/src/agentex/lib/core/harness/emitter.py @@ -0,0 +1,74 @@ +"""UnifiedEmitter: the single facade agent authors use for either delivery mode.""" + +from __future__ import annotations + +from typing import AsyncGenerator +from datetime import datetime + +from agentex.lib.core.harness.types import TurnResult, HarnessTurn, StreamTaskMessage +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.auto_send import auto_send +from agentex.lib.core.harness.yield_delivery import yield_events + + +class UnifiedEmitter: + """Ties trace context + chosen delivery together. + + Tracing modes (the `tracer` arg): + - tracer=None (default): auto-construct a SpanTracer if `trace_id` is present. + - tracer=False: disable tracing entirely, regardless of `trace_id`. + - tracer=: use the supplied instance. + + `tracing` and `streaming` are injection escape-hatches for tests/advanced + use; leave them None in production so the real adk modules are used. + """ + + tracer: SpanTracer | None + + def __init__( + self, + task_id: str, + trace_id: str | None, + parent_span_id: str | None, + tracer: SpanTracer | bool | None = None, + tracing: object | None = None, + streaming: object | None = None, + ): + self.task_id = task_id + self.trace_id = trace_id + self.parent_span_id = parent_span_id + self._streaming = streaming + if tracer is False: + self.tracer = None + elif isinstance(tracer, SpanTracer): + self.tracer = tracer + elif trace_id: + self.tracer = SpanTracer( + trace_id=trace_id, + parent_span_id=parent_span_id, + task_id=task_id, + tracing=tracing, + ) + else: + self.tracer = None + + async def yield_turn(self, turn: HarnessTurn) -> AsyncGenerator[StreamTaskMessage, None]: + """Sync HTTP ACP delivery: forward events, trace as side effect.""" + async for event in yield_events(turn.events, tracer=self.tracer): + yield event + + async def auto_send_turn(self, turn: HarnessTurn, created_at: datetime | None = None) -> TurnResult: + """Async/temporal delivery: push to the task stream, return TurnResult. + + Pass `created_at` (e.g. `workflow.now()` under Temporal) to stamp the + turn's messages with a deterministic timestamp; it is forwarded to the + streaming contexts. Default None preserves server-side timestamps. + """ + return await auto_send( + turn.events, + task_id=self.task_id, + tracer=self.tracer, + streaming=self._streaming, + usage=turn.usage(), + created_at=created_at, + ) diff --git a/src/agentex/lib/core/harness/span_derivation.py b/src/agentex/lib/core/harness/span_derivation.py new file mode 100644 index 000000000..503957582 --- /dev/null +++ b/src/agentex/lib/core/harness/span_derivation.py @@ -0,0 +1,147 @@ +"""Pure reducer: canonical StreamTaskMessage* stream -> span open/close signals. + +Has no dependency on adk; unit-testable in isolation. Delivery adapters feed it +every event and act on the returned signals. +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass + +from agentex.lib.core.harness.types import OpenSpan, CloseSpan, SpanSignal, StreamTaskMessage +from agentex.types.tool_request_delta import ToolRequestDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent + + +@dataclass +class _ToolReqMeta: + tool_call_id: str + name: str + arguments: dict[str, object] + args_buf: str = "" # accumulated streamed argument fragments + + +class SpanDeriver: + """Stateful reducer over the canonical stream. + + Tool span: open on Done of a ToolRequestContent index; close on matching + ToolResponseContent by tool_call_id. Reasoning span: open on + Start(ReasoningContent); close on that index's Done. + + Deliberate contracts: + - A `Full(ToolResponseContent)` whose tool_call_id was never opened is + ignored (no CloseSpan emitted). + - A `Done` for an index that was never a tool_request/reasoning Start is + ignored (no signal emitted). + - Events with `index is None` are skipped entirely; without a stable index + they cannot be reliably paired, and aliasing them to a sentinel would + let unrelated None-indexed events cross-match. + - `flush()` closes anything still open as incomplete; unclosed tool spans + are emitted in the order they were opened. + """ + + def __init__(self) -> None: + self._tool_by_index: dict[int, _ToolReqMeta] = {} + self._reasoning_index_open: set[int] = set() + # insertion-ordered set of open tool_call_ids (dict keys preserve order) + self._open_tool_ids: dict[str, None] = {} + + def observe(self, event: StreamTaskMessage) -> list[SpanSignal]: + if isinstance(event, StreamTaskMessageStart): + return self._on_start(event) + if isinstance(event, StreamTaskMessageDelta): + return self._on_delta(event) + if isinstance(event, StreamTaskMessageFull): + return self._on_full(event) + if isinstance(event, StreamTaskMessageDone): + return self._on_done(event) + return [] + + def flush(self) -> list[SpanSignal]: + """Close anything still open at end of stream, marked incomplete.""" + signals: list[SpanSignal] = [] + for tcid in list(self._open_tool_ids): + signals.append(CloseSpan(key=tcid, output=None, is_complete=False)) + self._open_tool_ids.clear() + for idx in sorted(self._reasoning_index_open): + signals.append(CloseSpan(key=f"reasoning:{idx}", output=None, is_complete=False)) + self._reasoning_index_open.clear() + return signals + + def _on_start(self, event: StreamTaskMessageStart) -> list[SpanSignal]: + if event.index is None: + return [] + idx = event.index + content = event.content + if isinstance(content, ToolRequestContent): + self._tool_by_index[idx] = _ToolReqMeta( + tool_call_id=content.tool_call_id, + name=content.name, + arguments=dict(content.arguments or {}), + ) + return [] + if content.type == "reasoning": + self._reasoning_index_open.add(idx) + return [OpenSpan(key=f"reasoning:{idx}", kind="reasoning", name="reasoning", input={})] + return [] + + def _on_delta(self, event: StreamTaskMessageDelta) -> list[SpanSignal]: + if event.index is None: + return [] + idx = event.index + delta = event.delta + if isinstance(delta, ToolRequestDelta): + meta = self._tool_by_index.get(idx) + if meta is not None and delta.arguments_delta: + meta.args_buf += delta.arguments_delta + return [] + + def _on_full(self, event: StreamTaskMessageFull) -> list[SpanSignal]: + """Handle a Full event. + + A `Full(ToolRequestContent)` opens a tool span (keyed by tool_call_id) + if it is not already open; the matching `Full(ToolResponseContent)` + closes it. This handles harnesses (e.g. LangGraph) that emit tool calls + as a single Full rather than Start+Done. + """ + content = event.content + if isinstance(content, ToolRequestContent): + tcid = content.tool_call_id + if tcid not in self._open_tool_ids: + self._open_tool_ids[tcid] = None + args = dict(content.arguments or {}) + return [OpenSpan(key=tcid, kind="tool", name=content.name, input=args)] + return [] + if isinstance(content, ToolResponseContent): + tcid = content.tool_call_id + if tcid in self._open_tool_ids: + self._open_tool_ids.pop(tcid, None) + return [CloseSpan(key=tcid, output=content.content, is_complete=True)] + return [] + + def _on_done(self, event: StreamTaskMessageDone) -> list[SpanSignal]: + if event.index is None: + return [] + idx = event.index + meta = self._tool_by_index.pop(idx, None) + if meta is not None: + args = meta.arguments + if meta.args_buf: + try: + args = json.loads(meta.args_buf) + except json.JSONDecodeError: + args = {"_raw": meta.args_buf} + self._open_tool_ids[meta.tool_call_id] = None + return [OpenSpan(key=meta.tool_call_id, kind="tool", name=meta.name, input=args)] + if idx in self._reasoning_index_open: + self._reasoning_index_open.discard(idx) + return [CloseSpan(key=f"reasoning:{idx}", output=None, is_complete=True)] + return [] diff --git a/src/agentex/lib/core/harness/tracer.py b/src/agentex/lib/core/harness/tracer.py new file mode 100644 index 000000000..8384407bd --- /dev/null +++ b/src/agentex/lib/core/harness/tracer.py @@ -0,0 +1,82 @@ +"""Adapter from SpanSignals to adk.tracing spans (best-effort, overridable).""" + +from __future__ import annotations + +from typing import Any + +from agentex.lib.core.harness.types import OpenSpan, CloseSpan, SpanSignal + +try: + from agentex.lib.utils.logging import make_logger + + logger = make_logger(__name__) +except Exception: # ddtrace may be absent in some envs; fall back to stdlib + import logging + + logger = logging.getLogger(__name__) + + +class SpanTracer: + """Opens/closes adk.tracing child spans in response to span signals. + + `tracing` defaults to the real `adk.tracing` module; inject a fake in tests + or a custom tracer to override. No-op when `trace_id` is falsy. Never raises. + + The real TracingModule.end_span does NOT accept an output kwarg — output is + recorded by mutating span.output before calling end_span, matching the pattern + used throughout the codebase (see _langgraph_tracing.py on_tool_end etc.). + + Span-lifecycle contract: the `_open` dict (span key -> span object) is scoped + to a single turn. Pairing is by `key`: + - A duplicate OpenSpan for a key already in `_open` silently replaces the + earlier span; the earlier span is then orphaned (never closed / leaked). + - A CloseSpan for an unknown key is a no-op. + - Unpaired opens accumulate in `_open` for the lifetime of the tracer; since + a tracer is expected to live for one turn, this is bounded and acceptable. + """ + + def __init__( + self, + trace_id: str | None, + parent_span_id: str | None, + tracing: Any = None, + task_id: str | None = None, + ): + self.trace_id = trace_id + self.parent_span_id = parent_span_id + self.task_id = task_id + if tracing is None: + from agentex.lib import adk + + tracing = adk.tracing + self._tracing = tracing + self._open: dict[str, Any] = {} # span key -> span object + + async def handle(self, signal: SpanSignal) -> None: + if not self.trace_id: + return + try: + if isinstance(signal, OpenSpan): + span = await self._tracing.start_span( + trace_id=self.trace_id, + name=signal.name, + input=signal.input, + parent_id=self.parent_span_id, + task_id=self.task_id, + ) + if span is not None: + self._open[signal.key] = span + elif isinstance(signal, CloseSpan): + span = self._open.pop(signal.key, None) + if span is not None: + # Output is recorded by mutating span.output before end_span. + # The real TracingModule.end_span signature is: + # end_span(trace_id, span, start_to_close_timeout, heartbeat_timeout, retry_policy) + # It does not accept an output= kwarg. + span.output = signal.output + await self._tracing.end_span( + trace_id=self.trace_id, + span=span, + ) + except Exception as exc: # best-effort: tracing never breaks delivery + logger.warning("[harness.tracer] span signal failed: %s", exc) diff --git a/src/agentex/lib/core/harness/types.py b/src/agentex/lib/core/harness/types.py new file mode 100644 index 000000000..64104d316 --- /dev/null +++ b/src/agentex/lib/core/harness/types.py @@ -0,0 +1,92 @@ +"""Types for the unified harness surface.""" + +from __future__ import annotations + +from typing import Any, Union, Literal, Protocol, AsyncIterator, runtime_checkable +from dataclasses import field, dataclass + +from pydantic import BaseModel, ConfigDict + +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) + +# The canonical stream element. Taps yield these; delivery adapters consume them. +StreamTaskMessage = Union[ + StreamTaskMessageStart, + StreamTaskMessageDelta, + StreamTaskMessageFull, + StreamTaskMessageDone, +] + +SpanKind = Literal["tool", "reasoning", "subagent"] + + +@dataclass +class OpenSpan: + """Signal to open a child span. `key` pairs an open with its close.""" + + key: str + kind: SpanKind + name: str + input: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class CloseSpan: + """Signal to close the span previously opened with the same `key`.""" + + key: str + output: Any = None + is_complete: bool = True # False when closed by flush() without a result + + +SpanSignal = Union[OpenSpan, CloseSpan] + + +class TurnUsage(BaseModel): + """Harness-independent turn usage/cost, attached to the turn span. + + Token field names align with agentex.lib.core.observability.llm_metrics. + """ + + model_config = ConfigDict(from_attributes=True, populate_by_name=True) + + model: str | None = None + input_tokens: int | None = None + output_tokens: int | None = None + cached_input_tokens: int | None = None + reasoning_tokens: int | None = None + total_tokens: int | None = None + cost_usd: float | None = None + duration_ms: int | None = None + num_llm_calls: int = 0 + num_tool_calls: int = 0 + num_reasoning_blocks: int = 0 + + +class TurnResult(BaseModel): + """Returned to the caller after a turn is delivered.""" + + model_config = ConfigDict(from_attributes=True, populate_by_name=True) + + final_text: str = "" + usage: TurnUsage = TurnUsage() + + +@runtime_checkable +class HarnessTurn(Protocol): + """A single harness turn: a canonical stream plus its normalized usage. + + Python async generators cannot cleanly return a value to their consumer, so + a tap exposes usage via `usage()` (valid only after `events` is exhausted) + rather than via StopAsyncIteration. + """ + + @property + def events(self) -> AsyncIterator[StreamTaskMessage]: ... + + def usage(self) -> TurnUsage: ... diff --git a/src/agentex/lib/core/harness/yield_delivery.py b/src/agentex/lib/core/harness/yield_delivery.py new file mode 100644 index 000000000..69b39f152 --- /dev/null +++ b/src/agentex/lib/core/harness/yield_delivery.py @@ -0,0 +1,31 @@ +"""Yield delivery: pass the canonical stream through, tracing as a side effect.""" + +from __future__ import annotations + +from typing import AsyncIterator, AsyncGenerator + +from agentex.lib.core.harness.types import StreamTaskMessage +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.lib.core.harness.span_derivation import SpanDeriver + + +async def yield_events( + events: AsyncIterator[StreamTaskMessage], + tracer: SpanTracer | None = None, +) -> AsyncGenerator[StreamTaskMessage, None]: + """Forward each event to the caller; derive + trace spans as a side effect. + + For sync HTTP ACP agents that yield events back over the response. When + `tracer` is None, this is a pure passthrough. + """ + deriver = SpanDeriver() if tracer is not None else None + try: + async for event in events: + if deriver is not None and tracer is not None: + for signal in deriver.observe(event): + await tracer.handle(signal) + yield event + finally: + if deriver is not None and tracer is not None: + for signal in deriver.flush(): + await tracer.handle(signal) diff --git a/tests/lib/core/harness/__init__.py b/tests/lib/core/harness/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/harness/conformance/__init__.py b/tests/lib/core/harness/conformance/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/lib/core/harness/conformance/runner.py b/tests/lib/core/harness/conformance/runner.py new file mode 100644 index 000000000..81a74860c --- /dev/null +++ b/tests/lib/core/harness/conformance/runner.py @@ -0,0 +1,48 @@ +"""Shared conformance engine: every harness tap registers fixtures here. + +A fixture is (name, list[StreamTaskMessage]). The runner asserts that span +derivation over the events is identical regardless of delivery channel, which is +the cross-channel guarantee from the spec. + +Registry shared-state hazard: `_REGISTRY` is process-global. Every `test_*.py` +module that calls `register()` at import time contributes to it, so a module +that parametrizes over `all_fixtures()` will see fixtures registered by ANY +other conformance module imported earlier in the same pytest process (collection +order is not guaranteed). To stay deterministic, each future harness conformance +module should register and parametrize over its OWN fixtures (e.g. keep a +module-local list it both registers and parametrizes), rather than relying on +cross-module global accumulation via `all_fixtures()`. +""" + +from __future__ import annotations + +from dataclasses import dataclass + +from agentex.lib.core.harness.types import SpanSignal, StreamTaskMessage +from agentex.lib.core.harness.span_derivation import SpanDeriver + + +@dataclass +class Fixture: + name: str + events: list[StreamTaskMessage] + + +_REGISTRY: list[Fixture] = [] + + +def register(fixture: Fixture) -> None: + _REGISTRY.append(fixture) + + +def all_fixtures() -> list[Fixture]: + return list(_REGISTRY) + + +def derive_all(events: list[StreamTaskMessage]) -> list[SpanSignal]: + d = SpanDeriver() + out: list[SpanSignal] = [] + for e in events: + out.extend(d.observe(e)) + out.extend(d.flush()) + return out diff --git a/tests/lib/core/harness/conformance/test_conformance.py b/tests/lib/core/harness/conformance/test_conformance.py new file mode 100644 index 000000000..d9eec1c15 --- /dev/null +++ b/tests/lib/core/harness/conformance/test_conformance.py @@ -0,0 +1,43 @@ +import pytest + +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent + +from .runner import Fixture, register, derive_all, all_fixtures + +register( + Fixture( + name="builtin-single-tool", + events=[ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", author="agent", tool_call_id="c", name="Bash", arguments={} + ), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", author="agent", tool_call_id="c", name="Bash", content="ok" + ), + ), + ], + ) +) + + +@pytest.mark.parametrize("fixture", all_fixtures(), ids=lambda f: f.name) +def test_span_derivation_is_deterministic(fixture): + """Exercises the cross-channel guarantee: yield and auto-send observe the + same event stream, so span derivation must be deterministic/idempotent.""" + # Deriving twice over the same events yields identical signals (the property + # that makes yield vs auto-send equivalent, since both observe the same stream). + assert derive_all(fixture.events) == derive_all(fixture.events) diff --git a/tests/lib/core/harness/test_auto_send.py b/tests/lib/core/harness/test_auto_send.py new file mode 100644 index 000000000..1948e9196 --- /dev/null +++ b/tests/lib/core/harness/test_auto_send.py @@ -0,0 +1,490 @@ +"""Tests for auto_send delivery adapter. + +The fake mirrors the real StreamingTaskMessageContext API exactly: +- streaming_task_message_context(...) returns a context object (synchronously) +- open the context via __aenter__ (returns self after creating the task message) +- stream deltas via ctx.stream_update(StreamTaskMessageDelta(...)) +- close via ctx.close() (NOT __aexit__) + +This mirrors _langgraph_async.py lines 62-78 and 100-127. +""" + +import types as _types +from datetime import datetime + +import pytest + +from agentex.types.task_message import TaskMessage +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.types.task_message_delta import TextDelta +from agentex.types.tool_request_delta import ToolRequestDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.lib.core.harness.auto_send import auto_send +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent + + +class _FakeCtx: + """Mirrors StreamingTaskMessageContext: __aenter__ opens (returns self with task_message set), + close() closes. stream_update records the call. + + task_message is a real TaskMessage instance so that auto_send can use it + as parent_task_message in StreamTaskMessageDelta without Pydantic validation errors. + """ + + def __init__(self, sink, content_type, initial_content): + self.sink = sink + self.content_type = content_type + # Real TaskMessage so StreamTaskMessageDelta(parent_task_message=...) passes validation + self.task_message = TaskMessage(id="msg-1", task_id="task1", content=initial_content) + + async def __aenter__(self): + self.sink.append(("open", self.content_type)) + return self + + async def __aexit__(self, *a): + # __aexit__ delegates to close in the real impl; keep for safety + await self.close() + return False + + async def close(self): + self.sink.append(("close", self.content_type)) + + async def stream_update(self, update): + self.sink.append(("update", update)) + return update + + +class _FakeStreaming: + """Mirrors StreamingService: streaming_task_message_context returns a context object.""" + + def __init__(self): + self.sink = [] + self.recorded_created_at: list[datetime | None] = [] + + def streaming_task_message_context(self, task_id, initial_content, streaming_mode="coalesced", created_at=None): + ctype = getattr(initial_content, "type", None) + self.sink.append(("ctx", ctype)) + self.recorded_created_at.append(created_at) + return _FakeCtx(self.sink, ctype, initial_content) + + +async def _gen(events): + for e in events: + yield e + + +# --------------------------------------------------------------------------- +# Test 1: text streaming — open, stream deltas, close; return accumulated text +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_streams_text_and_returns_final_text(): + streaming = _FakeStreaming() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta="Hel"), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta="lo"), + ), + StreamTaskMessageDone(type="done", index=0), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + + assert result.final_text == "Hello" + + kinds = [s[0] for s in streaming.sink] + # A context was created for the text content + assert kinds[0] == "ctx" + # It was opened and closed + assert "open" in kinds + assert "close" in kinds + # Exactly two updates were streamed (one per delta) + updates = [s for s in streaming.sink if s[0] == "update"] + assert len(updates) == 2 + + +# --------------------------------------------------------------------------- +# Test 2: tool_request Full + tool_response Full — each posts one full message +# (open context with the content, no deltas, close immediately) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_posts_full_tool_messages(): + streaming = _FakeStreaming() + events = [ + # Two Full events post two messages (open+close immediately, no deltas). + StreamTaskMessageFull( + type="full", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="c1", + name="Bash", + arguments={"cmd": "ls"}, + ), + ), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="c1", + name="Bash", + content="file.py", + ), + ), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + + assert result.final_text == "" + + # Each Full event opens and closes exactly one context. + ctx_events = [s for s in streaming.sink if s[0] == "ctx"] + assert len(ctx_events) == 2 + content_types = [s[1] for s in ctx_events] + assert content_types == ["tool_request", "tool_response"] + + # Each context is opened and closed + opens = [s for s in streaming.sink if s[0] == "open"] + closes = [s for s in streaming.sink if s[0] == "close"] + assert len(opens) == 2 + assert len(closes) == 2 + + # No stream_update calls (full messages have no deltas) + updates = [s for s in streaming.sink if s[0] == "update"] + assert len(updates) == 0 + + +# --------------------------------------------------------------------------- +# Test 3: tracing — spans are derived and handed to the tracer +# --------------------------------------------------------------------------- + + +class _RecordTracing: + def __init__(self): + self.started, self.ended = [], [] + + async def start_span(self, *, trace_id, name, input=None, parent_id=None, data=None, task_id=None): + self.started.append(name) + return _types.SimpleNamespace() + + async def end_span(self, *, trace_id, span): + self.ended.append(getattr(span, "output", None)) + + +@pytest.mark.asyncio +async def test_auto_send_derives_tool_spans_via_tracer(): + fake_tracing = _RecordTracing() + tracer = SpanTracer(trace_id="t", parent_span_id="p", tracing=fake_tracing) + streaming = _FakeStreaming() + + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="c1", + name="Bash", + arguments={}, + ), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="c1", + name="Bash", + content="ok", + ), + ), + ] + + result = await auto_send(_gen(events), task_id="task1", tracer=tracer, streaming=streaming) + + assert result.final_text == "" + assert fake_tracing.started == ["Bash"] + assert fake_tracing.ended == ["ok"] + + +# --------------------------------------------------------------------------- +# Test 4: text followed by a tool Full — text context is closed before Full +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_closes_text_context_before_full_message(): + streaming = _FakeStreaming() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta="Hi"), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="c2", + name="read_file", + arguments={}, + ), + ), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + assert result.final_text == "Hi" + + # Verify ordering: text ctx opens, updates, closes; then tool_request ctx opens, closes + event_sequence = [(s[0], s[1]) for s in streaming.sink] + text_open_idx = next(i for i, s in enumerate(event_sequence) if s == ("open", "text")) + text_close_idx = next(i for i, s in enumerate(event_sequence) if s == ("close", "text")) + tool_open_idx = next(i for i, s in enumerate(event_sequence) if s == ("open", "tool_request")) + assert text_open_idx < text_close_idx < tool_open_idx + + +# --------------------------------------------------------------------------- +# Test 5: midstream error — propagates AND the open context is closed (finally) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_open_context_closed_on_midstream_error(): + streaming = _FakeStreaming() + + async def _exploding_gen(): + yield StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ) + raise RuntimeError("boom") + + with pytest.raises(RuntimeError, match="boom"): + await auto_send(_exploding_gen(), task_id="task1", tracer=None, streaming=streaming) + + # The text context that was opened mid-stream was closed by the finally block. + assert ("open", "text") in [(s[0], s[1]) for s in streaming.sink] + assert ("close", "text") in [(s[0], s[1]) for s in streaming.sink] + + +# --------------------------------------------------------------------------- +# Test 6: streamed tool_request delivered (AGX1-377 core) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_streams_tool_request(): + """A Start(ToolRequestContent) MUST open a streaming context (AGX1-377).""" + streaming = _FakeStreaming() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="c_tool", + name="Bash", + arguments={}, + ), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ToolRequestDelta( + type="tool_request", + tool_call_id="c_tool", + name="Bash", + arguments_delta='{"cmd": "ls"}', + ), + ), + StreamTaskMessageDone(type="done", index=0), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + + assert result.final_text == "" + + ctx_events = [s for s in streaming.sink if s[0] == "ctx"] + assert len(ctx_events) == 1 + assert ctx_events[0][1] == "tool_request" + + opens = [s for s in streaming.sink if s[0] == "open"] + closes = [s for s in streaming.sink if s[0] == "close"] + assert len(opens) == 1 + assert len(closes) == 1 + + updates = [s for s in streaming.sink if s[0] == "update"] + assert len(updates) == 1 + + +# --------------------------------------------------------------------------- +# Test 7: interleaved indexes route correctly +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_interleaved_indexes_route_correctly(): + """Deltas must be routed to the correct index-keyed context.""" + streaming = _FakeStreaming() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageStart( + type="start", + index=1, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta="A"), + ), + StreamTaskMessageDelta( + type="delta", + index=1, + delta=TextDelta(type="text", text_delta="B"), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageDone(type="done", index=1), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + + ctx_events = [s for s in streaming.sink if s[0] == "ctx"] + assert len(ctx_events) == 2 + + opens = [s for s in streaming.sink if s[0] == "open"] + assert len(opens) == 2 + + updates = [s for s in streaming.sink if s[0] == "update"] + assert len(updates) == 2 + + update_deltas = [s[1].delta for s in streaming.sink if s[0] == "update"] + text_deltas = [d.text_delta for d in update_deltas if isinstance(d, TextDelta)] + assert set(text_deltas) == {"A", "B"} + + +# --------------------------------------------------------------------------- +# Test 8: final_text returns last text segment for multi-step +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_final_text_last_segment(): + """final_text must be the LAST text segment, not accumulated across all turns.""" + streaming = _FakeStreaming() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=TextDelta(type="text", text_delta="First"), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageStart( + type="start", + index=1, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDelta( + type="delta", + index=1, + delta=TextDelta(type="text", text_delta="Second"), + ), + StreamTaskMessageDone(type="done", index=1), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + assert result.final_text == "Second" + + +# --------------------------------------------------------------------------- +# Test 9: Full(TextContent) contributes to final_text +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_full_text_content_sets_final_text(): + """A Full(TextContent) must contribute its text to final_text.""" + streaming = _FakeStreaming() + events = [ + StreamTaskMessageFull( + type="full", + index=0, + content=TextContent(type="text", author="agent", content="hello"), + ), + ] + result = await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming) + assert result.final_text == "hello" + + +# --------------------------------------------------------------------------- +# Test 10: created_at is forwarded to streaming context (AGX1-378) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_send_created_at_forwarded(): + """created_at must be forwarded to every streaming_task_message_context call.""" + streaming = _FakeStreaming() + dt = datetime(2025, 1, 15, 12, 0, 0) + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=TextContent(type="text", author="agent", content=""), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="c_ts", + name="Bash", + arguments={}, + ), + ), + ] + await auto_send(_gen(events), task_id="task1", tracer=None, streaming=streaming, created_at=dt) + + assert all(ts == dt for ts in streaming.recorded_created_at) diff --git a/tests/lib/core/harness/test_emitter.py b/tests/lib/core/harness/test_emitter.py new file mode 100644 index 000000000..ee3052f47 --- /dev/null +++ b/tests/lib/core/harness/test_emitter.py @@ -0,0 +1,110 @@ +import pytest + +from agentex.types.task_message import TaskMessage +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.types import TurnUsage +from agentex.lib.core.harness.emitter import UnifiedEmitter +from agentex.types.task_message_delta import TextDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) + + +class _FakeTracing: + async def start_span(self, **kw): + return None + + async def end_span(self, **kw): + pass + + +class _FakeCtx: + """Minimal StreamingTaskMessageContext fake (see test_auto_send.py).""" + + def __init__(self, sink, content_type, initial_content): + self.sink = sink + self.content_type = content_type + self.task_message = TaskMessage(id="msg-1", task_id="task1", content=initial_content) + + async def __aenter__(self): + self.sink.append(("open", self.content_type)) + return self + + async def __aexit__(self, *a): + await self.close() + return False + + async def close(self): + self.sink.append(("close", self.content_type)) + + async def stream_update(self, update): + self.sink.append(("update", update)) + return update + + +class _FakeStreaming: + def __init__(self): + self.sink = [] + + def streaming_task_message_context(self, task_id, initial_content, streaming_mode="coalesced", created_at=None): + ctype = getattr(initial_content, "type", None) + self.sink.append(("ctx", ctype)) + return _FakeCtx(self.sink, ctype, initial_content) + + +class _Turn: + def __init__(self, events_list, usage): + self._events_list = events_list + self._usage = usage + + @property + async def events(self): + for e in self._events_list: + yield e + + def usage(self): + return self._usage + + +@pytest.mark.asyncio +async def test_emitter_yield_mode_passes_through(): + events = [ + StreamTaskMessageStart(type="start", index=0, content=TextContent(type="text", author="agent", content="hi")), + StreamTaskMessageDone(type="done", index=0), + ] + turn = _Turn(events, TurnUsage(model="m")) + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None) + out = [e async for e in emitter.yield_turn(turn)] + assert out == events + + +@pytest.mark.asyncio +async def test_emitter_tracing_default_on_when_trace_id_present(): + # Inject a fake tracing backend so the test env doesn't need temporalio. + # This exercises the default-on path (tracer=None) when trace_id is truthy. + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id="p", tracing=_FakeTracing()) + assert emitter.tracer is not None + + +@pytest.mark.asyncio +async def test_emitter_tracing_overridable_off(): + emitter = UnifiedEmitter(task_id="t", trace_id="trace1", parent_span_id="p", tracer=False) + assert emitter.tracer is None + + +@pytest.mark.asyncio +async def test_emitter_auto_send_turn_returns_usage(): + usage = TurnUsage(model="m", input_tokens=5) + events = [ + StreamTaskMessageStart(type="start", index=0, content=TextContent(type="text", author="agent", content="")), + StreamTaskMessageDelta(type="delta", index=0, delta=TextDelta(type="text", text_delta="Hello")), + StreamTaskMessageDone(type="done", index=0), + ] + turn = _Turn(events, usage) + fake = _FakeStreaming() + emitter = UnifiedEmitter(task_id="t", trace_id=None, parent_span_id=None, streaming=fake) + result = await emitter.auto_send_turn(turn) + assert result.usage == usage + assert result.final_text == "Hello" diff --git a/tests/lib/core/harness/test_span_derivation.py b/tests/lib/core/harness/test_span_derivation.py new file mode 100644 index 000000000..f22b83d54 --- /dev/null +++ b/tests/lib/core/harness/test_span_derivation.py @@ -0,0 +1,257 @@ +from agentex.types.text_content import TextContent +from agentex.lib.core.harness.types import OpenSpan, CloseSpan +from agentex.types.reasoning_content import ReasoningContent +from agentex.types.tool_request_delta import ToolRequestDelta +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageDelta, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.core.harness.span_derivation import SpanDeriver + + +def _signals(deriver, events): + out = [] + for e in events: + out.extend(deriver.observe(e)) + out.extend(deriver.flush()) + return out + + +def _tool_req(idx, tcid, name, args): + return StreamTaskMessageStart( + type="start", + index=idx, + content=ToolRequestContent(type="tool_request", author="agent", tool_call_id=tcid, name=name, arguments=args), + ) + + +def test_text_only_yields_no_spans(): + d = SpanDeriver() + events = [ + StreamTaskMessageStart(type="start", index=0, content=TextContent(type="text", author="agent", content="")), + StreamTaskMessageDelta(type="delta", index=0, delta=None), + StreamTaskMessageDone(type="done", index=0), + ] + assert _signals(d, events) == [] + + +def test_single_tool_opens_on_done_closes_on_response(): + d = SpanDeriver() + events = [ + _tool_req(0, "call_1", "Bash", {"cmd": "ls"}), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", author="agent", tool_call_id="call_1", name="Bash", content="files" + ), + ), + ] + sigs = _signals(d, events) + assert sigs == [ + OpenSpan(key="call_1", kind="tool", name="Bash", input={"cmd": "ls"}), + CloseSpan(key="call_1", output="files", is_complete=True), + ] + + +def test_reasoning_opens_on_start_closes_on_done(): + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", index=0, content=ReasoningContent(type="reasoning", author="agent", summary=[], content=[]) + ), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[0] == OpenSpan(key="reasoning:0", kind="reasoning", name="reasoning", input={}) + assert sigs[1] == CloseSpan(key="reasoning:0", output=None, is_complete=True) + + +def test_parallel_tools_pair_by_tool_call_id(): + d = SpanDeriver() + events = [ + _tool_req(0, "a", "T1", {}), + _tool_req(1, "b", "T2", {}), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageDone(type="done", index=1), + StreamTaskMessageFull( + type="full", + index=2, + content=ToolResponseContent( + type="tool_response", author="agent", tool_call_id="b", name="T2", content="rb" + ), + ), + StreamTaskMessageFull( + type="full", + index=3, + content=ToolResponseContent( + type="tool_response", author="agent", tool_call_id="a", name="T1", content="ra" + ), + ), + ] + sigs = _signals(d, events) + opens = [s for s in sigs if isinstance(s, OpenSpan)] + closes = [s for s in sigs if isinstance(s, CloseSpan)] + assert {o.key for o in opens} == {"a", "b"} + assert [c.key for c in closes] == ["b", "a"] + assert all(c.is_complete for c in closes) + + +def test_streamed_args_accumulate_into_open_input(): + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", author="agent", tool_call_id="c", name="Bash", arguments={} + ), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ToolRequestDelta(type="tool_request", tool_call_id="c", name="Bash", arguments_delta='{"cmd":'), + ), + StreamTaskMessageDelta( + type="delta", + index=0, + delta=ToolRequestDelta(type="tool_request", tool_call_id="c", name="Bash", arguments_delta='"ls"}'), + ), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[0] == OpenSpan(key="c", kind="tool", name="Bash", input={"cmd": "ls"}) + + +def test_unclosed_tool_closed_incomplete_on_flush(): + d = SpanDeriver() + events = [ + _tool_req(0, "x", "Bash", {}), + StreamTaskMessageDone(type="done", index=0), + ] + sigs = _signals(d, events) + assert sigs[0] == OpenSpan(key="x", kind="tool", name="Bash", input={}) + assert sigs[1] == CloseSpan(key="x", output=None, is_complete=False) + + +def test_none_index_is_skipped(): + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", + index=None, + content=ToolRequestContent( + type="tool_request", author="agent", tool_call_id="n", name="Bash", arguments={} + ), + ), + StreamTaskMessageDone(type="done", index=None), + ] + assert _signals(d, events) == [] + + +def test_orphan_tool_response_ignored(): + d = SpanDeriver() + events = [ + StreamTaskMessageFull( + type="full", + index=0, + content=ToolResponseContent( + type="tool_response", author="agent", tool_call_id="z", name="Bash", content="r" + ), + ), + ] + assert _signals(d, events) == [] + + +def test_full_tool_request_opens_span(): + """Full(ToolRequestContent) must open a tool span (for LangGraph-style harnesses).""" + d = SpanDeriver() + events = [ + StreamTaskMessageFull( + type="full", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_x", + name="Bash", + arguments={"cmd": "ls"}, + ), + ), + ] + sigs = _signals(d, events) + assert sigs[0] == OpenSpan(key="call_x", kind="tool", name="Bash", input={"cmd": "ls"}) + assert sigs[1] == CloseSpan(key="call_x", output=None, is_complete=False) + + +def test_full_tool_request_and_response_paired(): + """Full(ToolRequestContent) + Full(ToolResponseContent) produces a complete span pair.""" + d = SpanDeriver() + events = [ + StreamTaskMessageFull( + type="full", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_y", + name="Grep", + arguments={}, + ), + ), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", + author="agent", + tool_call_id="call_y", + name="Grep", + content="result", + ), + ), + ] + sigs = _signals(d, events) + assert sigs == [ + OpenSpan(key="call_y", kind="tool", name="Grep", input={}), + CloseSpan(key="call_y", output="result", is_complete=True), + ] + + +def test_full_tool_request_does_not_double_open(): + """A Full(ToolRequestContent) for an already-open tool_call_id is a no-op.""" + d = SpanDeriver() + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_z", + name="X", + arguments={}, + ), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolRequestContent( + type="tool_request", + author="agent", + tool_call_id="call_z", + name="X", + arguments={}, + ), + ), + ] + sigs = _signals(d, events) + opens = [s for s in sigs if isinstance(s, OpenSpan)] + assert len(opens) == 1 + assert opens[0].key == "call_z" diff --git a/tests/lib/core/harness/test_tracer.py b/tests/lib/core/harness/test_tracer.py new file mode 100644 index 000000000..315b74417 --- /dev/null +++ b/tests/lib/core/harness/test_tracer.py @@ -0,0 +1,70 @@ +from typing import override + +import pytest + +from agentex.lib.core.harness.types import OpenSpan, CloseSpan +from agentex.lib.core.harness.tracer import SpanTracer + + +class _FakeSpan: + def __init__(self, name): + self.name = name + self.output = None + + +class _FakeTracing: + def __init__(self): + self.started = [] + self.ended = [] + + async def start_span(self, *, trace_id, name, input=None, parent_id=None, data=None, task_id=None): + self.started.append((name, parent_id, input)) + return _FakeSpan(name) + + async def end_span(self, *, trace_id, span): + self.ended.append((span.name, span.output)) + + +@pytest.mark.asyncio +async def test_open_then_close_starts_and_ends_span(): + fake = _FakeTracing() + tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) + await tracer.handle(OpenSpan(key="call_1", kind="tool", name="Bash", input={"cmd": "ls"})) + await tracer.handle(CloseSpan(key="call_1", output="files", is_complete=True)) + assert fake.started == [("Bash", "p1", {"cmd": "ls"})] + assert fake.ended == [("Bash", "files")] + + +@pytest.mark.asyncio +async def test_no_trace_id_is_noop(): + fake = _FakeTracing() + tracer = SpanTracer(trace_id="", parent_span_id=None, tracing=fake) + await tracer.handle(OpenSpan(key="k", kind="tool", name="X")) + await tracer.handle(CloseSpan(key="k")) + assert fake.started == [] and fake.ended == [] + + +@pytest.mark.asyncio +async def test_tracing_failure_is_swallowed(): + class _Boom(_FakeTracing): + @override + async def start_span(self, **kw): + raise RuntimeError("backend down") + + tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=_Boom()) + # Must not raise. + await tracer.handle(OpenSpan(key="k", kind="tool", name="X")) + await tracer.handle(CloseSpan(key="k")) + assert tracer._open == {} + + +@pytest.mark.asyncio +async def test_duplicate_open_replaces_silently(): + fake = _FakeTracing() + tracer = SpanTracer(trace_id="t1", parent_span_id="p1", tracing=fake) + await tracer.handle(OpenSpan(key="k", kind="tool", name="A")) + await tracer.handle(OpenSpan(key="k", kind="tool", name="B")) + await tracer.handle(CloseSpan(key="k")) + # Both opens started spans, but only the second ("B") is closed. + assert [name for name, _, _ in fake.started] == ["A", "B"] + assert fake.ended == [("B", None)] diff --git a/tests/lib/core/harness/test_types.py b/tests/lib/core/harness/test_types.py new file mode 100644 index 000000000..68bc89ce2 --- /dev/null +++ b/tests/lib/core/harness/test_types.py @@ -0,0 +1,53 @@ +from typing import AsyncIterator + +from agentex.lib.core.harness.types import ( + OpenSpan, + CloseSpan, + TurnUsage, + TurnResult, + HarnessTurn, + StreamTaskMessage, +) + + +def test_open_close_span_construct(): + o = OpenSpan(key="call_1", kind="tool", name="Bash", input={"cmd": "ls"}) + c = CloseSpan(key="call_1", output="files", is_complete=True) + assert o.key == c.key == "call_1" + assert o.kind == "tool" + assert c.is_complete is True + + +def test_turn_usage_defaults_are_none(): + u = TurnUsage(model="claude-opus-4-6") + assert u.model == "claude-opus-4-6" + assert u.input_tokens is None + assert u.num_tool_calls == 0 + + +def test_turn_result_wraps_usage(): + r = TurnResult(final_text="hi", usage=TurnUsage(model="m")) + assert r.final_text == "hi" + assert r.usage.model == "m" + + +def test_close_span_defaults(): + c = CloseSpan(key="x") + assert c.output is None + assert c.is_complete is True + + +def test_harness_turn_runtime_check(): + class _Turn: + @property + def events(self) -> AsyncIterator[StreamTaskMessage]: + async def _gen() -> AsyncIterator[StreamTaskMessage]: + if False: + yield # pragma: no cover + + return _gen() + + def usage(self) -> TurnUsage: + return TurnUsage(model="m") + + assert isinstance(_Turn(), HarnessTurn) is True diff --git a/tests/lib/core/harness/test_yield_delivery.py b/tests/lib/core/harness/test_yield_delivery.py new file mode 100644 index 000000000..f3f491d84 --- /dev/null +++ b/tests/lib/core/harness/test_yield_delivery.py @@ -0,0 +1,89 @@ +import types as _types + +import pytest + +from agentex.lib.core.harness.tracer import SpanTracer +from agentex.types.task_message_update import ( + StreamTaskMessageDone, + StreamTaskMessageFull, + StreamTaskMessageStart, +) +from agentex.types.tool_request_content import ToolRequestContent +from agentex.types.tool_response_content import ToolResponseContent +from agentex.lib.core.harness.yield_delivery import yield_events + + +class _RecordTracing: + def __init__(self): + self.started, self.ended = [], [] + + async def start_span(self, *, trace_id, name, input=None, parent_id=None, data=None, task_id=None): + self.started.append(name) + return _types.SimpleNamespace() # supports arbitrary attribute assignment (span.output = ...) + + async def end_span(self, *, trace_id, span): + self.ended.append(getattr(span, "output", None)) + + +async def _gen(events): + for e in events: + yield e + + +@pytest.mark.asyncio +async def test_yield_passes_events_through_and_traces(): + fake = _RecordTracing() + tracer = SpanTracer(trace_id="t", parent_span_id="p", tracing=fake) + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", author="agent", tool_call_id="c", name="Bash", arguments={} + ), + ), + StreamTaskMessageDone(type="done", index=0), + StreamTaskMessageFull( + type="full", + index=1, + content=ToolResponseContent( + type="tool_response", author="agent", tool_call_id="c", name="Bash", content="ok" + ), + ), + ] + out = [e async for e in yield_events(_gen(events), tracer=tracer)] + assert out == events # passthrough unchanged + assert fake.started == ["Bash"] # span derived + opened + assert fake.ended == ["ok"] # span closed with response + + +@pytest.mark.asyncio +async def test_yield_without_tracer_is_pure_passthrough(): + events = [ + StreamTaskMessageDone(type="done", index=0), + ] + out = [e async for e in yield_events(_gen(events), tracer=None)] + assert out == events + + +@pytest.mark.asyncio +async def test_flush_runs_on_early_close(): + fake = _RecordTracing() + tracer = SpanTracer(trace_id="t", parent_span_id="p", tracing=fake) + events = [ + StreamTaskMessageStart( + type="start", + index=0, + content=ToolRequestContent( + type="tool_request", author="agent", tool_call_id="c", name="Bash", arguments={} + ), + ), + StreamTaskMessageDone(type="done", index=0), + # response intentionally never arrives + ] + gen = yield_events(_gen(events), tracer=tracer) + first = await gen.__anext__() # Start + second = await gen.__anext__() # Done -> tool span opens here + await gen.aclose() # triggers the finally -> flush() + assert fake.started == ["Bash"] + assert fake.ended == [None] # flush closed the unpaired span (incomplete, no output)