This post is about using Elixir to stream OpenAI Chat Response’s in real time! If you want to keep your latency low then check out how to get started. You could be up and running in minutes.
Problem
You are building an application that interfaces with OpenAI’s ChatGPT and want to create a real time interactive experience just like the OpenAI Chat UI.
To do this we will need to work with the ChatGPT Streaming API, which is built using the HTTP Server Sent Events . Elixir is great for real time applications, how can we use the streaming API with Elixir?
Solution
Server Sent Events are a streaming response protocol compatible with HTTP/1.1. A GET
request is made to a server and will keep the connection alive sending messages in the format data: <message>\n\n
until the connection closes. Browsers handle this by parsing the data line by line and giving you the message stream. If you are curious, yes Plug does support it!
Let’s start by adding the fantastic Req to your dependencies. Req is a high level HTTP Client Library built by Elixir Contributor Wojtek Mach. It builds off of pure Elixir libraries and uses common Elixir idioms and patterns. It also comes with tons of developer UX such has handlers for common response types, streaming requests, and common header values.
Overall if we want a “just works” http client library use Req, if we want something a little lower level use Finch, which is what Req is built on top of. Today we will end up using both!
{:req, github: "wojtekmach/req"}
We’re using the main
branch here until a version > 0.3.6
is deployed. The fine grained control of Streams was just added to Req and will be available with the next version. We could have just used the Finch Library directly but Req is handy enough I still grabbed it!
We’re going to make a single function called gpt_stream
that takes a prompt and callback function. And luckily for us the Req library has an example from the documentation that handle’s this case! So building off of that:
defmodule OpenAI do
def gpt_stream(prompt, cb) do
fun = fn request, finch_request, finch_name, finch_options ->
fun = fn
{:status, status}, response ->
%{response | status: status}
{:headers, headers}, response ->
%{response | headers: headers}
{:data, data}, response ->
body =
data
|> String.split("data: ")
|> Enum.map(fn str ->
str
|> String.trim()
|> decode_body(cb)
end)
|> Enum.filter(fn d -> d != :ok end)
old_body = if response.body == "", do: [], else: response.body
%{response | body: old_body ++ body}
end
case Finch.stream(finch_request, finch_name, Req.Response.new(), fun, finch_options) do
{:ok, response} -> {request, response}
{:error, exception} -> {request, exception}
end
end
Req.post!("https://api.openai.com/v1/chat/completions",
json: %{
# Pick your model here
model: "gpt-3.5-turbo-0301",
messages: [%{role: "user", content: prompt}],
stream: true
},
auth: {:bearer, System.fetch_env!("OPENAI_KEY")},
finch_request: fun
)
end
defp decode_body("", _), do: :ok
defp decode_body("[DONE]", _), do: :ok
defp decode_body(json, cb), do: cb.(Jason.decode!(json))
end
Some functions are easier to read from the bottom up, so let’s start there. Req.post!()
takes the usual parameters:
- URL
- JSON body with arguments
- auth header with our
Bearer
token finch_request
: this one requires some explaining. Req is a high level HTTP Library built on top of the lower level Finch HTTP library. With this option, we can configure the Finch request handling manually using a function callback. That’s what we’re doing here.
The Finch.stream/5 function takes a callback function where we define how to handle streamed data, headers and the status. Each time returning the response or an error. In our case we handle status by setting the status on the response, headers by setting the headers, and data by calling our callback (cb) function with said data.
The Chat Completions API will return the streamed data in lines with format data: <JSON>\n\ndata: <JSON>...
until it returns a data: [DONE]
which is a little strange since Server Sent Events end when the connection closes but so it goes! We handle this in our decode_body
which checks for empty strings, and [DONE]
via pattern matching.
We are also appending the data to the body just in case we want to use it after the stream is complete.
And that’s basically it! We can call our function like so
OpenAI.gpt_stream("How do I train a cat to shake hands?", fn data ->
IO.inspect(data)
end)
You can do whatever you want with the data, such as sending the data to a pid
or PubSub.broadcast
it, but I will leave that as an exercise to the reader!