Skip to content

Commit a87cd6d

Browse files
committed
feat: add ToolContext.send_progress() for streaming tool progress events
Allow function tools to emit intermediate progress events during execution via ToolContext.send_progress(data). Events appear in RunResultStreaming.stream_events() as ToolProgressStreamEvent while the tool is still running. No-op in non-streaming mode.
1 parent 8dc30e4 commit a87cd6d

9 files changed

Lines changed: 551 additions & 1 deletion

File tree

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
"""Example: streaming progress events from long-running tools.
2+
3+
Demonstrates ToolContext.send_progress() — tools can emit intermediate
4+
progress events that appear in stream_events() while the tool is still running.
5+
"""
6+
7+
import asyncio
8+
9+
from agents import Agent, ItemHelpers, Runner, function_tool
10+
from agents.stream_events import ToolProgressStreamEvent
11+
from agents.tool_context import ToolContext
12+
13+
14+
@function_tool
15+
async def analyze_data(ctx: ToolContext, query: str) -> str:
16+
"""Analyze data for a given query. Use this for complex analysis requests."""
17+
ctx.send_progress({"status": "starting", "query": query})
18+
await asyncio.sleep(1)
19+
20+
ctx.send_progress({"status": "fetching_data", "progress": 0.25})
21+
await asyncio.sleep(1)
22+
23+
ctx.send_progress({"status": "processing", "progress": 0.5})
24+
await asyncio.sleep(1)
25+
26+
ctx.send_progress({"status": "finalizing", "progress": 1.0})
27+
await asyncio.sleep(0.5)
28+
29+
return f"Analysis complete for '{query}': found 42 results with 95% confidence."
30+
31+
32+
@function_tool
33+
async def quick_lookup(ctx: ToolContext, term: str) -> str:
34+
"""Look up a term quickly. Use this for simple lookups."""
35+
ctx.send_progress({"status": "searching", "term": term})
36+
await asyncio.sleep(0.5)
37+
return f"Found definition for '{term}': a common search term."
38+
39+
40+
async def main():
41+
agent = Agent(
42+
name="Analyst",
43+
instructions=(
44+
"You are a data analyst. Use the analyze_data tool for complex queries "
45+
"and quick_lookup for simple lookups. Always use the tools when asked."
46+
),
47+
tools=[analyze_data, quick_lookup],
48+
)
49+
50+
print("Interactive tool progress streaming example.")
51+
print("Type a message to chat, or 'quit' to exit.\n")
52+
53+
while True:
54+
user_input = input("You: ").strip()
55+
if not user_input or user_input.lower() == "quit":
56+
print("Goodbye!")
57+
break
58+
59+
result = Runner.run_streamed(agent, input=user_input)
60+
async for event in result.stream_events():
61+
if event.type == "raw_response_event":
62+
continue
63+
elif isinstance(event, ToolProgressStreamEvent):
64+
print(f" [progress] {event.tool_name}: {event.data}")
65+
elif event.type == "agent_updated_stream_event":
66+
print(f"Agent: {event.new_agent.name}")
67+
elif event.type == "run_item_stream_event":
68+
if event.item.type == "tool_call_item":
69+
print(f"\n-- Tool called: {getattr(event.item.raw_item, 'name', '?')}")
70+
elif event.item.type == "tool_call_output_item":
71+
print(f"-- Tool output: {event.item.output}")
72+
elif event.item.type == "message_output_item":
73+
print(f"\nAssistant: {ItemHelpers.text_message_output(event.item)}")
74+
75+
print()
76+
77+
78+
if __name__ == "__main__":
79+
asyncio.run(main())

examples/sandbox/tutorials/misc.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@
4949
from agents.stream_events import (
5050
AgentUpdatedStreamEvent,
5151
RawResponsesStreamEvent,
52+
RunItemStreamEvent,
5253
StreamEvent,
54+
ToolProgressStreamEvent,
5355
)
5456
from examples.auto_mode import input_with_fallback, is_auto_mode
5557

@@ -291,6 +293,21 @@ def print_event(event: PrintableEvent) -> None:
291293
if isinstance(event, RawResponsesStreamEvent):
292294
return
293295

