Server Sent Events with Elixir

SSE is old, but not a lot of Elixir materials out there. Took me 3 days to get this working, so congrats for finding this :) This is not a tutorial, but a code example from my project (it's open source so you can see the actual code).

This post will show how to:

  1. Set up Elixir to send SSE events
  2. Set up Pubsub for Elixir, to trigger an SSE event
  3. Set up a Javascript frontend to receive

Elixir side (parts 1 and 2)

First we have to install pubsub so change these files:

# mix.exs
defp deps do
[
{:pubsub, "~> 1.0"}  # Add this line
]
end
# application.exs

  def start(_type, _args) do
    children = [
      Plug.Cowboy.child_spec(
        protocol_options: [idle_timeout: :infinity] # This is needed
      ),
      {PubSub, []} # Add this
    ]

    opts = [strategy: :one_for_one, name: Bashboard.Supervisor]
    Supervisor.start_link(children, opts)
  end

idle_timeout: :infinity is needed for cowboy (also if you use Phoenix), as otherwise your connection will time out in 60 sec with a confusing error:

# Firefox
The connection to URL was interrupted while the page was loading.

# Chrome
net::ERR_INCOMPLETE_CHUNKED_ENCODING 200 (OK)

So here's the router code (using Plug only) with my specific parts removed:


  get "/sse" do
    conn =
      conn
      |> put_resp_header("Cache-Control", "no-cache")
      |> put_resp_header("connection", "keep-alive")
      |> put_resp_header("Content-Type", "text/event-stream; charset=utf-8")
      |> send_chunked(200)

	# Listen to events in the topic :cagg
    PubSub.subscribe(self(), :cagg)

    # This loop will wait for messages infinitely
    sse_loop(conn, self())
  end

  defp sse_loop(conn, pid) do
    # receive is what stops the router from processing
    # and waits for an event to come in
    receive do
      # Send updates when :cagg is finished.
      {:cagg, :done} ->
        # Query for updates.
        widget = get_new_data()

        # Send update.
        chunk(conn, "event: message\ndata: #{Poison.encode!(widget)}\n\n")
        
        # Wait for next publish.
        sse_loop(conn, pid)

      # Stop SSE if this conn is actually down.
      # Ignore other processes finishing.
      # Notice the "^" in front of pid.
      {:DOWN, _reference, :process, ^pid, _type} ->
        nil

      # Don't stop SSE because of unrelated events.
      _other ->
        sse_loop(conn, pid)
    end
  end

That's the Elixir part. Now if I call PubSub.publish(:cagg, {:cagg, :done}) anywhere in the app, SSE will fethc updates and send to client.

Javascript client (part 3)

const source = new EventSource(sseURL);

source.addEventListener("error", function(event) {
	console.error("[SSE close]", event);
});

source.addEventListener("open", function(event) {
	console.info("[SSE open]", event);
});

source.addEventListener("message", function(event) {
	// update your data or whatever
});

So it's not so hard at all, right? Well I saved you 3 days so high five!