This post was co-written with Thomas Lambert
If Kafka is part of your everyday work, you might already have seen this kind of error:
org.apache.Kafka.common.errors.TimeoutException: Expiring XX record(s) for [topic]: XXXX ms has passed since batch creation .
Basically, this means that messages are dropped because the service cannot reach the Kafka node.
Before we explain the issue we faced, we can define some naming conventions that will be used in this article:
- A producer is an EC2 instance producing messages for Kafka.
- A broker or a Kafka node is an EC2 instance that stores messages on disks. The messages come from producers.
- A record is an event in the sense of a streaming system.
From time to time, our producer expired records. It seemed random and restarting the failing producer seemed enough to fix it.
After digging through the logs a bit, we found that some producers could not establish the (TCP) connection with some Kafka nodes with the following log:
Connection to node [kafka-node-number] (/[ip-address]:9092) could not be established. Broker may not be available.
To produce records and push them to Kafka, the producer needs to establish several TCP connections to different Kafka nodes (the partitions leaders).
In our case, there was no apparent correlation between the different connection failures, and the issue seemed a bit random.
Connections between a given producer and a given broker could be established, while other connections to this same broker from other producers could fail. An example of the situation is depicted below:
Of course, transient network glitches may occur, but we faced too many occurrences for it to be a random network issue.
To drill down the issue at the network level, we need to have a grasp of how a TCP connection is established.
How is a TCP connection established?
The TCP (Transmission Control Protocol) uses a three-way handshake to establish a connection. Each machine needs to synchronize and acknowledge the other before sending data.
In a nutshell, this schema explains quite well how it works under the hood.
The sender needs to send a
SYN packet, the receiver receives this packet and sends a
SYN_ACK packet and ultimately, the sender receives the
SYN_ACK and sends an
Now let’s see in which state is our TCP connection during our outage.
What is our TCP connection state?
tcpdump session on both a producer and a broker, we saw the
SYN from the producer on the Kafka node, and the
SYN_ACK sent, but no
SYN_ACK on the producer. Lost in between the two instances.
As a result, in
netstat, we saw that the connection to Kafka from the producer was in
SYN_SENT state: It sent the
SYN_ACK but never received a response. And the producer was waiting for the
ACK from the Kafka node.
So we found the issue: the packet
SYN_ACK gets systematically lost in the network. How does this happen?
The culprits: ARP cache and AWS network behavior
The ARP process
Before sending a packet on a private subnet, the kernel must resolve the destination IP address to a Mac address. MAC addresses are, for example, used for device communications via ethernet or wifi. The IP to MAC resolution is done by the Address Resolution Protocol (ARP) and is then cached in the ARP table.
When opening a new socket, the ARP cache is first checked for an ARP entry for the targeted IP. If the IP is not present, an ARP broadcast is sent to ask if any device on the network has the requested IP address.
On AWS, the behavior of the kernel is similar even though there is no real ARP in a VPC. As per the AWS documentation:
All traffic is unicast (the Address Resolution Protocol, or ARP, is gone too). The VPC forwarding design knows where all IP addresses are, and proxies ARP responses for all ARP requests locally at the hypervisor.
In our case, some ARP cache entries from the broker were stale: we saw that some of the entries pointed to wrong MAC addresses.
This is because some newly launched EC2 instances reused IP addresses of terminated instances and did not update the ARP entry.
Hence, the ARP entry of the Kafka node was stale, the ping
SYN_ACK was dropped each time.
How did we fix it?
The ARP cache has a garbage collection (GC) mechanism to evict stale entries. The GC is ruled by several parameters including those:
gc_interval= How frequently the garbage collector for neighbor entries should attempt to run.
gc_stale_time= Determines how often to check for stale neighbor entries. When a neighbor entry is considered stale, it is resolved again before sending data to it.
gc_thresh1= The minimum number of entries to keep in the ARP cache. The garbage collector will not run if there are fewer than this number of entries in the cache. → This is the key value for our scenario.
gc_thresh2= The soft maximum number of entries to keep in the ARP cache. The garbage collector will allow the number of entries to exceed this for 5 seconds before collection will be performed.
gc_thresh3= The hard maximum number of entries to keep in the ARP cache. The garbage collector will always run if there are more than this number of entries in the cache.
So our issue was that the GC did not clean the ARP entries often enough, because we did not reach the
gc_thresh1 value (default at 128) often enough.
The solution we adopted was to set the
gc_thresh1 value to 0, to force a garbage collection every
gc_interval time. It does add a bit more pressure on the kernel but this is acceptable for our workload and we are now sure the mapping will be garbage collected regularly enough for our use case.
In the end, what seemed to be a random packet loss was a deterministic issue. We were reusing IPs and the ARP cache entries were not updated resulting in a wrong IP ↔ MAC mapping.
This issue did not occur anymore since the deployment of the fix, and thankfully we did not notice any side effects on production.
Shortly after having fixed this issue on our Kafka, it did appear on our Cassandra as well. Specifically, the typical context for this issue to happen is:
- 1 pool of clients whose instances are rolled regularly, reusing IPs over time
- 1 cluster of servers with long-lived instances, and each instance has a low number of instances connecting to it
If you encounter some connection issues, do not hesitate to have a look at this ARP cache tuning.
Thanks to all those who reviewed this article. Especially Benjamin Davy, Joseph Rocca, and Marc-Enzo Bonnafon.
Thanks to the AWS support who helped us dig through the issue.