Skip to content

Callbacks not getting executed with ADK Bidi-streaming [Audio to Audio conversation] #1897

@ravi-incred

Description

@ravi-incred

Describe the bug
I am trying to log the timestamps of the requests using before and after callbacks provided in the ADK documentation. They work when working with traditional text to text communication. But callbacks are not getting executed when working in Audio to Audio conversation.

Setup
I've setup a very basic client and backend which communicates over websocket.
Backend creates a in-memory session whenever clients connects over ws.
Client sends stream of audio over ws and backend sends it to Gemini Model.

Code

import os

from datetime import datetime
from typing import Optional
import copy

## Third Party Imports
from google.adk.agents import Agent
from google.adk.tools.agent_tool import AgentTool
from google.adk.agents.callback_context import CallbackContext
from google.genai import types
from google.adk.models import LlmRequest, LlmResponse

## Import SUB AGENTS
from .sub_agents.emi_loan_agent.agent import emi_loan_agent

## Import Tools - Manager Agent
from .tools.date_time import get_current_time


def before_agent_callback(callback_context: CallbackContext) -> Optional[types.Content]:
    """
    Simple callback that logs when the agent starts processing a request.

    Args:
        callback_context: Contains state and context information

    Returns:
        None to continue with normal agent processing
    """
    # Get the session state
    state = callback_context.state

    # Record timestamp
    timestamp = datetime.now()

    # Set agent name if not present
    if "agent_name" not in state:
        state["agent_name"] = "SimpleChatBot"

    # Initialize request counter
    if "request_counter" not in state:
        state["request_counter"] = 1
    else:
        state["request_counter"] += 1

    # Store start time for duration calculation in after_agent_callback
    state["request_start_time"] = timestamp

    # Log the request
    print("=== AGENT EXECUTION STARTED ===")
    print(f"Request #: {state['request_counter']}")
    print(f"Timestamp: {timestamp.strftime('%Y-%m-%d %H:%M:%S')}")

    # Print to console
    print(f"\n[BEFORE CALLBACK] Agent processing request #{state['request_counter']}")

    return None

def after_agent_callback(callback_context: CallbackContext) -> Optional[types.Content]:
    """
    Simple callback that logs when the agent finishes processing a request.

    Args:
        callback_context: Contains state and context information

    Returns:
        None to continue with normal agent processing
    """
    # Get the session state
    state = callback_context.state

    # Calculate request duration if start time is available
    timestamp = datetime.now()
    duration = None
    if "request_start_time" in state:
        duration = (timestamp - state["request_start_time"]).total_seconds()

    # Log the completion
    print("=== AGENT EXECUTION COMPLETED ===")
    print(f"Request #: {state.get('request_counter', 'Unknown')}")
    if duration is not None:
        print(f"Duration: {duration:.2f} seconds")

    # Print to console
    print(
        f"[AFTER CALLBACK] Agent completed request #{state.get('request_counter', 'Unknown')}"
    )
    if duration is not None:
        print(f"[AFTER CALLBACK] Processing took {duration:.2f} seconds")

    return None

