Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
# Unified Harness Surface — PR 4: pydantic-ai Migration Plan

> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.

**Goal:** Migrate the pydantic-ai harness onto the unified harness surface so it emits streaming + persisted messages + tracing + turn usage through ONE source of truth, over both delivery channels (yield + auto-send), with no public regression — and ship its 3 integration test agents (sync/async/temporal).

**Architecture:** Wrap a pydantic-ai run as a `HarnessTurn` (canonical `StreamTaskMessage*` stream + normalized `TurnUsage`). Reuse the existing `convert_pydantic_ai_to_agentex_events` mapping as the tap. Reimplement the existing public auto-send helper on top of `UnifiedEmitter.auto_send_turn`, and route sync ACP agents through `UnifiedEmitter.yield_turn`. Retire the bespoke `_pydantic_ai_tracing` handler in favor of the surface's derived spans (keep the old symbol as a deprecated shim).

**Tech Stack:** Python 3, pydantic-ai (`pydantic_ai`), pydantic v2, pytest + pytest-asyncio, the `agentex.lib.core.harness` package from PRs 1-3.

**Foundation:** `src/agentex/lib/core/harness/` (`UnifiedEmitter`, `SpanTracer`, `SpanDeriver`, `HarnessTurn`, `TurnUsage`, `TurnResult`, `yield_events`, `auto_send`, conformance scaffold). Design: `docs/superpowers/specs/2026-06-18-unified-harness-surface-design.md`.

---

## Dependencies (must land first)

- **AGX1-373** — cross-channel conformance equivalence + `Full` wire reconciliation. PR 4's conformance fixtures register into the upgraded cross-channel runner. **Do not start Task 6 until 373 is merged into the foundation branch.**
- **AGX1-375** — public `adk` import path for the harness surface. If merged, import the surface via the public path in this PR; if not, import from `agentex.lib.core.harness` and add a follow-up note. (Tasks below assume `from agentex.lib.core.harness import UnifiedEmitter, TurnUsage, ...`; swap to the public path if 375 landed.)

This is one PR (target < 1000 lines code, excluding any recorded fixtures). The 3 test agents are the largest chunk; if the diff exceeds budget, split the test agents into a follow-up PR 4b (note in the PR description).

---

## File Structure

- Modify `src/agentex/lib/adk/_modules/_pydantic_ai_sync.py` — add an optional `on_result` callback to `convert_pydantic_ai_to_agentex_events` (additive) so usage can be captured. Behavior unchanged when omitted.
- Create `src/agentex/lib/adk/_modules/_pydantic_ai_turn.py` — `PydanticAITurn(HarnessTurn)` + `pydantic_ai_usage_to_turn_usage(...)`.
- Modify `src/agentex/lib/adk/_modules/_pydantic_ai_async.py` — reimplement `stream_pydantic_ai_events` on `UnifiedEmitter.auto_send_turn`, preserving signature + return.
- Modify `src/agentex/lib/adk/_modules/_pydantic_ai_tracing.py` — mark `create_pydantic_ai_tracing_handler` / `AgentexPydanticAITracingHandler` deprecated (docstring + `DeprecationWarning`); keep importable.
- Create `tests/lib/core/harness/conformance/test_pydantic_ai_conformance.py` — register pydantic-ai fixtures into the cross-channel conformance runner.
- Create `examples/tutorials/harness-pydantic-ai-{sync,async,temporal}/` — 3 test agents (modeled on the `sync-pydantic-ai` / `default-pydantic-ai` / `temporal-pydantic-ai` CLI templates) using the unified surface.
- Modify `.github/workflows/harness-integration.yml` — enable the pydantic-ai rows of the `live-matrix` job.
- Modify `.github/workflows/agentex-tutorials-test.yml` (or its agent list) — include the 3 new test agents if that workflow enumerates agents.

---

## Task 1: Expose the pydantic-ai run result for usage capture

**Files:**
- Modify: `src/agentex/lib/adk/_modules/_pydantic_ai_sync.py`
- Test: `tests/lib/adk/test_pydantic_ai_sync.py` (create if absent)

The converter already iterates the pydantic-ai event stream and currently *ignores* `AgentRunResultEvent` (the terminal event carrying the run result + usage). Add an optional callback so a caller can capture it without changing existing behavior.

- [ ] **Step 1: Write the failing test.**

