Rephink: A real-time stack based on Phoenix Framework, RethinkDB, Elm, Webpack (Part 2: The Flow)

In the first part we saw how to set up all the components of our stack and glue them together. However, we’re not done with the gluing yet, and in this part we’re going to see how to make the data flow through a Phoenix channel from the Elm frontend to the RethinkDB database and back.

The code

Just a reminder: in case you’re not a fan of step-by-step copy-pasting / retyping or if you just want to share the working version of the code, you can find it here: https://github.com/bredikhin/phoenix-rethinkdb-elm-webpack-example.

Create a channel and connect to it using Elm

Phoenix channels are essentially a message passing engine that plays perfectly with Websockets in order to provide your application with real-time functionality. Using channels you can send and receive messages grouped by topics, broadcast to multiple clients, etc.

Let’s just go ahead and create a channel, so we could learn it all in practice (just as before, every command or path here assumes we’re in the root folder of our Phoenix application):

mix phx.gen.channel Todo

After that, as the message says, we need to add our channel to the socket handler in lib/rephink/web/channels/user_socket.ex:

channel "todo:*", Rephink.Web.TodoChannel

Once it’s taken care of, let’s write our channel module. The generator should have already created all the necessary functions with example signatures and bodies, so all we need to do is to open lib/rephink/web/channels/todo_channel.ex and edit it according to our goals:

defmodule Rephink.Web.TodoChannel do
use Rephink.Web, :channel
  def join("todo:list", payload, socket) do
if authorized?(payload) do
{:ok, socket}
else
{:error, %{reason: "unauthorized"}}
end
end
  def handle_in("ping", _payload, socket) do
Rephink.Web.Endpoint.broadcast!(socket.topic, "pong", %{"response" => "pong"})
    {:noreply, socket}
end
  defp authorized?(_payload) do
true
end
end

Nothing special so far, we’re keeping the standard placeholder for joining / authorization, but not going to elaborate on it in this post considering for sake of simplicity that we only have one common task list and everybody can read and modify it. As for the “ping” message, it’s a temporary one, and we’ll use it to make sure our channel can connect to the Elm client side.

Before we go any further, let’s strip that local storage functionality that came with the original TodoMVC example, since we’re not going to use it. In elm/Todo.elm, perform the following changes:

  • remove port from the first line,
  • remove the line
port setStorage : Model -> Cmd msg 
  • change
main : Program (Maybe Model) Model Msg

to be

main : Program Never Model Msg
  • change
Html.programWithFlags

to

Html.program
  • change the line
, update = updateWithStorage

to

, update = update
  • remove updateWithStorage declaration and body.

Also, the Elm app initialization in assets/js/app.js becomes much simpler:

// Elm application
let Elm = require('../../elm/Todo.elm')
let todomvc = Elm.Todo.fullscreen()

Okay, now that it’s taken care of, let’s add an Elm package that will help us communicate with Phoenix:

cd elm && elm package install -y fbonetti/elm-phoenix-socket

Next, open elm/Todo.elm and add the following to the imports section:

import Phoenix.Socket
import Phoenix.Channel
import Phoenix.Push

Now we need to add the socket to our application state (aka Model):

type alias Model =
{ entries : List Entry
, ...
, socket : Phoenix.Socket.Socket Msg
}

We must also initialize it properly, so init combined with emptyModel (which you can safely remove at this point) gives us the following:

init : ( Model, Cmd Msg )
init =
let
channelName = "todo:list"
channel = Phoenix.Channel.init channelName
|> Phoenix.Channel.onJoin (always RequestEntries)
socketInit = Phoenix.Socket.init "ws://localhost:4000/socket/websocket"
|> Phoenix.Socket.on "pong" channelName ReceiveEntries
( socket, cmd ) =
Phoenix.Socket.join channel socketInit
in
{ entries = []
, visibility = "All"
, field = ""
, uid = 0
, socket = socket
} ! [ Cmd.map SocketMsg cmd ]

As you can see, we included socket / channel initialization and request to join, which in its turn should trigger requesting the todo entries from the database.

Next stop: we need to add some new messages to our Msg type:

type Msg
= NoOp
...
| SocketMsg (Phoenix.Socket.Msg Msg)
| RequestEntries
| ReceiveEntries Encode.Value

The latter requires a new import to be added:

import Json.Encode as Encode
import Debug exposing (log)

(here we’ve also added Elm’s logging function, so we could analyze responses coming from our Phoenix server).

The first of these messages corresponds to the socket updates, the second one, RequestEntries, will send requests to the server via our Phoenix channel to fetch a specific set of entries, and finally ReceiveEntries defines how to handle the data we receive.

