|
| 1 | +"""Integration test: async (Redis-streaming) channel with a claude-code turn. |
| 2 | +
|
| 3 | +Exercises the unified harness surface (UnifiedEmitter.auto_send_turn + ClaudeCodeTurn) |
| 4 | +with hand-built claude-code ``stream-json`` envelopes and a fake streaming |
| 5 | +backend so the test runs fully offline (no claude-code CLI subprocess, no |
| 6 | +Redis, no Agentex server). |
| 7 | +
|
| 8 | +Native envelope shapes are copied verbatim from the claude-code turn test and |
| 9 | +conformance fixtures (assistant tool_use -> Start(ToolRequestContent)+Done; |
| 10 | +user tool_result -> Full(ToolResponseContent); assistant text -> |
| 11 | +Start(TextContent)+Delta+Done; result envelope -> usage). |
| 12 | +
|
| 13 | +What is tested |
| 14 | +-------------- |
| 15 | +- auto_send pushes the correct message contexts: tool_request + tool_response |
| 16 | + + text (in that order). |
| 17 | +- TurnResult.final_text equals the final assistant text. |
| 18 | +- TurnResult.usage reflects the claude-code ``result`` envelope (input/output |
| 19 | + tokens, cost, num_llm_calls from num_turns). |
| 20 | +- With a SpanTracer + fake tracing, a tool span is derived on the async path. |
| 21 | +
|
| 22 | +What is NOT covered without live infrastructure |
| 23 | +----------------------------------------------- |
| 24 | +- Actual Redis streaming. |
| 25 | +- The ACP on_task_event_send / on_task_create / on_task_cancel lifecycle. |
| 26 | +- A real claude-code CLI subprocess / live model behaviour. |
| 27 | +
|
| 28 | +See also: test_harness_claude_code_sync.py and test_harness_claude_code_temporal.py. |
| 29 | +""" |
| 30 | + |
| 31 | +from __future__ import annotations |
| 32 | + |
| 33 | +from typing import Any, AsyncIterator |
| 34 | + |
| 35 | +import pytest |
| 36 | + |
| 37 | +from agentex.types.task_message import TaskMessage |
| 38 | +from agentex.lib.core.harness.types import TurnResult |
| 39 | +from agentex.lib.core.harness.tracer import SpanTracer |
| 40 | +from agentex.lib.core.harness.emitter import UnifiedEmitter |
| 41 | +from agentex.types.tool_request_content import ToolRequestContent |
| 42 | +from agentex.types.tool_response_content import ToolResponseContent |
| 43 | +from agentex.lib.adk._modules._claude_code_turn import ClaudeCodeTurn |
| 44 | + |
| 45 | +from ._fakes import FakeTracing |
| 46 | + |
| 47 | +# --------------------------------------------------------------------------- |
| 48 | +# Native claude-code envelope fixtures |
| 49 | +# --------------------------------------------------------------------------- |
| 50 | + |
| 51 | + |
| 52 | +def _tool_then_text_envelopes() -> list[dict[str, Any]]: |
| 53 | + return [ |
| 54 | + { |
| 55 | + "type": "assistant", |
| 56 | + "message": { |
| 57 | + "content": [ |
| 58 | + { |
| 59 | + "type": "tool_use", |
| 60 | + "id": "call_read", |
| 61 | + "name": "Read", |
| 62 | + "input": {"path": "/workspace/README.md"}, |
| 63 | + } |
| 64 | + ] |
| 65 | + }, |
| 66 | + }, |
| 67 | + { |
| 68 | + "type": "user", |
| 69 | + "message": { |
| 70 | + "content": [ |
| 71 | + { |
| 72 | + "type": "tool_result", |
| 73 | + "tool_use_id": "call_read", |
| 74 | + "content": "# My Project — temperature 72F", |
| 75 | + } |
| 76 | + ] |
| 77 | + }, |
| 78 | + }, |
| 79 | + { |
| 80 | + "type": "assistant", |
| 81 | + "message": {"content": [{"type": "text", "text": "The project file says 72F."}]}, |
| 82 | + }, |
| 83 | + { |
| 84 | + "type": "result", |
| 85 | + "usage": {"input_tokens": 200, "output_tokens": 80}, |
| 86 | + "cost_usd": 0.015, |
| 87 | + "num_turns": 2, |
| 88 | + }, |
| 89 | + ] |
| 90 | + |
| 91 | + |
| 92 | +async def _aiter(envelopes: list[dict[str, Any]]) -> AsyncIterator[dict[str, Any]]: |
| 93 | + for e in envelopes: |
| 94 | + yield e |
| 95 | + |
| 96 | + |
| 97 | +# --------------------------------------------------------------------------- |
| 98 | +# Fake streaming backend |
| 99 | +# --------------------------------------------------------------------------- |
| 100 | + |
| 101 | + |
| 102 | +class _FakeCtx: |
| 103 | + def __init__(self, sink: list[Any], ctype: str, initial_content: Any) -> None: |
| 104 | + self.sink = sink |
| 105 | + self.ctype = ctype |
| 106 | + self.task_message = TaskMessage(id="msg-1", task_id="task1", content=initial_content) |
| 107 | + |
| 108 | + async def __aenter__(self) -> "_FakeCtx": |
| 109 | + self.sink.append(("open", self.ctype, self.task_message.content)) |
| 110 | + return self |
| 111 | + |
| 112 | + async def __aexit__(self, *args: Any) -> bool: |
| 113 | + await self.close() |
| 114 | + return False |
| 115 | + |
| 116 | + async def close(self) -> None: |
| 117 | + self.sink.append(("close", self.ctype)) |
| 118 | + |
| 119 | + async def stream_update(self, update: Any) -> Any: |
| 120 | + self.sink.append(("delta", self.ctype, update)) |
| 121 | + return update |
| 122 | + |
| 123 | + |
| 124 | +class _FakeStreaming: |
| 125 | + def __init__(self) -> None: |
| 126 | + self.sink: list[Any] = [] |
| 127 | + self.messages_opened: list[Any] = [] |
| 128 | + |
| 129 | + def streaming_task_message_context( |
| 130 | + self, |
| 131 | + task_id: str, |
| 132 | + initial_content: Any, |
| 133 | + streaming_mode: str = "coalesced", |
| 134 | + created_at: Any = None, |
| 135 | + ) -> _FakeCtx: |
| 136 | + ctype = getattr(initial_content, "type", None) or "" |
| 137 | + self.messages_opened.append(initial_content) |
| 138 | + return _FakeCtx(self.sink, ctype, initial_content) |
| 139 | + |
| 140 | + |
| 141 | +# --------------------------------------------------------------------------- |
| 142 | +# Helpers |
| 143 | +# --------------------------------------------------------------------------- |
| 144 | + |
| 145 | + |
| 146 | +async def _run_auto_send_turn( |
| 147 | + envelopes: list[dict[str, Any]], |
| 148 | + trace_id: str | None = None, |
| 149 | + parent_span_id: str | None = None, |
| 150 | + fake_tracing: FakeTracing | None = None, |
| 151 | +) -> tuple[TurnResult, _FakeStreaming]: |
| 152 | + fake_streaming = _FakeStreaming() |
| 153 | + tracer: SpanTracer | bool | None = None |
| 154 | + if trace_id and fake_tracing is not None: |
| 155 | + tracer = SpanTracer( |
| 156 | + trace_id=trace_id, |
| 157 | + parent_span_id=parent_span_id, |
| 158 | + task_id="task1", |
| 159 | + tracing=fake_tracing, |
| 160 | + ) |
| 161 | + |
| 162 | + turn = ClaudeCodeTurn(_aiter(envelopes)) |
| 163 | + emitter = UnifiedEmitter( |
| 164 | + task_id="task1", |
| 165 | + trace_id=trace_id, |
| 166 | + parent_span_id=parent_span_id, |
| 167 | + tracer=tracer if tracer is not None else False, |
| 168 | + streaming=fake_streaming, |
| 169 | + ) |
| 170 | + result = await emitter.auto_send_turn(turn) |
| 171 | + return result, fake_streaming |
| 172 | + |
| 173 | + |
| 174 | +# --------------------------------------------------------------------------- |
| 175 | +# Tests |
| 176 | +# --------------------------------------------------------------------------- |
| 177 | + |
| 178 | + |
| 179 | +class TestAsyncAutoSendMessageOrder: |
| 180 | + async def test_tool_request_pushed_before_tool_response(self) -> None: |
| 181 | + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_envelopes()) |
| 182 | + types = [getattr(m, "type", None) for m in fake_streaming.messages_opened] |
| 183 | + assert "tool_request" in types |
| 184 | + assert "tool_response" in types |
| 185 | + assert types.index("tool_request") < types.index("tool_response") |
| 186 | + |
| 187 | + async def test_text_pushed_last(self) -> None: |
| 188 | + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_envelopes()) |
| 189 | + types = [getattr(m, "type", None) for m in fake_streaming.messages_opened] |
| 190 | + assert types[-1] == "text", f"Expected last type=text, got {types}" |
| 191 | + |
| 192 | + |
| 193 | +class TestAsyncAutoSendContent: |
| 194 | + async def test_tool_request_content(self) -> None: |
| 195 | + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_envelopes()) |
| 196 | + tool_reqs = [m for m in fake_streaming.messages_opened if isinstance(m, ToolRequestContent)] |
| 197 | + assert len(tool_reqs) == 1 |
| 198 | + assert tool_reqs[0].name == "Read" |
| 199 | + |
| 200 | + async def test_tool_response_content(self) -> None: |
| 201 | + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_envelopes()) |
| 202 | + tool_resps = [m for m in fake_streaming.messages_opened if isinstance(m, ToolResponseContent)] |
| 203 | + assert len(tool_resps) == 1 |
| 204 | + assert "72F" in str(tool_resps[0].content) |
| 205 | + |
| 206 | + async def test_tool_call_ids_match(self) -> None: |
| 207 | + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_envelopes()) |
| 208 | + tool_req = next(m for m in fake_streaming.messages_opened if isinstance(m, ToolRequestContent)) |
| 209 | + tool_resp = next(m for m in fake_streaming.messages_opened if isinstance(m, ToolResponseContent)) |
| 210 | + assert tool_req.tool_call_id == tool_resp.tool_call_id == "call_read" |
| 211 | + |
| 212 | + |
| 213 | +class TestAsyncAutoSendFinalTextAndUsage: |
| 214 | + async def test_final_text_matches_last_text(self) -> None: |
| 215 | + result, _ = await _run_auto_send_turn(_tool_then_text_envelopes()) |
| 216 | + assert result.final_text == "The project file says 72F." |
| 217 | + |
| 218 | + async def test_usage_from_result_envelope(self) -> None: |
| 219 | + """TurnResult.usage reflects the claude-code result envelope.""" |
| 220 | + result, _ = await _run_auto_send_turn(_tool_then_text_envelopes()) |
| 221 | + assert result.usage is not None |
| 222 | + assert result.usage.input_tokens == 200 |
| 223 | + assert result.usage.output_tokens == 80 |
| 224 | + assert result.usage.total_tokens == 280 |
| 225 | + assert result.usage.cost_usd == pytest.approx(0.015) |
| 226 | + assert result.usage.num_llm_calls == 2 |
| 227 | + |
| 228 | + async def test_context_lifecycle_open_then_close(self) -> None: |
| 229 | + _, fake_streaming = await _run_auto_send_turn(_tool_then_text_envelopes()) |
| 230 | + opens = [e for e in fake_streaming.sink if e[0] == "open"] |
| 231 | + closes = [e for e in fake_streaming.sink if e[0] == "close"] |
| 232 | + assert len(opens) == len(closes) |
| 233 | + assert len(opens) == len(fake_streaming.messages_opened) |
| 234 | + |
| 235 | + |
| 236 | +class TestAsyncAutoSendSpanDerivation: |
| 237 | + async def test_tool_span_derived_on_async_path(self) -> None: |
| 238 | + fake_tracing = FakeTracing() |
| 239 | + await _run_auto_send_turn( |
| 240 | + _tool_then_text_envelopes(), |
| 241 | + trace_id="trace1", |
| 242 | + parent_span_id="parent", |
| 243 | + fake_tracing=fake_tracing, |
| 244 | + ) |
| 245 | + assert len(fake_tracing.started) == 1 |
| 246 | + assert fake_tracing.started[0][0] == "Read" |
| 247 | + assert len(fake_tracing.ended) == 1 |
| 248 | + assert "72F" in str(fake_tracing.ended[0][1]) |
0 commit comments