Southpaw

Morrigan Jones
JW Player Engineering
7 min readDec 12, 2018

--

Streaming Left Joins with Change Data Capture

We here at JW Player are breaking up the monolithic database that powers our media services and moving towards a microservice architecture. While this system served us well in the past, it has become a bottleneck as we’ve grown and our teams have become increasingly specialized. Strong coupling and iteration speed on both new and existing products are issues we needed to solve.

Our search and recommendations products depend heavily on data from this monolithic database such as titles, tags, and other metadata. Additionally, our Media Intelligence team is generating numerous artifacts that we are using to improve our algorithms. It isn’t enough to be able to request data from the APIs for these other services at query time. For our recommendations to function, we need to pull in entire datasets from other services in order to build our service long before the first request arrives. And we can’t just take an initial snapshot and be done. We require our recommendations service to update whenever a piece of media or other required data updates. Because of this, we need the ability to perform both iterative updates and full rebuilds of existing downstream services while decoupling these systems.

Enter the Polylog.

The Polylog

In building our Polylog architecture, we started with inspiration from the New York Times’s Monolog project. Similar to them, we implemented a change data capture system using Apache Kafka, which is a distributed, log-based platform. We chose Kafka because of its high availability and fault tolerance. Kafka can also be configured to maintain at least the latest version of any record, so our services can be built and rebuilt from the upstream data. In addition to Kafka, we are leveraging a tool called Debezium that allows us to easily snapshot upstream databases and then stream changes into Kafka.

This enabled us to get the Polylog up and running quickly with minimal fuss. With these changes captured in Kafka, they are now available on a log (“topic” in Kafka parlance) to downstream services. If you want to learn more, check out the presentation Eric Weaver and I did for the NYC Data Engineering meetup. You can also see the slides here.

With the Polylog in place, we now have multiple topics of records, one topic per type of record. Using Debezium, each table corresponds to its own topic. But our search and recommendations use case requires pulling together many different topics of data together based on the relationship between these datasets. And not only is the data split up, it is also very hierarchical.

Back when I was first planning out the Polylog project, I looked around at various stream processors such as Flink and Kafka Streams to see what their streaming join support was like. While these tools offer impressive functionality and performance, their join support was limited or have multiple caveats. The joins we needed to do would:

  • Go multiple levels deep
  • Join on primary and non-primary keys depending on the particular relation
  • Preferably perform multiple joins without writing intermediate results out to different topics or adding operational complexity
  • Not be windowed joins on time series data
  • All input topics are treated as if they were ‘tables’
  • Support 1 to 1, 1 to many, and many to 1 relations

In other words, we need join functionality that could produce similar results that a traditional SQL database would while also continuously producing and updating output records whenever an input record, any input record, was created or changed. This was when I was inspired to write Southpaw.

Southpaw

Southpaw works by reading in records from input topics, recording these records in its local state, updating indices, then creating output (aka denormalized) records based on these indices and the input records. The relations between the different input records are defined in a simple JSON format that mimics SQL joins.

Example

Let’s say we have a music service that contains information about both songs and playlists. We capture the data and any changes from this service and populate different topics in Kafka. For this example, we have three different topics at a minimum:

  • playlist — This represents the top-level playlist record.
  • song — This represents a song in our music service.
  • song_to_playlist — This maps the songs to playlists. An individual song can appear in zero, one, or many different playlists.

Now we want to make a downstream service that requires data from all three of these topics. Additionally, it wants this data denormalized into documents around the root playlist records. An example relations JSON for this could look like:

[
{
"DenormalizedName": "MusicPlaylists",
"Entity": "playlist",
"Children": [
{
"Entity": "song_to_playlist",
"JoinKey": "playlist_id",
"ParentKey": "playlist_id",
"Children": [
{
"Entity": "song",
"JoinKey": "song_id",
"ParentKey": "song_id"
}
]
}
]
}
]

Which would mimic a SQL query like:

