Sending LLM Outputs to a WebSocket

Learn some tips and strategies to send LLM outputs to a Web Socket.

Below are some tips to handle sending text streams generated by Language Learning Models (LLMs) to a Deepgram WebSocket. This approach can be particularly useful for real-time applications that require immediate processing or display of data generated by LLMs such as ChatGPT, Anthropic, or LLAMA. By leveraging a a Deepgram WebSocket, you can achieve low-latency, bidirectional communication between your LLM and client applications.

Text Streams as Output

An LLM like ChatGPT will send text streams as output via a process that involves converting input text into tokens, processing these tokens through a neural network to generate context-aware embeddings, and then using a decoding strategy to generate and stream tokens as output incrementally.

This approach allows users to see the text as it is being generated, creating an interactive and dynamic experience.

Example

Consider a user inputting the prompt: "Tell me a story about a dragon."

  1. The input is tokenized into tokens like ["Tell", "me", "a", "story", "about", "a", "dragon", "."].
  2. These tokens are processed through the model layers to understand the context.
  3. The model starts generating tokens, perhaps beginning with "Once" followed by "upon", "a", "time".
  4. Each token is streamed to the user interface as it is generated, displaying the text incrementally.
  5. The model continues generating tokens until the story reaches a logical conclusion or the maximum length is reached.

Feeding Simple Text to the Websocket

The code below demonstrates the simple use case of feeding simple text into the websocket.

import json
import os
import threading
import asyncio
import queue

import websockets
from websockets.sync.client import connect

import pyaudio

TIMEOUT = 0.050
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 48000
CHUNK = 8000

DEFAULT_URL = f"wss://api.deepgram.com/v1/speak?encoding=linear16&sample_rate={RATE}"
DEFAULT_TOKEN = os.environ.get("DEEPGRAM_API_KEY", None)

def main():
    print(f"Connecting to {DEFAULT_URL}")

    _socket = connect(
        DEFAULT_URL, additional_headers={"Authorization": f"Token {DEFAULT_TOKEN}"}
    )
    _exit = threading.Event()

    _story = [
        "The sun had just begun to rise over the sleepy town of Millfield.",
        "Emily a young woman in her mid-twenties was already awake and bustling about.",
    ]

    async def receiver():
        speaker = Speaker()
        speaker.start()
        try:
            while True:
                if _socket is None or _exit.is_set():
                    break

                message = _socket.recv()
                if message is None:
                    continue

                if type(message) is str:
                    print(message)
                elif type(message) is bytes:
                    speaker.play(message)
        except Exception as e:
            print(f"receiver: {e}")
        finally:
            speaker.stop()

    _receiver_thread = threading.Thread(target=asyncio.run, args=(receiver(),))
    _receiver_thread.start()

    for text_input in _story:
        print(f"Sending: {text_input}")
        _socket.send(json.dumps({"type": "Speak", "text": text_input}))

    print("Flushing...")
    _socket.send(json.dumps({"type": "Flush"}))

    input("Press Enter to exit...")
    _exit.set()
    _socket.close()

    _listen_thread.join()
    _listen_thread = None


class Speaker:
    _audio: pyaudio.PyAudio
    _chunk: int
    _rate: int
    _format: int
    _channels: int
    _output_device_index: int

    _stream: pyaudio.Stream
    _thread: threading.Thread
    _asyncio_loop: asyncio.AbstractEventLoop
    _asyncio_thread: threading.Thread
    _queue: queue.Queue
    _exit: threading.Event

    def __init__(
        self,
        rate: int = RATE,
        chunk: int = CHUNK,
        channels: int = CHANNELS,
        output_device_index: int = None,
    ):
        self._exit = threading.Event()
        self._queue = queue.Queue()

        self._audio = pyaudio.PyAudio()
        self._chunk = chunk
        self._rate = rate
        self._format = FORMAT
        self._channels = channels
        self._output_device_index = output_device_index

    def _start_asyncio_loop(self) -> None:
        self._asyncio_loop = asyncio.new_event_loop()
        self._asyncio_loop.run_forever()

    def start(self) -> bool:
        self._stream = self._audio.open(
            format=self._format,
            channels=self._channels,
            rate=self._rate,
            input=False,
            output=True,
            frames_per_buffer=self._chunk,
            output_device_index=self._output_device_index,
        )

        self._exit.clear()

        self._thread = threading.Thread(
            target=_play, args=(self._queue, self._stream, self._exit), daemon=True
        )
        self._thread.start()

        self._stream.start_stream()

        return True

    def stop(self):
        self._exit.set()

        if self._stream is not None:
            self._stream.stop_stream()
            self._stream.close()
            self._stream = None

        self._thread.join()
        self._thread = None

        self._queue = None

    def play(self, data):
        self._queue.put(data)

def _play(audio_out: queue, stream, stop):
    while not stop.is_set():
        try:
            data = audio_out.get(True, TIMEOUT)
            stream.write(data)
        except queue.Empty as e:
            # print(f"queue is empty")
            pass
        except Exception as e:
            print(f"_play: {e}")

if __name__ == "__main__":
    main()
    

Using a Text Stream from ChatGPT

