Build a Flux-enabled Voice Agent

Build a cascaded voice agent using Flux conversational speech to text, an OpenAI LLM, and Deepgram Aura-2 text to speech.

Flux tackles the most critical challenges for voice agents today: knowing when to listen, when to think, and when to speak. The model features first-of-its-kind model-integrated end-of-turn detection, configurable turn-taking dynamics, and ultra-low latency optimized for voice agent pipelines, all with Nova-3 level accuracy.

If you’d prefer to skip building, managing, and scaling a voice agent yourself β€” explore our Voice Agent API.

Let’s Build!

This guide walks you through building a basic voice agent powered by Deepgram Flux, OpenAI, and Deepgram TTSβ€”streaming speech-to-text with advanced turn detectionβ€”to create natural, real-time conversations with users.

By the end of this guide, you’ll have:

  • A real-time voice agent with sub-second response times
  • A voice agent that uses a static audio file for mocking out a conversation
  • Natural conversation flow with Flux’s advanced turn detection model
  • Voice Activity Detection based interruption handling for responsive interactions
  • A complete setup ready for a demo deployment

Choosing an LLM

Flux supports the use of any LLM you wish to use. So you can use the best LLM for your use case. For this demo we’ll be using OpenAI.

Voice Agent Patterns

For this demo will opt to use EndOfTurn only for simplicity.

Flux enables two voice agent patterns. You can decide which one to use based on your latency vs complexity/cost tradeoffs.

EndOfTurn Only

Considerations:

FactorDetails
PerformanceHigher latency but fewer LLM calls
ComplexitySimpler logic to implement
ExperienceRequires less experience interfacing with LLMs directly

We recommend starting with a purely EndOfTurn driven implementation to get up and running. This means:

  • Update/EagerEndOfTurn/TurnResumed: Use only for transcript reference
  • EndOfTurn: Send transcript to LLM and trigger agent response
  • StartOfTurn: Interrupt agent if speaking, otherwise wait

EagerEndOfTurn + EndOfTurn

For more information EagerEndOfTurn see our guide Optimize Voice Agent Latency with Eager End of Turn

Considerations:

FactorDetails
PerformanceLower latency but more LLM calls
ComplexityMore complex to implement
ExperienceRequires more experience interfacing with LLMs directly
AccuracyEagerEndOfTurn may be followed by TurnResumed if user continues speaking

Once comfortable with End of Turn, you can decide if you need to optimize latency using EagerEndOfTurn. Eager end of turn processing sends medium-confidence transcripts to your LLM before final EndOfTurn certainty, reducing response time. Though consider the LLM trade offs you might need to make.

  • EagerEndOfTurn: Start preparing agent reply (moderate confidence user finished speaking)
  • TurnResumed: Cancel agent reply preparation (user still speaking)
  • EndOfTurn: Proceed with prepared response (user definitely finished)
  • StartOfTurn: Interrupt agent if speaking, otherwise wait

Tuning Turn Detection: You can fine-tune the behavior of these events using the eot_threshold, eager_eot_threshold, and eot_timeout_ms parameters. See the End-of-Turn Configuration for detailed tuning guidance and use-case specific recommendations.

Voice Agent vs Flux Agent Pipeline

Using the Voice Agent API, your pipeline will look like this:

If you want to use Flux with the Voice Agent API set your listen.provider.model toflux-general-en

If you opt to build your own voice agent from scratch, you can use Flux to handle the speech to text and rely on its turn-taking cues to coordinate the rest of your pipeline.

You’ll now be responsible for:

  • Managing audio playback interruptions (barge-in)
  • Sending STT output to your LLM
  • Cancelling LLM responses if user resumes talking
  • Converting LLM output to speech via your chosen TTS provider

EndOfTurn Only Voice Agent Example

Here’s a sample voice agent implementation using Flux with the EndOfTurn only pattern:

1. Install the Deepgram SDK

1 # Install the Deepgram Python SDK
2 # https://github.com/deepgram/deepgram-python-sdk
3 pip install deepgram-sdk

2. Add Dependencies

