Getting Started with Flux

Flux is the first conversational speech recognition model built specifically for voice agents. Unlike traditional STT that just transcribes words, Flux understands conversational flow and automatically handles turn-taking.

Flux tackles the most critical challenges for voice agents today: knowing when to listen, when to think, and when to speak. The model features first-of-its-kind model-integrated end-of-turn detection, configurable turn-taking dynamics, and ultra-low latency optimized for voice agent pipelines, all with Nova-3 level accuracy.

Flux is Perfect for: turn-based voice agents, customer service bots, phone assistants, and real-time conversation tools.

Key Benefits:

  • Smart turn detection β€” Knows when speakers finish talking
  • Ultra-low latency β€” ~260ms end-of-turn detection
  • Early LLM responses β€” EagerEndOfTurn events for faster replies
  • Turn-based transcripts β€” Clean conversation structure
  • Natural interruptions β€” Built-in barge-in handling
  • Nova-3 accuracy β€” Best-in-class transcription quality

For more information on how Flux manages turns, see the Flux State Machine Guide guide.

Important: Flux Connection Requirements

Flux requires the /v2/listen endpoint β€” Using /v1/listen will not work with Flux.

When connecting to Flux, you must use:

  • Endpoint: /v2/listen (not /v1/listen)
  • Model: flux-general-en
  • Audio Format: See Audio Format Requirements table below
  • Chunk Size: 80ms audio chunks strongly recommended for optimal model performance and latency

Audio Format Requirements

Audio TypeEncodingContainerencoding paramsample_rate paramSupported Sample Rates
Rawlinear16, linear32, mulaw, alaw, opus, ogg-opusNoneRequiredRequired (16000 recommended)8000, 16000, 24000, 44100, 48000
Containerizedlinear16WAVOmitOmitAuto-detected from container
ContainerizedopusOggOmitOmitAuto-detected from container

WebSocket URL Format:

wss://api.deepgram.com/v2/listen?model=flux-general-en

When using the Deepgram SDK, use client.listen.v2.connect() to access the v2 endpoint. For direct WebSocket connections, ensure you’re using /v2/listen in your URL.

Configurable Parameters

Flux provides three key parameters to control end-of-turn detection behavior and optimize your voice agent’s conversational flow:

End-of-Turn Detection Parameters

ParameterRangeDefaultDescription
eot_threshold0.5 - 0.90.7Confidence required to trigger an EndOfTurn event. Higher values = more reliable turn detection but slightly increased latency.
eager_eot_threshold0.3 - 0.9NoneConfidence required to trigger an EagerEndOfTurn event. Required to enable early response generation. Lower values = earlier triggers but more false starts.
eot_timeout_ms500 - 100005000Maximum milliseconds of silence before forcing an EndOfTurn, regardless of confidence.

When to Configure These Parameters

For most use cases, the default eot_threshold=0.7 works well. You only need to configure these parameters if:

  • You want faster responses: Set eager_eot_threshold to enable EagerEndOfTurn events and start LLM processing before the user fully finishes speaking
  • Your users speak with long pauses: Increase eot_timeout_ms to avoid cutting off turns prematurely
  • You need more reliable turn detection: Increase eot_threshold to reduce false positives (at the cost of slightly higher latency)
  • You want more aggressive turn detection: Lower eot_threshold to trigger turns earlier

Important: Setting eager_eot_threshold enables EagerEndOfTurn and TurnResumed events. These events allow you to start preparing LLM responses early, reducing end-to-end latency by hundreds of milliseconds. See the Eager End-of-Turn Optimization Guide for implementation strategies.

Cost Consideration: Using EagerEndOfTurn can increase LLM API calls by 50-70% due to speculative response generation. The TurnResumed event signals when to cancel a draft response because the user continued speaking.

For comprehensive parameter documentation and tuning guidance, see the End-of-Turn Configuration.

Using Flux: SDK vs Direct WebSocket

1from deepgram import AsyncDeepgramClient
2
3client = AsyncDeepgramClient()
4
5# SDK automatically uses /v2/listen endpoint
6async with client.listen.v2.connect(
7 model="flux-general-en",
8 encoding="linear16",
9 sample_rate="16000"
10) as connection:
11 # Your code here
12 pass

Common Mistakes to Avoid:

  • ❌ Using /v1/listen instead of /v2/listen
  • ❌ Using model=flux instead of model=flux-general-en
  • ❌ Using language=en parameter (use model=flux-general-en instead)
  • ❌ Specifying encoding or sample_rate when sending containerized audio (omit these for containerized formats)