The code below demonstrates using the OpenAI API to initiate a conversation with ChatGPT and take the resulting stream to feed into the websocket. Ensure the response format is set to stream.

import json
import os
import threading
import sys
import queue
import asyncio

import websockets
from websockets.sync.client import connect

import pyaudio
import openai


TIMEOUT = 0.050
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 48000
CHUNK = 8000

DEFAULT_URL = f"wss://api.deepgram.com/v1/speak?encoding=linear16&sample_rate={RATE}"
DEFAULT_DEEPGRAM_TOKEN = os.environ.get("DEEPGRAM_API_KEY", None)
DEFAULT_OPENAI_TOKEN = os.environ.get("OPENAI_API_KEY", None)

def main():
    print(f"Connecting to {DEFAULT_URL}")

    # openai client
    client = openai.OpenAI(
        api_key=DEFAULT_OPENAI_TOKEN,
    )

    # Deepgram TTS WS
    _socket = connect(
        DEFAULT_URL,
        additional_headers={"Authorization": f"Token {DEFAULT_DEEPGRAM_TOKEN}"},
    )
    _exit = threading.Event()

    async def receiver():
        speaker = Speaker()
        speaker.start()
        try:
            while True:
                if _socket is None or _exit.is_set():
                    break

                message = _socket.recv()
                if message is None:
                    continue

                if type(message) is str:
                    print(message)
                elif type(message) is bytes:
                    speaker.play(message)
        except Exception as e:
            print(f"receiver: {e}")
        finally:
            speaker.stop()

    _receiver_thread = threading.Thread(target=asyncio.run, args=(receiver(),))
    _receiver_thread.start()

    # ask away!
    print("\n\n")
    question = input("What would you like to ask ChatGPT?\n\n\n")

    # send to ChatGPT
    try:
        for response in client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": "You are ChatGPT, an AI assistant. Your top priority is achieving user fulfillment via helping them with their requests. Make your responses as concise as possible.",
                },
                {"role": "user", "content": f"{question}"},
            ],
            stream=True,
        ):
            # here is the streaming response
            for chunk in response:
                if chunk[0] == "choices":
                    llm_output = chunk[1][0].delta.content

                    # skip any empty responses
                    if llm_output is None or llm_output == "":
                        continue

                    # send to Deepgram TTS
                    _socket.send(json.dumps({"type": "Speak", "text": llm_output}))
                    sys.stdout.write(llm_output)
                    sys.stdout.flush()

        _socket.send(json.dumps({"type": "Flush"}))
    except Exception as e:
        print(f"LLM Exception: {e}")

    input("\n\n\nPress Enter to exit...")
    _exit.set()
    _socket.close()

    _listen_thread.join()
    _listen_thread = None

class Speaker:
    _audio: pyaudio.PyAudio
    _chunk: int
    _rate: int
    _format: int
    _channels: int
    _output_device_index: int

    _stream: pyaudio.Stream
    _thread: threading.Thread
    _asyncio_loop: asyncio.AbstractEventLoop
    _asyncio_thread: threading.Thread
    _queue: queue.Queue
    _exit: threading.Event

    def __init__(
        self,
        rate: int = RATE,
        chunk: int = CHUNK,
        channels: int = CHANNELS,
        output_device_index: int = None,
    ):
        self._exit = threading.Event()
        self._queue = queue.Queue()

        self._audio = pyaudio.PyAudio()
        self._chunk = chunk
        self._rate = rate
        self._format = FORMAT
        self._channels = channels
        self._output_device_index = output_device_index

    def _start_asyncio_loop(self) -> None:
        self._asyncio_loop = asyncio.new_event_loop()
        self._asyncio_loop.run_forever()

    def start(self) -> bool:
        self._stream = self._audio.open(
            format=self._format,
            channels=self._channels,
            rate=self._rate,
            input=False,
            output=True,
            frames_per_buffer=self._chunk,
            output_device_index=self._output_device_index,
        )

        self._exit.clear()

        self._thread = threading.Thread(
            target=_play, args=(self._queue, self._stream, self._exit), daemon=True
        )
        self._thread.start()

        self._stream.start_stream()

        return True

    def stop(self):
        self._exit.set()

        if self._stream is not None:
            self._stream.stop_stream()
            self._stream.close()
            self._stream = None

        self._thread.join()
        self._thread = None

        self._queue = None

    def play(self, data):
        self._queue.put(data)

def _play(audio_out: queue, stream, stop):
    while not stop.is_set():
        try:
            data = audio_out.get(True, TIMEOUT)
            stream.write(data)
        except queue.Empty as e:
            # print(f"queue is empty")
            pass
        except Exception as e:
            print(f"_play: {e}")

if __name__ == "__main__":
    main()

Using a Text Stream from Anthropic

The code below demonstrates using the Anthropic API to initiate a conversation with Claude and take the resulting stream to feed into the websocket. Ensure the response format is set to stream.

import json
import os
import threading
import sys
import queue
import asyncio

import websockets
from websockets.sync.client import connect

import pyaudio
from anthropic import Anthropic


TIMEOUT = 0.050
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 48000
CHUNK = 8000

