Apache Kafka Deployments and Systems Reliability — Part 2

Joseph Niemiec
Cloudera
Published in
8 min readApr 12, 2022
Photo by Alina Grubnyak on Unsplash

In part 1 of this series we talked about serial and parallel reliability in the context of systems reliability. Additionally, we discussed some common deployment patterns that are used in the field: Apache Kafka clusters with and without co-located Zookeeper and VM-based Kafka Clusters, where we looked at the benefits and challenges of each. In part 2 the discussion will shift toward classic failover use cases with two clusters, for disaster recovery, and geographically distant clusters, for aggregation or distribution.

Failover or disaster recovery is one of the most common production requirements to ensure that business continuity is achieved in the event something goes wrong. This could be an accidental software failure taking down the system, a physical failure such as the core network or data center cooling failures, or a complete disaster that destroys the data center like an asteroid or tornado hitting, from which there is no possible way to repair the location. In order to enable failover it’s commonly required that messages be continuously replicated into another Kafka cluster, which is typically deployed in a geographically distant location from the source Kafka cluster. When this setup is in place, consumers and producers can failover to the replicated side upon the failure of the source cluster and continue processing data.

Aggregation and Distribution of data from multiple locations is another common pattern. Aggregation can occur when producers generate data to local Kafka clusters, at distinct geographical locations, and data from these clusters is replicated into a central Kafka cluster for further processing. Having for example local device logs collected at a smaller local geographical cluster for improved performance and then transferred to a primary cluster where analytics processing is performed on these messages is an aggregation pattern. Distribution, being the reverse of aggregation, would be when messages are copied from a source cluster into multiple destination clusters. Transferring collected change data capture records from a source system to multiple data centers so applications in each can apply the changes is a distribution pattern.

We use the term “Classic failover” to refer to the failover between two clusters in different data centers, where messages are being replicated across the WAN (wide area network). In the event of a catastrophic failure of one data center, clients can failover and resume processing on the alternate cluster. This pattern discussed for Aggregation and Distribution was similar to this, but without the expectation of application failover between clusters.

Copying Data for Failover or Aggregation/Distribution

Kafka provides an out of the box utility, called MirrorMaker, for copying data across different clusters. There are currently two versions of MirrorMaker: v1, and v2, which provide very different capabilities. Cloudera’s Streams Replication Manager (SRM) provides additional ease of use capabilities on top of MirrorMaker v2, like improved monitoring in a graphical UI.

In the replication scenario the high level failure domains are the data centers themselves and then the MirrorMaker deployments. The underlying cluster within each data center is assumed to have some parallel availability due to the design of Kafka. For replication of data, a MirrorMaker cluster of sufficient size to move all the required data should be set up. If your data volume is low enough, this could be achieved with a single MirrorMaker but it would result in the replication flow being a serial availability problem due to the single MirrorMaker deployment being a single point of failure. Because of this, one should always deploy multiple MirrorMaker instances to provide parallel availability of the replication service.

By no means will you need to have the maximum number of instances deployed as a far smaller number will most likely be able to replicate all of the data in your cluster with ease. It is important to test how many instances are required to achieve the replication throughout required, and then decide how many additional parallel instances you want to run to provide availability of the failure domain should some of them fail. Much like how part 1 talked about the placement of Kafka brokers in racks, the MirrorMaker instances should be distributed across the racks to ensure that failures of hardware do not result in serial reliability of the MirrorMakers.

If you’re using MirrorMaker v1, the maximum number of instances you can use to achieve replication throughput is limited by the number of partitions your replication has across all topics. For example: if you have a single topic in your cluster with 8 partitions, deploying more then 8 instances of MirrorMaker workers will not increase the throughput of replication, but it will increase the reliability of the system should one of the 8 instances that are replicating fails. This is somewhat more complex in the case of MirrorMaker v2, which also replicates offsets and heartbeats but for general purposes you can treat the maximum number of instances the same as with MirrorMaker v1 based on the number of partitions your replication has across all topics.

Offsets during failover

