-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Currently, I'm working on one POC in ADK, which has one root agent and multiple subagents, and for the query forwarding, I've used the LLM-driven delegation. It was working properly but taking much time, so I thought I should integrate the Gemini Live API streaming, which is working properly in the root agent only, but when I ask any query related to the subagents, then it doesn't respond; it only responds with the host agent data, which is given in the prompt. and in Postman, it shows this type of error:
I apologize for the error. It seems I was unable to call the `transfer_to_agent` function correctly. As your query relates to KYC verification, which falls under the domain of the preoss_Agent, I will need to transfer you to that agent.
What I expect to happen is my query goes to the root agent, and if the root agent itself could answer that, then that is okay, but if the query matches with the sub-agent's description, then that query should be forwarded to that sub-agent, which is not currently happening, and I guess it is failing in the transfer_to_agent
or in AutoFlow mechanism
.
Please guide the solution that can solve my scenario perfectly and give good results.
This is what I'm using right now:
My root agent
def create_agent() -> LlmAgent:
return LlmAgent(
name=ORCHESTRATOR_HOST_AGENT_NAME,
model=MODEL,
description=(
"Intelligently routes user queries to <sub agents name> ",
"Handling basic queries like Greetings"
),
instruction=instruction,
tools=[
get_mongodb_tool,
send_email_tool,
],
sub_agents=[
abc_agent,
],
planner=BuiltInPlanner(
thinking_config=types.ThinkingConfig(
thinking_budget=0,
)
),
)
root_agent = create_agent()
Where I want to use
session_service = InMemorySessionService()
APP_NAME = root_agent.name
runner = Runner(
agent=root_agent,
app_name=APP_NAME,
session_service=session_service,
)
@app.post("/chat_sse")
async def chat_sse(request: ChatRequest):
try:
session = await session_service.get_session_sync(
app_name=APP_NAME,
user_id=request.user_id,
session_id=request.session_id,
)
logger.info(f"Fetched existing session: {session.id}")
except Exception as e:
logger.info(f"Session not found, creating new one: {e}")
session = session_service.create_session_sync(
app_name=APP_NAME,
user_id=request.user_id,
)
request.session_id = session.id
logger.info(f"Created new session with id={session.id}")
if session is None or session.id is None:
raise HTTPException(status_code=500, detail="Failed to initialize session")
live_request_queue = LiveRequestQueue()
logger.info("Initialized LiveRequestQueue")
live_request_queue.send_content(
types.Content(role="user", parts=[types.Part(text=request.message)])
)
logger.info("Sent user content to LiveRequestQueue")
run_config = RunConfig(response_modalities=[Modality.TEXT])
async def event_generator():
try:
async for event in runner.run_live(
user_id=request.user_id,
session_id=session.id,
live_request_queue=live_request_queue,
run_config=run_config,
):
if event.content and event.content.parts:
text = event.content.parts[0].text
yield f"data: {text}\n\n"
except Exception as e:
logger.info(f"Error during live streaming: {e}")
yield f"data: [ERROR] {str(e)}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
)```