Keeping Cache in sync with Postgres Pub-Sub


There is a huge shift to cloud computing. In a cloud-powered application, replication and redundancy is critical. If you have a service it needs to not only be recreatable but also atomic. For example, if there is a service with high traffic it is usually best to implement some sort of load balancing and failover mechanism to reduce latency, and avoid downtime if a server in the cluster fails. This requirement complicates software architecture and drives designs towards a more distributed systems approach.

Let’s say you have a web service made of a server with a database. A simple architecture would be to run the web server, the application, and the database on the same machine.

For a simple application like a personal blog, this approach works great. However, let’s say the blog gets really popular; suddenly the application server cannot keep up with the overwhelming demand for the site. So you scale out the application horizontally and create multiple instances of the application and web server, each on their own machine. To maintain the illusion that you’re still running a single application, you add another machine with just a web server to act as a reverse proxy and route requests between the users and each application server in the cluster. So you now have a distributed system with two layers: the load balancer and the application layer.

Now this solves the issue of traffic, but has created a new problem: you have multiple instances of the database that can fall out of sync. To solve this problem you move the database off the application servers, and onto its own machine; or better yet, you use a database-as-a-service (DBaaS) like Amazon RDS. We’ve now just added a third layer to our system: the storage layer. This ensures that however many servers we add to scale out the application, they all see the same data. Simple right?

Alternatively, let’s say you have some kind of high-performance application. Your application does some form of intense computation, and requires some type of cache about the state of the database. Generally, we’d update the cache when we update the database. However, we run into the same problem as before with multiple databases. We need some way to ensure the cache in each of the servers remains consistent. We could just forget the cache and query the database on demand, but that would be way too slow. We could apply the same solution as before, and move the cache into it’s own layer using something like Redis or another key-value store. That’s not a bad idea, but similarly costs us a huge amount of time from network latency.

But wait, we already have a common storage layer between all the application instances: the database. Moreover, since we really only care about updating the cache when the database is updated, we can let the database itself update the caches by broadcasting when a change has been made. Postgresql provides functionality for a publish-subscribe pattern called LISTEN/NOTIFY. Like any pub-sub implementation, LISTEN/NOTIFY allows you to set channels on which the database can broadcast some text. Others can then listen on those channels and receive information asynchronously. Postgresql stores all the NOTIFY’s in a queue and drops them only when all registered listeners have received them. It is something to keep in mind because that queue can fill up if a listener fails which will cause an error in Postgresql on the next notify. Lastly, we can build a simple trigger in Postgresql that will NOTIFY on inserts to a table.

For example, let’s say we have an application that keeps track of employees and the departments they belong to. Each department has an employee designated as the manager of that department. For processing purposes, it’d be helpful if we kept a directory in memory of all the employees and who their department manager is.

CREATE TABLE IF NOT EXISTS employee (
id uuid PRIMARY KEY not null default uuid_generate_v4(),
title text not null,
name text not null,
department uuid not null
);
CREATE TABLE IF NOT EXISTS department (
id uuid PRIMARY KEY not null default uuid_generate_v4(),
name text not null,
location text not null,
manager_id uuid not null
);
CREATE OR REPLACE FUNCTION new_hire_notify() RETURNS trigger AS $$
DECLARE
payload varchar;
mid uuid;
BEGIN
SELECT manager_id INTO mid FROM departments
WHERE id=NEW.department;
payload = CAST(NEW.id AS text) ||
‘, ‘ || CAST(mid AS text);
PERFORM pg_notify(‘new_hire’, payload);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER value_insert
AFTER INSERT
ON employees
FOR EACH ROW
EXECUTE PROCEDURE new_hire_notify();

A small gotcha with NOTIFY in Postgresql is that it can only send strings, so in our trigger we concatenate the information we need into a string. In this example, all we want is the new employee’s ID, and the ID of the manager of their department.

In our application, we need some way to listen for the NOTIFY from Postgresql in the background. The Go language is excellent for this as it supports concurrency as a language primitive. In our Go application we’ll use the lib/pq Postgresql driver library. The driver comes with listener functionality, which our application can use to subscribe to the Postgresql notify.

First, let’s define some data structures:

type officeCache struct {
sync.Mutex
Directory map[string]string
}
type DBManager struct {
DB *sql.DB
Cache *officeCache
}

Next, we’ll create our Connect method:

(pg *DBManager) Connect() error {
uri := “user=postgres dbname=company_db sslmode=disable”
db, err := sql.Open(“postgres”, uri)
if err != nil {
return err
}
if err := db.Ping(); err != nil {
return err
}
reportProblem := func(et pq.ListenerEventType, err error) {
if err != nil {
fmt.Println(err)
}
}
  listener := pq.NewListener(uri, 10*time.Second, time.Minute, reportProblem)
err = listener.Listen(“new_hire”)
if err != nil {
return err
}
  pg.Cache = &officeCache{
Directory: make(map[string]string),
}
  go pg.Cache.Listen(listener)
  pg.DB = db
  return nil
}

In the Connect method, we connect to the database as described here. Then we create a new listener connection, which is a separate TCP connection to Postgresql. On that connection, we can then specify channels to listen to. We can subscribe to multiple channels on the same listener by calling listener.Listen on as many channels as we need. Finally, we pass the listener to the Cache.Listen method, and spin it off into a Go routine. This will enable it to run concurrently with the rest of the application and update the cache asynchronously.

func (oc *officeCache) Listen(l *pq.Listener) {
for {
n := <-l.Notify

switch n.Channel {
case “new_hire”:
parr := strings.Split(n.Extra, “, “)
if len(parr) != 2 {
break
}
employeeId := parr[0]
managerId := parr[1]
oc.Lock()
oc.Directory[employeeId] = managerId
oc.Unlock()
}
}
}

In the officeCache.Listen method we create an infinite loop. The loop will block while it waits for the listener.Notify channel to have some data. Once it receives some data, it’ll decode the Postgresql channel which triggered the notify, parse the text payload into the employee ID and manager ID, and add them to the cache. The NOTIFY messages are not stored or queued by Postgresql, meaning that a listener does not have access to messages that may have occurred before it began listening.

It is worth noting that this method alone may not be sufficient for a production ready cache. It would be better if there was also a way to drop and refresh the cache to ensure 100% data consistency.

Data consistency with high availability in a distributed system is still a difficult problem. This basic solution works in simple scenarios, but obviously issues can still arise as you continue to scale out. Postgresql NOTIFY is a little crude with only strings supported, so if you need more complicated data it could become a problem. This system also gets way more complicated as the database gets further distributed. Also, under extremely high demand you could see cache misses due to the latency in the network. However, this approach fits our needs for now and hopefully serves as a good starting point for bigger and better solutions.