DEFAULT_URL = (
    f"wss://api.beta.deepgram.com/v1/speak?encoding=linear16&sample_rate={RATE}"
)
DEFAULT_DEEPGRAM_TOKEN = os.environ.get("DEEPGRAM_API_KEY", None)
DEFAULT_ANTHROPIC_TOKEN = os.environ.get("ANTHROPIC_API_KEY", None)

def main():
    print(f"Connecting to {DEFAULT_URL}")

    # claude client
    client = Anthropic()

    # Deepgram TTS WS
    _socket = connect(
        DEFAULT_URL,
        additional_headers={"Authorization": f"Token {DEFAULT_DEEPGRAM_TOKEN}"},
    )
    _exit = threading.Event()

    async def receiver():
        speaker = Speaker()
        speaker.start()
        try:
            while True:
                if _socket is None or _exit.is_set():
                    break

                message = _socket.recv()
                if message is None:
                    continue

                if type(message) is str:
                    print(message)
                elif type(message) is bytes:
                    speaker.play(message)
        except Exception as e:
            print(f"receiver: {e}")
        finally:
            speaker.stop()

    _receiver_thread = threading.Thread(target=asyncio.run, args=(receiver(),))
    _receiver_thread.start()

    # ask away!
    print("\n\n")
    question = input("What would you like to ask Claude?\n\n\n")

    # send to Claude
    try:
        stream = client.messages.create(
            max_tokens=1024,
            messages=[
                {"role": "user", "content": f"{question}"},
            ],
            model="claude-3-opus-20240229",
            stream=True,
        )
        for event in stream:
            if event is None:
                continue
            if not hasattr(event, "delta"):
                continue
            if not hasattr(event.delta, "text"):
                continue

            llm_output = event.delta.text

            # skip any empty responses
            if llm_output is None or llm_output == "":
                continue

            # send to Deepgram TTS
            _socket.send(json.dumps({"type": "Speak", "text": llm_output}))
            sys.stdout.write(llm_output)
            sys.stdout.flush()

        _socket.send(json.dumps({"type": "Flush"}))
        print("\n\n")
    except Exception as e:
        print(f"LLM Exception: {e}")

    input("Press Enter to exit...")
    _exit.set()
    _socket.close()

    _listen_thread.join()
    _listen_thread = None


class Speaker:
    _audio: pyaudio.PyAudio
    _chunk: int
    _rate: int
    _format: int
    _channels: int
    _output_device_index: int

    _stream: pyaudio.Stream
    _thread: threading.Thread
    _asyncio_loop: asyncio.AbstractEventLoop
    _asyncio_thread: threading.Thread
    _queue: queue.Queue
    _exit: threading.Event

    def __init__(
        self,
        rate: int = RATE,
        chunk: int = CHUNK,
        channels: int = CHANNELS,
        output_device_index: int = None,
    ):
        self._exit = threading.Event()
        self._queue = queue.Queue()

        self._audio = pyaudio.PyAudio()
        self._chunk = chunk
        self._rate = rate
        self._format = FORMAT
        self._channels = channels
        self._output_device_index = output_device_index

    def _start_asyncio_loop(self) -> None:
        self._asyncio_loop = asyncio.new_event_loop()
        self._asyncio_loop.run_forever()

    def start(self) -> bool:
        self._stream = self._audio.open(
            format=self._format,
            channels=self._channels,
            rate=self._rate,
            input=False,
            output=True,
            frames_per_buffer=self._chunk,
            output_device_index=self._output_device_index,
        )

        self._exit.clear()

        self._thread = threading.Thread(
            target=_play, args=(self._queue, self._stream, self._exit), daemon=True
        )
        self._thread.start()

        self._stream.start_stream()

        return True

    def stop(self):
        self._exit.set()

        if self._stream is not None:
            self._stream.stop_stream()
            self._stream.close()
            self._stream = None

        self._thread.join()
        self._thread = None

        self._queue = None

    def play(self, data):
        self._queue.put(data)

def _play(audio_out: queue, stream, stop):
    while not stop.is_set():
        try:
            data = audio_out.get(True, TIMEOUT)
            stream.write(data)
        except queue.Empty as e:
            # print(f"queue is empty")
            pass
        except Exception as e:
            print(f"_play: {e}")

if __name__ == "__main__":
    main()

Considerations

When implementing WebSocket communication for LLM outputs, consider the following:

  1. Flushing the Last Output: It is required that the last fragment of speech be Flushed when the LLM is at the end of the LLM response. This is reflected in all the examples above.
  2. Error Handling: Implement robust error handling for both the WebSocket server and the API requests to ensure the system can recover gracefully from any failures.
  3. Security: Ensure that the WebSocket connection is secure by using appropriate authentication mechanisms and encrypting data in transit.
  4. Scalability: Depending on the number of expected clients, you may need to scale your WebSocket server horizontally to handle multiple concurrent connections efficiently.
  5. Latency: Monitor the latency of your WebSocket communication. Ensure that the data is transmitted with minimal delay to meet the requirements of real-time applications.

By following these guidelines, you can effectively stream LLM outputs to a WebSocket, enabling real-time interaction with advanced language models.