Counting with domain specific databases

Josh Yudaken
Smyte Blog
Published in
8 min readApr 4, 2016

Counting is core to what we do at Smyte. Our realtime spam and fraud fighting software analyzes an event stream and needs to answer many questions about it. Effective questions often begin with “how often” or “how many”.

For example, if we want to find out which IP addresses are likely to be spammy, we may first look at how many messages have been sent from a given IP address in the last 24 hours. We might also want to know how many distinct users have been seen on the IP, as that may indicate an internet cafe.

This seems like a trivial problem to solve, but everything gets a lot more interesting at scale. If we want to keep track of which IPs are sending lots of messages, we’ll want to count how many messages every IP we have ever seen has sent over various time windows, such as past hour, past day, past week. Additionally, we often want to track more than just IP address. We count as many as thirty unique features, some of which can be very sparse or very dense. To make matters worse, we want to track pairwise features to answer questions like “how many messages has email address X sent while using IP address Y in the last hour?” This quickly leads to a combinatoric explosion of data, and though we’re only storing a single integer for each key, time window pair, those keys can take up a lot of space.

The sheer amount of data is one hard problem. Latency is another hard problem. One common use case is running Smyte before sending push notifications. If Smyte is slow, then push notifications will be delayed, and our customers will be unhappy. This means our latency budget for counters is too low (≈ 10ms) to do anything too complicated.

There are clever data structures that can help. One in particular, a count-min sketch, can provide an efficient approximate count. While we have some use cases for count-min sketch, it has a big problem: it tends to overestimate. Since counts tend to be small and false positives are unacceptable, we needed a different solution.

We’re a small startup, so we need to keep our burn rate low. It turns out that storing all the different counts is one of the primary costs of providing our service. Often (but not always) we prioritize performance and cost, and we can tolerate eventual consistency. With that said, as a startup we usually don’t have time to “get it right” the first time, so we made a plan to get something done quickly that could be improved later.

v0. Single Redis server

When we need a new feature, we’ll generally hack out an initial prototype version with Redis. The fast, easy-to-use database allows us to get something up in a few hours, and we can move it to something different when we need stronger durability guarantees or if it gets too expensive. This appears to be a pretty common pattern we’ve seen at other companies as well.

Redis already supports INCR and DECR, but it does not intrinsically support time windows. By leveraging Kafka, a distributed message queue, we were able to support time-windowing pretty easily. The example here serves to demonstrate the very basic algorithm.

The strategy here is fairly simple. We use Kafka as a source of truth and pipe every increment into a super simple topic containing (timestamp, key, amount). One consumer is responsible for incrementing (Redis INCRBY), and then a separate consumer waits until the timestamps are an hour old before decrementing (Redis DECRBY).

Exactly-once delivery of the increments and decrements is achieved by storing the Kafka offsets for each tailer in Redis, and using Redis MULTI/WATCH/EXEC transactions. We commit our offset to Kafka as well, but the source of truth is the offset stored in Redis.

By using Secor we can easily save older data into longer term storage. We wrote our day & week consumers to read from this storage instead of Kafka. This also makes it trivial to switch away from Redis and simply reprocess all our data. Since we’re in Google Cloud we can use their nearline storage which provides cheap storage for this type of data.

Choice of sharding algorithm

The strategy described above works great for a single server, and by sharding the topics by key we’re able to horizontally scale this out to any number of counters. But how do you pick which key goes to which partition?

Consistent hashing is a popular choice, but we chose to use a much simpler sharding algorithm that will give us the flexibility we need later. Every increment is written to a sharded kafka partition, using the simple algorithm `MurmurHash3(key) % N` where `N` is our number of shards. MurmurHash3 is a well-known fast sharding algorithm for this use case and has solid implementations in the languages we use. We then run the entire set of tailers (incr, decr-hour, decr-day, …) for each partition.

Since most of our keys include a timestamp, we’re able to dynamically change our simple algorithm based on when the key was initially created. This will enable us to easily add/remove shards in the future, while removing the need to ever move data around as standard consistent hashing designs require.

