Getting Started with Flux

Flux is the first conversational speech recognition model built specifically for voice agents. Unlike traditional STT that just transcribes words, Flux understands conversational flow and automatically handles turn-taking.

Flux tackles the most critical challenges for voice agents today: knowing when to listen, when to think, and when to speak. The model features first-of-its-kind model-integrated end-of-turn detection, configurable turn-taking dynamics, and ultra-low latency optimized for voice agent pipelines, all with Nova-3 level accuracy.

Flux is Perfect for: turn-based voice agents, customer service bots, phone assistants, and real-time conversation tools.

Key Benefits:

  • Smart turn detection β€” Knows when speakers finish talking
  • Ultra-low latency β€” ~260ms end-of-turn detection
  • Early LLM responses β€” EagerEndOfTurn events for faster replies
  • Turn-based transcripts β€” Clean conversation structure
  • Natural interruptions β€” Built-in barge-in handling
  • Nova-3 accuracy β€” Best-in-class transcription quality

For more information on how Flux manages turns, see the Flux State Machine Guide guide.

Let’s Build!

This guide walks you through building a basic streaming transcription application powered by Deepgram Flux and the Deepgram SDK.

By the end of this guide, you’ll have:

  • A real-time streaming transcription application with sub-second response times using the BBC Real Time Live Stream as your audio.
  • Natural conversation flow with Flux’s advanced turn detection model
  • Voice Activity Detection based interruption handling for responsive interactions
  • A working demo you can build on!

Audio Stream

To handle the audio stream will be using the following conversion approach:

1. Install the Deepgram SDK

1 # Install the Deepgram Python SDK
2 # https://github.com/deepgram/deepgram-python-sdk
3 pip install deepgram-sdk

2. Add Dependencies

Install the additional dependencies:

1# Install python-dotenv to protect your API key
2pip install python-dotenv

3. Install FFMPEG on your machine

You will need the actual FFmpeg binary installed to run this demo:

  • macOS: brew install ffmpeg
  • Ubuntu/Debian: sudo apt install ffmpeg
  • Windows: Download from https://ffmpeg.org/

4. Create a .env file

Create a .env file in your project root with your Deepgram API key:

$touch .env
$DEEPGRAM_API_KEY="your_deepgram_api_key"

Replace your_deepgram_api_key with your actual Deepgram API key.

4. Set Imports and Set Audio Stream Colors

Core Dependencies:

  • asyncio - Handles concurrent audio streaming and Deepgram connection
  • subprocess - Manages FFmpeg process for audio conversion
  • dotenv - Loads Deepgram API key from .env file

Deepgram SDK:

  • AsyncDeepgramClient - Main client for Flux API connection
  • EventType - WebSocket event constants (OPEN, MESSAGE, CLOSE, ERROR)
  • ListenV2SocketClientResponse - Type hints for incoming transcription messages

Configuration:

  • STREAM_URL - BBC World Service streaming audio endpoint

Visual Feedback System:

  • Colors class - ANSI terminal color codes for confidence visualization
  • get_confidence_color() - Maps confidence scores to colors:
    • Green (0.90-1.00): High confidence
    • Yellow (0.80-0.90): Good confidence
    • Orange (0.70-0.80): Lower confidence
    • Red (≀0.69): Low confidence

Purpose: Sets up the foundation for real-time streaming transcription with visual quality indicators, making it easy to spot transcription accuracy at a glance.

1import asyncio
2import subprocess
3from dotenv import load_dotenv
4
5# Load environment variables from .env file
6load_dotenv()
7
8from deepgram import AsyncDeepgramClient
9from deepgram.core.events import EventType
10from deepgram.extensions.types.sockets import ListenV2SocketClientResponse
11
12# URL for the realtime streaming audio to transcribe
13STREAM_URL = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service"
14
15# Terminal color codes
16class Colors:
17 GREEN = '\033[92m' # 0.90-1.00
18 YELLOW = '\033[93m' # 0.80-0.90
19 ORANGE = '\033[91m' # 0.70-0.80 (using red as orange isn't standard)
20 RED = '\033[31m' # <=0.69
21 RESET = '\033[0m' # Reset to default
22
23def get_confidence_color(confidence: float) -> str:
24 """Return the appropriate color code based on confidence score"""
25 if confidence >= 0.90:
26 return Colors.GREEN
27 elif confidence >= 0.80:
28 return Colors.YELLOW
29 elif confidence >= 0.70:
30 return Colors.ORANGE
31 else:
32 return Colors.RED

5. Connect to Flux and Process Audio