Install the additional dependencies:

1# Install python-dotenv to protect your API key
2pip install python-dotenv

3. Create a .env file

Create a .env file in your project root with your Deepgram API key and OpenAI API Key.

$touch .env
$DEEPGRAM_API_KEY="your_deepgram_api_key"
>OPENAI_API_KEY="your_open_ai_api_key"

Replace your_deepgram_api_key with your actual Deepgram API key. Replace your_open_ai_api_key with your actual Open API key.

4. Set Imports & Audio File

1import asyncio
2import os
3import sys
4import json
5import urllib.request
6
7from dotenv import load_dotenv
8
9# Load environment variables
10load_dotenv()
11
12AUDIO_FILE = "audio/spacewalk_linear16.wav" # Raw: linear16, linear32, mulaw, alaw, opus, ogg-opus; Containerized: linear16 in WAV, opus in Ogg

5. Transcribe with Flux

1# Transcribe with Flux
2 print("\n🎀 Transcribing with Flux...")
3 transcript = ""
4 done = asyncio.Event()
5
6 def on_flux_message(message: ListenV2SocketClientResponse) -> None:
7 nonlocal transcript
8 if hasattr(message, 'type') and message.type == 'TurnInfo':
9 if hasattr(message, 'event') and message.event == 'EndOfTurn':
10 if hasattr(message, 'transcript') and message.transcript:
11 transcript = message.transcript.strip()
12 print(f"βœ“ Transcript: '{transcript}'")
13 done.set()
14
15 with client.listen.v2.connect(model="flux-general-en", encoding="linear16", sample_rate=16000) as connection:
16 connection.on(EventType.MESSAGE, on_flux_message)
17
18 import threading
19 threading.Thread(target=connection.start_listening, daemon=True).start()
20
21 # Send audio in chunks
22 # Note: For optimal Flux performance, use ~80ms audio chunks
23 # At 16kHz linear16: 80ms = ~2560 bytes. Using 4096 (~128ms) for simplicity in this demo
24 chunk_size = 4096
25 for i in range(0, len(audio_data), chunk_size):
26 connection.send_media(audio_data[i:i + chunk_size])
27 await asyncio.sleep(0.01)
28
29 # Wait for transcript
30 await asyncio.wait_for(done.wait(), timeout=30.0)
31
32 if not transcript:
33 print("❌ No transcript received")
34 return

6. Generate OpenAI Response

1 # Generate OpenAI response
2 print("\nπŸ€– Generating OpenAI response...")
3
4 # Direct HTTP request to OpenAI API
5 openai_data = {
6 "model": "gpt-4o-mini",
7 "messages": [
8 {"role": "system", "content": "You are a helpful assistant. Keep responses concise and conversational."},
9 {"role": "user", "content": transcript}
10 ],
11 "temperature": 0.7,
12 "max_tokens": 100
13 }
14
15 req = urllib.request.Request(
16 "https://api.openai.com/v1/chat/completions",
17 data=json.dumps(openai_data).encode(),
18 headers={
19 "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
20 "Content-Type": "application/json"
21 }
22 )
23
24 try:
25 with urllib.request.urlopen(req) as response_obj:
26 openai_response = json.loads(response_obj.read().decode())
27 response = openai_response["choices"][0]["message"]["content"]
28 print(f"βœ“ Response: '{response}'")
29 except Exception as e:
30 print(f"❌ OpenAI API error: {e}")
31 response = f"I heard you say: {transcript}" # Fallback
32 print(f"βœ“ Fallback response: '{response}'")

7. Generate TTS Response

1# Generate TTS Response
2 print("\nπŸ”Š Generating TTS...")
3 tts_audio = []
4 tts_done = asyncio.Event()
5
6 def on_tts_message(message: SpeakV1SocketClientResponse) -> None:
7 if isinstance(message, bytes):
8 tts_audio.append(message)
9 elif hasattr(message, 'type') and message.type == 'Flushed':
10 tts_done.set()
11
12 with client.speak.v1.connect(model="aura-2-phoebe-en", encoding="linear16", sample_rate=16000) as connection:
13 connection.on(EventType.MESSAGE, on_tts_message)
14
15 threading.Thread(target=connection.start_listening, daemon=True).start()
16
17 connection.send_text(SpeakV1TextMessage(type="Speak", text=response))
18 connection.send_control(SpeakV1ControlMessage(type="Flush"))
19
20 # Wait for TTS completion
21 await asyncio.wait_for(tts_done.wait(), timeout=15.0)