As an added bonus it is possible for us to split a shard into two if we need to. Using backups we can bring up a second copy of the same data, and then use another single un-used bit of the MurmurHash’ed key to decide which side of the split each key belongs. Once complete we can clean any duplicate copies by running DEL on any keys that do not match the shard. Instead of expensive SCAN/KEYS queries on a production cluster, we use rdb-rs to read the backup files. We then shard the keys into redis protocol in text files and can use `redis-cli --pipe` to feed it to our production boxes.

Productionising with RocksDB

Now that we had a solution we knew worked, we focused on reducing cost. The Redis protocol made a lot of sense for the server, but storing everything in memory is not an option.

We took a look at leveraging the existing data stores we were running:

  • While Riak includes a counter CRDT, it does not support the atomic write batches necessary for exactly-once delivery of increments and decrements. There is an additional performance penalty as the clients are not smart with regards to sharding.
  • MySQL does not support efficient key compression, requests cannot be efficiently pipelined, and the entire setup is generally far less flexible than managing the storage ourselves.

The route we chose was to create our own super-simple database using RocksDB within a C++ server we implemented on top of Wangle. Normally, building your own database server is a bad idea™, but RocksDB + Redis protocol already solved most of the hard problems for us:

  • The Redis protocol is flexible, well-documented, reasonably efficient and easy to implement.
  • We already have efficient Redis clients in all of our production languages. We could test both backends with no changes on the client side.
  • RocksDB got us huge wins in key compression as most of our keys share similar prefixes.
  • Backing up RocksDB to cloud storage is straightforward due to its use of immutable files for the levels and the BackupEngine API.
  • RocksDB supports safe, atomic writes via WriteBatch.

Data replication and backup

Another problem with building your own distributed database is replication. Our initial Redis implementation used the built-in Redis replication, which doubled the number of high memory instances that we had to pay for, but RocksDB-based version could be much more efficient — using disk instead of memory, compressing common key prefixes, expiring keys during compaction, etc.

We were able to leverage Kafka to get replication for free. Each replica is associated with exactly one Kafka partition. It stores its Kafka offset locally in RocksDB and has no knowledge of other replicas. Each new replica starts from the most recent backup and then consumes from Kafka to catch up to where the live data stream is. This setup makes it trivial to create new replicas, either using cloud persistent disks or local drives on the machines.

In our persistent disk setup we have two Kubernetes replication controllers configured for each partition with names `counters-{partition}{replica}` (i.e. `counters-4a` and `counters-4b`). Using Kubernetes services each shard receives an internal service IP for all its replicas. We run the Kubernetes `kube-proxy` on all of our machines, and it sets up iptables to load-balance the traffic across replicas.

This setup gives us easy failover and quick recovery. Should one replica fail, for example, due to the decommission of the underlying host, Kubernetes will unmount the persistent disk from the host and schedule the affected replica to a different host with the same persistent disk mount. As a result, to catch up with Kafka, the recover process only involves consuming data for the duration of replica rescheduling, which is usually no more than two minutes, as supposed to basing on the latest backup, which may lag as much as 24 hours.

Some of our databases use Local SSD drives, which offers higher IO performance than persistent SSDs. Local disks slightly complicate the Kubernetes setup, since it becomes more valuable to have the pods be more sticky to each server. As we’re running the open-source Kubernetes (rather than Google’s container engine) it is trivial for us to change the code to make the pods more sticky.

The end result

With Redis and Kafka we were able to quickly build a system that was able to count hundreds of millions of unique keys. More importantly, this architecture let us “throw money at the problem” for a while as we scaled other parts of our infrastructure.

Eventually, when costs became an issue we migrated Redis to RocksDB and added Kubernetes. We were able to save thousands of dollars a month and reduce the RAM requirements of our application by about 1 TB.

At the end of the day, all of the open-source technology available in 2016 — especially Kafka, Kubernetes and RocksDB — made it possible for us to build a domain-specific database for counting things. Our setup still isn’t perfect, but we’ve built even more interesting servers (based on the Sliding HyperLogLog paper for one) using a similar approach. But that’s another blog post.

Photo credit: Greg Rakozy

--

--