Let’s Build!

This guide walks you through building a basic streaming transcription application powered by Deepgram Flux and the Deepgram SDK.

By the end of this guide, you’ll have:

  • A real-time streaming transcription application with sub-second response times using the BBC Real Time Live Stream as your audio.
  • Natural conversation flow with Flux’s advanced turn detection model
  • Voice Activity Detection based interruption handling for responsive interactions
  • A working demo you can build on!

Audio Stream

To handle the audio stream will be using the following conversion approach:

1. Install the Deepgram SDK

1 # Install the Deepgram Python SDK
2 # https://github.com/deepgram/deepgram-python-sdk
3 pip install deepgram-sdk

2. Add Dependencies

Install the additional dependencies:

1# Install python-dotenv to protect your API key
2pip install python-dotenv

3. Install FFMPEG on your machine

You will need the actual FFmpeg binary installed to run this demo:

  • macOS: brew install ffmpeg
  • Ubuntu/Debian: sudo apt install ffmpeg
  • Windows: Download from https://ffmpeg.org/

4. Create a .env file

Create a .env file in your project root with your Deepgram API key:

$touch .env
$DEEPGRAM_API_KEY="your_deepgram_api_key"

Replace your_deepgram_api_key with your actual Deepgram API key.

4. Set Imports and Set Audio Stream Colors

Core Dependencies:

  • asyncio - Handles concurrent audio streaming and Deepgram connection
  • subprocess - Manages FFmpeg process for audio conversion
  • dotenv - Loads Deepgram API key from .env file

Deepgram SDK:

  • AsyncDeepgramClient - Main client for Flux API connection
  • EventType - WebSocket event constants (OPEN, MESSAGE, CLOSE, ERROR)
  • ListenV2SocketClientResponse - Type hints for incoming transcription messages

Configuration:

  • STREAM_URL - BBC World Service streaming audio endpoint

Visual Feedback System:

  • Colors class - ANSI terminal color codes for confidence visualization
  • get_confidence_color() - Maps confidence scores to colors:
    • Green (0.90-1.00): High confidence
    • Yellow (0.80-0.90): Good confidence
    • Orange (0.70-0.80): Lower confidence
    • Red (≀0.69): Low confidence

Purpose: Sets up the foundation for real-time streaming transcription with visual quality indicators, making it easy to spot transcription accuracy at a glance.

1import asyncio
2import subprocess
3from dotenv import load_dotenv
4
5# Load environment variables from .env file
6load_dotenv()
7
8from deepgram import AsyncDeepgramClient
9from deepgram.core.events import EventType
10from deepgram.extensions.types.sockets import ListenV2SocketClientResponse
11
12# URL for the realtime streaming audio to transcribe
13STREAM_URL = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service"
14
15# Terminal color codes
16class Colors:
17 GREEN = '\033[92m' # 0.90-1.00
18 YELLOW = '\033[93m' # 0.80-0.90
19 ORANGE = '\033[91m' # 0.70-0.80 (using red as orange isn't standard)
20 RED = '\033[31m' # <=0.69
21 RESET = '\033[0m' # Reset to default
22
23def get_confidence_color(confidence: float) -> str:
24 """Return the appropriate color code based on confidence score"""
25 if confidence >= 0.90:
26 return Colors.GREEN
27 elif confidence >= 0.80:
28 return Colors.YELLOW
29 elif confidence >= 0.70:
30 return Colors.ORANGE
31 else:
32 return Colors.RED

5. Connect to Flux and Process Audio

The main function orchestrates real-time transcription of streaming audio URLs:

  • Initialize: Creates AsyncDeepgramClient and connects to Flux with required linear16 format
  • Event Handling: Sets up message handler that displays transcriptions with color-coded confidence scores
  • Audio Pipeline: Launches FFmpeg subprocess to convert compressed stream URL to linear16 PCM format
  • Streaming Loop: Reads converted audio chunks and pipes them to Deepgram Flux connection
  • Concurrent Tasks: Runs Deepgram listener and audio conversion simultaneously using asyncio
  • Error Handling: Manages FFmpeg errors and connection timeouts (60s default)

The function handles both the audio conversion requirement (Flux only accepts linear16) and real-time streaming coordination between multiple async processes.