Now, we should describe how those messages are supposed to be handled within the update function:

update msg model =
case msg of
NoOp ->
...
SocketMsg msg ->
let
( socket, cmd ) =
Phoenix.Socket.update msg model.socket
in
{ model | socket = socket } ! [ Cmd.map SocketMsg cmd ]
        RequestEntries ->
let
push =
Phoenix.Push.init "ping" "todo:list"
|> Phoenix.Push.onOk ReceiveEntries
( socket, cmd ) =
Phoenix.Socket.push push model.socket
in
{ model | socket = socket } ! [ Cmd.map SocketMsg cmd ]
        ReceiveEntries raw ->
let
entries = log "Ping" raw
in
model ! []

Here, for sake of demonstration, we’re sending a ping to our Phoenix server and just logging what’s received.

The last addition we need to do is a subscription to our socket: change the subscriptions line in the main definition to

main =
...
, subscriptions = subscriptions
}

and add the subscriptions section at the bottom of the elm/Todo.elm:

-- SUBSCRIPTIONS

subscriptions : Model -> Sub Msg
subscriptions model =
Phoenix.Socket.listen model.socket SocketMsg

Perfect, let’s take it for a spin using mix phoenix server. If all is done properly, you should see Ping: { response = "pong" } in the browser console when you go to http://localhost:4000/ with developer tools open. This means our Elm application was able to successfully communicate with our server using a Phoenix channel.

However, as impressive as this ping-pong demonstration is, it’s hardly useful until we somehow combine these results with the main purpose of our todo list app. So, let’s figure out how to do it.

We will start by fetching a list of todos from the server, and we will use the same message (we will just call it “todos”) to request and receive the current list. On the server side, change that handle_in inside lib/rephink/web/channels/todo_channel.ex to the following:

@table_name "todos"
  def handle_in("todos", _payload, socket) do
%{data: todos} = table(@table_name) |> RethinkDB.run(Rephink.DB)
Rephink.Web.Endpoint.broadcast!(socket.topic, "todos", %{todos: todos})
    {:noreply, socket}
end

In order for this code to work, you should also:

  • add import RethinkDB.Query at the beginning of that file,
  • establish a database connection by creating lib/rephink/db.ex with the following content:
defmodule Rephink.DB do
use RethinkDB.Connection
end
  • add it as a worker to the main supervision tree in lib/rephink/application.ex:
...
def start(_type, _args) do
...
children = [
...
# Start your own worker by calling: Rephink.Worker.start_link(arg1, arg2, arg3)
worker(Rephink.DB, [Application.get_env(:rephink, Rephink.Repo)]),
]
...
end
...
  • make sure your mix.exs starts the rethinkdb application:
...
def application do
[mod: {Rephink.Application, []},
extra_applications: [:logger, :runtime_tools, :rethinkdb]]
end
...

Next, let’s take care of the client side. In you elm/Todo.elm, start by replacing message names "ping" and "pong" with "todos":

...
socketInit = Phoenix.Socket.init "ws://localhost:4000/socket/websocket"
|> Phoenix.Socket.on "todos" channelName ReceiveEntries
...
RequestEntries ->
let
push =
Phoenix.Push.init "todos" "todo:list"
...

At this point we should be able to receive the todo list entries from the server, but still need to figure out how to update our model.

We will use ReceiveEntries message to do it:

...
ReceiveEntries raw ->
let
decoded =
Json.decodeValue
( Json.field "todos"
( Json.list
( Json.map4
Entry
(Json.field "task" Json.string)
(Json.field "completed" Json.bool)
(Json.succeed False)
(Json.field "id" Json.int)
)
)
)
raw
in
case decoded of
Ok entries ->
{ model | entries = entries } ! []
Err error ->
model ! []
...

Done? Let’s try it out. For now we’re just going to add some todos manually via RethinkDB admin which is running on http://localhost:8080 by default. Open http://localhost:8080/#tables and make sure your database was created and contains todos table. Then switch to http://localhost:8080/#dataexplorer and enter the following:

r.db('rephink').table('todos').insert([
{id: 1, task: "Task 1", completed: false},
{id: 2, task: "Task 2", completed: false},
{id: 3, task: "Task 3", completed: false}
])

(or, well, any other tasks you have in mind).

Once it’s run, r.db('rephink').table('todos') should show you that we have three rows in our table now. And actually, starting your Phoenix server and opening http://localhost:4000 should also show you your todo list with the same entries. You can even edit / delete / complete items, the only problem is that it won’t be written to the database. But we'll fix it in a bit.

