Flux Voice Agent

Use Flux to build a composite voice agent with OpenAI and Deepgram TTS.

Flux tackles the most critical challenges for voice agents today: knowing when to listen, when to think, and when to speak. The model features first-of-its-kind model-integrated end-of-turn detection, configurable turn-taking dynamics, and ultra-low latency optimized for voice agent pipelines, all with Nova-3 level accuracy.

If you’d prefer to skip building, managing, and scaling a voice agent yourself β€” explore our Voice Agent API.

Let’s Build!

This guide walks you through building a basic voice agent powered by Deepgram Flux, OpenAI, and Deepgram TTSβ€”streaming speech-to-text with advanced turn detectionβ€”to create natural, real-time conversations with users.

By the end of this guide, you’ll have:

  • A real-time voice agent with sub-second response times
  • A voice agent that uses a static audio file for mocking out a conversation
  • Natural conversation flow with Flux’s advanced turn detection model
  • Voice Activity Detection based interruption handling for responsive interactions
  • A complete setup ready for a demo deployment

Choosing an LLM

Flux supports the use of any LLM you wish to use. So you can use the best LLM for your use case. For this demo we’ll be using OpenAI.

Voice Agent Patterns

For this demo will opt to use EndOfTurn only for simplicity.

Flux enables two voice agent patterns. You can decide which one to use based on your latency vs complexity/cost tradeoffs.

EndOfTurn Only

Considerations:

FactorDetails
PerformanceHigher latency but fewer LLM calls
ComplexitySimpler logic to implement
ExperienceRequires less experience interfacing with LLMs directly

We recommend starting with a purely EndOfTurn driven implementation to get up and running. This means:

  • Update/EagerEndOfTurn/TurnResumed: Use only for transcript reference
  • EndOfTurn: Send transcript to LLM and trigger agent response
  • StartOfTurn: Interrupt agent if speaking, otherwise wait

EagerEndOfTurn + EndOfTurn

For more information EagerEndOfTurn see our guide Optimize Voice Agent Latency with Eager End of Turn

Considerations:

FactorDetails
PerformanceLower latency but more LLM calls
ComplexityMore complex to implement
ExperienceRequires more experience interfacing with LLMs directly
AccuracyTranscripts may differ from final version (~1% beyond punctuation changes)

Once comfortable with End of Turn, you can decide if you need to optimize latency using EagerEndOfTurn. Eager end of turn processing sends medium-confidence transcripts to your LLM before final EndOfTurn certainty, reducing response time. Though consider the LLM trade offs you might need to make.

  • EagerEndOfTurn: Start preparing agent reply (moderate confidence user finished speaking)
  • TurnResumed: Cancel agent reply preparation (user still speaking)
  • EndOfTurn: Proceed with prepared response (user definitely finished)
  • StartOfTurn: Interrupt agent if speaking, otherwise wait

Voice Agent vs Flux Agent Pipeline

Using the Voice Agent API your pipeline will look like this:

If you want to use Flux with the Voice Agent API set your listen.provider.model toflux-general-en

In comparison, Flux only handles the STT processing β€” everything else becomes modular and under your control.

You’ll now be responsible for:

  • Managing audio playback interruptions (barge-in)
  • Sending STT output to your LLM
  • Cancelling LLM responses if user resumes talking
  • Converting LLM output to speech via your chosen TTS provider

EndOfTurn Only Voice Agent Example

Here’s a sample voice agent implementation using Flux with the EndOfTurn only pattern:

1. Install the Deepgram SDK

1 # Install the Deepgram Python SDK
2 # https://github.com/deepgram/deepgram-python-sdk
3 pip install deepgram-sdk

2. Add Dependencies

Install the additional dependencies:

1# Install python-dotenv to protect your API key
2pip install python-dotenv

3. Create a .env file

Create a .env file in your project root with your Deepgram API key and OpenAI API Key.

$touch .env
$DEEPGRAM_API_KEY="your_deepgram_api_key"
>OPENAI_API_KEY="your_open_ai_api_key"

Replace your_deepgram_api_key with your actual Deepgram API key. Replace your_open_ai_api_key with your actual Open API key.

4. Set Imports & Audio File

1import asyncio
2import os
3import sys
4import json
5import urllib.request
6
7from dotenv import load_dotenv
8
9# Load environment variables
10load_dotenv()
11
12AUDIO_FILE = "audio/spacewalk_linear16.wav" # Your audio file. Must be linear16.

5. Transcribe with Flux

