diff --git a/examples/realtime/demo.py b/examples/realtime/demo.py index e366212d2..65ca63f27 100644 --- a/examples/realtime/demo.py +++ b/examples/realtime/demo.py @@ -93,7 +93,8 @@ async def _on_event(self, event: RealtimeSessionEvent) -> None: self.ui.add_transcript("Audio ended") elif event.type == "audio": np_audio = np.frombuffer(event.audio.data, dtype=np.int16) - self.ui.play_audio(np_audio) + # Play audio in a separate thread to avoid blocking the event loop + await asyncio.to_thread(self.ui.play_audio, np_audio) elif event.type == "audio_interrupted": self.ui.add_transcript("Audio interrupted") elif event.type == "error": diff --git a/examples/realtime/no_ui_demo.py b/examples/realtime/no_ui_demo.py index ff53fea84..be610b43e 100644 --- a/examples/realtime/no_ui_demo.py +++ b/examples/realtime/no_ui_demo.py @@ -1,5 +1,8 @@ import asyncio +import queue import sys +import threading +from typing import Any import numpy as np import sounddevice as sd @@ -46,14 +49,77 @@ def __init__(self) -> None: self.audio_player: sd.OutputStream | None = None self.recording = False + # Audio output state for callback system + self.output_queue: queue.Queue[Any] = queue.Queue(maxsize=10) # Buffer more chunks + self.interrupt_event = threading.Event() + self.current_audio_chunk: np.ndarray | None = None # type: ignore + self.chunk_position = 0 + + def _output_callback(self, outdata, frames: int, time, status) -> None: + """Callback for audio output - handles continuous audio stream from server.""" + if status: + print(f"Output callback status: {status}") + + # Check if we should clear the queue due to interrupt + if self.interrupt_event.is_set(): + # Clear the queue and current chunk state + while not self.output_queue.empty(): + try: + self.output_queue.get_nowait() + except queue.Empty: + break + self.current_audio_chunk = None + self.chunk_position = 0 + self.interrupt_event.clear() + outdata.fill(0) + return + + # Fill output buffer from queue and current chunk + outdata.fill(0) # Start with silence + samples_filled = 0 + + while samples_filled < len(outdata): + # If we don't have a current chunk, try to get one from queue + if self.current_audio_chunk is None: + try: + self.current_audio_chunk = self.output_queue.get_nowait() + self.chunk_position = 0 + except queue.Empty: + # No more audio data available - this causes choppiness + # Uncomment next line to debug underruns: + # print(f"Audio underrun: {samples_filled}/{len(outdata)} samples filled") + break + + # Copy data from current chunk to output buffer + remaining_output = len(outdata) - samples_filled + remaining_chunk = len(self.current_audio_chunk) - self.chunk_position + samples_to_copy = min(remaining_output, remaining_chunk) + + if samples_to_copy > 0: + chunk_data = self.current_audio_chunk[ + self.chunk_position : self.chunk_position + samples_to_copy + ] + # More efficient: direct assignment for mono audio instead of reshape + outdata[samples_filled : samples_filled + samples_to_copy, 0] = chunk_data + samples_filled += samples_to_copy + self.chunk_position += samples_to_copy + + # If we've used up the entire chunk, reset for next iteration + if self.chunk_position >= len(self.current_audio_chunk): + self.current_audio_chunk = None + self.chunk_position = 0 + async def run(self) -> None: print("Connecting, may take a few seconds...") - # Initialize audio player + # Initialize audio player with callback + chunk_size = int(SAMPLE_RATE * CHUNK_LENGTH_S) self.audio_player = sd.OutputStream( channels=CHANNELS, samplerate=SAMPLE_RATE, dtype=FORMAT, + callback=self._output_callback, + blocksize=chunk_size, # Match our chunk timing for better alignment ) self.audio_player.start() @@ -146,15 +212,24 @@ async def _on_event(self, event: RealtimeSessionEvent) -> None: elif event.type == "audio_end": print("Audio ended") elif event.type == "audio": - # Play audio through speakers + # Enqueue audio for callback-based playback np_audio = np.frombuffer(event.audio.data, dtype=np.int16) - if self.audio_player: - try: - self.audio_player.write(np_audio) - except Exception as e: - print(f"Audio playback error: {e}") + try: + self.output_queue.put_nowait(np_audio) + except queue.Full: + # Queue is full - only drop if we have significant backlog + # This prevents aggressive dropping that could cause choppiness + if self.output_queue.qsize() > 8: # Keep some buffer + try: + self.output_queue.get_nowait() + self.output_queue.put_nowait(np_audio) + except queue.Empty: + pass + # If queue isn't too full, just skip this chunk to avoid blocking elif event.type == "audio_interrupted": print("Audio interrupted") + # Signal the output callback to clear its queue and state + self.interrupt_event.set() elif event.type == "error": print(f"Error: {event.error}") elif event.type == "history_updated": diff --git a/pyproject.toml b/pyproject.toml index b654f0c3d..df6915c35 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ requires-python = ">=3.9" license = "MIT" authors = [{ name = "OpenAI", email = "support@openai.com" }] dependencies = [ - "openai>=1.96.0, <2", + "openai>=1.96.1, <2", "pydantic>=2.10, <3", "griffe>=1.5.6, <2", "typing-extensions>=4.12.2, <5", diff --git a/src/agents/realtime/items.py b/src/agents/realtime/items.py index 4d618f3a9..fc05ebc51 100644 --- a/src/agents/realtime/items.py +++ b/src/agents/realtime/items.py @@ -30,6 +30,15 @@ class AssistantText(BaseModel): model_config = ConfigDict(extra="allow") +class AssistantAudio(BaseModel): + type: Literal["audio"] = "audio" + audio: str | None = None + transcript: str | None = None + + # Allow extra data + model_config = ConfigDict(extra="allow") + + class SystemMessageItem(BaseModel): item_id: str previous_item_id: str | None = None @@ -58,7 +67,7 @@ class AssistantMessageItem(BaseModel): type: Literal["message"] = "message" role: Literal["assistant"] = "assistant" status: Literal["in_progress", "completed", "incomplete"] | None = None - content: list[AssistantText] + content: list[Annotated[AssistantText | AssistantAudio, Field(discriminator="type")]] # Allow extra data model_config = ConfigDict(extra="allow") diff --git a/src/agents/realtime/openai_realtime.py b/src/agents/realtime/openai_realtime.py index bbffaeefa..e8a4749e7 100644 --- a/src/agents/realtime/openai_realtime.py +++ b/src/agents/realtime/openai_realtime.py @@ -364,7 +364,9 @@ async def _handle_output_item(self, item: ConversationItem) -> None: "item_id": item.id or "", "type": item.type, "role": item.role, - "content": item.content, + "content": ( + [content.model_dump() for content in item.content] if item.content else [] + ), "status": "in_progress", } ) diff --git a/tests/realtime/test_session.py b/tests/realtime/test_session.py index d95e4e33d..7c1eb53ff 100644 --- a/tests/realtime/test_session.py +++ b/tests/realtime/test_session.py @@ -295,7 +295,7 @@ async def test_item_updated_event_updates_existing_item(self, mock_model, mock_a # Check that item was updated assert len(session._history) == 1 updated_item = cast(AssistantMessageItem, session._history[0]) - assert updated_item.content[0].text == "Updated" + assert updated_item.content[0].text == "Updated" # type: ignore # Should have 2 events: raw + history updated (not added) assert session._event_queue.qsize() == 2 @@ -526,7 +526,7 @@ def test_update_existing_item_by_id(self): # Item should be updated result_item = cast(AssistantMessageItem, new_history[0]) assert result_item.item_id == "item_1" - assert result_item.content[0].text == "Updated" + assert result_item.content[0].text == "Updated" # type: ignore def test_update_existing_item_preserves_order(self): """Test that updating existing item preserves its position in history""" @@ -559,13 +559,13 @@ def test_update_existing_item_preserves_order(self): # Middle item should be updated updated_result = cast(AssistantMessageItem, new_history[1]) - assert updated_result.content[0].text == "Updated Second" + assert updated_result.content[0].text == "Updated Second" # type: ignore # Other items should be unchanged item1_result = cast(AssistantMessageItem, new_history[0]) item3_result = cast(AssistantMessageItem, new_history[2]) - assert item1_result.content[0].text == "First" - assert item3_result.content[0].text == "Third" + assert item1_result.content[0].text == "First" # type: ignore + assert item3_result.content[0].text == "Third" # type: ignore def test_insert_new_item_after_previous_item(self): """Test inserting new item after specified previous_item_id""" @@ -600,7 +600,7 @@ def test_insert_new_item_after_previous_item(self): # Content should be correct item2_result = cast(AssistantMessageItem, new_history[1]) - assert item2_result.content[0].text == "Second" + assert item2_result.content[0].text == "Second" # type: ignore def test_insert_new_item_after_nonexistent_previous_item(self): """Test that item with nonexistent previous_item_id gets added to end""" @@ -703,7 +703,7 @@ def test_complex_insertion_scenario(self): assert len(history) == 4 assert [item.item_id for item in history] == ["A", "B", "D", "C"] itemB_result = cast(AssistantMessageItem, history[1]) - assert itemB_result.content[0].text == "Updated B" + assert itemB_result.content[0].text == "Updated B" # type: ignore # Test 3: Tool call execution flow (_handle_tool_call method) diff --git a/uv.lock b/uv.lock index 3bff55bf9..918c3d0be 100644 --- a/uv.lock +++ b/uv.lock @@ -1461,7 +1461,7 @@ wheels = [ [[package]] name = "openai" -version = "1.96.0" +version = "1.96.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio" }, @@ -1473,9 +1473,9 @@ dependencies = [ { name = "tqdm" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/47/7d/dbc636786f8bf029600abdbf89da74706bdde37c4fe1471ce78834a7ecaa/openai-1.96.0.tar.gz", hash = "sha256:36e34b5aa2c9c0380c1934fa16ba53b3b3c6462450b4c008b98859b9b6424cf7", size = 488898, upload-time = "2025-07-15T15:56:52.853Z" } +sdist = { url = "https://files.pythonhosted.org/packages/2f/b5/18fd5e1b6b6c7dca52d60307b3637f9e9e3206a8041a9c8028985dbc6260/openai-1.96.1.tar.gz", hash = "sha256:6d505b5cc550e036bfa3fe99d6cff565b11491d12378d4c353f92ef72b0a408a", size = 489065, upload-time = "2025-07-15T21:39:37.215Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/dc/63/e29319a52449b7ac4c3a13a1448a9fa326aa1263478eb4c4a9bcbbe95648/openai-1.96.0-py3-none-any.whl", hash = "sha256:4dee023520f8a70ddeaa9f4d6fb247e6dcafd79d2ebb415e3f85932d95aa64a0", size = 757092, upload-time = "2025-07-15T15:56:50.387Z" }, + { url = "https://files.pythonhosted.org/packages/4f/57/325bbdbdc27b47309be35cb4e0eb8980b0c1bc997194c797c3691d88ae41/openai-1.96.1-py3-none-any.whl", hash = "sha256:0afaab2019bae8e145e7a1baf6953167084f019dd15042c65edd117398c1eb1c", size = 757454, upload-time = "2025-07-15T21:39:34.517Z" }, ] [[package]] @@ -1539,7 +1539,7 @@ requires-dist = [ { name = "litellm", marker = "extra == 'litellm'", specifier = ">=1.67.4.post1,<2" }, { name = "mcp", marker = "python_full_version >= '3.10'", specifier = ">=1.9.4,<2" }, { name = "numpy", marker = "python_full_version >= '3.10' and extra == 'voice'", specifier = ">=2.2.0,<3" }, - { name = "openai", specifier = ">=1.96.0,<2" }, + { name = "openai", specifier = ">=1.96.1,<2" }, { name = "pydantic", specifier = ">=2.10,<3" }, { name = "requests", specifier = ">=2.0,<3" }, { name = "types-requests", specifier = ">=2.0,<3" },