Update the database

Let’s start with adding new entries. On the server side, we need another handle_in clause in lib/rephink/web/channels/todo_channel.ex:

...
def handle_in("insert", %{"todo" => todo}, socket) do
table(@table_name)
|> insert(todo)
|> RethinkDB.run(Rephink.DB)
%{data: todos} = table(@table_name) |> RethinkDB.run(Rephink.DB)
Rephink.Web.Endpoint.broadcast!(socket.topic, "todos", %{todos: todos})
    {:noreply, socket}
end
...

On the client side, it’s a bit more work. First, a little side note: in this example we’ll be using integer row ids, which is not the best idea and it’s not what RethinkDB is using by default. The reason we want it is that we’d like to keep the changes to minimum, and the original TodoMVC example was based on integer ids. With that in mind, let’s add a little update to the model which will get triggered whenever we fetch our entries from the server:

...
ReceiveEntries raw ->
let
decoded =
...
nextId xs = List.foldl (\x y->if x.id > y then x.id else y) 0 xs
in
case decoded of
Ok entries ->
{ model | entries = entries, uid = nextId entries } ! []
...

This finds the biggest row id in the entries and stores it in the uid field. And it gets extremely handy when we’ll be inserting a new record in the database:

...
update msg model =
...
Add ->
let
payload =
Encode.object
[ ( "todo", Encode.object
[ ("task", Encode.string model.field)
, ("id", Encode.int (model.uid + 1))
, ("completed", Encode.bool False)
]
)
]
                push =
Phoenix.Push.init "insert" "todo:list"
|> Phoenix.Push.withPayload payload
                ( socket, cmd ) =
Phoenix.Socket.push push model.socket
in
{ model | socket = socket, field = "" }
! [ Cmd.map SocketMsg cmd ]
...

Now, once again, using integer ids and generating them on the client is probably something that can be easily nominated to receive the “Worst Idea Ever” prize, but for sake of simplicity we’ll just assume that every client gets its entries updated before inserting any new row in the database, and that there’s no race conditions possible. In the real world, of course, the way to go is to use the UUIDs being generated by RethinkDB by default.

So, with this disclaimer out of the way, run mix phx.server, open http://localhost:4000/ and add a new entry (type something in the input field and hit Enter). You should see your new entry appearing in the list, and it should be there even after you refresh the page, it's stored in the database now.

Perfect, let’s proceed with updating the existing records then. The flow is very similar, we’re starting by adding a server-side handler to lib/rephink/web/channels/todo_channel.ex:

...
def handle_in("update", %{"todo" => todo}, socket) do
table(@table_name)
|> update(todo)
|> RethinkDB.run(Rephink.DB)
%{data: todos} = table(@table_name) |> RethinkDB.run(Rephink.DB)
Rephink.Web.Endpoint.broadcast!(socket.topic, "todos", %{todos: todos})
    {:noreply, socket}
end
...

Then, on the client side we will be changing the EditingEntry message and adding a new one called SyncEntry(which should also be added to the Msg type):

...
type Msg
= NoOp
| SyncEntry Int
...
update msg model =
...
        EditingEntry id isEditing ->
let
updateEntry t =
if t.id == id then
{ t | editing = isEditing }
else
t
                focus =
Dom.focus ("todo-" ++ toString id)
                (updatedModel, cmd) =
if (not isEditing) then
update (SyncEntry id) model
else
(model, Cmd.none)
in
{ updatedModel | entries = List.map updateEntry updatedModel.entries }
! [ Task.attempt (\_ -> NoOp) focus, cmd ]
        SyncEntry id ->
let
edited = List.head (List.filter (\x -> x.id == id) model.entries)
( socket, cmd ) =
case edited of
Nothing -> (model.socket, Cmd.none)
Just entry ->
let
payload =
Encode.object
[ ( "todo", Encode.object
[ ("task", Encode.string entry.description)
, ("id", Encode.int entry.id)
, ("completed", Encode.bool entry.completed)
]
)
]
                                push =
Phoenix.Push.init "update" "todo:list"
|> Phoenix.Push.withPayload payload
                          in
Phoenix.Socket.push push model.socket
in
{ model | socket = socket }
! [ Cmd.map SocketMsg cmd ]
...

What’s happening here is we’re sending our updated entry to the server (via SyncEntry) whenever we finish editing (receiving EditingEntry with isEditing equal to False). This doesn't let us handle the completion of tasks though, so we also need to update Check and CheckAll handlers:

...
update msg model =
...
        Check id isComplete ->
