Weโre Fly.io. We run apps for our users on hardware we host around the world. Fly.io happens to be a great place to run Phoenix applications. Check out how to get started!
Ever wanted to create something like a ChatGPT interface that asynchronously streams in a response? I have! Assuming weโre building it with LiveView, then we want to run that streaming conversation in a separate process because we donโt want to block our LiveView from being highly responsive to user input. In this post, weโll cover how to build that type of async code in our LiveView using the awesome building blocks available to us in Elixir. We’ll also have some fun learning a bit more about Elixir’s concurrency primitives along the way!
Dramatic Version: ๐ญ This is a story of two star-crossed lovers processes. When linked, their fates are intertwined and a tragedy spreads it’s destruction. Can the fates be changed? Perhaps… but at what cost? Actually, it’s really easy. ๐
What we’re solving
Elixir is awesome at concurrency. Processes, links, and monitors form a powerful foundation. Built on top of that, we have Supervisors, Tasks and even LiveView. Sometimes, the challenge can be knowing how to use the different tools available to us to build what we want.
In this case, there isnโt a ready-made solution already built for us called something like a โAsyncPatternGoodForChatGPTStyleUIโ. Fortunately, it isnโt hard to build what we want. In fact, once we see how easy it is, it’ll hopefully be clear why we donโt need a pre-built library for it.
Let’s visualize what we’re building. It’s a LiveView that launches a Task, and we’re more interested in the messages sent back while that Task is running than the final result of the Task. Our focus on the messages during the process changes how we design our solution. If you are more focused on the final result of a Task, then check out this excellent post.
Before we jump into the code, letโs define a bit more of what we want the code to do.
Goals
Here weโll outline how we want the application to behave, particularly around the async process and our LiveView.
- The LiveView process remains unblocked. Our blocking external calls should happen in a separate process. Letโs keep the UI buttery smooth.
- When the LiveView process goes away, the other process should too. Because the worker process only exists to fetch and provide data to the LiveView, we want the async process to stop if the user closes the page or navigates away.
- When the async process crashes, it should NOT kill our LiveView. We expect the async process talking to an external API will fail sometimes. We do not want that to crash the UI for the user.
- Ability to cancel a running worker process. We want the ability to cancel a running async worker from the LiveView.
- We care about the side-effects, not the final result. Thinking back to ChatGPT as an example, the stream of text coming in is the side-effect we want. We could wait for the full text to be collected and then returned as the final โhere it all is!โ result, but thatโs not actually what we want. The user can start reading the streamed in result long before the full text is available. These small chunks of data can be sent over as they are received and this is the side-effect weโre talking about.
All right! That may seem like a hefty set of requirements, but itโll give us the user experience we want and is actually easier to do than you may suspect.
Here is a visual example of the behavior we want.
What Building Blocks to Use?
Well, we know we want a separate process to do the work of talking to an external API. We could use spawn
, a GenServer, or a Task. Actually, both GenServer and Task are abstractions on top of spawn
. For this solution, weโll use a Task. Why? We only need the additional process to run when thereโs an API request to make. Thatโs actually a really good fit for a Task.
Great, so we know we want a Task, but there are a number of different ways to run one. Ever gotten confused about how they differ? Yup. Me too.
The one we want is Task.start_link/1
. Why? This one links the spawned task to our LiveView process. Letโs look at that next.
Thinking about Linking
Processes can be easily linked manually using Process.link/1
or when starting a new processes using spawn_link/1
. In our situation, we want a Task and when we call Task.start_link/1
, it makes it easy to link the Task process to our LiveView.
When two processes are linked together and one process dies, the linked partner dies as well. Itโs Romeo and Juliet on the BEAM. ๐๐ต So sad.
A visual story of our linked star-crossed processes:
When the Juliet process diesโฆ
โฆa signal is sent to the Romeo process, killing it as well.
It’s important to note that this linking process goes both ways. If the Romeo process died first, a signal would be sent to the Juliet process killing it.
Tip: I find it helpful to think about processes as people. This makes the interactions easier to understand and feels more natural.
Back to our Problem
When we start a Task using Task.start_link/1
, it creates the process and links it to our LiveView process for us. Neat!
This means when our LiveView closes, any running Task automatically gets killed. But waitโฆ doesn’t that mean that when our task ends, it kills our LiveView? Uhโฆ yes. But hold on! Unlike the famous Shakespearean play, our processes can change their fates.
The signal sent when a process dies is a special system-level message telling the other linked process to exit. Normally, we never see those messages. However, if we use this snippet of code during the setup of our LiveView (like in the mount
), then we will see those messages telling our process to exit.
Process.flag(:trap_exit, true)
If we remember that our LiveView is a process, then this elegant solution isn’t far fetched. This is also referred to as trapping exits.
Now, when our linked Task either completes normally or crashes, the LiveView gets the message that it should exit also. It comes in as a handle_info
callback and looks like this:
def handle_info({:EXIT, pid, reason}, socket) do
IO.puts("Received exit signal for pid #{inspect(pid)} with reason: #{inspect(reason)}")
{:noreply, socket}
end
By catching this message and NOT returning a response like {:stop, reason}
, we effectively handle the request for our LiveView process to close, but choose to not close. Excellent! That means weโve got the linking working the way we want.
Let’s recap how the linking will behave:
- When our LiveView closes, it kills a running task.
- When our Task crashes/dies/finishes, it notifies our LiveView.
Let’s see what we can do with that.
Trapping and Handling Exits
Here’s a LiveView code sample of what we’ve covered so far. We’ll discuss it after.
def mount(_params, _session, socket) do
# Trap exits
Process.flag(:trap_exit, true)
socket =
socket
# keep track of a running task for the UI
|> assign(:running_task, nil)
{:ok, socket}
end
def handle_info({:EXIT, pid, reason}, socket) do
socket =
if pid == socket.assigns.running_task do
# the closed PID was our task, remove it
assign(socket, :running_task, nil)
else
socket
end
{:noreply, socket}
end
In the LiveView’s mount
callback, we setup to trap exits. It would be great if our UI displayed when a task was running and even let us cancel it. To do that, we can track our :running_task
as a pid
(Process ID).
In our handle_info
callback, we keep an exiting Task from killing the LiveView process, but we can also clear our reference to the running Task! An argument in the message is the pid
of the process that exited.
This makes it really easy to keep our :running_task
value up-to-date without us having to manually track what’s happening with it.
Now we’re ready to create our Task to do some work!
Starting the Task
In this article we’re more concerned with how the LiveView and Task processes interact and we’re not focusing on the actual work the Task is doing.
When the user clicks the “Start” button, the handle_event
clears the messages and starts the Task.
def handle_event("start", _params, socket) do
socket =
socket
|> assign(:messages, [])
|> start_test_task()
{:noreply, socket}
end
In our start_test_task
function, we use Task.start_link/1
which takes an anonymous function to execute asynchronously.
def start_test_task(socket) do
live_view_pid = self()
{:ok, task_pid} =
Task.start_link(fn ->
# the code to run async
Enum.each(1..5, fn n ->
Process.sleep(1_000)
send(live_view_pid, {:task_message, "Async work chunk #{n}"})
end)
# function result discarded -- it isn't used
:ok
end)
# returning the socket so it's pipe-friendly
assign(socket, :running_task, task_pid)
end
Before we start the Task, we call self()
and assign the pid
of the LiveView process to a variable that can be referenced in our async function. This passes the pid
of the LiveView via a closure to our async function.
The “work” being done in our function is looping 5 times, sleeping for 1 second, then sending a message to the LiveView process about the chunk of work we completed.
When the task finishes running the function, it auto-exits.
Concurrency Overview
For those new to Elixir’s actor model concurrency, the code may appear confusing, but it’s actually quite elegant. Here’s a different perspective of what’s happening:
In Elixir, processes are cheap and quick to start. As soon as we start the Task, we get back the new pid
(Process ID) and store it in our LiveView’s assigns. The LiveView now knows we’re running an async task and our UI automatically updates to reflect it. Nothing is blocked in our main LiveView process. Yay!
As the Task runs, it does work and sends messages back to our LiveView which we respond to in a handle_info
function with the pattern match for the message.
When the Task completes, it auto-exits. Because the processes are linked and we are trapping exits, the LiveView process receives the system :EXIT
message and we remove the :running_task
pid
from the assigns. Our LiveView’s UI automatically updates to remove the “Cancel” button and display the “Start” button again.
What’s amazing to me is how elegantly this approach works and how natural it feels.
More Insight into Task Exits
We’d like some more insight into when our Task exits. After all, we want to make sure it has the behavior we want.
Let’s sprinkle in some IO.puts
print messages to see what’s happening.
def handle_info({:EXIT, pid, reason}, socket) do
IO.puts("Received exit signal for pid #{inspect(pid)} with reason: #{inspect(reason)}")
# ...
end
def start_test_task(socket) do
live_view_pid = self()
{:ok, task_pid} =
Task.start_link(fn ->
# the code to run async
Enum.each(1..5, fn n ->
Process.sleep(1_000)
IO.puts "SENDING ASYNC TASK MESSAGE #{n}"
# raise "TASK RAISED EXCEPTION"
send(live_view_pid, {:task_message, "Async work chunk #{n}"})
end)
# ...
end
We added two IO.puts
commands. Now let’s see the results in the console.
Normal completion
This is what a normal Task completion looks like.
SENDING ASYNC TASK MESSAGE 1
SENDING ASYNC TASK MESSAGE 2
SENDING ASYNC TASK MESSAGE 3
SENDING ASYNC TASK MESSAGE 4
SENDING ASYNC TASK MESSAGE 5
Received exit signal for pid #PID<0.6890.0> with reason: :normal
Notice that the exit reason
is :normal
. This tells us under what circumstances the Task exited. It was done!
If you’re thinking, “Hey, couldn’t we pattern match on the exit reason if we wanted to?” then yes, you’re right! We’ll look a bit more at some other exit reasons in a minute.
Navigate away from the LiveView
One of our goals is that when the user navigates away from the LiveView, a running Task should immediately be cancelled. Let’s see what happens when we navigate away.
SENDING ASYNC TASK MESSAGE 1
SENDING ASYNC TASK MESSAGE 2
SENDING ASYNC TASK MESSAGE 3
We received 3 messages before navigating somewhere else. In the console we see that the Task stopped executing. Excellent!
The Task was notified to close because of the process link. Since the Task isn’t setup to trap exits, we don’t see messages about that happening. What we do see is that the task stopped running as soon as the LiveView process exited. This is exactly what we want here. If the LiveView is gone, the Task should stop immediately and it did.
Task crashes
What happens if the Task crashes? In our Task function, right after our sleep, we’ll add raise "TASK RAISED EXCEPTION"
and see what happens.
SENDING ASYNC TASK MESSAGE 1
[error] Task #PID<0.7046.0> started from #PID<0.7035.0> terminating
** (RuntimeError) TASK RAISED EXCEPTION
(my_app 0.1.0) lib/my_app_web/live/task_test_live/index.ex:81: anonymous fn/2 in MyAppWeb.TaskTestLive.Index.start_test_task/1
(elixir 1.15.4) lib/enum.ex:989: anonymous fn/3 in Enum.each/2
(elixir 1.15.4) lib/enum.ex:4379: Enum.reduce_range/5
(elixir 1.15.4) lib/enum.ex:2514: Enum.each/2
(my_app 0.1.0) lib/my_app_web/live/task_test_live/index.ex:78: anonymous fn/1 in MyAppWeb.TaskTestLive.Index.start_test_task/1
(elixir 1.15.4) lib/task/supervised.ex:101: Task.Supervised.invoke_mfa/2
Function: #Function<0.12965136/0 in MyAppWeb.TaskTestLive.Index.start_test_task/1>
Args: []
Received exit signal for pid #PID<0.7046.0> with reason: {%RuntimeError{message: "TASK RAISED EXCEPTION"}, [{MyAppWeb.TaskTestLive.Index, :"-start_test_task/1-fun-0-", 2, [file: ~c"lib/my_app_web/live/task_test_live/index.ex", line: 81, error_info: %{module: Exception}]}, {Enum, :"-each/2-fun-0-", 3, [file: ~c"lib/enum.ex", line: 989]}, {Enum, :reduce_range, 5, [file: ~c"lib/enum.ex", line: 4379]}, {Enum, :each, 2, [file: ~c"lib/enum.ex", line: 2514]}, {MyAppWeb.TaskTestLive.Index, :"-start_test_task/1-fun-1-", 1, [file: ~c"lib/my_app_web/live/task_test_live/index.ex", line: 78]}, {Task.Supervised, :invoke_mfa, 2, [file: ~c"lib/task/supervised.ex", line: 101]}]}
When our Task crashed, we notice that our LiveView doesn’t crash but still gets notified of the exit. The last message in the console output is for the EXIT. Note that the reason is a tuple with the exception raised. Nice! That could be helpful information, especially if we wanted to pattern match on an error. ๐
In our application, we’re already removing the :running_task
from our assigns and our UI updates correctly when the task crashed. No extra work needed there!
Cancelling the Task
One of our goals was to be able to cancel a running Task. How do we do that?
To review, we are tracking the pid
of the Task in :running_task
. This serves two purposes.
- we know when a task is running
- we have the
pid
of the task
Let’s add a “Cancel” button to the UI. The following markup displays a “Start” button when no Task is running and the “Cancel” button when a Task is running.
<div class="...">
<.button :if={is_nil(@running_task)} phx-click="start">Start</.button>
<.button :if={@running_task} phx-click="cancel">Cancel</.button>
</div>
Next we’ll handle when the “Cancel” button is clicked. Let’s see how we can do that.
def handle_event("cancel", _params, socket) do
task_id = socket.assigns.running_task
socket =
if task_id do
Process.exit(task_id, :kill)
# display it was cancelled.
put_flash(socket, :info, "Cancelled")
else
socket
end
{:noreply, socket}
end
It’s really simple. We send an exit
message to the Task using it’s pid
. The message we send is :kill
to let the system know we want it closed immediately.
Finally, we add a flash message to display that it was cancelled.
Let’s see what happens in the console when we cancel a running Task.
SENDING ASYNC TASK MESSAGE 1
SENDING ASYNC TASK MESSAGE 2
Received exit signal for pid #PID<0.6888.0> with reason: :killed
The Cancel button was clicked and the Task exited. Our LiveView was notified that it exited and the reason was :killed
. How cool is that?
The whole “cancel” feature almost feels anti-climactic because it was so easy.
It’s worth pointing out that we don’t have to explicitly handle removing the Task pid
from the :running_task
in our assigns because even on a cancel, the EXIT message is received and handled in one place. In fact, let’s look more at that next.
Making a Nice, Orderly Exit
I really like how the Task can exit under different circumstances but it all passes through the same handle_info
callback. The same handler fires when the Task:
- completes successfully
- is cancelled
- errors and crashes
This makes keeping the UI up-to-date with a Task’s running state very simple and clean.
Also, the reason for the exit is sent along in the message which we can pattern match on to handle in the most appropriate way for our project.
It’s a really powerful feature of the BEAM (the VM runtime) when all we have to do is tell the BEAM that want to be notified when a linked process exits. Love it.
Where We Ended Up
We achieved our goal. We set out to figure out how to architect the building blocks so our LiveView could run an asynchronous Task and receives updates of the progress along the way. The example use-case was a ChatGPT-style flow of messages that builds up a result.
In the process, we created a beautifully working async solution in less than 100 lines of code. Here’s a Gist of the code.
We also learned more about how processes can be linked together and the resulting star-crossed processes’ fates are bound together as well. Then we saw that trapping exits lets us change that fate and enables a simple and elegant solution.
Sometimes the challenge is knowing which of the powerful building blocks are the right tools for our specific situation. Hopefully these building blocks are a bit more approachable now and you’ll see more opportunities for using them in the future too!