Realtime Market-Data Updates with Elixir

Cryptocurrency exchanges usually open their realtime feed for free and, like Coinbase Pro, without even having to create an account. This gives us a great way to build an architecture around realtime market data.
In this article we see how to build an Elixir application to get realtime updates from the coinbase websocket feed, handling crashes and disconnections.

Realtime Crypto Market Data

We mainly focus on the stream of new trades at the bottom-right, in the Trade History column, where it shows the trades of the last few minutes in descending order. We see the last trade price on the top-bar is $3356.01, which reflects the trade at the first row happened at 12:36:30. The data we want to download, at the moment, is just the stream of trades.
Let’s take a look at the documentation of the websocket-feed.

We need first to connect to the websocket service, using the wss:// URI.
Once connected we just need to send a subscription JSON message like this.

"type": "subscribe",
"product_ids": [
"channels": [

There are many channels: level2, ticker, heartbeat, matches etc.. We focus just on the matches channel, which represents the stream of trades. In this case we subscribe for the “BTC-USD” product, but we could subscribe to many different products at the same time.

WebSocket Connection

$ mix new coinbase --sup

We need now a websocket client. I’ve used WebSockex in the last few months which, as of now, it’s actively maintained. Let’s add it to our dependencies in the mix.exs file, along with the Poison JSON library. We will need Poison to encode and decode JSON strings.

defp deps do
{:websockex, "~> 0.4.2"},
{:poison, "~> 4.0"},

And run the deps.get command to download the dependencies.

$ mix deps.get
Resolving Hex dependencies...
Dependency resolution completed:
poison 4.0.1
websockex 0.4.2
* Getting websockex (Hex package)
* Getting poison (Hex package)

Great, we can now start building our Coinbase client using Websockex.
Let’s focus, at first, on the client’s connection

defmodule Coinbase.Client do
use WebSockex

@url "wss://"

def start_link(product_ids[]) do
WebSockex.start_link(@url, __MODULE__, :no_state)

def handle_connect(conn, state) do
IO.puts "Connected!"
{:ok, state}

We’ve created a Coinbase.Client module that simply connects to the Coinbase server.

  • line 2: we’ve used use WebSockex to inject WebSockex functionalities in our module
  • line 7: WebSockex.start_link/3 starts a websocket client in a seperate process and returns {:ok, pid} tuple.
  • line 10: the handle_connect/2 callback is called when the client is connected.

Let’s run it in the Elixir’s interactive shell:

$ iex -S mix

iex> {:ok, _pid} = Cryptex.CoinbaseClient.start_link []
{:ok, #PID<0.393.0>}

Our module connects correctly to the Coinbase websocket server. To be able to receive the trades through the websocket connection, we need to subscribe to the matches channel.

def subscribtion_frame(products) do

subscription_msg = %{

type: "subscribe",
product_ids: products,
channels: ["matches"]

} |> Poison.encode!()

{:text, subscription_msg}

The subscription_frame/1 function returns a frame ready to be sent to the server. A frame is a tuple {type, data}, where in our case type is :text and data is a JSON string.
The products argument is a list of Coinbase product ids, which are strings like "BTC-USD", "ETH-USD", "LTC-USD".
We build the subscription message using a Map with type, product_ids and channels keys. This map is then converted to a JSON string using the Poison.encode!/1 function. The returned frame looks like this

{:text, "{"type":"subscribe","product_ids":["BTC-USD"],"channels":["matches"]}"}

We then add a subscribe function to our module, which we will use once connected.

defp subscribe(pid, products) do
WebSockex.send_frame pid, subscribtion_frame(products)

The subscribe/2 function builds the subscription frame {:text, json_msg} using the subscription_frame(products) we’ve just seen and sends the frame to the server using the WebSockex.send_frame(pid, frame) function, where the pid is the websocket client process id.

Handling Frames

def handle_frame(_frame={:text, msg}, state) do
|> Poison.decode!()
|> IO.inspect()
{:ok, state}

We’ve just implemented a simple handle_frame(frame, state) function where we

  • pattern match the frame, extracting the JSON message string
  • decode it using Poison.decode!/1, which converts the JSON string to a Map
  • print the map to the standard output

The callback returns a {:ok, new_state} tuple, useful if we need to update our state (like handle_cast/2 in GenServer).

We are ready to test our client on iex and manually connect and subscribe, hoping to see some trades flowing in.

iex> products = ["BTC-USD"]
iex> {:ok, pid} = Coinbase.Client.start_link products
{:ok, #PID<0.200.0>}

iex> Coinbase.Client.subscribe pid, products

"price" => "3562.91000000", "product_id" => "BTC-USD",
"side" => "sell", "size" => "0.01341129",
"time" => "2018-12-18T14:43:13.254000Z","trade_id" => 56138234,
"type" => "last_match",
"channels" => [%{"name" => "matches", "product_ids" => ["BTC-USD"]}],
"type" => "subscriptions"
"maker_order_id" => "......",
"price" => "3562.91000000",
"product_id" => "BTC-USD",
"sequence" => .....,
"side" => "sell",
"size" => "0.13762144",
"taker_order_id" => ".....",
"time" => "2018-12-18T14:43:15.177000Z",
"trade_id" => 56138235,
"type" => "match"

It’s working! We are receiving data from the server.

The first message is always a "type" => "last_match", which is necessary when we have a recovery process that downloads the missed trades after a disconnection (beyond the scope of this article). There is also a confirmation of our subscription "type" => "subscriptions". But what we are really interested about are the "type" => "match" messages, which are the live trades.

To now make the subscription automatic, we need to change the Coinbase.Client.start_link function

def start_link(products[]) do
{:ok, pid} = WebSockex.start_link(@url, __MODULE__, :no_state)
subscribe(pid, products)
{:ok, pid}

In this way we start the WebSockex process with WebSockex.start_link/3, which connects to the server and returns the pid. We then use this pid to subscribe, calling the subscription function implemented before, and then let the function to return the {:ok, pid} tuple.

Filtering the trades is really easy with pattern matching. We can’t pattern match the message directly in the handle_frame/2 function, since msg is a JSON string and we need first to convert it into a Map, using Poison.decode!/1. We define then a new function handle_msg/2 .

def handle_frame({:text, msg}, state) do
handle_msg(Poison.decode!(msg), state)

def handle_msg(%{"type" => "match"}=trade, state) do
{:ok, state}

def handle_msg(_, state), do: {:ok, state}

The first handle_msg/2 function imposes that the "type" => "match" and is called only when this condition is true.
The second handle_msg(_,state) works as a catch all, which ignore the messages returning {:ok, state}.

This works like a filter and it’s exactly the same as using case

def handle_msg(msg, state) do
case msg do
%{"type" => "match"}=trade -> IO.inspect(trade)
_ -> :ignore
{:ok, state}

Let’s go back to iex and see now what happens just starting our Coinbase Client

iex> Coinbase.Client.start_link ["BTC-USD"]
{:ok, #PID<0.184.0>}
"maker_order_id" => "9050ea18-440f-442e-9129-358d77351685",
"price" => "3531.42000000",
"product_id" => "BTC-USD",
"sequence" => 7584073701,
"side" => "sell",
"size" => "0.04000084",
"taker_order_id" => "e5df8af2-20d0-4592-af98-e41bbb2ed8a9",
"time" => "2018-12-18T15:20:01.237000Z",
"trade_id" => 56140459,
"type" => "match"

Great, it works!

You can find the full code at this link: client.ex


def handle_disconnect(_conn, state) do
IO.puts "disconnected"
{:reconnect, state}

Returning {:reconnect, state} we ask to WebSockex to reconnect using the same process. Unfortunately this solution isn’t the best in our case because we have the subscription process inside the Coinbase.Client.start_link function, which is not called when the connection is restarted.
We opt then to let the process exit, returning :ok instead of :reconnect.

def handle_disconnect(_conn, state) do
IO.puts "disconnected"
{:ok, state}

It’s easy to test a disconnection. Just switching off the wifi (or disconnecting the ethernet cable), inducing the client to fire a timeout error.

iex> Coinbase.Client.start_link ["BTC-USD"]
%{"type" => "match", ...}
%{"type" => "match", ...}
# switch off the connection and wait

15:33:51.839 [error] [83, 83, 76, 58, 32, 83, 111, 99, 107, 101, 116, 32, 101, 114, 114, 111, 114, 58, 32, 'etimedout', 32, 10]
** (EXIT from #PID<0.182.0>) shell process exited with reason: {:remote, :closed}

Since we’ve used the WebSockex.start_link function, the websockex process was linked to our iex process which is taken down after the error.

To cope with client’s disconnections and crashes, we need to add some basic supervision. When we created the project, we’ve passed the --sup option to the mix command. This created a supervision skeleton we can now use.
Let’s open the application.ex file and add the processes we want to monitor.

defmodule Coinbase.Application do
use Application

def start(_type, _args) do
children = [
{Coinbase.Client, ["BTC-USD"]}

opts = [strategy: :one_for_one, name: Coinbase.Supervisor]
Supervisor.start_link(children, opts)

At line 6, we’ve added the {Coinbase.Client, products}tuple. The Supervisor will then start, running and monitoring our process, started with Coinbase.Client.start_link ["BTC-USD"]. In the case of a disconnection (or a crash) it will start a new client.

Running the Application

$ mix run --no-halt

"price" => "3521.20000000",
"type" => "match"

In both cases the application starts, running the Coinbase.Supervisor which, in turn, starts and monitors the Coinbase.Client.

What happens if we kill the Coinbase.Client process? We can get the pid from the list of processes monitored by the Coinbase.Supervisor and kill it using the Process.exit(pid, reason) function.

[{Coinbase.Client, pid, _, _}]= Supervisor.which_children(Coinbase.Supervisor)
Process.exit(pid, :kill)

Let’s see it in action

$ iex -S mix
%{ ..., "price" => "3522.85000000", "type" => "match"}

iex> [{Coinbase.Client, pid, _, _}]= Supervisor.which_children(Coinbase.Supervisor)
Process.exit(pid, :kill)

iex(6)> connected!
%{ ..., "price" => "3535.00000000", "type" => "match"}

We see that after we kill the client process, the supervisor starts a new client that connects immediately to the Coinbase server.

Wrap Up

A solution, to properly recover the lost trades, is to check the id of the last trade and then download the lost trades using a different API, like the Get Trades in Coinbase.

Originally published at

Passionate Software Engineer, CTO in a London-based Hedge Fund. Eager to learn new things. Fell in love with Elixir.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store