1# Transcribe with Flux
2 print("\n🎀 Transcribing with Flux...")
3 transcript = ""
4 done = asyncio.Event()
5
6 def on_flux_message(message: ListenV2SocketClientResponse) -> None:
7 nonlocal transcript
8 if hasattr(message, 'type') and message.type == 'TurnInfo':
9 if hasattr(message, 'event') and message.event == 'EndOfTurn':
10 if hasattr(message, 'transcript') and message.transcript:
11 transcript = message.transcript.strip()
12 print(f"βœ“ Transcript: '{transcript}'")
13 done.set()
14
15 with client.listen.v2.connect(model="flux-general-en", encoding="linear16", sample_rate=16000) as connection:
16 connection.on(EventType.MESSAGE, on_flux_message)
17
18 import threading
19 threading.Thread(target=connection.start_listening, daemon=True).start()
20
21 # Send audio in chunks
22 chunk_size = 4096
23 for i in range(0, len(audio_data), chunk_size):
24 connection.send_media(audio_data[i:i + chunk_size])
25 await asyncio.sleep(0.01)
26
27 # Wait for transcript
28 await asyncio.wait_for(done.wait(), timeout=30.0)
29
30 if not transcript:
31 print("❌ No transcript received")
32 return

6. Generate OpenAI Response

1 # Generate OpenAI response
2 print("\nπŸ€– Generating OpenAI response...")
3
4 # Direct HTTP request to OpenAI API
5 openai_data = {
6 "model": "gpt-4o-mini",
7 "messages": [
8 {"role": "system", "content": "You are a helpful assistant. Keep responses concise and conversational."},
9 {"role": "user", "content": transcript}
10 ],
11 "temperature": 0.7,
12 "max_tokens": 100
13 }
14
15 req = urllib.request.Request(
16 "https://api.openai.com/v1/chat/completions",
17 data=json.dumps(openai_data).encode(),
18 headers={
19 "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
20 "Content-Type": "application/json"
21 }
22 )
23
24 try:
25 with urllib.request.urlopen(req) as response_obj:
26 openai_response = json.loads(response_obj.read().decode())
27 response = openai_response["choices"][0]["message"]["content"]
28 print(f"βœ“ Response: '{response}'")
29 except Exception as e:
30 print(f"❌ OpenAI API error: {e}")
31 response = f"I heard you say: {transcript}" # Fallback
32 print(f"βœ“ Fallback response: '{response}'")

7. Generate TTS Response

1# Generate TTS Response
2 print("\nπŸ”Š Generating TTS...")
3 tts_audio = []
4 tts_done = asyncio.Event()
5
6 def on_tts_message(message: SpeakV1SocketClientResponse) -> None:
7 if isinstance(message, bytes):
8 tts_audio.append(message)
9 elif hasattr(message, 'type') and message.type == 'Flushed':
10 tts_done.set()
11
12 with client.speak.v1.connect(model="aura-2-phoebe-en", encoding="linear16", sample_rate=16000) as connection:
13 connection.on(EventType.MESSAGE, on_tts_message)
14
15 threading.Thread(target=connection.start_listening, daemon=True).start()
16
17 connection.send_text(SpeakV1TextMessage(type="Speak", text=response))
18 connection.send_control(SpeakV1ControlMessage(type="Flush"))
19
20 # Wait for TTS completion
21 await asyncio.wait_for(tts_done.wait(), timeout=15.0)

8. Save TTS Audio

1if tts_audio:
2 output_file = "audio/responses/agent_response.wav"
3 combined_audio = b''.join(tts_audio)
4
5 # Create simple WAV header
6 import struct
7 wav_header = struct.pack(
8 '<4sI4s4sIHHIIHH4sI',
9 b'RIFF', 36 + len(combined_audio), b'WAVE', b'fmt ', 16, 1, 1,
10 16000, 32000, 2, 16, b'data', len(combined_audio)
11 )
12
13 with open(output_file, 'wb') as f:
14 f.write(wav_header + combined_audio)
15
16 print(f"πŸ’Ύ Saved TTS audio: {output_file}")
17
18 print("\nπŸŽ‰ Demo complete!")
19 print(f"πŸ“ User: '{transcript}'")
20 print(f"πŸ€– Agent: '{response}'")
21
22if __name__ == "__main__":
23 try:
24 asyncio.run(main())
25 except KeyboardInterrupt:
26 print("\nπŸ‘‹ Demo stopped")
27 except Exception as e:
28 print(f"❌ Error: {e}")

8. Complete Code Example

Here’s the complete working example that combines all the steps. You can also find this code on GitHub.