```python
import pytest
from agentex.lib.adk._modules._pydantic_ai_sync import convert_pydantic_ai_to_agentex_events


class _FakeResultEvent: # stand-in for pydantic_ai.run.AgentRunResultEvent
def __init__(self, result):
self.result = result


async def _stream(events):
for e in events:
yield e


@pytest.mark.asyncio
async def test_on_result_callback_receives_terminal_event(monkeypatch):
# When the stream ends with an AgentRunResultEvent, on_result is invoked with it,
# and the converter still yields no extra events for it.
captured = {}
# Use a real AgentRunResultEvent if constructable; otherwise patch isinstance check.
# (Implementer: see Step 3 note — match the real terminal event type.)
...
```

Implementer note: the exact terminal event type is `pydantic_ai.run.AgentRunResultEvent` (already imported in `_pydantic_ai_sync.py`). Write the test to feed a stream ending in a real `AgentRunResultEvent` (construct it as the installed pydantic-ai version requires; inspect `python -c "import pydantic_ai.run, inspect; print(inspect.signature(pydantic_ai.run.AgentRunResultEvent))"`). Assert `on_result` is called once with that event and that the converter yields the same `StreamTaskMessage*` sequence as without the callback (no behavior change for the streaming output).

- [ ] **Step 2: Run** `uv run pytest tests/lib/adk/test_pydantic_ai_sync.py -v` — expect FAIL (no `on_result` param).

