Sending LLM Outputs to a WebSocket

Learn some tips and strategies to send LLM outputs to a Web Socket.

Below are some tips to handle sending text streams generated by Language Learning Models (LLMs) to a Deepgram WebSocket. This approach can be particularly useful for real-time applications that require immediate processing or display of data generated by LLMs such as ChatGPT, Anthropic, or LLAMA. By leveraging a a Deepgram WebSocket, you can achieve low-latency, bidirectional communication between your LLM and client applications.

Text Streams as Output

An LLM like ChatGPT will send text streams as output via a process that involves converting input text into tokens, processing these tokens through a neural network to generate context-aware embeddings, and then using a decoding strategy to generate and stream tokens as output incrementally.

This approach allows users to see the text as it is being generated, creating an interactive and dynamic experience.

Example

Consider a user inputting the prompt: “Tell me a story about a dragon.”

  1. The input is tokenized into tokens like [“Tell”, “me”, “a”, “story”, “about”, “a”, “dragon”, ”.”].
  2. These tokens are processed through the model layers to understand the context.
  3. The model starts generating tokens, perhaps beginning with “Once” followed by “upon”, “a”, “time”.
  4. Each token is streamed to the user interface as it is generated, displaying the text incrementally.
  5. The model continues generating tokens until the story reaches a logical conclusion or the maximum length is reached.

Feeding Simple Text to the Websocket

The code below demonstrates the simple use case of feeding simple text into the websocket.

Python
1import json
2import os
3import threading
4import asyncio
5import queue
6
7import websockets
8from websockets.sync.client import connect
9
10import pyaudio
11
12TIMEOUT = 0.050
13FORMAT = pyaudio.paInt16
14CHANNELS = 1
15RATE = 48000
16CHUNK = 8000
17
18DEFAULT_URL = f"wss://api.deepgram.com/v1/speak?encoding=linear16&sample_rate={RATE}"
19DEFAULT_TOKEN = os.environ.get("DEEPGRAM_API_KEY", None)
20
21def main():
22 print(f"Connecting to {DEFAULT_URL}")
23
24 _socket = connect(
25 DEFAULT_URL, additional_headers={"Authorization": f"Token {DEFAULT_TOKEN}"}
26 )
27 _exit = threading.Event()
28
29 _story = [
30 "The sun had just begun to rise over the sleepy town of Millfield.",
31 "Emily a young woman in her mid-twenties was already awake and bustling about.",
32 ]
33
34 async def receiver():
35 speaker = Speaker()
36 speaker.start()
37 try:
38 while True:
39 if _socket is None or _exit.is_set():
40 break
41
42 message = _socket.recv()
43 if message is None:
44 continue
45
46 if type(message) is str:
47 print(message)
48 elif type(message) is bytes:
49 speaker.play(message)
50 except Exception as e:
51 print(f"receiver: {e}")
52 finally:
53 speaker.stop()
54
55 _receiver_thread = threading.Thread(target=asyncio.run, args=(receiver(),))
56 _receiver_thread.start()
57
58 for text_input in _story:
59 print(f"Sending: {text_input}")
60 _socket.send(json.dumps({"type": "Speak", "text": text_input}))
61
62 print("Flushing...")
63 _socket.send(json.dumps({"type": "Flush"}))
64
65 input("Press Enter to exit...")
66 _exit.set()
67 _socket.close()
68
69 _listen_thread.join()
70 _listen_thread = None
71
72class Speaker:
73 _audio: pyaudio.PyAudio
74 _chunk: int
75 _rate: int
76 _format: int
77 _channels: int
78 _output_device_index: int
79
80 _stream: pyaudio.Stream
81 _thread: threading.Thread
82 _asyncio_loop: asyncio.AbstractEventLoop
83 _asyncio_thread: threading.Thread
84 _queue: queue.Queue
85 _exit: threading.Event
86
87 def __init__(
88 self,
89 rate: int = RATE,
90 chunk: int = CHUNK,
91 channels: int = CHANNELS,
92 output_device_index: int = None,
93 ):
94 self._exit = threading.Event()
95 self._queue = queue.Queue()
96
97 self._audio = pyaudio.PyAudio()
98 self._chunk = chunk
99 self._rate = rate
100 self._format = FORMAT
101 self._channels = channels
102 self._output_device_index = output_device_index
103
104 def _start_asyncio_loop(self) -> None:
105 self._asyncio_loop = asyncio.new_event_loop()
106 self._asyncio_loop.run_forever()
107
108 def start(self) -> bool:
109 self._stream = self._audio.open(
110 format=self._format,
111 channels=self._channels,
112 rate=self._rate,
113 input=False,
114 output=True,
115 frames_per_buffer=self._chunk,
116 output_device_index=self._output_device_index,
117 )
118
119 self._exit.clear()
120
121 self._thread = threading.Thread(
122 target=_play, args=(self._queue, self._stream, self._exit), daemon=True
123 )
124 self._thread.start()
125
126 self._stream.start_stream()
127
128 return True
129
130 def stop(self):
131 self._exit.set()
132
133 if self._stream is not None:
134 self._stream.stop_stream()
135 self._stream.close()
136 self._stream = None
137
138 self._thread.join()
139 self._thread = None
140
141 self._queue = None
142
143 def play(self, data):
144 self._queue.put(data)
145
146def _play(audio_out: queue, stream, stop):
147 while not stop.is_set():
148 try:
149 data = audio_out.get(True, TIMEOUT)
150 stream.write(data)
151 except queue.Empty as e:
152 # print(f"queue is empty")
153 pass
154 except Exception as e:
155 print(f"_play: {e}")
156
157if __name__ == "__main__":
158 main()
159