8. Save TTS Audio

1if tts_audio:
2 output_file = "audio/responses/agent_response.wav"
3 combined_audio = b''.join(tts_audio)
4
5 # Create simple WAV header
6 import struct
7 wav_header = struct.pack(
8 '<4sI4s4sIHHIIHH4sI',
9 b'RIFF', 36 + len(combined_audio), b'WAVE', b'fmt ', 16, 1, 1,
10 16000, 32000, 2, 16, b'data', len(combined_audio)
11 )
12
13 with open(output_file, 'wb') as f:
14 f.write(wav_header + combined_audio)
15
16 print(f"πŸ’Ύ Saved TTS audio: {output_file}")
17
18 print("\nπŸŽ‰ Demo complete!")
19 print(f"πŸ“ User: '{transcript}'")
20 print(f"πŸ€– Agent: '{response}'")
21
22if __name__ == "__main__":
23 try:
24 asyncio.run(main())
25 except KeyboardInterrupt:
26 print("\nπŸ‘‹ Demo stopped")
27 except Exception as e:
28 print(f"❌ Error: {e}")

8. Complete Code Example

Here’s the complete working example that combines all the steps. You can also find this code on GitHub.

1import asyncio
2import os
3import sys
4import json
5import urllib.request
6
7from dotenv import load_dotenv
8
9# Load environment variables
10load_dotenv()
11
12AUDIO_FILE = "audio/spacewalk_linear16.wav" # Raw: linear16, linear32, mulaw, alaw, opus, ogg-opus; Containerized: linear16 in WAV, opus in Ogg
13
14async def main():
15 """Main demo function."""
16 print("πŸš€ Deepgram Flux Agent Demo")
17 print("=" * 40)
18
19 # Check for audio file
20 if not os.path.exists(AUDIO_FILE):
21 print(f"❌ Audio file '{AUDIO_FILE}' not found")
22 print("Please add an audio.wav file to this directory")
23 return
24
25 # Read audio file
26 print(f"πŸ“ Reading {AUDIO_FILE}...")
27 with open(AUDIO_FILE, 'rb') as f:
28 audio_data = f.read()
29
30 print(f"βœ“ Read {len(audio_data)} bytes")
31
32 # Import Deepgram
33 from deepgram import DeepgramClient
34 from deepgram.core.events import EventType
35 from deepgram.extensions.types.sockets import ListenV2SocketClientResponse, SpeakV1SocketClientResponse, SpeakV1ControlMessage, ListenV2MediaMessage, SpeakV1TextMessage
36
37 client = DeepgramClient() # The API key retrieval happens automatically in the constructor
38
39 # Transcribe with Flux
40 print("\n🎀 Transcribing with Flux...")
41 transcript = ""
42 done = asyncio.Event()
43
44 def on_flux_message(message: ListenV2SocketClientResponse) -> None:
45 nonlocal transcript
46 if hasattr(message, 'type') and message.type == 'TurnInfo':
47 if hasattr(message, 'event') and message.event == 'EndOfTurn':
48 if hasattr(message, 'transcript') and message.transcript:
49 transcript = message.transcript.strip()
50 print(f"βœ“ Transcript: '{transcript}'")
51 done.set()
52
53 with client.listen.v2.connect(model="flux-general-en", encoding="linear16", sample_rate=16000) as connection:
54 connection.on(EventType.MESSAGE, on_flux_message)
55
56 import threading
57 threading.Thread(target=connection.start_listening, daemon=True).start()
58
59 # Send audio in chunks
60 # Note: For optimal Flux performance, use ~80ms audio chunks
61 # At 16kHz linear16: 80ms = ~2560 bytes. Using 4096 (~128ms) for simplicity in this demo
62 chunk_size = 4096
63 for i in range(0, len(audio_data), chunk_size):
64 connection.send_media(audio_data[i:i + chunk_size])
65 await asyncio.sleep(0.01)
66
67 # Wait for transcript
68 await asyncio.wait_for(done.wait(), timeout=30.0)
69
70 if not transcript:
71 print("❌ No transcript received")
72 return
73
74 # Generate OpenAI response
75 print("\nπŸ€– Generating OpenAI response...")
76
77 # Direct HTTP request to OpenAI API
78 openai_data = {
79 "model": "gpt-4o-mini",
80 "messages": [
81 {"role": "system", "content": "You are a helpful assistant. Keep responses concise and conversational."},
82 {"role": "user", "content": transcript}
83 ],
84 "temperature": 0.7,
85 "max_tokens": 100
86 }
87
88 req = urllib.request.Request(
89 "https://api.openai.com/v1/chat/completions",
90 data=json.dumps(openai_data).encode(),
91 headers={
92 "Authorization": f"Bearer {os.environ.get('OPENAI_API_KEY')}",
93 "Content-Type": "application/json"
94 }
95 )
96
97 try:
98 with urllib.request.urlopen(req) as response_obj:
99 openai_response = json.loads(response_obj.read().decode())
100 response = openai_response["choices"][0]["message"]["content"]
101 print(f"βœ“ Response: '{response}'")
102 except Exception as e:
103 print(f"❌ OpenAI API error: {e}")
104 response = f"I heard you say: {transcript}" # Fallback
105 print(f"βœ“ Fallback response: '{response}'")
106
107 # Generate TTS Response
108 print("\nπŸ”Š Generating TTS...")
109 tts_audio = []
110 tts_done = asyncio.Event()
111
112 def on_tts_message(message: SpeakV1SocketClientResponse) -> None:
113 if isinstance(message, bytes):
114 tts_audio.append(message)
115 elif hasattr(message, 'type') and message.type == 'Flushed':
116 tts_done.set()
117
118 with client.speak.v1.connect(model="aura-2-phoebe-en", encoding="linear16", sample_rate=16000) as connection:
119 connection.on(EventType.MESSAGE, on_tts_message)
120
121 threading.Thread(target=connection.start_listening, daemon=True).start()
122
123 connection.send_text(SpeakV1TextMessage(type="Speak", text=response))
124 connection.send_control(SpeakV1ControlMessage(type="Flush"))
125
126 # Wait for TTS completion
127 await asyncio.wait_for(tts_done.wait(), timeout=15.0)
128
129 # Save TTS audio
130 if tts_audio:
131 output_file = "audio/responses/agent_response.wav"
132 combined_audio = b''.join(tts_audio)
133
134 # Create simple WAV header
135 import struct
136 wav_header = struct.pack(
137 '<4sI4s4sIHHIIHH4sI',
138 b'RIFF', 36 + len(combined_audio), b'WAVE', b'fmt ', 16, 1, 1,
139 16000, 32000, 2, 16, b'data', len(combined_audio)
140 )
141
142 with open(output_file, 'wb') as f:
143 f.write(wav_header + combined_audio)
144
145 print(f"πŸ’Ύ Saved TTS audio: {output_file}")
146
147 print("\nπŸŽ‰ Demo complete!")
148 print(f"πŸ“ User: '{transcript}'")
149 print(f"πŸ€– Agent: '{response}'")
150
151if __name__ == "__main__":
152 try:
153 asyncio.run(main())
154 except KeyboardInterrupt:
155 print("\nπŸ‘‹ Demo stopped")
156 except Exception as e:
157 print(f"❌ Error: {e}")

Additional Flux Demos

For additional demos showcasing Flux, check out the following repositories:

Demo LinkRepositoryTech StackUse Case
Demo LinkRepositoryPython, JS, HTML, CSSBuild a Flux-enabled Voice Agent
N/ARepositoryRustBuild a Flux-enabled Voice Agent