In the case of the classic failover scenario, consumer group offsets are of the utmost importance, as they represent the position (message) the consumer (application) is at in the topic. Topic offsets are not identical between clusters. For example, when the message in partition 1’s offset #1001 is replicated to the target Kafka cluster, it can, and probably will, end up at a completely different offset, for example #2099. If you’re using MirrorMaker v1 there is no guarantee that the message ends up in the same partition that it was replicated from. MirrorMaker v2 and Streams Replication Manager ensure that the message is replicated to the same partition that it originated from. They also provide offset translation capabilities, which help applications to find out what are the correct current offsets on the target cluster

Offset translation does not interfere with the offsets of the replicated messages in the target cluster. MirrorMaker manages this by keeping a mapping between source offsets and target offsets, which can be later used to correctly update the consumer group offsets on the target cluster in the event of a failover. This allows consumers to resume processing from where they left off on the origin cluster. Periodically, consumer group offsets on the origin cluster are replicated and translated into the target cluster. These offsets can be applied to the target cluster consumer groups at any time by running a few commands and in the future can be automatically applied.

Wide Area Network Challenges for Replication

The WAN provides another set of challenges that exist for both the Classic Failover and in the Distribution and Aggregation scenario:

  • Bandwidth for copying data between the clusters
  • Connectivity challenges in order to allow full mesh connectivity of both clusters to the MirrorMaker instances.

Bandwidth between the sites can be impacted by what is commonly called ‘The long fat pipe problem’ but more properly described as the ‘Bandwidth-Delay Product.’ The Bandwidth-Delay Product is an effect on TCP packets requiring acknowledgements that limit the amount of data which can be placed in-flight into the network uplink, the rate at which its possible to acknowledge these packets is based on the latency of the network uplink between the sites. Servers have to keep track of each TCP packet sent while waiting for an acknowledgement to be sent back, keeping track of the prior sent packets takes place in a TCP Window with a set amount of memory that can only track so many in-flight TCP Packets. If this window is too small then the total bandwidth of the uplink can be wasted as the window is not large enough for the server to send or receive enough packets to utilize the entirety of the bandwidth provided.

Having a TCP send or receive window smaller than the below formula calculates will result in performance lower than the uplink can support. To make the best use of your uplink, update your TCP send and receive windows to match the bandwidth delay product.

Bandwidth Delay Product in bits (proper tcp send or receive window size) = uplink-bandwidth-in-bits-per-second * latency-in-seconds

Another way to look at this, if you know your uplink latency and TCP window sizes, is as the maximum possible throughput, in bits per second.

Throughput in bits-per-second = window-in-bits / latency-in-seconds

Full Mesh Connectivity

Another problem that can occur when designing a replication topology is a lack of full mesh connectivity between the MirrorMaker or Stream Replication Manager deployments and both the origin and target cluster brokers. This can present problems if networks are not designed in such a way that this can be achieved. Solutions are very specific to the environment that the clusters are deployed in but some potential options include utilizing MirrorMaker deployed on the source cluster pushing to publicly exposed target brokers, or VPN tunneling, and possibly other options that enable the required connectivity.

For deployments where the source cluster is in an enterprise data center and the target cluster deployed in the public cloud the MirrorMaker deployment can be run on the source cluster. By running the deployment there it’s possible it can connect to all source cluster brokers and then with correct firewall rules assuming internet connectivity can connect to all target brokers deployed in the public cloud allowing replication to take place. Some cloud vendors support direct connection capabilities that allow for the DC network to be connected to the private VPC in the cloud which allows for correct full mesh connectivity to be achieved. When deployments are between data centers where no publicly exposed cluster endpoints can exist a potential option is to use a VPN. Doing this enables the servers which connect over the VPN to peer into the opposing network enabling full mesh connectivity for MirrorMaker and hence replication can take place.

Conclusion

Classic Failover for Disaster Recovery and Aggregation and Distribution both focus on 1 core capability: replication of messages. Failover also touches upon consumer group offsets that are used by the consumer application so that it can resume processing where it left off, when switching between Kafka Clusters, by using Offset Translation on MirrorMaker v2 or Streams replication Manager. Ensuring that your MirrorMaker or Stream Replication Manager deployments are sized correctly, so that they can replicate all the data required and handle failures by providing additional availability, is an important aspect to take into consideration. All replications between clusters are impacted by the effect of the Bandwidth-Delay Product. Clusters that have higher latency between them are more affected and require networking tuning in order to fully utilize the available bandwidth.

--

--