Finalize

Learn how to send Deepgram a Finalize message, which flushes the websocket stream's audio by forcing the server to process all unprocessed audio data immediately and return the results.

In real-time audio processing, there are scenarios where you may need to force the server to process (or flush) all unprocessed audio data immediately. Deepgram supports a Finalize message to handle such situations, ensuring that interim results are treated as final.

What is the Finalize Message?

The Finalize message is a JSON command that you send to the Deepgram server, instructing it to process and finalize all remaining audio data immediately. This is particularly useful in scenarios where an utterance has ended, or when transitioning to a keep-alive period to ensure that no previous transcripts reappear unexpectedly.

Sending Finalize

To send the Finalize message, you need to send the following JSON message to the server:

{
  "type": "Finalize"
}

You can optionally specify a channel field to finalize a specific channel. If the channel field is omitted, all channels in the audio will be finalized. Note that channel indexing starts at 0, so to finalize only the first channel you need to send:

{
  "type": "Finalize",
   "channel": 0
}

Finalize Confirmation

Upon receiving the Finalize message, the server will process all remaining audio data and return the final results. You may receive a response with the from_finalize attribute set to true, indicating that the finalization process is complete. This response typically occurs when there is a noticeable amount of audio buffered in the server.

If you specified a channel to be finalized, use the response's channel_index field to check which channel was finalized.

{
  "from_finalize": true
}

📘

In most cases, you will receive this response, but it is not guaranteed if there is no significant amount of audio data to process.

Language-Specific Implementations

Following are code examples to help you get started.

Sending a Finalize message in JSON Format

These snippets demonstrate how to construct a JSON message containing the "Finalize" type and send it over the WebSocket connection in each respective language.

const WebSocket = require("ws");

// Assuming 'headers' is already defined for authorization
const ws = new WebSocket("wss://api.deepgram.com/v1/listen", { headers });

ws.on('open', function open() {
  // Construct Finalize message
  const finalizeMsg = JSON.stringify({ type: "Finalize" });
  
  // Send Finalize message
  ws.send(finalizeMsg);
});

import json
import websocket

# Assuming 'headers' is already defined for authorization
ws = websocket.create_connection("wss://api.deepgram.com/v1/listen", header=headers)

# Construct Finalize message
finalize_msg = json.dumps({"type": "Finalize"})

# Send Finalize message
ws.send(finalize_msg)

package main

import (
    "encoding/json"
    "log"
    "net/http"
    "github.com/gorilla/websocket"
)

func main() {
    // Define headers for authorization
    headers := http.Header{}
    
    // Assuming headers are set here for authorization
    conn, _, err := websocket.DefaultDialer.Dial("wss://api.deepgram.com/v1/listen", headers)
    if err != nil {
        log.Fatal("Error connecting to WebSocket:", err)
    }
    defer conn.Close()

    // Construct Finalize message
    finalizeMsg := map[string]string{"type": "Finalize"}
    jsonMsg, err := json.Marshal(finalizeMsg)
    if err != nil {
        log.Fatal("Error encoding JSON:", err)
    }

    // Send Finalize message
    err = conn.WriteMessage(websocket.TextMessage, jsonMsg)
    if err != nil {
        log.Fatal("Error sending Finalize message:", err)
    }
}


using System;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

class Program
{
    static async Task Main(string[] args)
    {
        // Set up the WebSocket URL and headers
        Uri uri = new Uri("wss://api.deepgram.com/v1/listen");
        
        string apiKey = "DEEPGRAM_API_KEY";

        // Create a new client WebSocket instance
        using (ClientWebSocket ws = new ClientWebSocket())
        {
            // Set the authorization header
            ws.Options.SetRequestHeader("Authorization", "Token " + apiKey);

            // Connect to the WebSocket server
            await ws.ConnectAsync(uri, CancellationToken.None);

            // Construct the Finalize message
            string finalizeMsg = "{\"type\": \"Finalize\"}";

            // Convert the Finalize message to a byte array
            byte[] finalizeBytes = Encoding.UTF8.GetBytes(finalizeMsg);

            // Send the Finalize message asynchronously
            await ws.SendAsync(new ArraySegment<byte>(finalizeBytes), WebSocketMessageType.Text, true, CancellationToken.None);
        }
    }
}

Streaming Examples

Here are more complete examples that make a streaming request and use Finalize. Try running these examples to see how Finalize can be sent to Deepgram, forcing the API to process all unprocessed audio data and immediately return the results.

const WebSocket = require("ws");
const axios = require("axios");
const { PassThrough } = require("stream");

const apiKey = "YOUR_DEEPGRAM_API_KEY";
const headers = {
  Authorization: `Token ${apiKey}`,
};

