Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions examples/basic/stream_tool_progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""Example: streaming progress events from long-running tools.

Demonstrates ToolContext.send_progress() — tools can emit intermediate
progress events that appear in stream_events() while the tool is still running.
"""

import asyncio

from agents import Agent, ItemHelpers, Runner, function_tool
from agents.stream_events import ToolProgressStreamEvent
from agents.tool_context import ToolContext


@function_tool
async def analyze_data(ctx: ToolContext, query: str) -> str:
"""Analyze data for a given query. Use this for complex analysis requests."""
ctx.send_progress({"status": "starting", "query": query})
await asyncio.sleep(1)

ctx.send_progress({"status": "fetching_data", "progress": 0.25})
await asyncio.sleep(1)

ctx.send_progress({"status": "processing", "progress": 0.5})
await asyncio.sleep(1)

ctx.send_progress({"status": "finalizing", "progress": 1.0})
await asyncio.sleep(0.5)

return f"Analysis complete for '{query}': found 42 results with 95% confidence."


@function_tool
async def quick_lookup(ctx: ToolContext, term: str) -> str:
"""Look up a term quickly. Use this for simple lookups."""
ctx.send_progress({"status": "searching", "term": term})
await asyncio.sleep(0.5)
return f"Found definition for '{term}': a common search term."


async def main():
agent = Agent(
name="Analyst",
instructions=(
"You are a data analyst. Use the analyze_data tool for complex queries "
"and quick_lookup for simple lookups. Always use the tools when asked."
),
tools=[analyze_data, quick_lookup],
)

print("Interactive tool progress streaming example.")
print("Type a message to chat, or 'quit' to exit.\n")

while True:
user_input = input("You: ").strip()
if not user_input or user_input.lower() == "quit":
print("Goodbye!")
break

result = Runner.run_streamed(agent, input=user_input)
async for event in result.stream_events():
if event.type == "raw_response_event":
continue
elif isinstance(event, ToolProgressStreamEvent):
print(f" [progress] {event.tool_name}: {event.data}")
elif event.type == "agent_updated_stream_event":
print(f"Agent: {event.new_agent.name}")
elif event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
print(f"\n-- Tool called: {getattr(event.item.raw_item, 'name', '?')}")
elif event.item.type == "tool_call_output_item":
print(f"-- Tool output: {event.item.output}")
elif event.item.type == "message_output_item":
print(f"\nAssistant: {ItemHelpers.text_message_output(event.item)}")

print()


if __name__ == "__main__":
asyncio.run(main())
17 changes: 17 additions & 0 deletions examples/sandbox/tutorials/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
from agents.stream_events import (
AgentUpdatedStreamEvent,
RawResponsesStreamEvent,
RunItemStreamEvent,
StreamEvent,
ToolProgressStreamEvent,
)
from examples.auto_mode import input_with_fallback, is_auto_mode

Expand Down Expand Up @@ -291,6 +293,21 @@ def print_event(event: PrintableEvent) -> None:
if isinstance(event, RawResponsesStreamEvent):
return

if isinstance(event, ToolProgressStreamEvent):
console.print(
Panel(
Pretty(event.data, expand_all=True),
title=f"Tool progress: {event.tool_name}",
border_style="yellow",
box=box.ROUNDED,
expand=False,
)
)
return

if not isinstance(event, RunItemStreamEvent):
return

body: PanelBody
match event.item:
case ReasoningItem() as item:
Expand Down
98 changes: 98 additions & 0 deletions pr_description.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# PR: feat: add ToolContext.send_progress() for streaming tool progress events

### Summary

Adds `ToolContext.send_progress(data)` — a simple API for function tools to emit intermediate progress events during execution. Events appear in `RunResultStreaming.stream_events()` as `ToolProgressStreamEvent` while the tool is still running. In non-streaming mode (`Runner.run()`), calls are silently ignored.

**Motivation (re: #1333)**

Existing lifecycle hooks (`on_tool_start` / `on_tool_end`) fire at the boundaries of tool execution, but they don't cover cases where a tool needs to emit multiple intermediate updates from inside the tool body. Without framework support, developers resort to external shared state or event buses, which add complexity and couple tool logic to infrastructure concerns. Providing an official way for tools to emit mid-execution progress events improves developer experience and makes responsive UIs and long-running workflows (data processing, web scraping, multi-step API calls) much easier to build.

**New stream event type**

A new `ToolProgressStreamEvent` is added to the `StreamEvent` union alongside the existing `RawResponsesStreamEvent`, `RunItemStreamEvent`, and `AgentUpdatedStreamEvent`. It carries:

- `tool_name: str` — identifies which tool emitted the event
- `tool_call_id: str` — correlates with a specific tool call (important when parallel tools run)
- `data: Any` — arbitrary progress payload (dict, string, number, etc.)
- `type: Literal["tool_progress_stream_event"]` — discriminator for pattern matching

Consumers can filter for progress events via `isinstance(event, ToolProgressStreamEvent)` or `event.type == "tool_progress_stream_event"`.

**Design**

- **Transport**: A `_StreamContext` dataclass (holding `event_queue` and `event_loop`) on `RunContextWrapper` — piggybacking on an object already threaded through the entire execution chain. Zero intermediate function signature changes.
- **API**: `send_progress()` method on `ToolContext` — scoped to function tools, reads per-tool identity (`tool_name`, `tool_call_id`) from the instance. No shared mutable state.
- **Thread safety**: Uses `loop.call_soon_threadsafe()` with a stored event loop reference so sync tools (`sync_invoker=True`) running in worker threads can safely call `send_progress()`. The loop is captured at wiring time (on the event loop thread), not at call time.
- **Nested agent-as-tool**: Each `Runner.run_streamed()` creates a new `RunContextWrapper` with its own `_stream_context`. No cross-contamination between outer and inner runs.

**Usage — basic tool with progress**

```python
from agents import Agent, Runner, function_tool, ToolProgressStreamEvent
from agents.tool_context import ToolContext

@function_tool
async def analyze_data(ctx: ToolContext, query: str) -> str:
ctx.send_progress({"status": "fetching", "progress": 0.25})
# ... work ...
ctx.send_progress({"status": "processing", "progress": 0.75})
# ... more work ...
return "analysis complete"

agent = Agent(name="Analyst", tools=[analyze_data])

result = Runner.run_streamed(agent, "Analyze Q4 sales")
async for event in result.stream_events():
if isinstance(event, ToolProgressStreamEvent):
print(f"[{event.tool_name}] {event.data}")
```

**Usage — agent-as-tool with `on_stream` handler**

When an agent is used as a tool via `as_tool()`, inner progress events are delivered to the `on_stream` callback:

```python
from agents import Agent
from agents.stream_events import ToolProgressStreamEvent

def handle_inner_stream(payload):
event = payload["event"]
if isinstance(event, ToolProgressStreamEvent):
print(f"Inner tool progress: {event.data}")

inner_agent = Agent(name="Researcher", tools=[analyze_data])
outer_agent = Agent(
name="Orchestrator",
tools=[inner_agent.as_tool(on_stream=handle_inner_stream)],
)
```

**Usage — non-streaming mode (no-op)**

```python
# send_progress is silently ignored — no error, no side effects
result = await Runner.run(agent, "Analyze Q4 sales")
```

### Test plan

- Unit tests for `send_progress` with active stream context, without context (no-op), and with broken context (failure isolation)
- Unit tests for `ToolProgressStreamEvent` field validation and `data: Any` flexibility
- Propagation tests: `_stream_context` survives `_fork_with_tool_input`, `_fork_without_tool_input`, and `ToolContext.from_agent_context`
- Integration: streaming run with progress events appearing in `stream_events()`
- Integration: non-streaming run with `send_progress` as no-op
- Integration: parallel tools emitting progress with correct `tool_call_id` attribution
- Integration: progress events arrive before `tool_output` event for the same tool
- 13 tests total, all passing

### Issue number

Closes #1333

### Checks

- [x] I've added new tests (if relevant)
- [x] I've added/updated the relevant documentation
- [x] I've run `make lint` and `make format`
- [x] I've made sure tests pass
2 changes: 2 additions & 0 deletions src/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
RawResponsesStreamEvent,
RunItemStreamEvent,
StreamEvent,
ToolProgressStreamEvent,
)
from .tool import (
ApplyPatchTool,
Expand Down Expand Up @@ -442,6 +443,7 @@ def enable_verbose_stdout_logging():
"RawResponsesStreamEvent",
"RunItemStreamEvent",
"AgentUpdatedStreamEvent",
"ToolProgressStreamEvent",
"StreamEvent",
"FunctionTool",
"FunctionToolResult",
Expand Down
8 changes: 8 additions & 0 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1840,6 +1840,14 @@ def run_streamed(
if sandbox_runtime.enabled:
sandbox_runtime.apply_result_metadata(streamed_result)

# Allow tools to emit progress events via ToolContext.send_progress().
from .run_context import _StreamContext

context_wrapper._stream_context = _StreamContext(
event_queue=streamed_result._event_queue,
event_loop=asyncio.get_running_loop(),
)

# Kick off the actual agent loop in the background and return the streamed result object.
streamed_result.run_loop_task = asyncio.create_task(
start_streaming(
Expand Down
20 changes: 20 additions & 0 deletions src/agents/run_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@
TContext = TypeVar("TContext", default=Any)


@dataclass
class _StreamContext:
"""Holds streaming plumbing for ToolContext.send_progress().

