Sending LLM Outputs to a WebSocket
Learn some tips and strategies to send LLM outputs to a Web Socket.
Below are some tips to handle sending text streams generated by Language Learning Models (LLMs) to a Deepgram WebSocket. This approach can be particularly useful for real-time applications that require immediate processing or display of data generated by LLMs such as ChatGPT, Anthropic, or LLAMA. By leveraging a a Deepgram WebSocket, you can achieve low-latency, bidirectional communication between your LLM and client applications.
Text Streams as Output
An LLM like ChatGPT will send text streams as output via a process that involves converting input text into tokens, processing these tokens through a neural network to generate context-aware embeddings, and then using a decoding strategy to generate and stream tokens as output incrementally.
This approach allows users to see the text as it is being generated, creating an interactive and dynamic experience.
Example
Consider a user inputting the prompt: "Tell me a story about a dragon."
- The input is tokenized into tokens like ["Tell", "me", "a", "story", "about", "a", "dragon", "."].
- These tokens are processed through the model layers to understand the context.
- The model starts generating tokens, perhaps beginning with "Once" followed by "upon", "a", "time".
- Each token is streamed to the user interface as it is generated, displaying the text incrementally.
- The model continues generating tokens until the story reaches a logical conclusion or the maximum length is reached.
Feeding Simple Text to the Websocket
The code below demonstrates the simple use case of feeding simple text into the websocket.
import json
import os
import threading
import asyncio
import queue
import websockets
from websockets.sync.client import connect
import pyaudio
TIMEOUT = 0.050
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 48000
CHUNK = 8000
DEFAULT_URL = f"wss://api.deepgram.com/v1/speak?encoding=linear16&sample_rate={RATE}"
DEFAULT_TOKEN = os.environ.get("DEEPGRAM_API_KEY", None)
def main():
print(f"Connecting to {DEFAULT_URL}")
_socket = connect(
DEFAULT_URL, additional_headers={"Authorization": f"Token {DEFAULT_TOKEN}"}
)
_exit = threading.Event()
_story = [
"The sun had just begun to rise over the sleepy town of Millfield.",
"Emily a young woman in her mid-twenties was already awake and bustling about.",
]
async def receiver():
speaker = Speaker()
speaker.start()
try:
while True:
if _socket is None or _exit.is_set():
break
message = _socket.recv()
if message is None:
continue
if type(message) is str:
print(message)
elif type(message) is bytes:
speaker.play(message)
except Exception as e:
print(f"receiver: {e}")
finally:
speaker.stop()
_receiver_thread = threading.Thread(target=asyncio.run, args=(receiver(),))
_receiver_thread.start()
for text_input in _story:
print(f"Sending: {text_input}")
_socket.send(json.dumps({"type": "Speak", "text": text_input}))
print("Flushing...")
_socket.send(json.dumps({"type": "Flush"}))
input("Press Enter to exit...")
_exit.set()
_socket.close()
_listen_thread.join()
_listen_thread = None
class Speaker:
_audio: pyaudio.PyAudio
_chunk: int
_rate: int
_format: int
_channels: int
_output_device_index: int
_stream: pyaudio.Stream
_thread: threading.Thread
_asyncio_loop: asyncio.AbstractEventLoop
_asyncio_thread: threading.Thread
_queue: queue.Queue
_exit: threading.Event
def __init__(
self,
rate: int = RATE,
chunk: int = CHUNK,
channels: int = CHANNELS,
output_device_index: int = None,
):
self._exit = threading.Event()
self._queue = queue.Queue()
self._audio = pyaudio.PyAudio()
self._chunk = chunk
self._rate = rate
self._format = FORMAT
self._channels = channels
self._output_device_index = output_device_index
def _start_asyncio_loop(self) -> None:
self._asyncio_loop = asyncio.new_event_loop()
self._asyncio_loop.run_forever()
def start(self) -> bool:
self._stream = self._audio.open(
format=self._format,
channels=self._channels,
rate=self._rate,
input=False,
output=True,
frames_per_buffer=self._chunk,
output_device_index=self._output_device_index,
)
self._exit.clear()
self._thread = threading.Thread(
target=_play, args=(self._queue, self._stream, self._exit), daemon=True
)
self._thread.start()
self._stream.start_stream()
return True
def stop(self):
self._exit.set()
if self._stream is not None:
self._stream.stop_stream()
self._stream.close()
self._stream = None
self._thread.join()
self._thread = None
self._queue = None
def play(self, data):
self._queue.put(data)
def _play(audio_out: queue, stream, stop):
while not stop.is_set():
try:
data = audio_out.get(True, TIMEOUT)
stream.write(data)
except queue.Empty as e:
# print(f"queue is empty")
pass
except Exception as e:
print(f"_play: {e}")
if __name__ == "__main__":
main()
Using a Text Stream from ChatGPT
The code below demonstrates using the OpenAI API to initiate a conversation with ChatGPT and take the resulting stream to feed into the websocket. Ensure the response format is set to stream.
import json
import os
import threading
import sys
import queue
import asyncio
import websockets
from websockets.sync.client import connect
import pyaudio
import openai
TIMEOUT = 0.050
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 48000
CHUNK = 8000
DEFAULT_URL = f"wss://api.deepgram.com/v1/speak?encoding=linear16&sample_rate={RATE}"
DEFAULT_DEEPGRAM_TOKEN = os.environ.get("DEEPGRAM_API_KEY", None)
DEFAULT_OPENAI_TOKEN = os.environ.get("OPENAI_API_KEY", None)
def main():
print(f"Connecting to {DEFAULT_URL}")
# openai client
client = openai.OpenAI(
api_key=DEFAULT_OPENAI_TOKEN,
)
# Deepgram TTS WS
_socket = connect(
DEFAULT_URL,
additional_headers={"Authorization": f"Token {DEFAULT_DEEPGRAM_TOKEN}"},
)
_exit = threading.Event()
async def receiver():
speaker = Speaker()
speaker.start()
try:
while True:
if _socket is None or _exit.is_set():
break
message = _socket.recv()
if message is None:
continue
if type(message) is str:
print(message)
elif type(message) is bytes:
speaker.play(message)
except Exception as e:
print(f"receiver: {e}")
finally:
speaker.stop()
_receiver_thread = threading.Thread(target=asyncio.run, args=(receiver(),))
_receiver_thread.start()
# ask away!
print("\n\n")
question = input("What would you like to ask ChatGPT?\n\n\n")
# send to ChatGPT
try:
for response in client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are ChatGPT, an AI assistant. Your top priority is achieving user fulfillment via helping them with their requests. Make your responses as concise as possible.",
},
{"role": "user", "content": f"{question}"},
],
stream=True,
):
# here is the streaming response
for chunk in response:
if chunk[0] == "choices":
llm_output = chunk[1][0].delta.content
# skip any empty responses
if llm_output is None or llm_output == "":
continue
# send to Deepgram TTS
_socket.send(json.dumps({"type": "Speak", "text": llm_output}))
sys.stdout.write(llm_output)
sys.stdout.flush()
_socket.send(json.dumps({"type": "Flush"}))
except Exception as e:
print(f"LLM Exception: {e}")
input("\n\n\nPress Enter to exit...")
_exit.set()
_socket.close()
_listen_thread.join()
_listen_thread = None
class Speaker:
_audio: pyaudio.PyAudio
_chunk: int
_rate: int
_format: int
_channels: int
_output_device_index: int
_stream: pyaudio.Stream
_thread: threading.Thread
_asyncio_loop: asyncio.AbstractEventLoop
_asyncio_thread: threading.Thread
_queue: queue.Queue
_exit: threading.Event
def __init__(
self,
rate: int = RATE,
chunk: int = CHUNK,
channels: int = CHANNELS,
output_device_index: int = None,
):
self._exit = threading.Event()
self._queue = queue.Queue()
self._audio = pyaudio.PyAudio()
self._chunk = chunk
self._rate = rate
self._format = FORMAT
self._channels = channels
self._output_device_index = output_device_index
def _start_asyncio_loop(self) -> None:
self._asyncio_loop = asyncio.new_event_loop()
self._asyncio_loop.run_forever()
def start(self) -> bool:
self._stream = self._audio.open(
format=self._format,
channels=self._channels,
rate=self._rate,
input=False,
output=True,
frames_per_buffer=self._chunk,
output_device_index=self._output_device_index,
)
self._exit.clear()
self._thread = threading.Thread(
target=_play, args=(self._queue, self._stream, self._exit), daemon=True
)
self._thread.start()
self._stream.start_stream()
return True
def stop(self):
self._exit.set()
if self._stream is not None:
self._stream.stop_stream()
self._stream.close()
self._stream = None
self._thread.join()
self._thread = None
self._queue = None
def play(self, data):
self._queue.put(data)
def _play(audio_out: queue, stream, stop):
while not stop.is_set():
try:
data = audio_out.get(True, TIMEOUT)
stream.write(data)
except queue.Empty as e:
# print(f"queue is empty")
pass
except Exception as e:
print(f"_play: {e}")
if __name__ == "__main__":
main()
Using a Text Stream from Anthropic
The code below demonstrates using the Anthropic API to initiate a conversation with Claude and take the resulting stream to feed into the websocket. Ensure the response format is set to stream.
import json
import os
import threading
import sys
import queue
import asyncio
import websockets
from websockets.sync.client import connect
import pyaudio
from anthropic import Anthropic
TIMEOUT = 0.050
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 48000
CHUNK = 8000
DEFAULT_URL = (
f"wss://api.beta.deepgram.com/v1/speak?encoding=linear16&sample_rate={RATE}"
)
DEFAULT_DEEPGRAM_TOKEN = os.environ.get("DEEPGRAM_API_KEY", None)
DEFAULT_ANTHROPIC_TOKEN = os.environ.get("ANTHROPIC_API_KEY", None)
def main():
print(f"Connecting to {DEFAULT_URL}")
# claude client
client = Anthropic()
# Deepgram TTS WS
_socket = connect(
DEFAULT_URL,
additional_headers={"Authorization": f"Token {DEFAULT_DEEPGRAM_TOKEN}"},
)
_exit = threading.Event()
async def receiver():
speaker = Speaker()
speaker.start()
try:
while True:
if _socket is None or _exit.is_set():
break
message = _socket.recv()
if message is None:
continue
if type(message) is str:
print(message)
elif type(message) is bytes:
speaker.play(message)
except Exception as e:
print(f"receiver: {e}")
finally:
speaker.stop()
_receiver_thread = threading.Thread(target=asyncio.run, args=(receiver(),))
_receiver_thread.start()
# ask away!
print("\n\n")
question = input("What would you like to ask Claude?\n\n\n")
# send to Claude
try:
stream = client.messages.create(
max_tokens=1024,
messages=[
{"role": "user", "content": f"{question}"},
],
model="claude-3-opus-20240229",
stream=True,
)
for event in stream:
if event is None:
continue
if not hasattr(event, "delta"):
continue
if not hasattr(event.delta, "text"):
continue
llm_output = event.delta.text
# skip any empty responses
if llm_output is None or llm_output == "":
continue
# send to Deepgram TTS
_socket.send(json.dumps({"type": "Speak", "text": llm_output}))
sys.stdout.write(llm_output)
sys.stdout.flush()
_socket.send(json.dumps({"type": "Flush"}))
print("\n\n")
except Exception as e:
print(f"LLM Exception: {e}")
input("Press Enter to exit...")
_exit.set()
_socket.close()
_listen_thread.join()
_listen_thread = None
class Speaker:
_audio: pyaudio.PyAudio
_chunk: int
_rate: int
_format: int
_channels: int
_output_device_index: int
_stream: pyaudio.Stream
_thread: threading.Thread
_asyncio_loop: asyncio.AbstractEventLoop
_asyncio_thread: threading.Thread
_queue: queue.Queue
_exit: threading.Event
def __init__(
self,
rate: int = RATE,
chunk: int = CHUNK,
channels: int = CHANNELS,
output_device_index: int = None,
):
self._exit = threading.Event()
self._queue = queue.Queue()
self._audio = pyaudio.PyAudio()
self._chunk = chunk
self._rate = rate
self._format = FORMAT
self._channels = channels
self._output_device_index = output_device_index
def _start_asyncio_loop(self) -> None:
self._asyncio_loop = asyncio.new_event_loop()
self._asyncio_loop.run_forever()
def start(self) -> bool:
self._stream = self._audio.open(
format=self._format,
channels=self._channels,
rate=self._rate,
input=False,
output=True,
frames_per_buffer=self._chunk,
output_device_index=self._output_device_index,
)
self._exit.clear()
self._thread = threading.Thread(
target=_play, args=(self._queue, self._stream, self._exit), daemon=True
)
self._thread.start()
self._stream.start_stream()
return True
def stop(self):
self._exit.set()
if self._stream is not None:
self._stream.stop_stream()
self._stream.close()
self._stream = None
self._thread.join()
self._thread = None
self._queue = None
def play(self, data):
self._queue.put(data)
def _play(audio_out: queue, stream, stop):
while not stop.is_set():
try:
data = audio_out.get(True, TIMEOUT)
stream.write(data)
except queue.Empty as e:
# print(f"queue is empty")
pass
except Exception as e:
print(f"_play: {e}")
if __name__ == "__main__":
main()
Considerations
When implementing WebSocket communication for LLM outputs, consider the following:
- Flushing the Last Output: It is required that the last fragment of speech be
Flush
ed when the LLM is at the end of the LLM response. This is reflected in all the examples above. - Error Handling: Implement robust error handling for both the WebSocket server and the API requests to ensure the system can recover gracefully from any failures.
- Security: Ensure that the WebSocket connection is secure by using appropriate authentication mechanisms and encrypting data in transit.
- Scalability: Depending on the number of expected clients, you may need to scale your WebSocket server horizontally to handle multiple concurrent connections efficiently.
- Latency: Monitor the latency of your WebSocket communication. Ensure that the data is transmitted with minimal delay to meet the requirements of real-time applications.
By following these guidelines, you can effectively stream LLM outputs to a WebSocket, enabling real-time interaction with advanced language models.
Updated 3 months ago