- [ ] **Step 3: Implement.** Add `on_result: Callable[[AgentRunResultEvent], None] | None = None` (and an async-callable variant if needed) to `convert_pydantic_ai_to_agentex_events`. In the existing `elif isinstance(event, (FunctionToolCallEvent, FinalResultEvent, AgentRunResultEvent))` branch, when the event is an `AgentRunResultEvent` and `on_result` is set, call it (await if it's a coroutine). Keep yielding nothing for it. No other change.

- [ ] **Step 4: Run** the test — expect PASS, plus run the existing `_pydantic_ai_sync` tests if any to confirm no regression.

- [ ] **Step 5: Commit** `feat(pydantic-ai): optional on_result callback to expose run result for usage capture`.

---

## Task 2: Normalize pydantic-ai usage to `TurnUsage`

**Files:**
- Create: `src/agentex/lib/adk/_modules/_pydantic_ai_turn.py`
- Test: `tests/lib/adk/test_pydantic_ai_turn.py`

- [ ] **Step 1: Verify the real usage shape FIRST.** Run `uv run python -c "from pydantic_ai.usage import RunUsage; import inspect; print([f for f in RunUsage.model_fields])"` (the type/name may be `RunUsage` or `Usage` depending on the installed version). Record the exact field names (commonly: `input_tokens`, `output_tokens`, `total_tokens`, `requests`, and a cache/`details` field). The mapping in Step 3 MUST use the real field names.

- [ ] **Step 2: Write the failing test.**

```python
from agentex.lib.adk._modules._pydantic_ai_turn import pydantic_ai_usage_to_turn_usage


def test_usage_normalization_maps_fields():
# Build a usage object matching the installed pydantic-ai RunUsage shape
# (see Task 2 Step 1 for the real fields), then assert the mapping.
usage_obj = ... # construct RunUsage(input_tokens=10, output_tokens=20, requests=2, ...)
tu = pydantic_ai_usage_to_turn_usage(usage_obj, model="openai:gpt-4o")
assert tu.model == "openai:gpt-4o"
assert tu.input_tokens == 10
assert tu.output_tokens == 20
assert tu.num_llm_calls == 2
```

- [ ] **Step 3: Implement** `pydantic_ai_usage_to_turn_usage(usage, model) -> TurnUsage` mapping the verified RunUsage fields onto `TurnUsage` (`input_tokens`, `output_tokens`, `total_tokens`, `cached_input_tokens` if available, `num_llm_calls` ← `requests`). Use `getattr(usage, "<field>", None)` defensively so a version field rename degrades to `None` rather than crashing. Then implement `PydanticAITurn`:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Cache tokens dropped

The plan tells the implementer to populate TurnUsage.cached_input_tokens only from a same-named pydantic-ai field. Current pydantic-ai RunUsage exposes cache input usage as cache_read_tokens and cache_write_tokens, so following this mapping silently drops cached-token usage even when the run reports it. Please spell out the real mapping, such as combining the cache read/write token fields if that matches Agentex semantics, and add a fixture that asserts cache usage survives normalization.

Artifacts

Repro: Python script constructing pydantic-ai RunUsage and applying same-name-only cache-token normalization

  • Contains supporting evidence from the run (text/x-python; charset=utf-8).

Repro: failing script output showing cache_read_tokens and cache_write_tokens are dropped by cached_input_tokens-only mapping

  • Keeps the command output available without making the summary code-heavy.

View artifacts

T-Rex Ran code and verified through T-Rex

Prompt To Fix With AI
This is a comment left during a code review.
Path: docs/superpowers/plans/2026-06-18-unified-harness-surface-pr4-pydantic-ai.md
Line: 109

Comment:
**Cache tokens dropped**

The plan tells the implementer to populate `TurnUsage.cached_input_tokens` only from a same-named pydantic-ai field. Current pydantic-ai `RunUsage` exposes cache input usage as `cache_read_tokens` and `cache_write_tokens`, so following this mapping silently drops cached-token usage even when the run reports it. Please spell out the real mapping, such as combining the cache read/write token fields if that matches Agentex semantics, and add a fixture that asserts cache usage survives normalization.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code


```python
class PydanticAITurn:
"""A pydantic-ai run as a HarnessTurn: canonical event stream + normalized usage."""

def __init__(self, stream, model: str | None = None):
self._stream = stream
self._model = model
self._usage = TurnUsage(model=model)

@property
async def events(self):
def _capture(result_event):
run_result = getattr(result_event, "result", None)
usage_obj = run_result.usage() if run_result is not None else None
if usage_obj is not None:
self._usage = pydantic_ai_usage_to_turn_usage(usage_obj, self._model)
async for ev in convert_pydantic_ai_to_agentex_events(self._stream, on_result=_capture):
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Comment thread
greptile-apps[bot] marked this conversation as resolved.
yield ev

def usage(self) -> TurnUsage:
return self._usage
```

(Verify `run_result.usage()` is the correct accessor for the installed version; adjust if it's an attribute.)

- [ ] **Step 4: Add a `PydanticAITurn` test** that feeds a small stream ending in an `AgentRunResultEvent` whose `result.usage()` returns a known usage, drives `turn.events` to exhaustion, then asserts `turn.usage()` reflects the normalized values and that `events` yielded the expected `StreamTaskMessage*`. Confirm `usage()` BEFORE exhaustion returns the default (documented single-pass contract).

- [ ] **Step 5: Run** the tests — expect PASS.

- [ ] **Step 6: Commit** `feat(pydantic-ai): PydanticAITurn HarnessTurn + usage normalization`.

---

## Task 3: Reimplement the auto-send helper on the unified surface

**Files:**
- Modify: `src/agentex/lib/adk/_modules/_pydantic_ai_async.py`
- Test: `tests/lib/adk/test_pydantic_ai_async.py`

`stream_pydantic_ai_events(stream, task_id, ...)` currently hand-drives `adk.streaming`. Reimplement it to delegate to `UnifiedEmitter.auto_send_turn(PydanticAITurn(stream, model))`, preserving its signature and return value (the accumulated final text). Feature-add: traces by default.

- [ ] **Step 1: Capture current behavior as a characterization test.** Before changing anything, write a test that runs the CURRENT `stream_pydantic_ai_events` over a fixture stream with a fake `adk.streaming` and records the messages produced (text, tool request/response). This is the backward-compat baseline ("equivalent messages before/after" from the design).

- [ ] **Step 2: Run** it green against the current implementation. Commit the test alone: `test(pydantic-ai): characterize stream_pydantic_ai_events output`.

- [ ] **Step 3: Reimplement** `stream_pydantic_ai_events` to build a `PydanticAITurn` and call `UnifiedEmitter(task_id=task_id, trace_id=<resolved>, parent_span_id=<resolved>, streaming=<injected or None>).auto_send_turn(turn)`, returning `result.final_text`. Resolve `trace_id`/`parent_span_id` the same way the module does today (from the streaming/tracing context vars it already reads). Preserve the exact public signature and return type.
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Comment thread
greptile-apps[bot] marked this conversation as resolved.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Tracing context has no source

This step says to resolve trace_id and parent_span_id "the same way the module does today", but _pydantic_ai_async.py currently does not read any tracing context vars. Current callers pass those values through tracing_handler; if the implementation preserves the signature but builds UnifiedEmitter with no actual trace context, default tracing is disabled and existing callers that still pass tracing_handler silently lose their tool spans. Please specify how to bridge the preserved handler into the emitter, or add explicit context access, with a regression test for an existing caller that passes a handler.

Artifacts

Repro: focused harness that imports the pydantic-ai async streamer and exercises handler-derived tracing context

  • Contains supporting evidence from the run (text/x-python; charset=utf-8).

Repro: harness output showing absent context-var reads and handler-sourced tool span tracing

  • Keeps the command output available without making the summary code-heavy.

View artifacts

T-Rex Ran code and verified through T-Rex

Prompt To Fix With AI
This is a comment left during a code review.
Path: docs/superpowers/plans/2026-06-18-unified-harness-surface-pr4-pydantic-ai.md
Line: 156

Comment:
**Tracing context has no source**

This step says to resolve `trace_id` and `parent_span_id` "the same way the module does today", but `_pydantic_ai_async.py` currently does not read any tracing context vars. Current callers pass those values through `tracing_handler`; if the implementation preserves the signature but builds `UnifiedEmitter` with no actual trace context, default tracing is disabled and existing callers that still pass `tracing_handler` silently lose their tool spans. Please specify how to bridge the preserved handler into the emitter, or add explicit context access, with a regression test for an existing caller that passes a handler.

How can I resolve this? If you propose a fix, please make it concise.

Fix in Claude Code


- [ ] **Step 4: Run** the characterization test — it must still pass (same messages). Adjust the test only if AGX1-373 deliberately changed the tool-message wire shape; in that case assert the post-373 shape and note it. Confirm tracing now occurs by default (assert spans via a fake tracer).
Comment thread
greptile-apps[bot] marked this conversation as resolved.

- [ ] **Step 5: Commit** `refactor(pydantic-ai): reimplement stream_pydantic_ai_events on UnifiedEmitter (default tracing)`.

---

## Task 4: Route sync ACP delivery through the surface + deprecate the bespoke tracing handler

**Files:**
- Modify: `src/agentex/lib/adk/_modules/_pydantic_ai_tracing.py`
- (Reference) the sync ACP usage pattern in the pydantic-ai docs/templates.

- [ ] **Step 1: Deprecate the bespoke tracing handler.** Add a `DeprecationWarning` (via `warnings.warn(...)`) and a docstring note to `create_pydantic_ai_tracing_handler` / `AgentexPydanticAITracingHandler` stating the unified surface (`UnifiedEmitter`, which derives spans from the canonical stream) supersedes it. Keep the symbols importable and functional (no removal — backward compat).

- [ ] **Step 2: Confirm the sync path.** The sync tap remains `convert_pydantic_ai_to_agentex_events`. Document (in the module docstring of `_pydantic_ai_sync.py`) the recommended sync ACP usage:

```python
turn = PydanticAITurn(agent.run_stream_events(...), model=...)
async for event in emitter.yield_turn(turn):
yield event
```

No code change beyond the docstring (the sync converter already yields the canonical stream; `yield_turn` adds tracing). Add a test that `emitter.yield_turn(PydanticAITurn(...))` forwards the same events the bare converter would and derives spans.

- [ ] **Step 3: Run** tests; **Commit** `refactor(pydantic-ai): deprecate bespoke tracing handler; document unified sync path`.

---

## Task 5: pydantic-ai cross-channel conformance fixtures

**Files:**
- Create: `tests/lib/core/harness/conformance/test_pydantic_ai_conformance.py`

**Blocked by AGX1-373** (the cross-channel conformance runner). Once 373 is merged into the foundation branch:

- [ ] **Step 1: Record canonical fixtures.** For 3-4 representative pydantic-ai runs (text-only; single tool; reasoning/thinking; multi-step text+tool), capture the `StreamTaskMessage*` sequence the tap produces (run `convert_pydantic_ai_to_agentex_events` over recorded `AgentStreamEvent` inputs, or hand-author the canonical sequences). Store as `Fixture(name=..., events=[...])`.

- [ ] **Step 2: Register** each fixture with the conformance runner and let the cross-channel parametrized test (from AGX1-373) assert yield-vs-auto-send equivalence + span equivalence for each. Register/parametrize within THIS module (per the runner's documented per-module registry semantics).

- [ ] **Step 3: Run** `./scripts/test tests/lib/core/harness/ -v` — all green. **Commit** `test(pydantic-ai): cross-channel conformance fixtures`.

---

## Task 6: Three integration test agents (sync / async / temporal)

**Files:**
- Create: `examples/tutorials/harness-pydantic-ai-sync/` , `…-async/` , `…-temporal/` (each a minimal Agentex agent).
- Modify: `.github/workflows/harness-integration.yml` (enable pydantic-ai `live-matrix` rows).
- Modify: `.github/workflows/agentex-tutorials-test.yml` if it enumerates agents.

Each agent is the smallest agent that exercises one delivery channel through the unified surface with the pydantic-ai harness.

- [ ] **Step 1: Scaffold from the existing templates.** Base each agent on the corresponding CLI template: `sync-pydantic-ai`, `default-pydantic-ai` (async), `temporal-pydantic-ai` (under `src/agentex/lib/cli/templates/`). In each, the message handler builds `PydanticAITurn(agent.run_stream_events(params.content.content), model=...)` and:
- sync agent: `async for ev in emitter.yield_turn(turn): yield ev`
- async + temporal agents: `await emitter.auto_send_turn(turn)` (temporal: inside the activity, as the template already structures it).
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Use a tiny pydantic-ai agent with ONE trivial tool so the run exercises text + a tool call + tool response.

- [ ] **Step 2: Write an integration test per agent** that drives it with a fixed prompt and asserts: valid ordered messages (text + tool request + tool response) and a well-formed span tree. Use the repo's existing tutorial-agent test harness pattern (see `agentex-tutorials-test.yml` and how current tutorial agents are tested).

- [ ] **Step 3: Wire CI.** In `.github/workflows/harness-integration.yml`, replace the `if: false` placeholder `live-matrix` job (or add a real matrix) with the pydantic-ai × {sync, async, temporal} entries, each running its agent's integration test. If `agentex-tutorials-test.yml` enumerates agents, add the three there too. `log`/document any agent-type not covered (none expected for pydantic-ai).

- [ ] **Step 4: Run** the integration tests locally (as far as the env allows) and the conformance + unit suites. **Commit** `test(pydantic-ai): sync/async/temporal integration agents + enable CI live-matrix rows`.

---

## Task 7: Full suite, type check, and backward-compat audit

- [ ] **Step 1:** `./scripts/test tests/lib/core/harness/ tests/lib/adk/ -v` — all green on 3.12 + 3.13.
- [ ] **Step 2:** `uv run pyright src/agentex/lib/` (or the harness + pydantic modules) — 0 new errors.
- [ ] **Step 3: Backward-compat audit.** Confirm the public signatures are unchanged: `convert_pydantic_ai_to_agentex_events` (only gained an optional kwarg), `stream_pydantic_ai_events` (same signature + return), `create_pydantic_ai_tracing_handler` (still importable, now warns). Grep the repo + templates for callers and confirm none broke.
- [ ] **Step 4:** If any fix was needed, **Commit** `chore(pydantic-ai): type/back-compat fixes`.

---

## Self-Review checklist (run before opening the PR)

- Every public symbol that existed before still exists with the same signature (additive-only): `convert_pydantic_ai_to_agentex_events`, `stream_pydantic_ai_events`, `create_pydantic_ai_tracing_handler`.
- The auto-send helper returns the same final text as before (characterization test passes, or the post-373 shape is asserted with a note).
- Tracing is now on by default for both channels and is overridable (emitter `tracer=False`).
- Usage normalization uses the REAL pydantic-ai usage field names (verified in Task 2 Step 1), with defensive `getattr`.
- Conformance fixtures register per-module and pass the cross-channel assertion from AGX1-373.
- 3 test agents exist and their CI rows are enabled.
- No `# type: ignore` added without justification.

## Notes for the PR description

- Link AGX1-373 (dependency) and AGX1-375 (import path); note AGX1-374 (reasoning/mixed-ordering auto_send tests) is foundation-level and orthogonal.
- State the diff size; if test agents pushed it over budget, note the PR 4b split.
- This is the template the langgraph (PR 5) and openai (PR 6) migrations follow.
Loading