Skip to content

Feat: separate tool_call_item and tool_call_output_item in stream events #974

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/agents/_run_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,12 +911,12 @@ async def run_single_output_guardrail(
return result

@classmethod
def stream_step_result_to_queue(
def stream_step_items_to_queue(
cls,
step_result: SingleStepResult,
new_step_items: list[RunItem],
queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel],
):
for item in step_result.new_step_items:
for item in new_step_items:
if isinstance(item, MessageOutputItem):
event = RunItemStreamEvent(item=item, name="message_output_created")
elif isinstance(item, HandoffCallItem):
Expand All @@ -941,6 +941,14 @@ def stream_step_result_to_queue(
if event:
queue.put_nowait(event)

@classmethod
def stream_step_result_to_queue(
cls,
step_result: SingleStepResult,
queue: asyncio.Queue[StreamEvent | QueueCompleteSentinel],
):
cls.stream_step_items_to_queue(step_result.new_step_items, queue)

@classmethod
async def _check_for_final_output_from_tools(
cls,
Expand Down
59 changes: 53 additions & 6 deletions src/agents/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -904,10 +904,9 @@ async def _run_single_turn_streamed(
raise ModelBehaviorError("Model did not produce a final response!")

# 3. Now, we can process the turn as we do in the non-streaming case
single_step_result = await cls._get_single_step_result_from_response(
return await cls._get_single_step_result_from_streamed_response(
agent=agent,
original_input=streamed_result.input,
pre_step_items=streamed_result.new_items,
streamed_result=streamed_result,
new_response=final_response,
output_schema=output_schema,
all_tools=all_tools,
Expand All @@ -918,9 +917,6 @@ async def _run_single_turn_streamed(
tool_use_tracker=tool_use_tracker,
)

RunImpl.stream_step_result_to_queue(single_step_result, streamed_result._event_queue)
return single_step_result

@classmethod
async def _run_single_turn(
cls,
Expand Down Expand Up @@ -1023,6 +1019,57 @@ async def _get_single_step_result_from_response(
run_config=run_config,
)

@classmethod
async def _get_single_step_result_from_streamed_response(
cls,
*,
agent: Agent[TContext],
all_tools: list[Tool],
streamed_result: RunResultStreaming,
new_response: ModelResponse,
output_schema: AgentOutputSchemaBase | None,
handoffs: list[Handoff],
hooks: RunHooks[TContext],
context_wrapper: RunContextWrapper[TContext],
run_config: RunConfig,
tool_use_tracker: AgentToolUseTracker,
) -> SingleStepResult:

original_input = streamed_result.input
pre_step_items = streamed_result.new_items
event_queue = streamed_result._event_queue

processed_response = RunImpl.process_model_response(
agent=agent,
all_tools=all_tools,
response=new_response,
output_schema=output_schema,
handoffs=handoffs,
)
new_items_processed_response = processed_response.new_items
tool_use_tracker.add_tool_use(agent, processed_response.tools_used)
RunImpl.stream_step_items_to_queue(new_items_processed_response, event_queue)

single_step_result = await RunImpl.execute_tools_and_side_effects(
agent=agent,
original_input=original_input,
pre_step_items=pre_step_items,
new_response=new_response,
processed_response=processed_response,
output_schema=output_schema,
hooks=hooks,
context_wrapper=context_wrapper,
run_config=run_config,
)
new_step_items = [
item
for item in single_step_result.new_step_items
if item not in new_items_processed_response
]
RunImpl.stream_step_items_to_queue(new_step_items, event_queue)

return single_step_result

@classmethod
async def _run_input_guardrails(
cls,
Expand Down
53 changes: 53 additions & 0 deletions tests/test_stream_events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import asyncio
import time

import pytest

from agents import Agent, Runner, function_tool

from .fake_model import FakeModel
from .test_responses import get_function_tool_call, get_text_message


@function_tool
async def foo() -> str:
await asyncio.sleep(3)
return "success!"

@pytest.mark.asyncio
async def test_stream_events_main():
model = FakeModel()
agent = Agent(
name="Joker",
model=model,
tools=[foo],
)

model.add_multiple_turn_outputs(
[
# First turn: a message and tool call
[
get_text_message("a_message"),
get_function_tool_call("foo", ""),
],
# Second turn: text message
[get_text_message("done")],
]
)

result = Runner.run_streamed(
agent,
input="Hello",
)
tool_call_start_time = -1
tool_call_end_time = -1
async for event in result.stream_events():
if event.type == "run_item_stream_event":
if event.item.type == "tool_call_item":
tool_call_start_time = time.time_ns()
elif event.item.type == "tool_call_output_item":
tool_call_end_time = time.time_ns()

assert tool_call_start_time > 0, "tool_call_item was not observed"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be better to instead capture all the events and do a snapshot() to ensure they are in the right order + params

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be better to instead capture all the events and do a snapshot() to ensure they are in the right order + params

The order of these events is correct, but tool_call_item is triggered immediately after tool_call_output_item (upon tool call completion), not at the start of the tool call.

assert tool_call_end_time > 0, "tool_call_output_item was not observed"
assert tool_call_start_time < tool_call_end_time, "Tool call ended before or equals it started?"
Loading