Skip to content

Commit 937437d

Browse files
committed
Realtime: move demo audio to separate thread
1 parent 479c171 commit 937437d

File tree

6 files changed

+97
-15
lines changed

6 files changed

+97
-15
lines changed

examples/realtime/demo.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ async def _on_event(self, event: RealtimeSessionEvent) -> None:
9393
self.ui.add_transcript("Audio ended")
9494
elif event.type == "audio":
9595
np_audio = np.frombuffer(event.audio.data, dtype=np.int16)
96-
self.ui.play_audio(np_audio)
96+
# Play audio in a separate thread to avoid blocking the event loop
97+
await asyncio.to_thread(self.ui.play_audio, np_audio)
9798
elif event.type == "audio_interrupted":
9899
self.ui.add_transcript("Audio interrupted")
99100
elif event.type == "error":

examples/realtime/no_ui_demo.py

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import asyncio
2+
import queue
23
import sys
4+
import threading
35

46
import numpy as np
57
import sounddevice as sd
@@ -45,15 +47,76 @@ def __init__(self) -> None:
4547
self.audio_stream: sd.InputStream | None = None
4648
self.audio_player: sd.OutputStream | None = None
4749
self.recording = False
50+
51+
# Audio output state for callback system
52+
self.output_queue: queue.Queue[np.ndarray] = queue.Queue(maxsize=10) # Buffer more chunks
53+
self.interrupt_event = threading.Event()
54+
self.current_audio_chunk: np.ndarray | None = None
55+
self.chunk_position = 0
56+
57+
def _output_callback(self, outdata: np.ndarray, frames: int, time, status) -> None:
58+
"""Callback for audio output - handles continuous audio stream from server."""
59+
if status:
60+
print(f"Output callback status: {status}")
61+
62+
# Check if we should clear the queue due to interrupt
63+
if self.interrupt_event.is_set():
64+
# Clear the queue and current chunk state
65+
while not self.output_queue.empty():
66+
try:
67+
self.output_queue.get_nowait()
68+
except queue.Empty:
69+
break
70+
self.current_audio_chunk = None
71+
self.chunk_position = 0
72+
self.interrupt_event.clear()
73+
outdata.fill(0)
74+
return
75+
76+
# Fill output buffer from queue and current chunk
77+
outdata.fill(0) # Start with silence
78+
samples_filled = 0
79+
80+
while samples_filled < len(outdata):
81+
# If we don't have a current chunk, try to get one from queue
82+
if self.current_audio_chunk is None:
83+
try:
84+
self.current_audio_chunk = self.output_queue.get_nowait()
85+
self.chunk_position = 0
86+
except queue.Empty:
87+
# No more audio data available - this causes choppiness
88+
# Uncomment next line to debug underruns:
89+
# print(f"Audio underrun: {samples_filled}/{len(outdata)} samples filled")
90+
break
91+
92+
# Copy data from current chunk to output buffer
93+
remaining_output = len(outdata) - samples_filled
94+
remaining_chunk = len(self.current_audio_chunk) - self.chunk_position
95+
samples_to_copy = min(remaining_output, remaining_chunk)
96+
97+
if samples_to_copy > 0:
98+
chunk_data = self.current_audio_chunk[self.chunk_position:self.chunk_position + samples_to_copy]
99+
# More efficient: direct assignment for mono audio instead of reshape
100+
outdata[samples_filled:samples_filled + samples_to_copy, 0] = chunk_data
101+
samples_filled += samples_to_copy
102+
self.chunk_position += samples_to_copy
103+
104+
# If we've used up the entire chunk, reset for next iteration
105+
if self.chunk_position >= len(self.current_audio_chunk):
106+
self.current_audio_chunk = None
107+
self.chunk_position = 0
48108

49109
async def run(self) -> None:
50110
print("Connecting, may take a few seconds...")
51111

52-
# Initialize audio player
112+
# Initialize audio player with callback
113+
chunk_size = int(SAMPLE_RATE * CHUNK_LENGTH_S)
53114
self.audio_player = sd.OutputStream(
54115
channels=CHANNELS,
55116
samplerate=SAMPLE_RATE,
56117
dtype=FORMAT,
118+
callback=self._output_callback,
119+
blocksize=chunk_size, # Match our chunk timing for better alignment
57120
)
58121
self.audio_player.start()
59122

@@ -146,15 +209,24 @@ async def _on_event(self, event: RealtimeSessionEvent) -> None:
146209
elif event.type == "audio_end":
147210
print("Audio ended")
148211
elif event.type == "audio":
149-
# Play audio through speakers
212+
# Enqueue audio for callback-based playback
150213
np_audio = np.frombuffer(event.audio.data, dtype=np.int16)
151-
if self.audio_player:
152-
try:
153-
self.audio_player.write(np_audio)
154-
except Exception as e:
155-
print(f"Audio playback error: {e}")
214+
try:
215+
self.output_queue.put_nowait(np_audio)
216+
except queue.Full:
217+
# Queue is full - only drop if we have significant backlog
218+
# This prevents aggressive dropping that could cause choppiness
219+
if self.output_queue.qsize() > 8: # Keep some buffer
220+
try:
221+
self.output_queue.get_nowait()
222+
self.output_queue.put_nowait(np_audio)
223+
except queue.Empty:
224+
pass
225+
# If queue isn't too full, just skip this chunk to avoid blocking
156226
elif event.type == "audio_interrupted":
157227
print("Audio interrupted")
228+
# Signal the output callback to clear its queue and state
229+
self.interrupt_event.set()
158230
elif event.type == "error":
159231
print(f"Error: {event.error}")
160232
elif event.type == "history_updated":

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ requires-python = ">=3.9"
77
license = "MIT"
88
authors = [{ name = "OpenAI", email = "support@openai.com" }]
99
dependencies = [
10-
"openai>=1.96.0, <2",
10+
"openai>=1.96.1, <2",
1111
"pydantic>=2.10, <3",
1212
"griffe>=1.5.6, <2",
1313
"typing-extensions>=4.12.2, <5",

src/agents/realtime/items.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@ class AssistantText(BaseModel):
3030
model_config = ConfigDict(extra="allow")
3131

3232

33+
class AssistantAudio(BaseModel):
34+
type: Literal["audio"] = "audio"
35+
audio: str | None = None
36+
transcript: str | None = None
37+
38+
# Allow extra data
39+
model_config = ConfigDict(extra="allow")
40+
41+
3342
class SystemMessageItem(BaseModel):
3443
item_id: str
3544
previous_item_id: str | None = None
@@ -58,7 +67,7 @@ class AssistantMessageItem(BaseModel):
5867
type: Literal["message"] = "message"
5968
role: Literal["assistant"] = "assistant"
6069
status: Literal["in_progress", "completed", "incomplete"] | None = None
61-
content: list[AssistantText]
70+
content: list[Annotated[AssistantText | AssistantAudio, Field(discriminator="type")]]
6271

6372
# Allow extra data
6473
model_config = ConfigDict(extra="allow")

src/agents/realtime/openai_realtime.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ async def _handle_output_item(self, item: ConversationItem) -> None:
364364
"item_id": item.id or "",
365365
"type": item.type,
366366
"role": item.role,
367-
"content": item.content,
367+
"content": [content.model_dump() for content in item.content],
368368
"status": "in_progress",
369369
}
370370
)

uv.lock

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)