// Initialize WebSocket connection
const ws = new WebSocket("wss://api.deepgram.com/v1/listen", { headers });

ws.on("open", async function open() {
  console.log("WebSocket connection established.");

  try {
    // Fetch the audio stream from the remote URL
    const response = await axios({
      method: "get",
      url: "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service",
      responseType: "stream",
    });

    const passThrough = new PassThrough();
    response.data.pipe(passThrough);

    passThrough.on("data", (chunk) => {
      ws.send(chunk);
    });

    passThrough.on("end", () => {
      console.log("Audio stream ended.");
      finalizeWebSocket();
    });

    passThrough.on("error", (err) => {
      console.error("Stream error:", err.message);
    });

    // Send Finalize message after 10 seconds
    setTimeout(() => {
      finalizeWebSocket();
    }, 10000);
  } catch (error) {
    console.error("Error fetching audio stream:", error.message);
  }
});

// Handle WebSocket message event
ws.on("message", function incoming(data) {
  let response = JSON.parse(data);
  if (response.type === "Results") {
    console.log("Transcript: ", response.channel.alternatives[0].transcript);
  }
});

// Handle WebSocket close event
ws.on("close", function close() {
  console.log("WebSocket connection closed.");
});

// Handle WebSocket error event
ws.on("error", function error(err) {
  console.error("WebSocket error:", err.message);
});

// Send Finalize message to WebSocket
function finalizeWebSocket() {
  const finalizeMsg = JSON.stringify({ type: "Finalize" });
  ws.send(finalizeMsg);
  console.log("Finalize message sent.");
}

// Gracefully close the WebSocket connection when done
function closeWebSocket() {
  const closeMsg = JSON.stringify({ type: "CloseStream" });
  ws.send(closeMsg);
  ws.close();
}

// Close WebSocket when process is terminated
process.on("SIGINT", () => {
  closeWebSocket();
  process.exit();
});

from websocket import WebSocketApp
import websocket
import json
import threading
import requests
import time

auth_token = "YOUR_DEEPGRAM_API_KEY"  # Replace with your actual authorization token

headers = {
    "Authorization": f"Token {auth_token}"
}

# WebSocket URL
ws_url = "wss://api.deepgram.com/v1/listen"

# Audio stream URL
audio_url = "http://stream.live.vc.bbcmedia.co.uk/bbc_world_service"

# Define the WebSocket functions on_open, on_message, on_close, and on_error

def on_open(ws):
    print("WebSocket connection established.")
    
    # Start audio streaming thread
    audio_thread = threading.Thread(target=stream_audio, args=(ws,))
    audio_thread.daemon = True
    audio_thread.start()
    
    # Finalize test thread
    finalize_thread = threading.Thread(target=finalize_test, args=(ws,))
    finalize_thread.daemon = True
    finalize_thread.start()

def on_message(ws, message):
    try:
        response = json.loads(message)
        if response.get("type") == "Results":
            transcript = response["channel"]["alternatives"][0].get("transcript", "")
            if transcript:
                print("Transcript:", transcript)
        elif response.get("type") == "FinalizationComplete":
            print("Finalization complete.")
    except json.JSONDecodeError as e:
        print(f"Error decoding JSON message: {e}")
    except KeyError as e:
        print(f"Key error: {e}")

def on_close(ws, close_status_code, close_msg):
    print(f"WebSocket connection closed with code: {close_status_code}, message: {close_msg}")

def on_error(ws, error):
    print("WebSocket error:", error)

# Define the function to stream audio to the WebSocket

def stream_audio(ws):
    response = requests.get(audio_url, stream=True)
    if response.status_code == 200:
        print("Audio stream opened.")
        for chunk in response.iter_content(chunk_size=4096):
            ws.send(chunk, opcode=websocket.ABNF.OPCODE_BINARY)
    else:
        print("Failed to open audio stream:", response.status_code)

# Define the function to send the Finalize message

def finalize_test(ws):
    # Wait for 10 seconds before sending the Finalize message to simulate the end of audio streaming
    time.sleep(10)
    finalize_message = json.dumps({"type": "Finalize"})
    ws.send(finalize_message)
    print("Finalize message sent.")

# Create WebSocket connection

ws = WebSocketApp(ws_url, on_open=on_open, on_message=on_message, on_close=on_close, on_error=on_error, header=headers)

# Run the WebSocket

ws.run_forever()

Conclusion

Using the Finalize message with Deepgram's API allows for precise control over the finalization of audio processing. This feature is essential for scenarios requiring immediate processing of the remaining audio data, ensuring accurate and timely results.


What’s Next