Bundles the event queue and the event loop together because sync tools
run in a worker thread (via asyncio.to_thread) and cannot call
asyncio.get_running_loop(). The loop reference is captured at wiring
time on the event loop thread so that send_progress() can use
loop.call_soon_threadsafe() from any thread.
"""

event_queue: Any
event_loop: Any


@dataclass(eq=False)
class _ApprovalRecord:
"""Tracks approval/rejection state for a tool.
Expand Down Expand Up @@ -61,6 +76,9 @@ class RunContextWrapper(Generic[TContext]):
tool_input: Any | None = None
"""Structured input for the current agent tool run, when available."""

# Set by Runner.run_streamed() for ToolContext.send_progress().
_stream_context: _StreamContext | None = None

@staticmethod
def _to_str_or_none(value: Any) -> str | None:
if isinstance(value, str):
Expand Down Expand Up @@ -461,6 +479,7 @@ def _fork_with_tool_input(self, tool_input: Any) -> RunContextWrapper[TContext]:
fork._approvals = self._approvals
fork.turn_input = self.turn_input
fork.tool_input = tool_input
fork._stream_context = self._stream_context
return fork

def _fork_without_tool_input(self) -> RunContextWrapper[TContext]:
Expand All @@ -469,6 +488,7 @@ def _fork_without_tool_input(self) -> RunContextWrapper[TContext]:
fork.usage = self.usage
fork._approvals = self._approvals
fork.turn_input = self.turn_input
fork._stream_context = self._stream_context
return fork