The main function orchestrates real-time transcription of streaming audio URLs:

  • Initialize: Creates AsyncDeepgramClient and connects to Flux with required linear16 format
  • Event Handling: Sets up message handler that displays transcriptions with color-coded confidence scores
  • Audio Pipeline: Launches FFmpeg subprocess to convert compressed stream URL to linear16 PCM format
  • Streaming Loop: Reads converted audio chunks and pipes them to Deepgram Flux connection
  • Concurrent Tasks: Runs Deepgram listener and audio conversion simultaneously using asyncio
  • Error Handling: Manages FFmpeg errors and connection timeouts (60s default)

The function handles both the audio conversion requirement (Flux only accepts linear16) and real-time streaming coordination between multiple async processes.

1async def main():
2 """Main async function to handle URL streaming to Deepgram Flux"""
3
4 # Create the Deepgram async client
5 client = AsyncDeepgramClient() # The API key retrieval happens automatically in the constructor
6
7 try:
8 # Connect to Flux with auto-detection for streaming audio
9 async with client.listen.v2.connect(
10 model="flux-general-en",
11 encoding="linear16",
12 sample_rate="16000"
13 ) as connection:
14
15 # Define message handler function
16 def on_message(message: ListenV2SocketClientResponse) -> None:
17 msg_type = getattr(message, "type", "Unknown")
18
19 # Show transcription results
20 if hasattr(message, 'transcript') and message.transcript:
21 print(f"🎀 {message.transcript}")
22
23 # Show word-level confidence with color coding
24 if hasattr(message, 'words') and message.words:
25 colored_words = []
26 for word in message.words:
27 color = get_confidence_color(word.confidence)
28 colored_words.append(f"{color}{word.word}({word.confidence:.2f}){Colors.RESET}")
29 words_info = " | ".join(colored_words)
30 print(f" πŸ“ {words_info}")
31 elif msg_type == "Connected":
32 print(f"βœ… Connected to Deepgram Flux - Ready for audio!")
33
34 # Set up event handlers
35 connection.on(EventType.OPEN, lambda _: print("Connection opened"))
36 connection.on(EventType.MESSAGE, on_message)
37 connection.on(EventType.CLOSE, lambda _: print("Connection closed"))
38 connection.on(EventType.ERROR, lambda error: print(f"Caught: {error}"))
39
40 # Start the connection listening in background (it's already async)
41 deepgram_task = asyncio.create_task(connection.start_listening())
42
43 # Convert BBC stream to linear16 PCM using ffmpeg
44 print(f"Starting to stream and convert audio from: {STREAM_URL}")
45
46 # Use ffmpeg to convert the compressed BBC stream to linear16 PCM at 16kHz
47 ffmpeg_cmd = [
48 'ffmpeg',
49 '-i', STREAM_URL, # Input: BBC World Service stream
50 '-f', 's16le', # Output format: 16-bit little-endian PCM (linear16)
51 '-ar', '16000', # Sample rate: 16kHz
52 '-ac', '1', # Channels: mono
53 '-' # Output to stdout
54 ]
55
56 try:
57 # Start ffmpeg process
58 process = await asyncio.create_subprocess_exec(
59 *ffmpeg_cmd,
60 stdout=asyncio.subprocess.PIPE,
61 stderr=asyncio.subprocess.PIPE
62 )
63
64 print(f"βœ… Audio conversion started (BBC β†’ linear16 PCM)")
65
66 # Read converted PCM data and send to Deepgram
67 while True:
68 chunk = await process.stdout.read(1024)
69 if not chunk:
70 break
71
72 # Send converted linear16 PCM data to Flux
73 await connection._send(chunk)
74
75 await process.wait()
76
77 except Exception as e:
78 print(f"Error during audio conversion: {e}")
79 if 'process' in locals():
80 stderr = await process.stderr.read()
81 print(f"FFmpeg error: {stderr.decode()}")
82
83 # Wait for Deepgram task to complete (or cancel after timeout)
84 try:
85 await asyncio.wait_for(deepgram_task, timeout=60)
86 except asyncio.TimeoutError:
87 print("Stream timeout after 60 seconds")
88 deepgram_task.cancel()
89
90 except Exception as e:
91 print(f"Caught: {e}")
92
93if __name__ == "__main__":
94 asyncio.run(main())

6. Complete Code Example

Here’s the complete working example that combines all the steps. You can also find this code on GitHub.