296+
if isinstance(event, ToolProgressStreamEvent):
297+
console.print(
298+
Panel(
299+
Pretty(event.data, expand_all=True),
300+
title=f"Tool progress: {event.tool_name}",
301+
border_style="yellow",
302+
box=box.ROUNDED,
303+
expand=False,
304+
)
305+
)
306+
return
307+
308+
if not isinstance(event, RunItemStreamEvent):
309+
return
310+
294311
body: PanelBody
295312
match event.item:
296313
case ReasoningItem() as item:

pr_description.md

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# PR: feat: add ToolContext.send_progress() for streaming tool progress events
2+
3+
### Summary
4+
5+
Adds `ToolContext.send_progress(data)` — a simple API for function tools to emit intermediate progress events during execution. Events appear in `RunResultStreaming.stream_events()` as `ToolProgressStreamEvent` while the tool is still running. In non-streaming mode (`Runner.run()`), calls are silently ignored.
6+
7+
**Motivation (re: #1333)**
8+
9+
Existing lifecycle hooks (`on_tool_start` / `on_tool_end`) fire at the boundaries of tool execution, but they don't cover cases where a tool needs to emit multiple intermediate updates from inside the tool body. Without framework support, developers resort to external shared state or event buses, which add complexity and couple tool logic to infrastructure concerns. Providing an official way for tools to emit mid-execution progress events improves developer experience and makes responsive UIs and long-running workflows (data processing, web scraping, multi-step API calls) much easier to build.
10+
11+
**New stream event type**
12+
13+
A new `ToolProgressStreamEvent` is added to the `StreamEvent` union alongside the existing `RawResponsesStreamEvent`, `RunItemStreamEvent`, and `AgentUpdatedStreamEvent`. It carries:
14+
15+
- `tool_name: str` — identifies which tool emitted the event
16+
- `tool_call_id: str` — correlates with a specific tool call (important when parallel tools run)
17+
- `data: Any` — arbitrary progress payload (dict, string, number, etc.)
18+
- `type: Literal["tool_progress_stream_event"]` — discriminator for pattern matching
19+
20+
Consumers can filter for progress events via `isinstance(event, ToolProgressStreamEvent)` or `event.type == "tool_progress_stream_event"`.
21+
22+
**Design**
23+
24+
- **Transport**: A `_StreamContext` dataclass (holding `event_queue` and `event_loop`) on `RunContextWrapper` — piggybacking on an object already threaded through the entire execution chain. Zero intermediate function signature changes.
25+
- **API**: `send_progress()` method on `ToolContext` — scoped to function tools, reads per-tool identity (`tool_name`, `tool_call_id`) from the instance. No shared mutable state.
26+
- **Thread safety**: Uses `loop.call_soon_threadsafe()` with a stored event loop reference so sync tools (`sync_invoker=True`) running in worker threads can safely call `send_progress()`. The loop is captured at wiring time (on the event loop thread), not at call time.
27+
- **Nested agent-as-tool**: Each `Runner.run_streamed()` creates a new `RunContextWrapper` with its own `_stream_context`. No cross-contamination between outer and inner runs.
28+
29+
**Usage — basic tool with progress**
30+
31+
```python
32+
from agents import Agent, Runner, function_tool, ToolProgressStreamEvent
33+
from agents.tool_context import ToolContext
34+
35+
@function_tool
36+
async def analyze_data(ctx: ToolContext, query: str) -> str:
37+
ctx.send_progress({"status": "fetching", "progress": 0.25})
38+
# ... work ...
39+
ctx.send_progress({"status": "processing", "progress": 0.75})
40+
# ... more work ...
41+
return "analysis complete"
42+
43+
agent = Agent(name="Analyst", tools=[analyze_data])
44+
45+
result = Runner.run_streamed(agent, "Analyze Q4 sales")
46+
async for event in result.stream_events():
47+
if isinstance(event, ToolProgressStreamEvent):
48+
print(f"[{event.tool_name}] {event.data}")
49+
```
50+
51+
**Usage — agent-as-tool with `on_stream` handler**
52+
53+
When an agent is used as a tool via `as_tool()`, inner progress events are delivered to the `on_stream` callback:
54+
55+
```python
56+
from agents import Agent
57+
from agents.stream_events import ToolProgressStreamEvent
58+
59+
def handle_inner_stream(payload):
60+
event = payload["event"]
61+
if isinstance(event, ToolProgressStreamEvent):
62+
print(f"Inner tool progress: {event.data}")
63+
64+
inner_agent = Agent(name="Researcher", tools=[analyze_data])
65+
outer_agent = Agent(
66+
name="Orchestrator",
67+
tools=[inner_agent.as_tool(on_stream=handle_inner_stream)],
68+
)
69+
```
70+
71+
**Usage — non-streaming mode (no-op)**
72+
73+
```python
74+
# send_progress is silently ignored — no error, no side effects
75+
result = await Runner.run(agent, "Analyze Q4 sales")
76+
```
77+
78+
### Test plan
79+
80+
- Unit tests for `send_progress` with active stream context, without context (no-op), and with broken context (failure isolation)
81+
- Unit tests for `ToolProgressStreamEvent` field validation and `data: Any` flexibility
82+
- Propagation tests: `_stream_context` survives `_fork_with_tool_input`, `_fork_without_tool_input`, and `ToolContext.from_agent_context`
83+
- Integration: streaming run with progress events appearing in `stream_events()`
84+
- Integration: non-streaming run with `send_progress` as no-op
85+
- Integration: parallel tools emitting progress with correct `tool_call_id` attribution
86+
- Integration: progress events arrive before `tool_output` event for the same tool
87+
- 13 tests total, all passing
88+
89+
### Issue number
90+
91+
Closes #1333
92+
93+
### Checks
94+
95+
- [x] I've added new tests (if relevant)
96+
- [x] I've added/updated the relevant documentation
97+
- [x] I've run `make lint` and `make format`
98+
- [x] I've made sure tests pass

src/agents/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@
126126
RawResponsesStreamEvent,
127127
RunItemStreamEvent,
128128
StreamEvent,
129+
ToolProgressStreamEvent,
129130
)
130131
from .tool import (
131132
ApplyPatchTool,
@@ -442,6 +443,7 @@ def enable_verbose_stdout_logging():
442443
"RawResponsesStreamEvent",
443444
"RunItemStreamEvent",
444445
"AgentUpdatedStreamEvent",
446+
"ToolProgressStreamEvent",
445447
"StreamEvent",
446448
"FunctionTool",
447449
"FunctionToolResult",

src/agents/run.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1840,6 +1840,14 @@ def run_streamed(
18401840
if sandbox_runtime.enabled:
18411841
sandbox_runtime.apply_result_metadata(streamed_result)
18421842

1843+
# Allow tools to emit progress events via ToolContext.send_progress().
1844+
from .run_context import _StreamContext
1845+
1846+
context_wrapper._stream_context = _StreamContext(
1847+
event_queue=streamed_result._event_queue,
1848+
event_loop=asyncio.get_running_loop(),
1849+
)
1850+
18431851
# Kick off the actual agent loop in the background and return the streamed result object.
18441852
streamed_result.run_loop_task = asyncio.create_task(
18451853
start_streaming(

src/agents/run_context.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@
2525
TContext = TypeVar("TContext", default=Any)
2626

2727

28+
@dataclass
29+
class _StreamContext:
30+
"""Holds streaming plumbing for ToolContext.send_progress()."""
31+
32+
event_queue: Any
33+
event_loop: Any
34+
35+
2836
@dataclass(eq=False)
2937
class _ApprovalRecord:
3038
"""Tracks approval/rejection state for a tool.
@@ -61,6 +69,9 @@ class RunContextWrapper(Generic[TContext]):
6169
tool_input: Any | None = None
6270
"""Structured input for the current agent tool run, when available."""
6371

72+
# Set by Runner.run_streamed() for ToolContext.send_progress().
73+
_stream_context: _StreamContext | None = None
74+
6475
@staticmethod
6576
def _to_str_or_none(value: Any) -> str | None:
6677
if isinstance(value, str):
@@ -461,6 +472,7 @@ def _fork_with_tool_input(self, tool_input: Any) -> RunContextWrapper[TContext]:
461472
fork._approvals = self._approvals
462473
fork.turn_input = self.turn_input
463474
fork.tool_input = tool_input
475+
fork._stream_context = self._stream_context
464476
return fork
465477

466478
def _fork_without_tool_input(self) -> RunContextWrapper[TContext]:
@@ -469,6 +481,7 @@ def _fork_without_tool_input(self) -> RunContextWrapper[TContext]:
469481
fork.usage = self.usage
470482
fork._approvals = self._approvals
471483
fork.turn_input = self.turn_input
484+
fork._stream_context = self._stream_context
472485
return fork
473486

474487

src/agents/stream_events.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,23 @@ class AgentUpdatedStreamEvent:
5858
type: Literal["agent_updated_stream_event"] = "agent_updated_stream_event"
5959

6060

61-
StreamEvent: TypeAlias = RawResponsesStreamEvent | RunItemStreamEvent | AgentUpdatedStreamEvent
61+
@dataclass
62+
class ToolProgressStreamEvent:
63+
"""Streaming event emitted by a tool to report intermediate progress."""
64+
65+
tool_name: str
66+
"""The name of the tool emitting progress."""
67+
68+
tool_call_id: str
69+
"""The tool call ID this progress event belongs to."""
70+
71+
data: Any
72+
"""Arbitrary progress payload provided by the tool."""
73+
74+
type: Literal["tool_progress_stream_event"] = "tool_progress_stream_event"
75+
76+
77+
StreamEvent: TypeAlias = (
78+
RawResponsesStreamEvent | RunItemStreamEvent | AgentUpdatedStreamEvent | ToolProgressStreamEvent
79+
)
6280
"""A streaming event from an agent."""

src/agents/tool_context.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from ._tool_identity import get_tool_call_namespace, tool_trace_name
99
from .agent_tool_state import get_agent_tool_state_scope, set_agent_tool_state_scope
10+
from .logger import logger
1011
from .run_context import RunContextWrapper, TContext
1112
from .usage import Usage
1213

@@ -72,6 +73,7 @@ def __init__(
7273
turn_input: list[TResponseInputItem] | None = None,
7374
_approvals: dict[str, _ApprovalRecord] | None = None,
7475
tool_input: Any | None = None,
76+
_stream_context: Any | None = None,
7577
) -> None:
7678
"""Preserve the v0.7 positional constructor while accepting new context fields."""
7779
resolved_usage = Usage() if usage is _MISSING else cast(Usage, usage)
@@ -81,6 +83,7 @@ def __init__(
8183
turn_input=list(turn_input or []),
8284
_approvals={} if _approvals is None else _approvals,
8385
tool_input=tool_input,
86+
_stream_context=_stream_context,
8487
)
8588
self.tool_name = (
8689
_assert_must_pass_tool_name() if tool_name is _MISSING else cast(str, tool_name)
@@ -109,6 +112,29 @@ def qualified_tool_name(self) -> str:
109112
"""Return the tool name qualified by namespace when available."""
110113
return tool_trace_name(self.tool_name, self.tool_namespace) or self.tool_name
111114

115+
def send_progress(self, data: Any) -> None:
116+
"""Emit a progress event to the streaming consumer.
117+
118+
In streaming mode (``Runner.run_streamed``), pushes a
119+
``ToolProgressStreamEvent`` into the event stream. In non-streaming
120+
mode, this is a no-op. Works from both async and sync tool functions.
121+
"""
122+
if self._stream_context is None:
123+
return
124+
try:
125+
from .stream_events import ToolProgressStreamEvent
126+
127+
event = ToolProgressStreamEvent(
128+
tool_name=self.tool_name,
129+
tool_call_id=self.tool_call_id,
130+
data=data,
131+
)
132+
self._stream_context.event_loop.call_soon_threadsafe(
133+
self._stream_context.event_queue.put_nowait, event
134+
)
135+
except Exception:
136+
logger.debug("send_progress failed", exc_info=True)
137+
112138
@classmethod
113139
def from_agent_context(
114140
cls,

0 commit comments

Comments
 (0)