1async def main():
2 """Main async function to handle URL streaming to Deepgram Flux"""
3
4 # Create the Deepgram async client
5 client = AsyncDeepgramClient() # The API key retrieval happens automatically in the constructor
6
7 try:
8 # Connect to Flux with auto-detection for streaming audio
9 # SDK automatically connects to: wss://api.deepgram.com/v2/listen?model=flux-general-en&encoding=linear16&sample_rate=16000
10 async with client.listen.v2.connect(
11 model="flux-general-en",
12 encoding="linear16",
13 sample_rate="16000"
14 ) as connection:
15
16 # Define message handler function
17 def on_message(message: ListenV2SocketClientResponse) -> None:
18 msg_type = getattr(message, "type", "Unknown")
19
20 # Show transcription results
21 if hasattr(message, 'transcript') and message.transcript:
22 print(f"🎀 {message.transcript}")
23
24 # Show word-level confidence with color coding
25 if hasattr(message, 'words') and message.words:
26 colored_words = []
27 for word in message.words:
28 color = get_confidence_color(word.confidence)
29 colored_words.append(f"{color}{word.word}({word.confidence:.2f}){Colors.RESET}")
30 words_info = " | ".join(colored_words)
31 print(f" πŸ“ {words_info}")
32 elif msg_type == "Connected":
33 print(f"βœ… Connected to Deepgram Flux - Ready for audio!")
34
35 # Set up event handlers
36 connection.on(EventType.OPEN, lambda _: print("Connection opened"))
37 connection.on(EventType.MESSAGE, on_message)
38 connection.on(EventType.CLOSE, lambda _: print("Connection closed"))
39 connection.on(EventType.ERROR, lambda error: print(f"Caught: {error}"))
40
41 # Start the connection listening in background (it's already async)
42 deepgram_task = asyncio.create_task(connection.start_listening())
43
44 # Convert BBC stream to linear16 PCM using ffmpeg
45 print(f"Starting to stream and convert audio from: {STREAM_URL}")
46
47 # Use ffmpeg to convert the compressed BBC stream to linear16 PCM at 16kHz
48 ffmpeg_cmd = [
49 'ffmpeg',
50 '-i', STREAM_URL, # Input: BBC World Service stream
51 '-f', 's16le', # Output format: 16-bit little-endian PCM (linear16)
52 '-ar', '16000', # Sample rate: 16kHz
53 '-ac', '1', # Channels: mono
54 '-' # Output to stdout
55 ]
56
57 try:
58 # Start ffmpeg process
59 process = await asyncio.create_subprocess_exec(
60 *ffmpeg_cmd,
61 stdout=asyncio.subprocess.PIPE,
62 stderr=asyncio.subprocess.PIPE
63 )
64
65 print(f"βœ… Audio conversion started (BBC β†’ linear16 PCM)")
66
67 # Read converted PCM data and send to Deepgram
68 # Note: 1024 bytes = ~32ms of audio at 16kHz linear16
69 # For optimal performance, consider using ~2560 bytes (~80ms at 16kHz)
70 while True:
71 chunk = await process.stdout.read(1024)
72 if not chunk:
73 break
74
75 # Send converted linear16 PCM data to Flux
76 await connection._send(chunk)
77
78 await process.wait()
79
80 except Exception as e:
81 print(f"Error during audio conversion: {e}")
82 if 'process' in locals():
83 stderr = await process.stderr.read()
84 print(f"FFmpeg error: {stderr.decode()}")
85
86 # Wait for Deepgram task to complete (or cancel after timeout)
87 try:
88 await asyncio.wait_for(deepgram_task, timeout=60)
89 except asyncio.TimeoutError:
90 print("Stream timeout after 60 seconds")
91 deepgram_task.cancel()
92
93 except Exception as e:
94 print(f"Caught: {e}")
95
96if __name__ == "__main__":
97 asyncio.run(main())

6. Complete Code Example

Here’s the complete working example that combines all the steps. You can also find this code on GitHub.