1import asyncio
2import os
3import sys
4import json
5import urllib.request
6
7from dotenv import load_dotenv
8
9# Load environment variables
10load_dotenv()
11
12AUDIO_FILE = "audio/spacewalk_linear16.wav" # Your audio file. Must be linear16.
13
14async def main():
15 """Main demo function."""
16 print("πŸš€ Deepgram Flux Agent Demo")
17 print("=" * 40)
18
19 # Check for audio file
20 if not os.path.exists(AUDIO_FILE):
21 print(f"❌ Audio file '{AUDIO_FILE}' not found")
22 print("Please add an audio.wav file to this directory")
23 return
24
25 # Read audio file
26 print(f"πŸ“ Reading {AUDIO_FILE}...")
27 with open(AUDIO_FILE, 'rb') as f:
28 audio_data = f.read()
29
30 print(f"βœ“ Read {len(audio_data)} bytes")
31
32 # Import Deepgram
33 from deepgram import DeepgramClient
34 from deepgram.core.events import EventType
35 from deepgram.extensions.types.sockets import ListenV2SocketClientResponse, SpeakV1SocketClientResponse, SpeakV1ControlMessage, ListenV2MediaMessage, SpeakV1TextMessage
36
37 client = DeepgramClient() # The API key retrieval happens automatically in the constructor
38
39 # Transcribe with Flux
40 print("\n🎀 Transcribing with Flux...")
41 transcript = ""
42 done = asyncio.Event()
43
44 def on_flux_message(message: ListenV2SocketClientResponse) -> None:
45 nonlocal transcript
46 if hasattr(message, 'type') and message.type == 'TurnInfo':
47 if hasattr(message, 'event') and message.event == 'EndOfTurn':
48 if hasattr(message, 'transcript') and message.transcript:
49 transcript = message.transcript.strip()
50 print(f"βœ“ Transcript: '{transcript}'")
51 done.set()
52
53 with client.listen.v2.connect(model="flux-general-en", encoding="linear16", sample_rate=16000) as connection:
54 connection.on(EventType.MESSAGE, on_flux_message)
55
56 import threading
57 threading.Thread(target=connection.start_listening, daemon=True).start()
58
59 # Send audio in chunks
60 chunk_size = 4096
61 for i in range(0, len(audio_data), chunk_size):
62 connection.send_media(audio_data[i:i + chunk_size])
63 await asyncio.sleep(0.01)
64
65 # Wait for transcript
66 await asyncio.wait_for(done.wait(), timeout=30.0)
67
68 if not transcript:
69 print("❌ No transcript received")
70 return
71
72 # Generate OpenAI response
73 print("\nπŸ€– Generating OpenAI response...")
74
75 # Direct HTTP request to OpenAI API
76 openai_data = {
77 "model": "gpt-4o-mini",
78 "messages": [
79 {"role": "system", "content": "You are a helpful assistant. Keep responses concise and conversational."},
80 {"role": "user", "content": transcript}
81 ],
82 "temperature": 0.7,
83 "max_tokens": 100
84 }
85
86 req = urllib.request.Request(
87 "https://api.openai.com/v1/chat/completions",
88 data=json.dumps(openai_data).encode(),
89 headers={
90 "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
91 "Content-Type": "application/json"
92 }
93 )
94
95 try:
96 with urllib.request.urlopen(req) as response_obj:
97 openai_response = json.loads(response_obj.read().decode())
98 response = openai_response["choices"][0]["message"]["content"]
99 print(f"βœ“ Response: '{response}'")
100 except Exception as e:
101 print(f"❌ OpenAI API error: {e}")
102 response = f"I heard you say: {transcript}" # Fallback
103 print(f"βœ“ Fallback response: '{response}'")
104
105 # Generate TTS Response
106 print("\nπŸ”Š Generating TTS...")
107 tts_audio = []
108 tts_done = asyncio.Event()
109
110 def on_tts_message(message: SpeakV1SocketClientResponse) -> None:
111 if isinstance(message, bytes):
112 tts_audio.append(message)
113 elif hasattr(message, 'type') and message.type == 'Flushed':
114 tts_done.set()
115
116 with client.speak.v1.connect(model="aura-2-phoebe-en", encoding="linear16", sample_rate=16000) as connection:
117 connection.on(EventType.MESSAGE, on_tts_message)
118
119 threading.Thread(target=connection.start_listening, daemon=True).start()
120
121 connection.send_text(SpeakV1TextMessage(type="Speak", text=response))
122 connection.send_control(SpeakV1ControlMessage(type="Flush"))
123
124 # Wait for TTS completion
125 await asyncio.wait_for(tts_done.wait(), timeout=15.0)
126
127 # Save TTS audio
128 if tts_audio:
129 output_file = "audio/responses/agent_response.wav"
130 combined_audio = b''.join(tts_audio)
131
132 # Create simple WAV header
133 import struct
134 wav_header = struct.pack(
135 '<4sI4s4sIHHIIHH4sI',
136 b'RIFF', 36 + len(combined_audio), b'WAVE', b'fmt ', 16, 1, 1,
137 16000, 32000, 2, 16, b'data', len(combined_audio)
138 )
139
140 with open(output_file, 'wb') as f:
141 f.write(wav_header + combined_audio)
142
143 print(f"πŸ’Ύ Saved TTS audio: {output_file}")
144
145 print("\nπŸŽ‰ Demo complete!")
146 print(f"πŸ“ User: '{transcript}'")
147 print(f"πŸ€– Agent: '{response}'")
148
149if __name__ == "__main__":
150 try:
151 asyncio.run(main())
152 except KeyboardInterrupt:
153 print("\nπŸ‘‹ Demo stopped")
154 except Exception as e:
155 print(f"❌ Error: {e}")

Additional Flux Demos

For additional demos showcasing Flux, check out the following repositories:

Demo LinkRepositoryTech StackUse Case
Demo LinkRepositoryPython, JS, HTML, CSSFlux Voice Agent
N/ARepositoryRustFlux Voice Agent