Scaling Requests to Queryable Apache Kafka Topics with nginx

Torben Meyer
bakdata
Published in
6 min readDec 2, 2020

In our previous blog post Queryable Kafka Topics with Kafka Streams, we introduced how we can efficiently scale Apache Kafka backed key-value stores by exposing additional metadata. However, applications should not have to know about this detail. As they usually communicate with the stores through a reverse proxy, it offers the perfect opportunity for such custom routing logic. In this blog post, we explore how to implement this idea in the popular nginx proxy.

You can find the code for this blog post in our GitHub repository.

Queryable Kafka Topics with Kafka Streams

Before looking into the proxy, we quickly recap the scalable architecture introduced by our previous blog post. For one topic, there are n Kafka Streams replicas exposing a key-value store API and each is responsible for a set of the topic’s partitions. When a replica processes a request, it has to check if the key belongs to its set of partitions. Otherwise, the replica has to forward the request to the responsible peer. To avoid such overhead, the stores expose metadata containing the mapping of partitions to hosts. For a given key, applications then compute the corresponding partition by implementing Kafka’s DefaultPartitioner, which is based on the murmur2 hash function. This way, they can send their request to the host responsible for the key, and we avoid needless communication overhead.

The final application architecture from our last blog post

Partitioning-aware Proxy

Implementing the DefaultPartitioner based routing logic requires broadly two steps: First, the load balancer has to fetch and store the mapping of partitions to hosts. Second, for every incoming request, it computes the queried key’s partition and passes the request to the responsible host.

Considering these requirements, we choose OpenResty as our nginx proxy. It comes with nginx-lua and LuaJit, allowing us to use Lua scripts that implement additional routing logic.

Partition to Host Mapping

The first step requires requesting the metadata from one of the stores’ metadata APIs and parsing the returned JSON. The latter can be done with the cjson module already included in OpenResty. We use the lua-resty-http library for requesting the stores’ metadata. OpenResty comes with its own package manager opm , hence installing boils down to a single instruction: opm get ledgetech/lua-resty-http .

With our dependencies in place, we configure nginx:

lua_shared_dict instantiates a shared variable called partitions for storing the mapping. Additionally, we use the init_worker_by_lua_file API to reference the Lua script implementing the initialization of the variable.

With the power of Lua, the initialization becomes almost trivial. The beforehand mentioned libraries take care of the tedious tasks in the fetch-partition_hosts() function: requesting the metadata from the METADATA_URI, e.g. http://kafka-store/metadata/partitions, and parsing the JSON data. The function set_partitions stores the decoded JSON in the shared variable partitions returned by the ngx.shared API. Additionally, the shared variable keeps track of the number of partitions. This helps to route the requests later.

Note: We recommend using ngx.timer.at to schedule this function. This workaround is needed because limitations in ningx’s core lead to unavailable sockets in the init_by_lua context.

Routing Fetch Requests

Having access to the mapping of partitions to hosts, the proxy is ready to serve requests. For that, we extend our nginx configuration:

As shown, we define an upstream element called kafka-store and reference a Lua script balancing the requests. Additionally, we add a location responsible for requests to the key-value stores. All it does is a proxy pass to the upstream element defined above.

Let’s take a look into the routing logic of our Lua script:

At a glance, this script is pretty straight-forward. First, we extract the queried key from the path (line 12), calculate the murmur2 hash, and with that the partition (line 13). This leads us to the host which should handle the request for this particular key (line 14). Finally, the ngx.balancer API allows us to set it dynamically (line16).

However, there is one open question concerning the murmur2 hash. How is it implemented? It is the most crucial part of the routing logic. We have to ensure that it is equivalent to the implementation provided by Kafka. Otherwise, the proxy routes requests to the wrong hosts. Since existing murmur2 implementations in the Lua and nginx ecosystem do not allow passing the so-called seed, these cannot be used in our case.

Thus, we provide our own implementation written in C, which is equivalent to the Kafka version. In the murmur2 module, we use the FFI library to load the function compiled as a shared object and then expose the wrapper function that is called in our routing script.

Up and Running

With the custom routing logic implemented, we want to get our improved proxy up and running. For that, we create a new Docker image:

In the first build stage, we compile the murmur2 C function into a shared object. The second stage uses the OpenResty image and adds our implementation. First, we install the lua-resty-http dependency as described before. Then, we copy the nginx.conf, as well as the scripts in the luaapi and luaib directories. The first directory contains init.lua and kafka_routing.lua, the latter the murmur2.lua module. Lastly, we add the compiled shared object from the first stage to the lualib directory.

Comparison to the Classic Approach

In the final step, we want to verify that our approach is more efficient. We compare our custom approach, i.e. routing the request based on the key to the responsible host, to nginx’ classic load-balancing, i.e. not taking the key-specific routing into account. To that end, we measure the requests per second (RPS) and response time latency achieved by both types. The testing is done with the great wrk, an HTTP benchmarking tool capable of generating significant load.

In a Kubernetes cluster, we deploy three replicas of the key-value stores with fixed resources (300m CPU and 500Mi memory). Overall, they contain one million keys with JSON objects as values. Additionally, we deploy one custom nginx and one ingress-nginx proxy. Their configurations are based on the defaults given by ingress-nginx. For the test, we let wrk run for five minutes with 12 threads and 48 connections: wrk -t12 -c48 -d300s -s base.lua http://nginx-host. The Lua script base.lua allows us to fetch a random key in each request, making the benchmark more realistic.

Let’s take a look at the results:

We can observe two times more served RPS for the custom routing logic. Additionally, the custom proxy provides a median latency of 15ms compared to the classic approach’s 40ms. For the custom proxy, the latency’s standard deviation is much smaller, and therefore the response time more predictable. The classic proxy routes a request to a wrong host in 2/3 of cases, which requires an additional request. Thus, the latency is on average higher and has a greater deviation.

Summary

In this blog post, we illustrated how to implement custom routing logic in nginx. This way, we can efficiently scale key-value stores backed by Kafka Streams without applications having to adept. We have shown that this approach provides a significantly lower response time latency and allows us to serve more requests per second.

--

--