Using a Text Stream from ChatGPT

The code below demonstrates using the OpenAI API to initiate a conversation with ChatGPT and take the resulting stream to feed into the websocket. Ensure the response format is set to stream.

Python
1import json
2import os
3import threading
4import sys
5import queue
6import asyncio
7
8import websockets
9from websockets.sync.client import connect
10
11import pyaudio
12import openai
13
14TIMEOUT = 0.050
15FORMAT = pyaudio.paInt16
16CHANNELS = 1
17RATE = 48000
18CHUNK = 8000
19
20DEFAULT_URL = f"wss://api.deepgram.com/v1/speak?encoding=linear16&sample_rate={RATE}"
21DEFAULT_DEEPGRAM_TOKEN = os.environ.get("DEEPGRAM_API_KEY", None)
22DEFAULT_OPENAI_TOKEN = os.environ.get("OPENAI_API_KEY", None)
23
24def main():
25 print(f"Connecting to {DEFAULT_URL}")
26
27 # openai client
28 client = openai.OpenAI(
29 api_key=DEFAULT_OPENAI_TOKEN,
30 )
31
32 # Deepgram TTS WS
33 _socket = connect(
34 DEFAULT_URL,
35 additional_headers={"Authorization": f"Token {DEFAULT_DEEPGRAM_TOKEN}"},
36 )
37 _exit = threading.Event()
38
39 async def receiver():
40 speaker = Speaker()
41 speaker.start()
42 try:
43 while True:
44 if _socket is None or _exit.is_set():
45 break
46
47 message = _socket.recv()
48 if message is None:
49 continue
50
51 if type(message) is str:
52 print(message)
53 elif type(message) is bytes:
54 speaker.play(message)
55 except Exception as e:
56 print(f"receiver: {e}")
57 finally:
58 speaker.stop()
59
60 _receiver_thread = threading.Thread(target=asyncio.run, args=(receiver(),))
61 _receiver_thread.start()
62
63 # ask away!
64 print("\n\n")
65 question = input("What would you like to ask ChatGPT?\n\n\n")
66
67 # send to ChatGPT
68 try:
69 for response in client.chat.completions.create(
70 model="gpt-4o-mini",
71 messages=[
72 {
73 "role": "system",
74 "content": "You are ChatGPT, an AI assistant. Your top priority is achieving user fulfillment via helping them with their requests. Make your responses as concise as possible.",
75 },
76 {"role": "user", "content": f"{question}"},
77 ],
78 stream=True,
79 ):
80 # here is the streaming response
81 for chunk in response:
82 if chunk[0] == "choices":
83 llm_output = chunk[1][0].delta.content
84
85 # skip any empty responses
86 if llm_output is None or llm_output == "":
87 continue
88
89 # send to Deepgram TTS
90 _socket.send(json.dumps({"type": "Speak", "text": llm_output}))
91 sys.stdout.write(llm_output)
92 sys.stdout.flush()
93
94 _socket.send(json.dumps({"type": "Flush"}))
95 except Exception as e:
96 print(f"LLM Exception: {e}")
97
98 input("\n\n\nPress Enter to exit...")
99 _exit.set()
100 _socket.close()
101
102 _listen_thread.join()
103 _listen_thread = None
104
105class Speaker:
106 _audio: pyaudio.PyAudio
107 _chunk: int
108 _rate: int
109 _format: int
110 _channels: int
111 _output_device_index: int
112
113 _stream: pyaudio.Stream
114 _thread: threading.Thread
115 _asyncio_loop: asyncio.AbstractEventLoop
116 _asyncio_thread: threading.Thread
117 _queue: queue.Queue
118 _exit: threading.Event
119
120 def __init__(
121 self,
122 rate: int = RATE,
123 chunk: int = CHUNK,
124 channels: int = CHANNELS,
125 output_device_index: int = None,
126 ):
127 self._exit = threading.Event()
128 self._queue = queue.Queue()
129
130 self._audio = pyaudio.PyAudio()
131 self._chunk = chunk
132 self._rate = rate
133 self._format = FORMAT
134 self._channels = channels
135 self._output_device_index = output_device_index
136
137 def _start_asyncio_loop(self) -> None:
138 self._asyncio_loop = asyncio.new_event_loop()
139 self._asyncio_loop.run_forever()
140
141 def start(self) -> bool:
142 self._stream = self._audio.open(
143 format=self._format,
144 channels=self._channels,
145 rate=self._rate,
146 input=False,
147 output=True,
148 frames_per_buffer=self._chunk,
149 output_device_index=self._output_device_index,
150 )
151
152 self._exit.clear()
153
154 self._thread = threading.Thread(
155 target=_play, args=(self._queue, self._stream, self._exit), daemon=True
156 )
157 self._thread.start()
158
159 self._stream.start_stream()
160
161 return True
162
163 def stop(self):
164 self._exit.set()
165
166 if self._stream is not None:
167 self._stream.stop_stream()
168 self._stream.close()
169 self._stream = None
170
171 self._thread.join()
172 self._thread = None
173
174 self._queue = None
175
176 def play(self, data):
177 self._queue.put(data)
178
179def _play(audio_out: queue, stream, stop):
180 while not stop.is_set():
181 try:
182 data = audio_out.get(True, TIMEOUT)
183 stream.write(data)
184 except queue.Empty as e:
185 # print(f"queue is empty")
186 pass
187 except Exception as e:
188 print(f"_play: {e}")
189
190if __name__ == "__main__":
191 main()