Expand Down
20 changes: 19 additions & 1 deletion src/agents/stream_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,23 @@ class AgentUpdatedStreamEvent:
type: Literal["agent_updated_stream_event"] = "agent_updated_stream_event"


StreamEvent: TypeAlias = RawResponsesStreamEvent | RunItemStreamEvent | AgentUpdatedStreamEvent
@dataclass
class ToolProgressStreamEvent:
"""Streaming event emitted by a tool to report intermediate progress."""

tool_name: str
"""The name of the tool emitting progress."""

tool_call_id: str
"""The tool call ID this progress event belongs to."""

data: Any
"""Arbitrary progress payload provided by the tool."""

type: Literal["tool_progress_stream_event"] = "tool_progress_stream_event"


StreamEvent: TypeAlias = (
RawResponsesStreamEvent | RunItemStreamEvent | AgentUpdatedStreamEvent | ToolProgressStreamEvent
)
"""A streaming event from an agent."""
26 changes: 26 additions & 0 deletions src/agents/tool_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from ._tool_identity import get_tool_call_namespace, tool_trace_name
from .agent_tool_state import get_agent_tool_state_scope, set_agent_tool_state_scope
from .logger import logger
from .run_context import RunContextWrapper, TContext
from .usage import Usage

Expand Down Expand Up @@ -72,6 +73,7 @@ def __init__(
turn_input: list[TResponseInputItem] | None = None,
_approvals: dict[str, _ApprovalRecord] | None = None,
tool_input: Any | None = None,
_stream_context: Any | None = None,
) -> None:
"""Preserve the v0.7 positional constructor while accepting new context fields."""
resolved_usage = Usage() if usage is _MISSING else cast(Usage, usage)
Expand All @@ -81,6 +83,7 @@ def __init__(
turn_input=list(turn_input or []),
_approvals={} if _approvals is None else _approvals,
tool_input=tool_input,
_stream_context=_stream_context,
)
self.tool_name = (
_assert_must_pass_tool_name() if tool_name is _MISSING else cast(str, tool_name)
Expand Down Expand Up @@ -109,6 +112,29 @@ def qualified_tool_name(self) -> str:
"""Return the tool name qualified by namespace when available."""
return tool_trace_name(self.tool_name, self.tool_namespace) or self.tool_name

def send_progress(self, data: Any) -> None:
"""Emit a progress event to the streaming consumer.

In streaming mode (``Runner.run_streamed``), pushes a
``ToolProgressStreamEvent`` into the event stream. In non-streaming
mode, this is a no-op. Works from both async and sync tool functions.
"""
if self._stream_context is None:
return
try:
from .stream_events import ToolProgressStreamEvent

event = ToolProgressStreamEvent(
tool_name=self.tool_name,
tool_call_id=self.tool_call_id,
data=data,
)
self._stream_context.event_loop.call_soon_threadsafe(
self._stream_context.event_queue.put_nowait, event
)
except Exception:
logger.debug("send_progress failed", exc_info=True)

@classmethod
def from_agent_context(
cls,
Expand Down
Loading
Loading