Scaling Requests to Queryable Apache Kafka Topics with nginx

Torben Meyer
Dec 2, 2020 · 6 min read

In our previous blog post Queryable Kafka Topics with Kafka Streams, we introduced how we can efficiently scale Apache Kafka backed key-value stores by exposing additional metadata. However, applications should not have to know about this detail. As they usually communicate with the stores through a reverse proxy, it offers the perfect opportunity for such custom routing logic. In this blog post, we explore how to implement this idea in the popular nginx proxy.

You can find the code for this blog post in our GitHub repository.

Queryable Kafka Topics with Kafka Streams

Image for post
Image for post
The final application architecture from our last blog post

Partitioning-aware Proxy

Considering these requirements, we choose OpenResty as our nginx proxy. It comes with nginx-lua and LuaJit, allowing us to use Lua scripts that implement additional routing logic.

Partition to Host Mapping

With our dependencies in place, we configure nginx:

lua_shared_dict instantiates a shared variable called partitions for storing the mapping. Additionally, we use the init_worker_by_lua_file API to reference the Lua script implementing the initialization of the variable.

With the power of Lua, the initialization becomes almost trivial. The beforehand mentioned libraries take care of the tedious tasks in the fetch-partition_hosts() function: requesting the metadata from the METADATA_URI, e.g. http://kafka-store/metadata/partitions, and parsing the JSON data. The function set_partitions stores the decoded JSON in the shared variable partitions returned by the ngx.shared API. Additionally, the shared variable keeps track of the number of partitions. This helps to route the requests later.

Note: We recommend using ngx.timer.at to schedule this function. This workaround is needed because limitations in ningx’s core lead to unavailable sockets in the init_by_lua context.

Routing Fetch Requests

As shown, we define an upstream element called kafka-store and reference a Lua script balancing the requests. Additionally, we add a location responsible for requests to the key-value stores. All it does is a proxy pass to the upstream element defined above.

Let’s take a look into the routing logic of our Lua script:

At a glance, this script is pretty straight-forward. First, we extract the queried key from the path (line 12), calculate the murmur2 hash, and with that the partition (line 13). This leads us to the host which should handle the request for this particular key (line 14). Finally, the ngx.balancer API allows us to set it dynamically (line16).

However, there is one open question concerning the murmur2 hash. How is it implemented? It is the most crucial part of the routing logic. We have to ensure that it is equivalent to the implementation provided by Kafka. Otherwise, the proxy routes requests to the wrong hosts. Since existing murmur2 implementations in the Lua and nginx ecosystem do not allow passing the so-called seed, these cannot be used in our case.

Thus, we provide our own implementation written in C, which is equivalent to the Kafka version. In the murmur2 module, we use the FFI library to load the function compiled as a shared object and then expose the wrapper function that is called in our routing script.

Up and Running

In the first build stage, we compile the murmur2 C function into a shared object. The second stage uses the OpenResty image and adds our implementation. First, we install the lua-resty-http dependency as described before. Then, we copy the nginx.conf, as well as the scripts in the luaapi and luaib directories. The first directory contains init.lua and kafka_routing.lua, the latter the murmur2.lua module. Lastly, we add the compiled shared object from the first stage to the lualib directory.

Comparison to the Classic Approach

In a Kubernetes cluster, we deploy three replicas of the key-value stores with fixed resources (300m CPU and 500Mi memory). Overall, they contain one million keys with JSON objects as values. Additionally, we deploy one custom nginx and one ingress-nginx proxy. Their configurations are based on the defaults given by ingress-nginx. For the test, we let wrk run for five minutes with 12 threads and 48 connections: wrk -t12 -c48 -d300s -s base.lua http://nginx-host. The Lua script base.lua allows us to fetch a random key in each request, making the benchmark more realistic.

Let’s take a look at the results:

Image for post
Image for post

We can observe two times more served RPS for the custom routing logic. Additionally, the custom proxy provides a median latency of 15ms compared to the classic approach’s 40ms. For the custom proxy, the latency’s standard deviation is much smaller, and therefore the response time more predictable. The classic proxy routes a request to a wrong host in 2/3 of cases, which requires an additional request. Thus, the latency is on average higher and has a greater deviation.

Summary

bakdata

bespoke data engineering

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store