Server Sent Events with Elixir
SSE is old, but not a lot of Elixir materials out there. Took me 3 days to get this working, so congrats for finding this :) This is not a tutorial, but a code example from my project (it's open source so you can see the actual code).
This post will show how to:
- Set up Elixir to send SSE events
- Set up Pubsub for Elixir, to trigger an SSE event
- Set up a Javascript frontend to receive
Elixir side (parts 1 and 2)
First we have to install pubsub
so change these files:
# mix.exs
defp deps do
[
{:pubsub, "~> 1.0"} # Add this line
]
end
# application.exs
def start(_type, _args) do
children = [
Plug.Cowboy.child_spec(
protocol_options: [idle_timeout: :infinity] # This is needed
),
{PubSub, []} # Add this
]
opts = [strategy: :one_for_one, name: Bashboard.Supervisor]
Supervisor.start_link(children, opts)
end
idle_timeout: :infinity
is needed for cowboy (also if you use Phoenix), as otherwise your connection will time out in 60 sec with a confusing error:
# Firefox
The connection to URL was interrupted while the page was loading.
# Chrome
net::ERR_INCOMPLETE_CHUNKED_ENCODING 200 (OK)
So here's the router code (using Plug only) with my specific parts removed:
get "/sse" do
conn =
conn
|> put_resp_header("Cache-Control", "no-cache")
|> put_resp_header("connection", "keep-alive")
|> put_resp_header("Content-Type", "text/event-stream; charset=utf-8")
|> send_chunked(200)
# Listen to events in the topic :cagg
PubSub.subscribe(self(), :cagg)
# This loop will wait for messages infinitely
sse_loop(conn, self())
end
defp sse_loop(conn, pid) do
# receive is what stops the router from processing
# and waits for an event to come in
receive do
# Send updates when :cagg is finished.
{:cagg, :done} ->
# Query for updates.
widget = get_new_data()
# Send update.
chunk(conn, "event: message\ndata: #{Poison.encode!(widget)}\n\n")
# Wait for next publish.
sse_loop(conn, pid)
# Stop SSE if this conn is actually down.
# Ignore other processes finishing.
# Notice the "^" in front of pid.
{:DOWN, _reference, :process, ^pid, _type} ->
nil
# Don't stop SSE because of unrelated events.
_other ->
sse_loop(conn, pid)
end
end
That's the Elixir part. Now if I call PubSub.publish(:cagg, {:cagg, :done})
anywhere in the app, SSE will fethc updates and send to client.
Javascript client (part 3)
const source = new EventSource(sseURL);
source.addEventListener("error", function(event) {
console.error("[SSE close]", event);
});
source.addEventListener("open", function(event) {
console.info("[SSE open]", event);
});
source.addEventListener("message", function(event) {
// update your data or whatever
});
So it's not so hard at all, right? Well I saved you 3 days so high five!