def before_model_callback(
    callback_context: CallbackContext, llm_request: LlmRequest
) -> Optional[LlmResponse]:
    """
    This callback runs before the model processes a request.
    It filters inappropriate content and logs request info.

    Args:
        callback_context: Contains state and context information
        llm_request: The LLM request being sent

    Returns:
        Optional LlmResponse to override model response
    """
    # Get the state and agent name
    state = callback_context.state
    agent_name = callback_context.agent_name

    # Extract the last user message
    last_user_message = ""
    if llm_request.contents and len(llm_request.contents) > 0:
        for content in reversed(llm_request.contents):
            if content.role == "user" and content.parts and len(content.parts) > 0:
                if hasattr(content.parts[0], "text") and content.parts[0].text:
                    last_user_message = content.parts[0].text
                    break

    # Log the request
    print("=== MODEL REQUEST STARTED ===")
    print(f"Agent: {agent_name}")
    if last_user_message:
        print(f"User message: {last_user_message[:100]}...")
        # Store for later use
        state["last_user_message"] = last_user_message
    else:
        print("User message: <empty>")

    print(f"Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

    # Check for inappropriate content
    if last_user_message and "sucks" in last_user_message.lower():
        print("=== INAPPROPRIATE CONTENT BLOCKED ===")
        print("Blocked text containing prohibited word: 'sucks'")

        print("[BEFORE MODEL] ⚠️ Request blocked due to inappropriate content")

        # Return a response to skip the model call
        return LlmResponse(
            content=types.Content(
                role="model",
                parts=[
                    types.Part(
                        text="I cannot respond to messages containing inappropriate language. "
                        "Please rephrase your request without using words like 'sucks'."
                    )
                ],
            )
        )

    # Record start time for duration calculation
    state["model_start_time"] = datetime.now()
    print("[BEFORE MODEL] ✓ Request approved for processing")

    # Return None to proceed with normal model request
    return None

def after_model_callback(
    callback_context: CallbackContext, llm_response: LlmResponse
) -> Optional[LlmResponse]:
    """
    Simple callback that replaces negative words with more positive alternatives.

    Args:
        callback_context: Contains state and context information
        llm_response: The LLM response received

    Returns:
        Optional LlmResponse to override model response
    """
    # Log completion
    print("[AFTER MODEL] Processing response")

    # Skip processing if response is empty or has no text content
    if not llm_response or not llm_response.content or not llm_response.content.parts:
        return None

    # Extract text from the response
    response_text = ""
    for part in llm_response.content.parts:
        if hasattr(part, "text") and part.text:
            response_text += part.text

    if not response_text:
        return None

    # Simple word replacements
    replacements = {
        "problem": "challenge",
        "difficult": "complex",
    }

    # Perform replacements
    modified_text = response_text
    modified = False

    for original, replacement in replacements.items():
        if original in modified_text.lower():
            modified_text = modified_text.replace(original, replacement)
            modified_text = modified_text.replace(
                original.capitalize(), replacement.capitalize()
            )
            modified = True

    # Return modified response if changes were made
    if modified:
        print("[AFTER MODEL] ↺ Modified response text")

        modified_parts = [copy.deepcopy(part) for part in llm_response.content.parts]
        for i, part in enumerate(modified_parts):
            if hasattr(part, "text") and part.text:
                modified_parts[i].text = modified_text

        return LlmResponse(content=types.Content(role="model", parts=modified_parts))

    # Return None to use the original response
    return None

root_agent = Agent(
    name="manager",
    model=os.getenv("GEMINI_MODEL"),
    description="Manager agent",
    instruction="""
    You are a manager of support agents at Incred and your responsibility is to help the customer by understanding their
    queries and delegating the work to the appropriate support agents.

    ## Responsibilities:
    - Always delegate the task to the appropriate support agent. Use your best judgement to determine which agent to delegate to.
    - You are responsible for delegating tasks to the following support agent:
    - emi_loan_agent: This agent is responsible for handling customer inquiries about their EMIs at Incred Finance.
    - todo_agent: This agent is responsible to fetch the user's todo
   
    You can also use the tools available to you to assist in your tasks.
    - get_current_time: To fetch current date and time. 
    """,
    sub_agents=[emi_loan_agent,],
    tools=[
        get_current_time,
    ],
    before_agent_callback=before_agent_callback,
    after_agent_callback=after_agent_callback,
    before_model_callback=before_model_callback,
    after_model_callback=after_model_callback,
)

Expected behavior
ADK should execute these before and after callback functions I've provided.

  1. before_agent_callback=before_agent_callback,
  2. after_agent_callback
  3. before_model_callback
  4. after_model_callback

Model Information:
Live Model used (for audio to audio) - gemini-2.0-flash-live-001

Metadata

Metadata

Assignees

Labels

bot triaged[Bot] This issue is triaged by ADK botlive[Component] This issue is related to live, voice and video chat

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions