diff --git a/examples/basic/stream_tool_progress.py b/examples/basic/stream_tool_progress.py new file mode 100644 index 0000000000..4897b76738 --- /dev/null +++ b/examples/basic/stream_tool_progress.py @@ -0,0 +1,79 @@ +"""Example: streaming progress events from long-running tools. + +Demonstrates ToolContext.send_progress() — tools can emit intermediate +progress events that appear in stream_events() while the tool is still running. +""" + +import asyncio + +from agents import Agent, ItemHelpers, Runner, function_tool +from agents.stream_events import ToolProgressStreamEvent +from agents.tool_context import ToolContext + + +@function_tool +async def analyze_data(ctx: ToolContext, query: str) -> str: + """Analyze data for a given query. Use this for complex analysis requests.""" + ctx.send_progress({"status": "starting", "query": query}) + await asyncio.sleep(1) + + ctx.send_progress({"status": "fetching_data", "progress": 0.25}) + await asyncio.sleep(1) + + ctx.send_progress({"status": "processing", "progress": 0.5}) + await asyncio.sleep(1) + + ctx.send_progress({"status": "finalizing", "progress": 1.0}) + await asyncio.sleep(0.5) + + return f"Analysis complete for '{query}': found 42 results with 95% confidence." + + +@function_tool +async def quick_lookup(ctx: ToolContext, term: str) -> str: + """Look up a term quickly. Use this for simple lookups.""" + ctx.send_progress({"status": "searching", "term": term}) + await asyncio.sleep(0.5) + return f"Found definition for '{term}': a common search term." + + +async def main(): + agent = Agent( + name="Analyst", + instructions=( + "You are a data analyst. Use the analyze_data tool for complex queries " + "and quick_lookup for simple lookups. Always use the tools when asked." + ), + tools=[analyze_data, quick_lookup], + ) + + print("Interactive tool progress streaming example.") + print("Type a message to chat, or 'quit' to exit.\n") + + while True: + user_input = input("You: ").strip() + if not user_input or user_input.lower() == "quit": + print("Goodbye!") + break + + result = Runner.run_streamed(agent, input=user_input) + async for event in result.stream_events(): + if event.type == "raw_response_event": + continue + elif isinstance(event, ToolProgressStreamEvent): + print(f" [progress] {event.tool_name}: {event.data}") + elif event.type == "agent_updated_stream_event": + print(f"Agent: {event.new_agent.name}") + elif event.type == "run_item_stream_event": + if event.item.type == "tool_call_item": + print(f"\n-- Tool called: {getattr(event.item.raw_item, 'name', '?')}") + elif event.item.type == "tool_call_output_item": + print(f"-- Tool output: {event.item.output}") + elif event.item.type == "message_output_item": + print(f"\nAssistant: {ItemHelpers.text_message_output(event.item)}") + + print() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/sandbox/tutorials/misc.py b/examples/sandbox/tutorials/misc.py index 805524824c..3073d12efe 100644 --- a/examples/sandbox/tutorials/misc.py +++ b/examples/sandbox/tutorials/misc.py @@ -49,7 +49,9 @@ from agents.stream_events import ( AgentUpdatedStreamEvent, RawResponsesStreamEvent, + RunItemStreamEvent, StreamEvent, + ToolProgressStreamEvent, ) from examples.auto_mode import input_with_fallback, is_auto_mode @@ -291,6 +293,21 @@ def print_event(event: PrintableEvent) -> None: if isinstance(event, RawResponsesStreamEvent): return + if isinstance(event, ToolProgressStreamEvent): + console.print( + Panel( + Pretty(event.data, expand_all=True), + title=f"Tool progress: {event.tool_name}", + border_style="yellow", + box=box.ROUNDED, + expand=False, + ) + ) + return + + if not isinstance(event, RunItemStreamEvent): + return + body: PanelBody match event.item: case ReasoningItem() as item: diff --git a/pr_description.md b/pr_description.md new file mode 100644 index 0000000000..2f41e9dfc3 --- /dev/null +++ b/pr_description.md @@ -0,0 +1,98 @@ +# PR: feat: add ToolContext.send_progress() for streaming tool progress events + +### Summary + +Adds `ToolContext.send_progress(data)` — a simple API for function tools to emit intermediate progress events during execution. Events appear in `RunResultStreaming.stream_events()` as `ToolProgressStreamEvent` while the tool is still running. In non-streaming mode (`Runner.run()`), calls are silently ignored. + +**Motivation (re: #1333)** + +Existing lifecycle hooks (`on_tool_start` / `on_tool_end`) fire at the boundaries of tool execution, but they don't cover cases where a tool needs to emit multiple intermediate updates from inside the tool body. Without framework support, developers resort to external shared state or event buses, which add complexity and couple tool logic to infrastructure concerns. Providing an official way for tools to emit mid-execution progress events improves developer experience and makes responsive UIs and long-running workflows (data processing, web scraping, multi-step API calls) much easier to build. + +**New stream event type** + +A new `ToolProgressStreamEvent` is added to the `StreamEvent` union alongside the existing `RawResponsesStreamEvent`, `RunItemStreamEvent`, and `AgentUpdatedStreamEvent`. It carries: + +- `tool_name: str` — identifies which tool emitted the event +- `tool_call_id: str` — correlates with a specific tool call (important when parallel tools run) +- `data: Any` — arbitrary progress payload (dict, string, number, etc.) +- `type: Literal["tool_progress_stream_event"]` — discriminator for pattern matching + +Consumers can filter for progress events via `isinstance(event, ToolProgressStreamEvent)` or `event.type == "tool_progress_stream_event"`. + +**Design** + +- **Transport**: A `_StreamContext` dataclass (holding `event_queue` and `event_loop`) on `RunContextWrapper` — piggybacking on an object already threaded through the entire execution chain. Zero intermediate function signature changes. +- **API**: `send_progress()` method on `ToolContext` — scoped to function tools, reads per-tool identity (`tool_name`, `tool_call_id`) from the instance. No shared mutable state. +- **Thread safety**: Uses `loop.call_soon_threadsafe()` with a stored event loop reference so sync tools (`sync_invoker=True`) running in worker threads can safely call `send_progress()`. The loop is captured at wiring time (on the event loop thread), not at call time. +- **Nested agent-as-tool**: Each `Runner.run_streamed()` creates a new `RunContextWrapper` with its own `_stream_context`. No cross-contamination between outer and inner runs. + +**Usage — basic tool with progress** + +```python +from agents import Agent, Runner, function_tool, ToolProgressStreamEvent +from agents.tool_context import ToolContext + +@function_tool +async def analyze_data(ctx: ToolContext, query: str) -> str: + ctx.send_progress({"status": "fetching", "progress": 0.25}) + # ... work ... + ctx.send_progress({"status": "processing", "progress": 0.75}) + # ... more work ... + return "analysis complete" + +agent = Agent(name="Analyst", tools=[analyze_data]) + +result = Runner.run_streamed(agent, "Analyze Q4 sales") +async for event in result.stream_events(): + if isinstance(event, ToolProgressStreamEvent): + print(f"[{event.tool_name}] {event.data}") +``` + +**Usage — agent-as-tool with `on_stream` handler** + +When an agent is used as a tool via `as_tool()`, inner progress events are delivered to the `on_stream` callback: + +```python +from agents import Agent +from agents.stream_events import ToolProgressStreamEvent + +def handle_inner_stream(payload): + event = payload["event"] + if isinstance(event, ToolProgressStreamEvent): + print(f"Inner tool progress: {event.data}") + +inner_agent = Agent(name="Researcher", tools=[analyze_data]) +outer_agent = Agent( + name="Orchestrator", + tools=[inner_agent.as_tool(on_stream=handle_inner_stream)], +) +``` + +**Usage — non-streaming mode (no-op)** + +```python +# send_progress is silently ignored — no error, no side effects +result = await Runner.run(agent, "Analyze Q4 sales") +``` + +### Test plan + +- Unit tests for `send_progress` with active stream context, without context (no-op), and with broken context (failure isolation) +- Unit tests for `ToolProgressStreamEvent` field validation and `data: Any` flexibility +- Propagation tests: `_stream_context` survives `_fork_with_tool_input`, `_fork_without_tool_input`, and `ToolContext.from_agent_context` +- Integration: streaming run with progress events appearing in `stream_events()` +- Integration: non-streaming run with `send_progress` as no-op +- Integration: parallel tools emitting progress with correct `tool_call_id` attribution +- Integration: progress events arrive before `tool_output` event for the same tool +- 13 tests total, all passing + +### Issue number + +Closes #1333 + +### Checks + +- [x] I've added new tests (if relevant) +- [x] I've added/updated the relevant documentation +- [x] I've run `make lint` and `make format` +- [x] I've made sure tests pass diff --git a/src/agents/__init__.py b/src/agents/__init__.py index ce2f8fbca8..4649118587 100644 --- a/src/agents/__init__.py +++ b/src/agents/__init__.py @@ -126,6 +126,7 @@ RawResponsesStreamEvent, RunItemStreamEvent, StreamEvent, + ToolProgressStreamEvent, ) from .tool import ( ApplyPatchTool, @@ -442,6 +443,7 @@ def enable_verbose_stdout_logging(): "RawResponsesStreamEvent", "RunItemStreamEvent", "AgentUpdatedStreamEvent", + "ToolProgressStreamEvent", "StreamEvent", "FunctionTool", "FunctionToolResult", diff --git a/src/agents/run.py b/src/agents/run.py index d05878e5d2..bfaa086442 100644 --- a/src/agents/run.py +++ b/src/agents/run.py @@ -1840,6 +1840,14 @@ def run_streamed( if sandbox_runtime.enabled: sandbox_runtime.apply_result_metadata(streamed_result) + # Allow tools to emit progress events via ToolContext.send_progress(). + from .run_context import _StreamContext + + context_wrapper._stream_context = _StreamContext( + event_queue=streamed_result._event_queue, + event_loop=asyncio.get_running_loop(), + ) + # Kick off the actual agent loop in the background and return the streamed result object. streamed_result.run_loop_task = asyncio.create_task( start_streaming( diff --git a/src/agents/run_context.py b/src/agents/run_context.py index df7047eb38..b54af80b5e 100644 --- a/src/agents/run_context.py +++ b/src/agents/run_context.py @@ -25,6 +25,21 @@ TContext = TypeVar("TContext", default=Any) +@dataclass +class _StreamContext: + """Holds streaming plumbing for ToolContext.send_progress(). + + Bundles the event queue and the event loop together because sync tools + run in a worker thread (via asyncio.to_thread) and cannot call + asyncio.get_running_loop(). The loop reference is captured at wiring + time on the event loop thread so that send_progress() can use + loop.call_soon_threadsafe() from any thread. + """ + + event_queue: Any + event_loop: Any + + @dataclass(eq=False) class _ApprovalRecord: """Tracks approval/rejection state for a tool. @@ -61,6 +76,9 @@ class RunContextWrapper(Generic[TContext]): tool_input: Any | None = None """Structured input for the current agent tool run, when available.""" + # Set by Runner.run_streamed() for ToolContext.send_progress(). + _stream_context: _StreamContext | None = None + @staticmethod def _to_str_or_none(value: Any) -> str | None: if isinstance(value, str): @@ -461,6 +479,7 @@ def _fork_with_tool_input(self, tool_input: Any) -> RunContextWrapper[TContext]: fork._approvals = self._approvals fork.turn_input = self.turn_input fork.tool_input = tool_input + fork._stream_context = self._stream_context return fork def _fork_without_tool_input(self) -> RunContextWrapper[TContext]: @@ -469,6 +488,7 @@ def _fork_without_tool_input(self) -> RunContextWrapper[TContext]: fork.usage = self.usage fork._approvals = self._approvals fork.turn_input = self.turn_input + fork._stream_context = self._stream_context return fork diff --git a/src/agents/stream_events.py b/src/agents/stream_events.py index ac04251ae3..4e01bd2c1b 100644 --- a/src/agents/stream_events.py +++ b/src/agents/stream_events.py @@ -58,5 +58,23 @@ class AgentUpdatedStreamEvent: type: Literal["agent_updated_stream_event"] = "agent_updated_stream_event" -StreamEvent: TypeAlias = RawResponsesStreamEvent | RunItemStreamEvent | AgentUpdatedStreamEvent +@dataclass +class ToolProgressStreamEvent: + """Streaming event emitted by a tool to report intermediate progress.""" + + tool_name: str + """The name of the tool emitting progress.""" + + tool_call_id: str + """The tool call ID this progress event belongs to.""" + + data: Any + """Arbitrary progress payload provided by the tool.""" + + type: Literal["tool_progress_stream_event"] = "tool_progress_stream_event" + + +StreamEvent: TypeAlias = ( + RawResponsesStreamEvent | RunItemStreamEvent | AgentUpdatedStreamEvent | ToolProgressStreamEvent +) """A streaming event from an agent.""" diff --git a/src/agents/tool_context.py b/src/agents/tool_context.py index eaad0cc167..6e16881c6b 100644 --- a/src/agents/tool_context.py +++ b/src/agents/tool_context.py @@ -7,6 +7,7 @@ from ._tool_identity import get_tool_call_namespace, tool_trace_name from .agent_tool_state import get_agent_tool_state_scope, set_agent_tool_state_scope +from .logger import logger from .run_context import RunContextWrapper, TContext from .usage import Usage @@ -72,6 +73,7 @@ def __init__( turn_input: list[TResponseInputItem] | None = None, _approvals: dict[str, _ApprovalRecord] | None = None, tool_input: Any | None = None, + _stream_context: Any | None = None, ) -> None: """Preserve the v0.7 positional constructor while accepting new context fields.""" resolved_usage = Usage() if usage is _MISSING else cast(Usage, usage) @@ -81,6 +83,7 @@ def __init__( turn_input=list(turn_input or []), _approvals={} if _approvals is None else _approvals, tool_input=tool_input, + _stream_context=_stream_context, ) self.tool_name = ( _assert_must_pass_tool_name() if tool_name is _MISSING else cast(str, tool_name) @@ -109,6 +112,29 @@ def qualified_tool_name(self) -> str: """Return the tool name qualified by namespace when available.""" return tool_trace_name(self.tool_name, self.tool_namespace) or self.tool_name + def send_progress(self, data: Any) -> None: + """Emit a progress event to the streaming consumer. + + In streaming mode (``Runner.run_streamed``), pushes a + ``ToolProgressStreamEvent`` into the event stream. In non-streaming + mode, this is a no-op. Works from both async and sync tool functions. + """ + if self._stream_context is None: + return + try: + from .stream_events import ToolProgressStreamEvent + + event = ToolProgressStreamEvent( + tool_name=self.tool_name, + tool_call_id=self.tool_call_id, + data=data, + ) + self._stream_context.event_loop.call_soon_threadsafe( + self._stream_context.event_queue.put_nowait, event + ) + except Exception: + logger.debug("send_progress failed", exc_info=True) + @classmethod def from_agent_context( cls, diff --git a/tests/test_tool_progress.py b/tests/test_tool_progress.py new file mode 100644 index 0000000000..c66400d4d1 --- /dev/null +++ b/tests/test_tool_progress.py @@ -0,0 +1,289 @@ +"""Tests for ToolContext.send_progress and ToolProgressStreamEvent.""" + +from __future__ import annotations + +import asyncio +from typing import Any + +import pytest + +from agents import Agent, Runner +from agents.run_context import RunContextWrapper, _StreamContext +from agents.stream_events import ToolProgressStreamEvent +from agents.tool import function_tool +from agents.tool_context import ToolContext + +from .fake_model import FakeModel +from .test_responses import get_function_tool_call, get_text_message + + +def _make_stream_context( + queue: asyncio.Queue[Any], loop: asyncio.AbstractEventLoop +) -> _StreamContext: + return _StreamContext(event_queue=queue, event_loop=loop) + + +def _make_tool_context( + *, + stream_context: _StreamContext | None = None, + tool_name: str = "test_tool", + tool_call_id: str = "call-1", +) -> ToolContext[None]: + ctx: ToolContext[None] = ToolContext( + context=None, + tool_name=tool_name, + tool_call_id=tool_call_id, + tool_arguments="{}", + _stream_context=stream_context, + ) + return ctx + + +class TestSendProgress: + def test_send_progress_with_queue(self) -> None: + """send_progress pushes a ToolProgressStreamEvent to the queue.""" + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(self._send_and_verify(loop)) + finally: + loop.close() + + async def _send_and_verify(self, loop: asyncio.AbstractEventLoop) -> None: + queue: asyncio.Queue[Any] = asyncio.Queue() + sc = _make_stream_context(queue, loop) + ctx = _make_tool_context(stream_context=sc) + ctx.send_progress({"status": "working", "progress": 0.5}) + + # call_soon_threadsafe schedules on the loop, so we need to let it run. + await asyncio.sleep(0) + + assert not queue.empty() + event = queue.get_nowait() + assert isinstance(event, ToolProgressStreamEvent) + assert event.tool_name == "test_tool" + assert event.tool_call_id == "call-1" + assert event.data == {"status": "working", "progress": 0.5} + assert event.type == "tool_progress_stream_event" + + def test_send_progress_without_stream_context_is_noop(self) -> None: + """send_progress does nothing when _stream_context is None.""" + ctx = _make_tool_context(stream_context=None) + ctx.send_progress({"status": "working"}) + + def test_send_progress_failure_does_not_raise(self) -> None: + """If the queue operation fails, send_progress logs but doesn't crash.""" + bad_sc = _StreamContext(event_queue="not_a_queue", event_loop="not_a_loop") + ctx = _make_tool_context(stream_context=bad_sc) + ctx.send_progress({"status": "working"}) + + def test_multiple_progress_events(self) -> None: + """Multiple send_progress calls all arrive in order.""" + loop = asyncio.new_event_loop() + try: + loop.run_until_complete(self._send_multiple(loop)) + finally: + loop.close() + + async def _send_multiple(self, loop: asyncio.AbstractEventLoop) -> None: + queue: asyncio.Queue[Any] = asyncio.Queue() + sc = _make_stream_context(queue, loop) + ctx = _make_tool_context(stream_context=sc) + ctx.send_progress({"step": 1}) + ctx.send_progress({"step": 2}) + ctx.send_progress({"step": 3}) + await asyncio.sleep(0) + + events = [] + while not queue.empty(): + events.append(queue.get_nowait()) + assert len(events) == 3 + assert events[0].data == {"step": 1} + assert events[1].data == {"step": 2} + assert events[2].data == {"step": 3} + + +class TestToolProgressStreamEvent: + def test_fields(self) -> None: + event = ToolProgressStreamEvent( + tool_name="my_tool", + tool_call_id="call-123", + data={"progress": 0.75}, + ) + assert event.tool_name == "my_tool" + assert event.tool_call_id == "call-123" + assert event.data == {"progress": 0.75} + assert event.type == "tool_progress_stream_event" + + def test_data_accepts_any_type(self) -> None: + event_str = ToolProgressStreamEvent(tool_name="t", tool_call_id="c", data="hello") + assert event_str.data == "hello" + + event_num = ToolProgressStreamEvent(tool_name="t", tool_call_id="c", data=42) + assert event_num.data == 42 + + event_list = ToolProgressStreamEvent(tool_name="t", tool_call_id="c", data=[1, 2, 3]) + assert event_list.data == [1, 2, 3] + + +class TestStreamContextPropagation: + def test_fork_with_tool_input_propagates_stream_context(self) -> None: + """_fork_with_tool_input preserves _stream_context.""" + sc = _StreamContext(event_queue=object(), event_loop=object()) + wrapper: RunContextWrapper[None] = RunContextWrapper(context=None) + wrapper._stream_context = sc + + forked = wrapper._fork_with_tool_input(tool_input="test") + assert forked._stream_context is sc + + def test_fork_without_tool_input_propagates_stream_context(self) -> None: + """_fork_without_tool_input preserves _stream_context.""" + sc = _StreamContext(event_queue=object(), event_loop=object()) + wrapper: RunContextWrapper[None] = RunContextWrapper(context=None) + wrapper._stream_context = sc + + forked = wrapper._fork_without_tool_input() + assert forked._stream_context is sc + + def test_tool_context_from_agent_context_propagates_stream_context(self) -> None: + """ToolContext.from_agent_context copies _stream_context from parent.""" + sc = _StreamContext(event_queue=object(), event_loop=object()) + wrapper: RunContextWrapper[None] = RunContextWrapper(context=None) + wrapper._stream_context = sc + + tool_call = ToolContext.from_agent_context( + wrapper, + tool_call_id="call-1", + tool_name="test", + tool_arguments="{}", + ) + assert tool_call._stream_context is sc + + +class TestStreamingIntegration: + @pytest.mark.asyncio + async def test_progress_events_in_streamed_run(self) -> None: + """Integration test: send_progress events appear in stream_events().""" + + async def _progress_fn(ctx: ToolContext) -> str: + ctx.send_progress({"status": "starting"}) + ctx.send_progress({"status": "done"}) + return "result" + + tool = function_tool(_progress_fn, name_override="progress_tool") + model = FakeModel() + agent = Agent(name="test", model=model, tools=[tool]) + + model.add_multiple_turn_outputs( + [ + [get_function_tool_call("progress_tool", "{}")], + [get_text_message("final answer")], + ] + ) + + result = Runner.run_streamed(agent, input="test") + progress_events: list[ToolProgressStreamEvent] = [] + async for event in result.stream_events(): + if isinstance(event, ToolProgressStreamEvent): + progress_events.append(event) + + assert len(progress_events) == 2 + assert progress_events[0].data == {"status": "starting"} + assert progress_events[0].tool_name == "progress_tool" + assert progress_events[1].data == {"status": "done"} + + @pytest.mark.asyncio + async def test_progress_noop_in_non_streamed_run(self) -> None: + """send_progress is a no-op in non-streaming Runner.run().""" + + calls: list[dict[str, Any]] = [] + + async def _progress_fn(ctx: ToolContext) -> str: + ctx.send_progress({"status": "working"}) + calls.append({"tool": "progress_tool"}) + return "result" + + tool = function_tool(_progress_fn, name_override="progress_tool") + model = FakeModel() + agent = Agent(name="test", model=model, tools=[tool]) + + model.add_multiple_turn_outputs( + [ + [get_function_tool_call("progress_tool", "{}")], + [get_text_message("done")], + ] + ) + + result = await Runner.run(agent, input="test") + assert len(calls) == 1 + assert result.final_output == "done" + + @pytest.mark.asyncio + async def test_parallel_tools_with_progress(self) -> None: + """Two concurrent tools emitting progress; events arrive with correct tool_call_id.""" + + async def _tool_a(ctx: ToolContext) -> str: + ctx.send_progress({"tool": "a", "step": 1}) + ctx.send_progress({"tool": "a", "step": 2}) + return "a_done" + + async def _tool_b(ctx: ToolContext) -> str: + ctx.send_progress({"tool": "b", "step": 1}) + return "b_done" + + tool_a = function_tool(_tool_a, name_override="tool_a") + tool_b = function_tool(_tool_b, name_override="tool_b") + model = FakeModel() + agent = Agent(name="test", model=model, tools=[tool_a, tool_b]) + + model.add_multiple_turn_outputs( + [ + [ + get_function_tool_call("tool_a", "{}", call_id="call_a"), + get_function_tool_call("tool_b", "{}", call_id="call_b"), + ], + [get_text_message("final")], + ] + ) + + result = Runner.run_streamed(agent, input="test") + progress_events: list[ToolProgressStreamEvent] = [] + async for event in result.stream_events(): + if isinstance(event, ToolProgressStreamEvent): + progress_events.append(event) + + a_events = [e for e in progress_events if e.tool_name == "tool_a"] + b_events = [e for e in progress_events if e.tool_name == "tool_b"] + assert len(a_events) == 2 + assert len(b_events) == 1 + assert a_events[0].tool_call_id == "call_a" + assert b_events[0].tool_call_id == "call_b" + + @pytest.mark.asyncio + async def test_progress_events_arrive_before_tool_output(self) -> None: + """Progress events appear before the tool_output event for the same tool.""" + from agents.stream_events import RunItemStreamEvent + + async def _progress_fn(ctx: ToolContext) -> str: + ctx.send_progress({"status": "working"}) + return "result" + + tool = function_tool(_progress_fn, name_override="progress_tool") + model = FakeModel() + agent = Agent(name="test", model=model, tools=[tool]) + + model.add_multiple_turn_outputs( + [ + [get_function_tool_call("progress_tool", "{}")], + [get_text_message("done")], + ] + ) + + result = Runner.run_streamed(agent, input="test") + event_types: list[str] = [] + async for event in result.stream_events(): + if isinstance(event, ToolProgressStreamEvent): + event_types.append("progress") + elif isinstance(event, RunItemStreamEvent) and event.name == "tool_output": + event_types.append("tool_output") + + assert event_types.index("progress") < event_types.index("tool_output")