Using a Text Stream from Anthropic

The code below demonstrates using the Anthropic API to initiate a conversation with Claude and take the resulting stream to feed into the websocket. Ensure the response format is set to stream.

Python
1import json
2import os
3import threading
4import sys
5import queue
6import asyncio
7
8import websockets
9from websockets.sync.client import connect
10
11import pyaudio
12from anthropic import Anthropic
13
14TIMEOUT = 0.050
15FORMAT = pyaudio.paInt16
16CHANNELS = 1
17RATE = 48000
18CHUNK = 8000
19
20DEFAULT_URL = (
21 f"wss://api.beta.deepgram.com/v1/speak?encoding=linear16&sample_rate={RATE}"
22)
23DEFAULT_DEEPGRAM_TOKEN = os.environ.get("DEEPGRAM_API_KEY", None)
24DEFAULT_ANTHROPIC_TOKEN = os.environ.get("ANTHROPIC_API_KEY", None)
25
26def main():
27 print(f"Connecting to {DEFAULT_URL}")
28
29 # claude client
30 client = Anthropic()
31
32 # Deepgram TTS WS
33 _socket = connect(
34 DEFAULT_URL,
35 additional_headers={"Authorization": f"Token {DEFAULT_DEEPGRAM_TOKEN}"},
36 )
37 _exit = threading.Event()
38
39 async def receiver():
40 speaker = Speaker()
41 speaker.start()
42 try:
43 while True:
44 if _socket is None or _exit.is_set():
45 break
46
47 message = _socket.recv()
48 if message is None:
49 continue
50
51 if type(message) is str:
52 print(message)
53 elif type(message) is bytes:
54 speaker.play(message)
55 except Exception as e:
56 print(f"receiver: {e}")
57 finally:
58 speaker.stop()
59
60 _receiver_thread = threading.Thread(target=asyncio.run, args=(receiver(),))
61 _receiver_thread.start()
62
63 # ask away!
64 print("\n\n")
65 question = input("What would you like to ask Claude?\n\n\n")
66
67 # send to Claude
68 try:
69 stream = client.messages.create(
70 max_tokens=1024,
71 messages=[
72 {"role": "user", "content": f"{question}"},
73 ],
74 model="claude-3-opus-20240229",
75 stream=True,
76 )
77 for event in stream:
78 if event is None:
79 continue
80 if not hasattr(event, "delta"):
81 continue
82 if not hasattr(event.delta, "text"):
83 continue
84
85 llm_output = event.delta.text
86
87 # skip any empty responses
88 if llm_output is None or llm_output == "":
89 continue
90
91 # send to Deepgram TTS
92 _socket.send(json.dumps({"type": "Speak", "text": llm_output}))
93 sys.stdout.write(llm_output)
94 sys.stdout.flush()
95
96 _socket.send(json.dumps({"type": "Flush"}))
97 print("\n\n")
98 except Exception as e:
99 print(f"LLM Exception: {e}")
100
101 input("Press Enter to exit...")
102 _exit.set()
103 _socket.close()
104
105 _listen_thread.join()
106 _listen_thread = None
107
108class Speaker:
109 _audio: pyaudio.PyAudio
110 _chunk: int
111 _rate: int
112 _format: int
113 _channels: int
114 _output_device_index: int
115
116 _stream: pyaudio.Stream
117 _thread: threading.Thread
118 _asyncio_loop: asyncio.AbstractEventLoop
119 _asyncio_thread: threading.Thread
120 _queue: queue.Queue
121 _exit: threading.Event
122
123 def __init__(
124 self,
125 rate: int = RATE,
126 chunk: int = CHUNK,
127 channels: int = CHANNELS,
128 output_device_index: int = None,
129 ):
130 self._exit = threading.Event()
131 self._queue = queue.Queue()
132
133 self._audio = pyaudio.PyAudio()
134 self._chunk = chunk
135 self._rate = rate
136 self._format = FORMAT
137 self._channels = channels
138 self._output_device_index = output_device_index
139
140 def _start_asyncio_loop(self) -> None:
141 self._asyncio_loop = asyncio.new_event_loop()
142 self._asyncio_loop.run_forever()
143
144 def start(self) -> bool:
145 self._stream = self._audio.open(
146 format=self._format,
147 channels=self._channels,
148 rate=self._rate,
149 input=False,
150 output=True,
151 frames_per_buffer=self._chunk,
152 output_device_index=self._output_device_index,
153 )
154
155 self._exit.clear()
156
157 self._thread = threading.Thread(
158 target=_play, args=(self._queue, self._stream, self._exit), daemon=True
159 )
160 self._thread.start()
161
162 self._stream.start_stream()
163
164 return True
165
166 def stop(self):
167 self._exit.set()
168
169 if self._stream is not None:
170 self._stream.stop_stream()
171 self._stream.close()
172 self._stream = None
173
174 self._thread.join()
175 self._thread = None
176
177 self._queue = None
178
179 def play(self, data):
180 self._queue.put(data)
181
182def _play(audio_out: queue, stream, stop):
183 while not stop.is_set():
184 try:
185 data = audio_out.get(True, TIMEOUT)
186 stream.write(data)
187 except queue.Empty as e:
188 # print(f"queue is empty")
189 pass
190 except Exception as e:
191 print(f"_play: {e}")
192
193if __name__ == "__main__":
194 main()

Considerations

When implementing WebSocket communication for LLM outputs, consider the following:

  1. Flushing the Last Output: It is required that the last fragment of speech be Flushed when the LLM is at the end of the LLM response. This is reflected in all the examples above.
  2. Error Handling: Implement robust error handling for both the WebSocket server and the API requests to ensure the system can recover gracefully from any failures.
  3. Security: Ensure that the WebSocket connection is secure by using appropriate authentication mechanisms and encrypting data in transit.
  4. Scalability: Depending on the number of expected clients, you may need to scale your WebSocket server horizontally to handle multiple concurrent connections efficiently.
  5. Latency: Monitor the latency of your WebSocket communication. Ensure that the data is transmitted with minimal delay to meet the requirements of real-time applications.

By following these guidelines, you can effectively stream LLM outputs to a WebSocket, enabling real-time interaction with advanced language models.

Built with