let
updateEntry t =
if t.id == id then
{ t | completed = isComplete }
else
t
updatedModel = { model | entries = List.map updateEntry model.entries }
in
update (SyncEntry id) updatedModel
        CheckAll isCompleted ->
let
updateEntry t =
{ t | completed = isCompleted }
allCheckedModel = { model | entries = List.map updateEntry model.entries }
syncEntry t (model, cmdList) =
let
(updatedModel, newCmd) = update (SyncEntry t.id) model
in
(updatedModel, List.append cmdList [ newCmd ])
(updatedModel, cmdList) = List.foldr
syncEntry (allCheckedModel, [])
allCheckedModel.entries
in
updatedModel ! cmdList
...

As you might have noticed, the last update handler is extremely inefficient since it sends a number of messages essentially equal to the number of our todo entries being checked. Instead, we should have introduced a command for bulk update of a list of entries, but for sake of simplicity we’re leaving it as a refactoring exercise to the reader (same applies to the updated DeleteComplete handler below).

Now, the only thing that’s left to handle is removing entries, which consists of a server and a client side as well. Let’s start with the server side and add removal clause to the lib/rephink/web/channels/todo_channel.ex:

def handle_in("delete", %{"todo" => todo}, socket) do
table(@table_name)
|> get(todo["id"])
|> delete()
|> RethinkDB.run(Rephink.DB)
%{data: todos} = table(@table_name) |> RethinkDB.run(Rephink.DB)
Rephink.Web.Endpoint.broadcast!(socket.topic, "todos", %{todos: todos})
    {:noreply, socket}
end

As for the client side code for entry removal, it looks like this:

...
update msg model =
...
        Delete id ->
let
deleted = List.head (List.filter (\x -> x.id == id) model.entries)
( socket, cmd ) =
case deleted of
Nothing -> (model.socket, Cmd.none)
Just entry ->
let
payload =
Encode.object
[ ( "todo", Encode.object
[ ("id", Encode.int entry.id) ]
)
]
                                push =
Phoenix.Push.init "delete" "todo:list"
|> Phoenix.Push.withPayload payload
                          in
Phoenix.Socket.push push model.socket
in
{ model | socket = socket }
! [ Cmd.map SocketMsg cmd ]
        DeleteComplete ->
let
deleteEntry t (model, cmdList) =
let
(updatedModel, newCmd) = update (Delete t.id) model
in
(updatedModel, List.append cmdList [ newCmd ])
(updatedModel, cmdList) = List.foldr
deleteEntry (model, [])
(List.filter .completed model.entries)
in
updatedModel ! cmdList
...

With this in place we should have fully functional (albeit not perfect in terms of performance) todo list example in Elm connected to RethinkDB database via Phoenix backend. But wait, is there anything special about it? Was it even worth it? It will, in just a couple of minutes.

Subscribe to the changes

Changefeeds are arguably one of the most attractive features of RethinkDB. Remember Meteor.js? All the performance and compatibility issues aside, it was kind of cool getting real-time updates for free, and it even was (almost) scalable via MongoDB oplog tailing. So, RethinkDB changefeeds let you essentially get the same functionality, once again, for free. Well, almost.

Changefeeds allow you to subscribe and receive changes in the results of virtually any RethinkDB query. You can easily track a query, a table or even a single document using this functionality. And given the fact that the clustering, replication and sharding are built-in, you don’t have to worry about scalability for a while in case you had some (most probably premature) concerns about it.

So, how do we arrange it? Let’s say, we want to sync our todo list across multiple devices and get real-time pushes whenever we edit it on one of them? Let’s see.

First, we’re going to be using a dedicated library to deal with the changefeeds: https://github.com/hamiltop/rethinkdb_changefeed. Configuration is pretty straightforward: we just need to add{:rethinkdb_changefeed, "~> 0.0.1"} to the list of the dependencies and start :rethinkdb_changefeed with all the other extra_applications in the application in our mix.exs. Don't forget to run mix deps.get to fetch the library.

Next, let’s create lib/rephink/changefeed.ex containing the following

defmodule Rephink.Changefeed do
use RethinkDB.Changefeed
import RethinkDB.Query
  @table_name "todos"
@topic "todo:list"
  def start_link(db, gen_server_opts \\ [name: Rephink.Changefeed]) do
RethinkDB.Changefeed.start_link(__MODULE__, db, gen_server_opts)
end
  def init(db) do
query = table(@table_name)
%{data: data} = RethinkDB.run(query, db)
todos = Enum.map(data, fn (x) ->
{x["id"], x}
end) |> Enum.into(%{})
    {:subscribe, changes(query), db, {db, todos}}