1import asyncio
2import subprocess
3from dotenv import load_dotenv
4
5# Load environment variables from .env file
6load_dotenv()
7
8from deepgram import AsyncDeepgramClient
9from deepgram.core.events import EventType
10from deepgram.extensions.types.sockets import ListenV2SocketClientResponse
11
12# URL for the realtime streaming audio to transcribe
13STREAM_URL = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service"
14
15# Terminal color codes
16class Colors:
17 GREEN = '\033[92m' # 0.90-1.00
18 YELLOW = '\033[93m' # 0.80-0.90
19 ORANGE = '\033[91m' # 0.70-0.80 (using red as orange isn't standard)
20 RED = '\033[31m' # <=0.69
21 RESET = '\033[0m' # Reset to default
22
23def get_confidence_color(confidence: float) -> str:
24 """Return the appropriate color code based on confidence score"""
25 if confidence >= 0.90:
26 return Colors.GREEN
27 elif confidence >= 0.80:
28 return Colors.YELLOW
29 elif confidence >= 0.70:
30 return Colors.ORANGE
31 else:
32 return Colors.RED
33
34async def main():
35 """Main async function to handle URL streaming to Deepgram Flux"""
36
37 # Create the Deepgram async client
38 client = AsyncDeepgramClient()
39
40 try:
41 # Connect to Flux with auto-detection for streaming audio
42 async with client.listen.v2.connect(
43 model="flux-general-en",
44 encoding="linear16",
45 sample_rate="16000"
46 ) as connection:
47
48 # Define message handler function
49 def on_message(message: ListenV2SocketClientResponse) -> None:
50 msg_type = getattr(message, "type", "Unknown")
51
52 # Show transcription results
53 if hasattr(message, 'transcript') and message.transcript:
54 print(f"🎀 {message.transcript}")
55
56 # Show word-level confidence with color coding
57 if hasattr(message, 'words') and message.words:
58 colored_words = []
59 for word in message.words:
60 color = get_confidence_color(word.confidence)
61 colored_words.append(f"{color}{word.word}({word.confidence:.2f}){Colors.RESET}")
62 words_info = " | ".join(colored_words)
63 print(f" πŸ“ {words_info}")
64 elif msg_type == "Connected":
65 print(f"βœ… Connected to Deepgram Flux - Ready for audio!")
66
67 # Set up event handlers
68 connection.on(EventType.OPEN, lambda _: print("Connection opened"))
69 connection.on(EventType.MESSAGE, on_message)
70 connection.on(EventType.CLOSE, lambda _: print("Connection closed"))
71 connection.on(EventType.ERROR, lambda error: print(f"Caught: {error}"))
72
73 # Start the connection listening in background (it's already async)
74 deepgram_task = asyncio.create_task(connection.start_listening())
75
76 # Convert BBC stream to linear16 PCM using ffmpeg
77 print(f"Starting to stream and convert audio from: {STREAM_URL}")
78
79 # Use ffmpeg to convert the compressed BBC stream to linear16 PCM at 16kHz
80 ffmpeg_cmd = [
81 'ffmpeg',
82 '-i', STREAM_URL, # Input: BBC World Service stream
83 '-f', 's16le', # Output format: 16-bit little-endian PCM (linear16)
84 '-ar', '16000', # Sample rate: 16kHz
85 '-ac', '1', # Channels: mono
86 '-' # Output to stdout
87 ]
88
89 try:
90 # Start ffmpeg process
91 process = await asyncio.create_subprocess_exec(
92 *ffmpeg_cmd,
93 stdout=asyncio.subprocess.PIPE,
94 stderr=asyncio.subprocess.PIPE
95 )
96
97 print(f"βœ… Audio conversion started (BBC β†’ linear16 PCM)")
98
99 # Read converted PCM data and send to Deepgram
100 while True:
101 chunk = await process.stdout.read(1024)
102 if not chunk:
103 break
104
105 # Send converted linear16 PCM data to Flux
106 await connection._send(chunk)
107
108 await process.wait()
109
110 except Exception as e:
111 print(f"Error during audio conversion: {e}")
112 if 'process' in locals():
113 stderr = await process.stderr.read()
114 print(f"FFmpeg error: {stderr.decode()}")
115
116 # Wait for Deepgram task to complete (or cancel after timeout)
117 try:
118 await asyncio.wait_for(deepgram_task, timeout=60)
119 except asyncio.TimeoutError:
120 print("Stream timeout after 60 seconds")
121 deepgram_task.cancel()
122
123 except Exception as e:
124 print(f"Caught: {e}")
125
126if __name__ == "__main__":
127 asyncio.run(main())

Additional Flux Demos

For additional demos showcasing Flux, check out the following repositories:

Demo LinkRepositoryTech StackUse Case
Demo LinkRepositoryNode, JS, HTML, CSSFlux Streaming Transcription
N/ARepositoryRustFlux Streaming Transcription

Building a Voice Agent with Flux

Are you ready to build a voice agent with Flux? See our Flux Voice Agent Guide to get started.