-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Labels
help wanted[Community] Extra attention is needed[Community] Extra attention is neededlive[Component] This issue is related to live, voice and video chat[Component] This issue is related to live, voice and video chat
Description
** Please make sure you read the contribution guide and file the issues in the rigth place. **
Contribution guide.
Describe the bug
streaming tool is not working for latest model for Vertex endpoint. It works for AIS endpoint.
Tested based on : https://github.com/google/adk-python/pull/590/files
To Reproduce
apply https://github.com/google/adk-python/pull/590/files. then test the following agent for vertex endpoint:
import asyncio
from typing import AsyncGenerator
from google.adk.agents import LiveRequestQueue
from google.adk.agents.llm_agent import Agent
from google.adk.tools.function_tool import FunctionTool
from google.genai import Client
from google.genai import types as genai_types
async def monitor_stock_price(stock_symbol: str) -> AsyncGenerator[str, None]:
"""This function will monitor the price for the given stock_symbol in a continuous, streaming and asynchronously way."""
print(f"Start monitor stock price for {stock_symbol}!")
# Let's mock stock price change.
await asyncio.sleep(4)
price_alert1 = f"the price for {stock_symbol} is 300"
yield price_alert1
print(price_alert1)
await asyncio.sleep(4)
price_alert1 = f"the price for {stock_symbol} is 400"
yield price_alert1
print(price_alert1)
await asyncio.sleep(20)
price_alert1 = f"the price for {stock_symbol} is 900"
yield price_alert1
print(price_alert1)
await asyncio.sleep(20)
price_alert1 = f"the price for {stock_symbol} is 500"
yield price_alert1
print(price_alert1)
# for video streaming, `input_stream: LiveRequestQueue` is required and reserved key parameter for ADK to pass the video streams in.
async def monitor_video_stream(
input_stream: LiveRequestQueue,
) -> AsyncGenerator[str, None]:
"""Monitor how many people are in the video streams."""
print("start monitor_video_stream!")
client = Client(vertexai=False)
prompt_text = (
"Count the number of people in this image. Just respond with a numeric"
" number."
)
last_count = None
while True:
last_valid_req = None
print("Start monitoring loop")
# use this loop to pull the latest images and discard the old ones
while input_stream._queue.qsize() != 0:
live_req = await input_stream.get()
if live_req.blob is not None and live_req.blob.mime_type == "image/jpeg":
last_valid_req = live_req
# If we found a valid image, process it
if last_valid_req is not None:
print("Processing the most recent frame from the queue")
# Create an image part using the blob's data and mime type
image_part = genai_types.Part.from_bytes(
data=last_valid_req.blob.data, mime_type=last_valid_req.blob.mime_type
)
contents = genai_types.Content(
role="user",
parts=[image_part, genai_types.Part.from_text(prompt_text)],
)
# Call the model to generate content based on the provided image and prompt
response = client.models.generate_content(
model="gemini-2.0-flash-exp",
contents=contents,
config=genai_types.GenerateContentConfig(
system_instruction=(
"You are a helpful video analysis assistant. You can count"
" the number of people in this image or video. Just respond"
" with a numeric number."
)
),
)
if not last_count:
last_count = response.candidates[0].content.parts[0].text
elif last_count != response.candidates[0].content.parts[0].text:
last_count = response.candidates[0].content.parts[0].text
yield response
print("response:", response)
# Wait before checking for new images
await asyncio.sleep(0.5)
# Use this exact function to help ADK stop your streaming tools when requested.
# for example, if we want to stop `monitor_stock_price`, then the agent will
# invoke this function with stop_streaming(function_name=monitor_stock_price).
def stop_streaming(function_name: str):
"""Stop the streaming
Args:
function_name: The name of the streaming function to stop.
"""
pass
root_agent = Agent(
model="gemini-2.0-flash-live-preview-04-09",
name="video_streaming_agent",
instruction="""
You are a monitoring agent. You can do video monitoring and stock price monitoring
using the provided tools/functions.
When users want to monitor a video stream,
You can use monitor_video_stream function to do that. When monitor_video_stream
returns the alert, you should tell the users.
When users want to monitor a stock price, you can use monitor_stock_price.
Don't ask too many questions. Don't be too talkative.
""",
tools=[
monitor_video_stream,
monitor_stock_price,
FunctionTool(stop_streaming),
]
)
Expected behavior
it should work for the above demo
Screenshots
2025-05-07 20:55:55,358 - INFO - fast_api.py:313 - New session created
INFO: 127.0.0.1:54630 - "POST /apps/streaming_tool_agent/users/user/sessions HTTP/1.1" 200 OK
INFO: 127.0.0.1:54633 - "GET /apps/streaming_tool_agent/eval_sets HTTP/1.1" 200 OK
INFO: 127.0.0.1:54633 - "GET /apps/streaming_tool_agent/users/user/sessions HTTP/1.1" 200 OK
INFO: 127.0.0.1:54633 - "GET /apps/streaming_tool_agent/users/user/sessions HTTP/1.1" 200 OK
INFO: ('127.0.0.1', 54636) - "WebSocket /run_live?app_name=streaming_tool_agent&user_id=user&session_id=971993c5-303c-4a40-a7e0-d77cc7b73274" [accepted]
INFO: connection open
2025-05-07 20:55:56,016 - INFO - envs.py:54 - No .env file found for streaming_tool_agent
2025-05-07 20:55:56,023 - ERROR - fast_api.py:757 - Error during live websocket communication: Failed to parse the parameter return_value: AsyncGenerator[str, NoneType] of function monitor_video_stream for automatic function calling. Automatic function calling works best with simpler function signature schema,consider manually parse your function declaration for function monitor_video_stream.
Traceback (most recent call last):
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/cli/fast_api.py", line 753, in agent_live_run
task.result()
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/cli/fast_api.py", line 726, in forward_events
async for event in runner.run_live(
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/runners.py", line 296, in run_live
async for event in invocation_context.agent.run_live(invocation_context):
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/agents/base_agent.py", line 160, in run_live
async for event in self._run_live_impl(ctx):
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/agents/llm_agent.py", line 254, in _run_live_impl
async for event in self._llm_flow.run_live(ctx):
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/flows/llm_flows/base_llm_flow.py", line 72, in run_live
async for event in self._preprocess_async(invocation_context, llm_request):
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/flows/llm_flows/base_llm_flow.py", line 296, in _preprocess_async
await tool.process_llm_request(
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/tools/base_tool.py", line 96, in process_llm_request
if (function_declaration := self._get_declaration()) is None:
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/tools/function_tool.py", line 42, in _get_declaration
build_function_declaration(
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/tools/_automatic_function_calling_util.py", line 234, in build_function_declaration
else from_function_with_options(new_func, variant)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/tools/_automatic_function_calling_util.py", line 336, in from_function_with_options
function_parameter_parse_util._parse_schema_from_parameter(
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/tools/function_parameter_parse_util.py", line 292, in _parse_schema_from_parameter
raise ValueError(
ValueError: Failed to parse the parameter return_value: AsyncGenerator[str, NoneType] of function monitor_video_stream for automatic function calling. Automatic function calling works best with simpler function signature schema,consider manually parse your function declaration for function monitor_video_stream.
Traceback (most recent call last):
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/cli/fast_api.py", line 753, in agent_live_run
task.result()
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/cli/fast_api.py", line 726, in forward_events
async for event in runner.run_live(
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/runners.py", line 296, in run_live
async for event in invocation_context.agent.run_live(invocation_context):
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/agents/base_agent.py", line 160, in run_live
async for event in self._run_live_impl(ctx):
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/agents/llm_agent.py", line 254, in _run_live_impl
async for event in self._llm_flow.run_live(ctx):
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/flows/llm_flows/base_llm_flow.py", line 72, in run_live
async for event in self._preprocess_async(invocation_context, llm_request):
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/flows/llm_flows/base_llm_flow.py", line 296, in _preprocess_async
await tool.process_llm_request(
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/tools/base_tool.py", line 96, in process_llm_request
if (function_declaration := self._get_declaration()) is None:
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/tools/function_tool.py", line 42, in _get_declaration
build_function_declaration(
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/tools/_automatic_function_calling_util.py", line 234, in build_function_declaration
else from_function_with_options(new_func, variant)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/tools/_automatic_function_calling_util.py", line 336, in from_function_with_options
function_parameter_parse_util._parse_schema_from_parameter(
File "/Users/hangfeilin/cursor/adk-python/src/google/adk/tools/function_parameter_parse_util.py", line 292, in _parse_schema_from_parameter
raise ValueError(
ValueError: Failed to parse the parameter return_value: AsyncGenerator[str, NoneType] of function monitor_video_stream for automatic function calling. Automatic function calling works best with simpler function signature schema,consider manually parse your function declaration for function monitor_video_stream.
INFO: connection closed
^CINFO: Shutting down
Desktop (please complete the following information):
- OS: mac os
- Python version(python -V): 3.11
- ADK version(pip show google-adk): 0.4.0
Additional context
Add any other context about the problem here.
Metadata
Metadata
Assignees
Labels
help wanted[Community] Extra attention is needed[Community] Extra attention is neededlive[Component] This issue is related to live, voice and video chat[Component] This issue is related to live, voice and video chat