end
  def handle_update(data, {db, todos}) do
todos = Enum.reduce(data, todos, fn
%{"new_val" => nv, "old_val" => ov}, p ->
case nv do
nil ->
Map.delete(p, ov["id"])
%{"id" => id} ->
Map.put(p, id, nv)
end
end)
Rephink.Web.Endpoint.broadcast!(@topic, @table_name, %{todos: Map.values(todos)})
    {:next, {db, todos}}
end
end

What we got ourselves here is essentially a GenServer that holds the current list of todos as its state and updates it whenever the database subscription receives change notifications.

So, all we need to do now with this changefeed process is to add it to our supervision tree in lib/rephink/application.ex:

...
def start(_type, _args) do
...
children = [
...
worker(Rephink.Changefeed, [Rephink.DB])
]
...
end
...

Note that the update handler also broadcasts the updated todo list, which means that if you start your Phoenix server now and, for example, open the application in two browser tabs simultaneously then updating something in one tab will automatically update the other one. Yes, do see it for yourself.

It’s interesting that in this particular case when we only have one topic common for all the users, simple broadcasting to the channel on any update would do the trick, however in more elaborate cases different users should be subscribed to receive different set of entries. Besides, the database content can also be modified via other applications / tools (you can try updating one of your entries directly using RethinkDB admin console at http://localhost:8080/, for example), and that’s where our database subscription really shines.

Although at this point we’re basically reached our goal, let’s apply one final touch in order to improve the structure of our code. We’re gonna move the database interactions out of our channel code, which will let us use our changefeed process’ state to store the current todos and only hit the database when it’s really needed. In order to do that, let’s add the following to our lib/rephink/changefeed.ex:

...
def handle_call(:todos, _from, {db, todos}) do
Rephink.Web.Endpoint.broadcast!(@topic, @table_name, %{todos: Map.values(todos)})
    {:reply, Map.values(todos), {db, todos}}
end
  def handle_call({:insert, todo}, _from, {db, todos}) do
table(@table_name)
|> insert(todo)
|> RethinkDB.run(db)
    {:reply, Map.values(todos), {db, todos}}
end
  def handle_call({:update, todo}, _from, {db, todos}) do
table(@table_name)
|> update(todo)
|> RethinkDB.run(db)
    {:reply, Map.values(todos), {db, todos}}
end
  def handle_call({:delete, todo}, _from, {db, todos}) do
table(@table_name)
|> filter(todo)
|> delete()
|> RethinkDB.run(db)
    {:reply, Map.values(todos), {db, todos}}
end
...

This will allow us to simplify rephink/lib/rephink/web/channels/todo_channel.ex (let's just provide the complete listing here):

defmodule Rephink.Web.TodoChannel do
use Rephink.Web, :channel
alias RethinkDB.Changefeed
  def join("todo:list", payload, socket) do
if authorized?(payload) do
{:ok, socket}
else
{:error, %{reason: "unauthorized"}}
end
end
  def handle_in("todos", _payload, socket) do
Changefeed.call(Rephink.Changefeed, :todos)
    {:noreply, socket}
end
  def handle_in("insert", %{"todo" => todo}, socket) do
Changefeed.call(Rephink.Changefeed, {:insert, todo})
    {:noreply, socket}
end
  def handle_in("update", %{"todo" => todo}, socket) do
Changefeed.call(Rephink.Changefeed, {:update, todo})
    {:noreply, socket}
end
  def handle_in("delete", %{"todo" => todo}, socket) do
Changefeed.call(Rephink.Changefeed, {:delete, todo})
    {:noreply, socket}
end
  defp authorized?(_payload) do
true
end
end

As you can see, all the database queries were moved out of the channel code and replaced with the server calls, and the changefeed code holds all the data we need in its state and updates it whenever the database gets updated.

Conclusion

Thus, what we described here is essentially a real-time platform that can be used to create applications for multiple devices with real-time update and syncing capabilities. So what, you may say, it’s so 2012. Well, yes and no: this time it’s faster, more reliable, robust and well-structured. Besides, the tools we’re using now have built-in scalability features, so if at some point you’re going to feel like you need to scale horizontally, you can easily leverage the power of distributed Elixir and RethinkDB clustering.

Obviously, it’s a somewhat artificial example based on multiple assumptions that are hardly applicable to real life situations, but the purpose of it is mostly to show how these amazing tools can be connected and work together. I really hope I managed at least to get you interested. And of course, your constructive feedback is always welcome.

Credits