1import asyncio
2import subprocess
3from dotenv import load_dotenv
4
5# Load environment variables from .env file
6load_dotenv()
7
8from deepgram import AsyncDeepgramClient
9from deepgram.core.events import EventType
10from deepgram.extensions.types.sockets import ListenV2SocketClientResponse
11
12# URL for the realtime streaming audio to transcribe
13STREAM_URL = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service"
14
15# Terminal color codes
16class Colors:
17 GREEN = '\033[92m' # 0.90-1.00
18 YELLOW = '\033[93m' # 0.80-0.90
19 ORANGE = '\033[91m' # 0.70-0.80 (using red as orange isn't standard)
20 RED = '\033[31m' # <=0.69
21 RESET = '\033[0m' # Reset to default
22
23def get_confidence_color(confidence: float) -> str:
24 """Return the appropriate color code based on confidence score"""
25 if confidence >= 0.90:
26 return Colors.GREEN
27 elif confidence >= 0.80:
28 return Colors.YELLOW
29 elif confidence >= 0.70:
30 return Colors.ORANGE
31 else:
32 return Colors.RED
33
34async def main():
35 """Main async function to handle URL streaming to Deepgram Flux"""
36
37 # Create the Deepgram async client
38 client = AsyncDeepgramClient()
39
40 try:
41 # Connect to Flux with auto-detection for streaming audio
42 # SDK automatically connects to: wss://api.deepgram.com/v2/listen?model=flux-general-en&encoding=linear16&sample_rate=16000
43 async with client.listen.v2.connect(
44 model="flux-general-en",
45 encoding="linear16",
46 sample_rate="16000"
47 ) as connection:
48
49 # Define message handler function
50 def on_message(message: ListenV2SocketClientResponse) -> None:
51 msg_type = getattr(message, "type", "Unknown")
52
53 # Show transcription results
54 if hasattr(message, 'transcript') and message.transcript:
55 print(f"🎀 {message.transcript}")
56
57 # Show word-level confidence with color coding
58 if hasattr(message, 'words') and message.words:
59 colored_words = []
60 for word in message.words:
61 color = get_confidence_color(word.confidence)
62 colored_words.append(f"{color}{word.word}({word.confidence:.2f}){Colors.RESET}")
63 words_info = " | ".join(colored_words)
64 print(f" πŸ“ {words_info}")
65 elif msg_type == "Connected":
66 print(f"βœ… Connected to Deepgram Flux - Ready for audio!")
67
68 # Set up event handlers
69 connection.on(EventType.OPEN, lambda _: print("Connection opened"))
70 connection.on(EventType.MESSAGE, on_message)
71 connection.on(EventType.CLOSE, lambda _: print("Connection closed"))
72 connection.on(EventType.ERROR, lambda error: print(f"Caught: {error}"))
73
74 # Start the connection listening in background (it's already async)
75 deepgram_task = asyncio.create_task(connection.start_listening())
76
77 # Convert BBC stream to linear16 PCM using ffmpeg
78 print(f"Starting to stream and convert audio from: {STREAM_URL}")
79
80 # Use ffmpeg to convert the compressed BBC stream to linear16 PCM at 16kHz
81 ffmpeg_cmd = [
82 'ffmpeg',
83 '-i', STREAM_URL, # Input: BBC World Service stream
84 '-f', 's16le', # Output format: 16-bit little-endian PCM (linear16)
85 '-ar', '16000', # Sample rate: 16kHz
86 '-ac', '1', # Channels: mono
87 '-' # Output to stdout
88 ]
89
90 try:
91 # Start ffmpeg process
92 process = await asyncio.create_subprocess_exec(
93 *ffmpeg_cmd,
94 stdout=asyncio.subprocess.PIPE,
95 stderr=asyncio.subprocess.PIPE
96 )
97
98 print(f"βœ… Audio conversion started (BBC β†’ linear16 PCM)")
99
100 # Read converted PCM data and send to Deepgram
101 # Note: 1024 bytes = ~32ms of audio at 16kHz linear16
102 # For optimal performance, consider using ~2560 bytes (~80ms at 16kHz)
103 while True:
104 chunk = await process.stdout.read(1024)
105 if not chunk:
106 break
107
108 # Send converted linear16 PCM data to Flux
109 await connection._send(chunk)
110
111 await process.wait()
112
113 except Exception as e:
114 print(f"Error during audio conversion: {e}")
115 if 'process' in locals():
116 stderr = await process.stderr.read()
117 print(f"FFmpeg error: {stderr.decode()}")
118
119 # Wait for Deepgram task to complete (or cancel after timeout)
120 try:
121 await asyncio.wait_for(deepgram_task, timeout=60)
122 except asyncio.TimeoutError:
123 print("Stream timeout after 60 seconds")
124 deepgram_task.cancel()
125
126 except Exception as e:
127 print(f"Caught: {e}")
128
129if __name__ == "__main__":
130 asyncio.run(main())

Additional Flux Demos

For additional demos showcasing Flux, check out the following repositories:

Demo LinkRepositoryTech StackUse Case
Demo LinkRepositoryNode, JS, HTML, CSSFlux Streaming Transcription
N/ARepositoryRustFlux Streaming Transcription

Building a Voice Agent with Flux

Are you ready to build a voice agent with Flux? See our Build a Flux-enabled Voice Agent Guide to get started.