Skip to content

Commit 58bdb16

Browse files
declan-scaleclaudeOpenAI
authored
refactor(harness): move OpenAI harness into adk/_modules + facade export (#432)
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Co-authored-by: OpenAI <openai@example.com>
1 parent 48c3da8 commit 58bdb16

8 files changed

Lines changed: 534 additions & 493 deletions

File tree

src/agentex/lib/adk/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
)
1414
from agentex.lib.adk._modules._pydantic_ai_turn import PydanticAITurn, stream_pydantic_ai_events
1515
from agentex.lib.adk._modules._pydantic_ai_sync import convert_pydantic_ai_to_agentex_events
16+
from agentex.lib.adk._modules._openai_sync import convert_openai_to_agentex_events
17+
from agentex.lib.adk._modules._openai_turn import OpenAITurn, openai_usage_to_turn_usage
1618
from agentex.lib.adk._modules._claude_code_sync import convert_claude_code_to_agentex_events
1719
from agentex.lib.adk._modules._claude_code_turn import (
1820
ClaudeCodeTurn,
@@ -74,6 +76,10 @@
7476
"stream_pydantic_ai_events",
7577
"convert_pydantic_ai_to_agentex_events",
7678
"PydanticAITurn",
79+
# OpenAI Agents
80+
"convert_openai_to_agentex_events",
81+
"OpenAITurn",
82+
"openai_usage_to_turn_usage",
7783
# Claude Code
7884
"convert_claude_code_to_agentex_events",
7985
"ClaudeCodeTurn",

src/agentex/lib/adk/_modules/_openai_sync.py

Lines changed: 371 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
"""OpenAITurn: adapt an OpenAI Agents SDK streamed run onto the harness surface.
2+
3+
A ``HarnessTurn`` exposes a single canonical ``StreamTaskMessage*`` stream plus
4+
normalized usage. ``OpenAITurn`` wraps a ``RunResultStreaming`` (from
5+
``Runner.run_streamed``), converts its native OpenAI events into the canonical
6+
stream via ``convert_openai_to_agentex_events``, and after exhaustion reads the
7+
run's ``raw_responses`` to aggregate usage into a provider-independent
8+
``TurnUsage``.
9+
10+
Delivery (yield vs auto-send) and tracing are owned by ``UnifiedEmitter``; this
11+
module is purely the provider->canonical adapter.
12+
"""
13+
14+
from __future__ import annotations
15+
16+
from typing import TYPE_CHECKING, Any, AsyncIterator
17+
18+
from agents.usage import Usage
19+
20+
from agentex.lib.utils.logging import make_logger
21+
from agentex.lib.core.harness.types import TurnUsage, StreamTaskMessage
22+
from agentex.lib.adk._modules._openai_sync import (
23+
convert_openai_to_agentex_events,
24+
)
25+
26+
if TYPE_CHECKING:
27+
from agents import ModelResponse, RunResultStreaming
28+
29+
logger = make_logger(__name__)
30+
31+
32+
def openai_usage_to_turn_usage(usage: Usage | None, model: str | None) -> TurnUsage:
33+
"""Map an ``agents.Usage`` to a harness-independent ``TurnUsage``.
34+
35+
All field access is defensive (``getattr(..., None)``): different model
36+
backends populate different subsets of the usage object, and real zeros are
37+
valid values (e.g. 0 output tokens on a pure cache hit), so we never coerce
38+
a present-but-zero value into ``None``.
39+
"""
40+
if usage is None:
41+
return TurnUsage(model=model)
42+
43+
input_details = getattr(usage, "input_tokens_details", None)
44+
output_details = getattr(usage, "output_tokens_details", None)
45+
46+
return TurnUsage(
47+
model=model,
48+
num_llm_calls=getattr(usage, "requests", None) or 0,
49+
input_tokens=getattr(usage, "input_tokens", None),
50+
cached_input_tokens=getattr(input_details, "cached_tokens", None),
51+
output_tokens=getattr(usage, "output_tokens", None),
52+
reasoning_tokens=getattr(output_details, "reasoning_tokens", None),
53+
total_tokens=getattr(usage, "total_tokens", None),
54+
)
55+
56+
57+
def _aggregate_usage(raw_responses: list[ModelResponse]) -> Usage | None:
58+
"""Sum the per-response ``Usage`` across a run's ``ModelResponse`` list.
59+
60+
Returns ``None`` when no response carries usage so the caller can emit a
61+
usage object with only the model name set. ``Usage.add`` accumulates
62+
requests/tokens (including cached/reasoning detail fields).
63+
"""
64+
total: Usage | None = None
65+
for response in raw_responses:
66+
resp_usage = getattr(response, "usage", None)
67+
if resp_usage is None:
68+
continue
69+
if total is None:
70+
total = Usage()
71+
total.add(resp_usage)
72+
return total
73+
74+
75+
class OpenAITurn:
76+
"""A single OpenAI Agents SDK turn adapted to the ``HarnessTurn`` protocol.
77+
78+
Construct with exactly one of:
79+
- ``result``: a ``RunResultStreaming`` from ``Runner.run_streamed``. Its
80+
``stream_events()`` is converted to the canonical stream, and after the
81+
stream is exhausted ``raw_responses`` is read to compute usage.
82+
- ``stream``: a pre-built async iterator of canonical ``StreamTaskMessage``
83+
events (bypasses ``convert_openai_to_agentex_events``). Useful for tests
84+
and for callers that have already produced canonical events. Usage stays
85+
at ``TurnUsage(model=...)`` because there is no run to read usage from.
86+
87+
``coalesce_tool_requests`` is accepted for API parity with other provider
88+
turns but is a no-op for OpenAI: the OpenAI converter already emits a single
89+
``Full(ToolRequestContent)`` per tool call rather than streamed argument
90+
deltas, so there is nothing to coalesce.
91+
"""
92+
93+
def __init__(
94+
self,
95+
result: RunResultStreaming | None = None,
96+
model: str | None = None,
97+
stream: AsyncIterator[StreamTaskMessage] | None = None,
98+
coalesce_tool_requests: bool = False, # noqa: ARG002 - API parity, no-op for OpenAI
99+
) -> None:
100+
if result is None and stream is None:
101+
raise ValueError("OpenAITurn requires either `result` or `stream`")
102+
self._result = result
103+
self._model = model
104+
self._stream = stream
105+
self._usage: TurnUsage = TurnUsage(model=model)
106+
107+
@property
108+
def events(self) -> AsyncIterator[StreamTaskMessage]:
109+
return self._iter_events()
110+
111+
async def _iter_events(self) -> AsyncIterator[StreamTaskMessage]:
112+
if self._stream is not None:
113+
async for event in self._stream:
114+
yield event
115+
return
116+
117+
result = self._result
118+
assert result is not None # guaranteed by __init__
119+
async for event in convert_openai_to_agentex_events(result.stream_events()):
120+
yield event
121+
122+
# Stream is exhausted: the run has finished and raw_responses is now
123+
# populated, so usage can be aggregated and normalized.
124+
try:
125+
raw_responses: list[Any] = list(getattr(result, "raw_responses", None) or [])
126+
aggregated = _aggregate_usage(raw_responses)
127+
self._usage = openai_usage_to_turn_usage(aggregated, self._model)
128+
except Exception as exc: # pragma: no cover - defensive: never break delivery on usage
129+
logger.warning(f"Failed to aggregate OpenAI usage: {exc}")
130+
self._usage = TurnUsage(model=self._model)
131+
132+
def usage(self) -> TurnUsage:
133+
"""Normalized turn usage. Valid only after ``events`` is exhausted."""
134+
return self._usage
Lines changed: 8 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -1,134 +1,12 @@
1-
"""OpenAITurn: adapt an OpenAI Agents SDK streamed run onto the harness surface.
1+
"""Back-compat shim: ``OpenAITurn`` and ``openai_usage_to_turn_usage`` now live
2+
in ``agentex.lib.adk._modules._openai_turn``.
23
3-
A ``HarnessTurn`` exposes a single canonical ``StreamTaskMessage*`` stream plus
4-
normalized usage. ``OpenAITurn`` wraps a ``RunResultStreaming`` (from
5-
``Runner.run_streamed``), converts its native OpenAI events into the canonical
6-
stream via ``convert_openai_to_agentex_events``, and after exhaustion reads the
7-
run's ``raw_responses`` to aggregate usage into a provider-independent
8-
``TurnUsage``.
9-
10-
Delivery (yield vs auto-send) and tracing are owned by ``UnifiedEmitter``; this
11-
module is purely the provider->canonical adapter.
4+
Existing importers of
5+
``agentex.lib.adk.providers._modules.openai_turn.{OpenAITurn,openai_usage_to_turn_usage}``
6+
keep working.
127
"""
138

14-
from __future__ import annotations
15-
16-
from typing import TYPE_CHECKING, Any, AsyncIterator
17-
18-
from agents.usage import Usage
19-
20-
from agentex.lib.utils.logging import make_logger
21-
from agentex.lib.core.harness.types import TurnUsage, StreamTaskMessage
22-
from agentex.lib.adk.providers._modules.sync_provider import (
23-
convert_openai_to_agentex_events,
9+
from agentex.lib.adk._modules._openai_turn import ( # noqa: F401
10+
OpenAITurn,
11+
openai_usage_to_turn_usage,
2412
)
25-
26-
if TYPE_CHECKING:
27-
from agents import ModelResponse, RunResultStreaming
28-
29-
logger = make_logger(__name__)
30-
31-
32-
def openai_usage_to_turn_usage(usage: Usage | None, model: str | None) -> TurnUsage:
33-
"""Map an ``agents.Usage`` to a harness-independent ``TurnUsage``.
34-
35-
All field access is defensive (``getattr(..., None)``): different model
36-
backends populate different subsets of the usage object, and real zeros are
37-
valid values (e.g. 0 output tokens on a pure cache hit), so we never coerce
38-
a present-but-zero value into ``None``.
39-
"""
40-
if usage is None:
41-
return TurnUsage(model=model)
42-
43-
input_details = getattr(usage, "input_tokens_details", None)
44-
output_details = getattr(usage, "output_tokens_details", None)
45-
46-
return TurnUsage(
47-
model=model,
48-
num_llm_calls=getattr(usage, "requests", None) or 0,
49-
input_tokens=getattr(usage, "input_tokens", None),
50-
cached_input_tokens=getattr(input_details, "cached_tokens", None),
51-
output_tokens=getattr(usage, "output_tokens", None),
52-
reasoning_tokens=getattr(output_details, "reasoning_tokens", None),
53-
total_tokens=getattr(usage, "total_tokens", None),
54-
)
55-
56-
57-
def _aggregate_usage(raw_responses: list[ModelResponse]) -> Usage | None:
58-
"""Sum the per-response ``Usage`` across a run's ``ModelResponse`` list.
59-
60-
Returns ``None`` when no response carries usage so the caller can emit a
61-
usage object with only the model name set. ``Usage.add`` accumulates
62-
requests/tokens (including cached/reasoning detail fields).
63-
"""
64-
total: Usage | None = None
65-
for response in raw_responses:
66-
resp_usage = getattr(response, "usage", None)
67-
if resp_usage is None:
68-
continue
69-
if total is None:
70-
total = Usage()
71-
total.add(resp_usage)
72-
return total
73-
74-
75-
class OpenAITurn:
76-
"""A single OpenAI Agents SDK turn adapted to the ``HarnessTurn`` protocol.
77-
78-
Construct with exactly one of:
79-
- ``result``: a ``RunResultStreaming`` from ``Runner.run_streamed``. Its
80-
``stream_events()`` is converted to the canonical stream, and after the
81-
stream is exhausted ``raw_responses`` is read to compute usage.
82-
- ``stream``: a pre-built async iterator of canonical ``StreamTaskMessage``
83-
events (bypasses ``convert_openai_to_agentex_events``). Useful for tests
84-
and for callers that have already produced canonical events. Usage stays
85-
at ``TurnUsage(model=...)`` because there is no run to read usage from.
86-
87-
``coalesce_tool_requests`` is accepted for API parity with other provider
88-
turns but is a no-op for OpenAI: the OpenAI converter already emits a single
89-
``Full(ToolRequestContent)`` per tool call rather than streamed argument
90-
deltas, so there is nothing to coalesce.
91-
"""
92-
93-
def __init__(
94-
self,
95-
result: RunResultStreaming | None = None,
96-
model: str | None = None,
97-
stream: AsyncIterator[StreamTaskMessage] | None = None,
98-
coalesce_tool_requests: bool = False, # noqa: ARG002 - API parity, no-op for OpenAI
99-
) -> None:
100-
if result is None and stream is None:
101-
raise ValueError("OpenAITurn requires either `result` or `stream`")
102-
self._result = result
103-
self._model = model
104-
self._stream = stream
105-
self._usage: TurnUsage = TurnUsage(model=model)
106-
107-
@property
108-
def events(self) -> AsyncIterator[StreamTaskMessage]:
109-
return self._iter_events()
110-
111-
async def _iter_events(self) -> AsyncIterator[StreamTaskMessage]:
112-
if self._stream is not None:
113-
async for event in self._stream:
114-
yield event
115-
return
116-
117-
result = self._result
118-
assert result is not None # guaranteed by __init__
119-
async for event in convert_openai_to_agentex_events(result.stream_events()):
120-
yield event
121-
122-
# Stream is exhausted: the run has finished and raw_responses is now
123-
# populated, so usage can be aggregated and normalized.
124-
try:
125-
raw_responses: list[Any] = list(getattr(result, "raw_responses", None) or [])
126-
aggregated = _aggregate_usage(raw_responses)
127-
self._usage = openai_usage_to_turn_usage(aggregated, self._model)
128-
except Exception as exc: # pragma: no cover - defensive: never break delivery on usage
129-
logger.warning(f"Failed to aggregate OpenAI usage: {exc}")
130-
self._usage = TurnUsage(model=self._model)
131-
132-
def usage(self) -> TurnUsage:
133-
"""Normalized turn usage. Valid only after ``events`` is exhausted."""
134-
return self._usage

0 commit comments

Comments
 (0)