SELECT ...
FROM
playlist
LEFT OUTER JOIN song_to_playlist
ON playlist.playlist_id = song_to_playlist.playlist_id
LEFT OUTER JOIN song ON song_to_playlist.song_id = song.song_id

Unlike SQL where a 1 to many join will give you many different records, Southpaw will package everything together in a single document. The root of the document is the input record of the root relation. In our example above, it would be the playlist. There are then a hierarchy of children records. To use the above example relations, you could have a playlist (the root) that contains a list of song_to_playlist records (assuming a 1 to many relationship) which would each contain a single song record (assuming a 1 to 1 relationship). Additionally, the root playlist could also join to tags or other metadata and each song containing one or more genres. Each playlist would be its own JSON document.

An output record for our denormalized playlist could look like:

{
"Record": {
"playlsit_id": 4321
"title": "I'm a playlist!",
},
"Children": {
"song_to_playlist": [{
"Record": {
"playlist_id": 4321
"song_id": 1,
},
"Children": {
"song": [{
"Record": {
"artist": "A Real Band",
"song_id": 1,
"title": "That #1 Hit Single"
},
"Children": {}
}]
}
}, {
"Record": {
"playlist_id": 4321
"song_id": 2,
},
"Children": {
"song": [{
"Record": {
"artist": "Infinity and the Reals",
"song_id": 2,
"title": "We're not imaginary"
},
"Children": {}
}]
}
}]
}
}

To be able to generate the output records based on any incoming record, Southpaw uses two different types of indices that each answer a different question:

  • Parent indices: What output records should be created based on the join key of a given input record? In other words, if I read a song record, what playlist output records need to be generated that contain that song?
  • Join indices: What previously recorded input records should be included in an output record? In other words, I’m creating an output record for playlist X, what songs do I need to include?

With these two types of indices, Southpaw is able to create fully denormalized records anytime any input record is consumed.

Performance

Early on it was clear that performance was an important issue I needed to tackle. I accomplished this by making optimizations on multiple different fronts:

  • Read child records first when doing the initial build — Denormalized records only need to be created when root records are read. Southpaw can read in all of the child records first efficiently since it only needs to store those records and create the appropriate join indices.
  • When lagging, keep a list of records to create to avoid creating records multiple times — Often when one record is updated, many related records will be updated around the same time. This is especially true during an initial create event. For example, if a new piece of media is created, it will also have many related records created at the time such as tags, formats, and other related data. If we record the primary keys of the records that need to be created, we can avoid unnecessarily creating records over and over, while reverting to creating records as soon as an input records come in when lag drops to near 0.
  • Store sets of keys (index entries and records to create) in a byte packed array format to make reading, writing, serialization and deserialization faster — This was especially helpful for large index entries where a single key maps to 10k or even 1M keys! Southpaw was spending an inordinate amount of time serializing and deserializing between a Java set and a byte representation of that set that gets stored in the state. So I made a set implementation that was very close to its byte representation that completely removed this penalty. Large index entries were an issue of the past!

Future Improvements

We already have numerous ideas on how to further improve Southpaw. Some examples include:

  • Further optimizations — Instead of having a join index, store child records together to reduce # of gets to the state. If a record contains 20 children, we currently have to do 20 gets to the state to get each of those child records. If those records were stored together, we could speed up record creation.
  • Add an API to query or issue commands to the process — This would allow you to control Southpaw without having to redeploy. Additionally, being able to query an index or have Southpaw create and return a record to you would facilitate debugging.
  • Add the ability to include a transformation step after the denormalized documents are created — The denormalized records are large and include everything from the input records. With a transform step, we could craft the output record to be what we need it to be and nothing more. We could also include serialization to a format such as Avro.

Conclusion

We’ve been using Southpaw in production for months now. It efficiently and automatically generates and regenerates the denormalized documents as upstream services change. Because Southpaw fits a much needed hole for us that we couldn’t solve with existing technologies, we wanted to open source it and share it with the greater software community. You can find Southpaw on Github here. We hope others find it as useful as we have and would love feedback and contributions.

--

--