MCPcopy
hub / github.com/openai/openai-python / send_mic_audio

Method send_mic_audio

examples/realtime/push_to_talk_app.py:215–257  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

213 return self.connection
214
215 async def send_mic_audio(self) -> None:
216 import sounddevice as sd # type: ignore
217
218 sent_audio = False
219
220 device_info = sd.query_devices()
221 print(device_info)
222
223 read_size = int(SAMPLE_RATE * 0.02)
224
225 stream = sd.InputStream(
226 channels=CHANNELS,
227 samplerate=SAMPLE_RATE,
228 dtype="int16",
229 )
230 stream.start()
231
232 status_indicator = self.query_one(AudioStatusIndicator)
233
234 try:
235 while True:
236 if stream.read_available < read_size:
237 await asyncio.sleep(0)
238 continue
239
240 await self.should_send_audio.wait()
241 status_indicator.is_recording = True
242
243 data, _ = stream.read(read_size)
244
245 connection = await self._get_connection()
246 if not sent_audio:
247 asyncio.create_task(connection.send({"type": "response.cancel"}))
248 sent_audio = True
249
250 await connection.input_audio_buffer.append(audio=base64.b64encode(cast(Any, data)).decode("utf-8"))
251
252 await asyncio.sleep(0)
253 except KeyboardInterrupt:
254 pass
255 finally:
256 stream.stop()
257 stream.close()
258
259 async def on_key(self, event: events.Key) -> None:
260 """Handle key press events."""

Callers 1

on_mountMethod · 0.95

Calls 8

_get_connectionMethod · 0.95
startMethod · 0.80
decodeMethod · 0.80
stopMethod · 0.80
readMethod · 0.45
sendMethod · 0.45
appendMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected