Postgrex notifications

Sylvain Kieffer
3 min readFeb 22, 2016

--

2018–10–03 : this article is quite old now. Somebody recently wrote about the same subject in a more cleaner and up-to-date way. Check it out : https://blog.lelonek.me/listen-and-notify-postgresql-commands-in-elixir-187c49597851

Joseph Kain wrote an article about how to publish model changes to Phoenix channels. In his solution, you broadcast a message from the controller, whenever you make a change to the repo (that is on :create, :update and :delete actions).

Postgres implements a pub/sub system called LISTEN/NOTIFY which can be used along with triggers that we could use to broadcast a message whenever a row is changed. This solution is Postgres/Postgrex specific but it will broadcast messages even if the change comes from another app.

You’ll need postgrex 0.11 and ecto 1.1.4 at least.

As usual, I have a Post resource, in a table posts.

The SQL part

First let’s create the function which will be called by the trigger.

CREATE OR REPLACE FUNCTION notify_posts_changes()
RETURNS trigger AS $$
DECLARE
current_row RECORD;
BEGIN
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
current_row := NEW;
ELSE
current_row := OLD;
END IF;
PERFORM pg_notify(
'posts_changes',
json_build_object(
'table', TG_TABLE_NAME,
'type', TG_OP,
'id', current_row.id,
'data', row_to_json(current_row)
)::text
);
RETURN current_row;
END;
$$ LANGUAGE plpgsql;

We declare a variable current_row of type RECORD. If the operation is an INSERT or an UPDATE we’ll use the NEW row (the record after the modifications). In the case of a DELETE operation, only the OLD row is available.

Then we use PERFORM pg_notify(channel, text) which will send a message to everyone listening on the channel channel. json_build_object and row_to_json are Postgres function to format json. See here for more information. Our message will contain the table name, the type of the operation, the id of the row and a json representation of the row itself.

In the case of an UPDATE, we could have sent both the NEW and the OLD row to see the changes.

Next, add the trigger :

CREATE TRIGGER notify_posts_changes_trg
AFTER INSERT OR UPDATE OR DELETE
ON posts
FOR EACH ROW
EXECUTE PROCEDURE notify_posts_changes();

You can open psql, perform some operations and write :

LISTEN posts_changes;

You should see some messages containing the changes.

The Postgrex part

Postgrex has a Notifications module which exposes a start_link/1 function and a listen/3 function.

It’s used like this. First you start a process, giving it the repo config :

pg_config = Application.get_env(:posts, Posts.Repo)
{:ok, pid} = Postgrex.Notifications.start_link(pg_config)

Then you listen to a channel :

{:ok, ref} = Postgrex.Notifications.listen(pid, channel)

Now, whenever the process with pid pid receives a notification, it will forward it to the process that called it.

So you can write this in iex :

receive do
notification -> notification
end

The message will have this form :

{:notification, pid, ref, channel, payload}

So let’s pattern match on this :

receive do
{:notification, pid, ref, "posts_changes", payload} -> payload
end

You can decode the payload with Poison.decode!/2.

Poison.decode! payload

Integrating with Phoenix

Let’s add a worker to our application (lib/posts.ex) :

...
children = [
...,
worker(Posts.PgListener, ["posts_changes"], id: :posts_changes),
...
]
...

Create the file lib/posts/pg_listener.ex :

defmodule Posts.PgListener do
use GenServer
import Poison, only: [decode!: 1] def start_link(channel) do
GenServer.start_link(__MODULE__, channel)
end
def init(channel) do
pg_config = Application.get_env(:posts, Posts.Repo)
{:ok, pid} = Postgrex.Notifications.start_link(pg_config)
{:ok, ref} = Postgrex.Notifications.listen(pid, channel)
{:ok, {pid, channel, ref}
end
def handle_info(
{:notification, pid, ref, "posts_changes", payload},
{pid, channel, ref} = state) do
Posts.Endpoint.broadcast("posts", "change", decode!(payload)
{:noreply, state}
end
end

The interesting part is the handle_info. We pattern match on the notification. We can extract the payload, convert it to a Map with decode!, and broadcast it.

We can define several handle_info with different channels and add workers to our application, using a unique :id each time.

And you’re good to go! You’re not limited at broadcasting messages though. You could, for instance, use the notifications to generate files each time a row is created. Have fun!

I know there is a lot of room for enhancements. For instance, writing a macro that would generate handle_info for each channel. Writing a single worker that could handle several channels at once would also be great.
This article is a quick’n’dirty attempt to show you how to cable Postgres, Postgrex and Phoenix. Feel free to submit a better implementation, and I’ll be glad to replace the current one.

--

--