Open Sourcing pg2k4j, Our Change Data Capture Solution

Ryan Kass
disney-streaming
Published in
6 min readDec 12, 2018

Last month, Disney Streaming Services published our first open source project, pg2k4j, which is short for PostgreSQL to Kinesis For Java. pg2k4j is an app that sits between a PostgreSQL instance and an AWS Kinesis Stream, broadcasting all mutations made to the database onto a Kinesis Stream. It’s a simple Java library (a few hundred lines of code) that accomplishes an essential task within our ecosystem in an extremely performant manner.

Use Case

I come home on a Saturday afternoon and turn on ESPN+. Behind the scenes, hundreds of micro-services are responsible for providing data to the client app. On this particular day and time, a college basketball game begins auto-playing, and I see an episode of Earn Everything headlining the popular section. As I keep watching the college basketball game, many of those micro-services want to be kept abreast of my progress in the video:

  • Recommendation service uses my current place in the stream to make better recommendations about videos I’d want to watch.
  • Content Service needs to know my current place in the stream in order to inform the Continue Watching feature.
  • Most vitally, Playback Service uses this value in order to know where to resume playback if I were to exit and re-enter the stream.

We wanted to build a system that allows all of these downstream services to be notified when a user’s place in a stream has changed. Such a system should have the following guarantees:

  • Immediate Propagation of Events: Downstream clients are made aware of updates within seconds.
  • Replayability: Any client should be able to know the state of user’s bookmarks at any point in time. Similarly, new clients may come on board and glean the entire history of updates in order.
  • Scalability: We have millions of users who may be active at a given time. This system must be able to propagate bookmarks at the rate at which they occur.

Solution

A simple approach to satisfying these requirements would be to have the client device (Roku, Apple TV, web browser, etc.) put a user’s latest playhead on a Kinesis stream. Downstream clients like the Playback Service would then be able to consume this event to know the most up to date playhead for a given user, content pair. However, client devices cannot guarantee that these events will get delivered in order. Due to network irregularities, sometimes client devices will put updates on the stream out of order.

Instead of having every downstream service interested in the latest bookmarks perform logic for ordering these events, we solve this problem in one place using PostgreSQL. Then, when an insert or update occurs in PostgreSQL, that change is propagated to a new stream by pg2k4j. Here’s a visual summary of this architecture:

Why Postgres?

Postgres has two essential features that tipped the scales for us when considering which datastore to use:

  • Conditional Upserts: As of Postgres 9.5 we can write statements like the one below which allow us to update our Bookmarks table only if the item we’re inserting occurred after the one that exists in our database.
INSERT INTO <table_name> (userId, contentId, playhead, occurredOn) VALUES (?, ?, ?, ?) ON CONFLICT (userId, contentId) DO UPDATE SET playhead=excluded.playhead, occurredOn=excluded.occurredOn WHERE excluded.occurredOn > bookmarks.occurredOn

This statement allows us to fully offload the recency check to the database. Through load testing we’ve seen our Postgres instance able to handle upwards of 1 million of these upserts per minute.

  • Write Ahead Log (WAL): Postgres’ way of ensuring data integrity while maximizing its write throughput. All changes to data files are first written to this log before changes get flushed to disk. This provides a suitable resource to implement a Change Data Capture (CDC) solution against. (CDC refers to the design approach we take here of broadcasting changes to a datastore so that clients may know the state of the datastore at any point in time).

Why Kinesis?

As a company that deploys its infrastructure on AWS, there’s a lot of value for our team in choosing an AWS managed service. Furthermore, AWS Kinesis satisfies all of our requirements: records put onto Kinesis are available to consumers within a few seconds; each client may consume or replay events from any point in time (firehosing events to S3 preserves events in perpetuity), and Kinesis can horizontally scale by adding more shards, enabling continuously higher read and write throughputs.

Making pg2k4j

wal2json is a Postgres plugin that decodes the WAL into JSON documents with a predictable schema. This plugin provides a clean interface for reading messages from the WAL. Before we wrote pg2k4j to forward wal2json output to Kinesis, we ran software called pg2kinesis to solve this problem. This open source solution is essentially the Python version of pg2k4j. Though we found some minor bugs we needed to correct, pg2kinesis allowed us to quickly get our feet wet with CDC using Postgres.

It became clear, however, that this Python solution would not be able to keep up with the load we needed it to handle. Though our Postgres instance could easily handle 1 million inserts and updates per minute, pg2kinesis would top out at around 100k. Through profiling, we found two major bottlenecks within pg2kinesis:

  1. Parsing the json output from the wal2json plugin
  2. Putting the CDC records on the Kinesis Stream

These two bottlenecks immediately stood out as curable via a Java implementation. Even though wal2json outputs well defined JSON models, Python needs to perform type inference in order to convert these strings into dicts. Java does this orders of magnitude quicker, since it knows the exact schema of the models that the wal2json plugin emits (represented as a SlotMessage in pg2k4j).

As for the put to the Kinesis Stream, this is where the Java implementation really shines. By using the JVM, pg2k4j can make use of Amazon’s Kinesis Producer Library (KPL), which is Amazon’s open source library for publishing high-throughput data to Kinesis. It’s not possible to use this library with a non-JVM language, and while pg2kinesis does make use of an aggregation library, that library is significantly less performant than the Amazon’s native KPL.

These hypotheses turned out to be true: pg2k4j is orders of magnitude faster than pg2kinesis. It has proved itself capable of handling 1 million events per minute, and we are confident that it can keep up with any write speed Postgres can handle. It’s been up and running for four months in production serving many downstream teams without incident.

Here’s a snapshot of the Replication Slot Lag during one of our load tests.

The Replication Slot is the abstraction wal2json uses to read the changes to the WAL, and the lag refers to how far behind it is in doing so. When deploying pg2k4j, be sure to watch this number — if you see it consistently increasing, consider devoting more compute power to the app.

When making pg2k4j public, we wanted to strike a balance between releasing a feature-rich library and a tool that’s extremely simple to use. Internally, we use pg2k4j as a library, extending its behavior to allow us to integrate with NewRelic, add custom fields, and publish records to predictable shards to ensure consistent ordering. We’re optimistic that other teams can find value in this software, both as an out of the box solution that can be run via the command line, and as a library that can be customized and extended. Please reach out with any questions or feedback about pg2k4j, and for details about how to use or contribute to pg2k4j see the README.

Photo by Jonny Lindner on pixabay

--

--