At Zendesk, we have been using Kafka in production for several years. Primarily we have been using it to distribute events between services. For example, “a ticket has been changed, and this is what changed”. We are now considering using Kafka to share entities between systems. For example, each time a ticket is updated we will publish the full latest version of that ticket.
With events we were describing what happened. With entities we are simply presenting the latest version of the entity to other interested parties.
- Consumers may be interested in seeing all the events that occurred, not just the latest version.
- The topic has a retention time to ensure if a consumer is offline, the events are not cleaned up before the consumer has a chance to recover.
- Consumers are only interested in the latest version.
- Each entity has a unique identifier which will be set as the key of the Kafka record.
- Kafka topics with compaction enabled will be used so that older versions of entities are cleaned up to limit the amount of history consumers need to process.
The process of removing old versions of entities from compacted topics is called log cleaning.
Log cleaner 101
Lets break the compacted topic log up into 3 pieces:
The dirty ratio for a log in Kafka is defined as:
(size of the head) / (size of head + tail)
For simplicity, let’s assume all the records are the same size, and just consider the number of records.
In the example above the dirty ratio is
6 / (3 + 6) which is equal to
0.66: the log is eligible for compaction.
Log cleaner index
Log compaction starts by building an index of the latest offset for each key in the head. In this example the index would be:
This is a simplification: it’s actually a hash of the key which is stored in the index, so each entry is guaranteed to be 24 bytes according to the documentation.
Note that the maximum size of this index is controlled the broker configuration
log.cleaner.dedupe.buffer.size. If it is not big enough to fit the entire head, log cleaning will only progress up to the position in the head that the index could be built for.
Subsequent log cleaner runs would be required.
Given the example before, and the index built above:
Note there wouldn’t be empty space in the Kafka segment on disk: a new segment would be written.
While consumers are only interested in the latest version of an entity, a new consumer which processes the entire topic may see multiple versions of the entity depending on the state of log cleaning.
Kafka log cleaning has overheads. At the scale Zendesk operates, we needed to quantifiably explore how expensive it is, and what limitations exist.
Tickets is one of the domains under consideration for publishing entities. In one Kafka cluster, we may have up to 1.6 billion tickets, average about 5kb.
At times we will need to publish all 1.6 billion records, for example when initially bootstrapping. However, the majority of the time the topic will just be handling the regular trickle of updates. For the purposes of this test we are using 300 updates per second.
We wanted to evaluate the impact on our Kafka brokers of the following:
- Initial bulk publication
- Bulk republication
- New consumer group consuming all records from the beginning of the topic
- Consumer performance during republication
We spun up a dedicated test Kafka cluster setup as follows:
- 9 Kafka brokers (AWS EC2 m5.2xlarge)
- Kafka version 2.2.2
- 5tb EBS volume gp2 (SSD)
- Topic settings
— config cleanup.policy=compact min.insync.replicas=2 — replication-factor 3 — partitions 8
- When publishing we are using
acks = alland
Test 1 — Initial publication
The Kafka tool
kafka-producer-perf-test could not be used because it doesn’t support keys which are required for compacted topics. We built a simple Scala application which published random 5kb blobs for the keys 1 through to 1.6 billion. (Because we know the set of keys, we can later republish using the same keys to evaluate performance of a full republication or a trickle of regular changes to existing entities).
The full publish took 10.08 hours, sitting at about 43,000 messages per second. While 10 hours is a long time, this is not overly concerning as we could scale this out with further brokers / topic partitions if required.
Test 2 — Republication
The purpose of this test was to determine how Kafka behaved when a producer needed to republish all data. We have assumed that the data is going to be published in the same order as it originally was (eg a DB cursor is being used to publish ordered values). The keys are the same as the initial run, while the values are seeded from a different set of random bytes.
During the republication, I expected disk usage to increase, but shortly after the republication I expected that increase to be reclaimed at the log cleaner executed.
Throughput was slightly (6%) slower than the initial publication. I wasn’t concerned by this small variation.
During the test execution, disk usage doubled. However at the end of the test execution, disk usage was not reclaimed as expected. After 24 hours, no disk usage was reclaimed.
Log cleaner investigation
At the time I was investigating this, I didn’t have a great understanding of the log cleaner. So this follows roughly the thought process I went through to investigate this.
The logs also confirmed that the log cleaner executions were not reclaiming any space.
I double checked my code, it was using the same set of keys for the Kafka records produced.
I could see from the logs that the log cleaner was taking multiple cleaner iterations to get through a single partition. The head section of the logs which required indexing consisted of almost 1000 segments. However each cleaner iteration was only handling about 18 segments.
At that rate, it would take 55 log cleaner runs to index the head. And each iteration was taking 80–100 minutes. At that rate it’s going to take 3 days (to clean a single partition).
Back to the logs. Here is the summary of a single log cleaner run:
0.0% size reduction (0.0% fewer messages)
End size: 535,802.3 MB (148,931,380 messages)
Start size: 535,802.3 MB (148,931,380 messages)
Cleaned 535,802.3 MB in 6715.0 seconds (79.8 Mb/sec, 97.7% of total time)
Buffer utilization: 90.0%
Indexed 18,106.9 MB in 157.5 seconds (115.0 Mb/sec, 2.3% of total time)
535,802.3 MB of log processed in 6,872.5 seconds (78.0 MB/sec).
Log cleaner thread 0 cleaned log goanna.compacted.tickets.test-3 (dirty section = [158675344, 158675344])
It was clear we were not indexing enough records in a single iteration and that needed to be fixed. I looked up the docs and could see configuration settings that could be tweaked, but I was still confused about why each iteration was reclaiming no messages. Surely even if I had an suboptimal configuration, each iteration should still reclaim some space.
Eventually I realised this wasn’t related to the republication, but the initial publication. Lets go back to our initial publish of 1.6 billion tickets (spread across 8 partitions). The log cleaner dirty ratio is set to the default of 0.5. This means that up to 800 million tickets have not yet been cleaned, none of which will have any duplicates.
When we republish 1.6 billion tickets, the dirty section now contains 2.4 billion tickets, resulting in a dirty ratio of 0.75, and triggering the log cleaner to run. The log cleaner builds up the index of keys to offsets from the beginning of the dirty section. However the first 800 million tickets in the dirty section are from the first publication and are guaranteed to be unique. As a result all the cleans result in a 0% size reduction until we have progressed.
Tuning the log cleaner
We have a few levers we can pull.
With the default size and the knowledge each entry uses 24 bytes, we can index 5,592,405 entries. Given we had 2.4 billion dirty tickets across 8 partitions: that’s 300 million messages that need to be indexed / cleaned in a single log cleaner run. With the default setting, that is 54 log cleaner runs that are required, and each run is IO expensive.
I think it makes sense to increase this value. But it’s also worth keeping in mind that the majority of the time the log cleaner will not be cleaning large logs, so this is dead memory that otherwise the OS could be using to cache inbound / outbound data from Kafka. So we don’t just want to set it too high. I’m picking 1GB as a nice round number to try next. That would support indexing approximately 45 million messages. That would still require 7 log cleaner runs though, so this isn’t good enough in isolation.
To handle the 300 million dirty items in a single clean would require 7.2 GB memory and I don’t want this much memory sitting around unused on all the brokers most of the time.
I don’t think I want to mess around with this. If I understand this correctly, at best it’s going to give 10% more items that we can index in a single run. It appears to be a sensible default.
While it could help reduce overall log cleaning time, that would be at the expense of the brokers being more utilised (disk IO that could otherwise be used to serve producers and consumers). At this stage I’m looking for efficiency gains where I’ve been suboptimal.
Number of topic partitions
Given the decision to allocate 1GB memory to the dedupe buffer, and this meaning we can handle 45 million messages in a single cleaner run, I feel the following number of partitions is appropriate:
2.4 billion dirty tickets / 45 million = 53.333. Let’s round that up to 60 partitions.
More partitions has the benefit of being able to distribute load across more brokers.
Test results (round 2)
The dedicated test Kafka cluster setup is as before with the following adjustments:
log.cleaner.dedupe.buffer.size = 1073741824(1gb)
- Topic increased from 8 to 60 partitions
- The topic was deleted and recreated.
Test 1- Initial publication
It was a nice but unexpected improvement that the initial publication time improved.
The log cleaner execution completed much faster than before as well.
Test 2 — Republication
The log cleaner execution completed much faster than before as well.
While 12.5 hours could still be considered slow, this can be improved with more brokers or log cleaner threads.
Other considerations — dirty ratio
While the topic has 1.6 billion records, due to the default dirty ratio of 0.5, new consumers may need to consume 3.2 billion records. After the republication and log cleaning had completed, we observed the topics had grown by 25%.
I suspect most records change infrequently, but more recent tickets will be more active and have multiple changes. At a rate of 300 changes per second, it would take 61 days for enough changes to come through to trigger the log cleaner. Prior to this point, the consumers would be seeing a lot of duplicate versions of entities for those “recent” tickets. Consumers may benefit from an in memory sliding window where recent duplicates could be removed, however this would require experimentation with real data. This would only be relevant if consumers were doing a lot of full consumption from the beginning of the topic.
We can tweak the value of
min.cleanable.dirty.ratio for each topic. It is a tradeoff between several factors.
min.cleanable.dirty.ratio is guaranteed to result in lower EBS volume sizing on average, but we can only provision to a lower size if the log cleaner is keeping up. In our example, the topic requires the 16tb for the base set of records. If compaction only runs when we have a dirty ratio of 0.5, we need the same space again in reserve. At $0.10 per GB-month of provisioned storage, that’s $1,600 for “reserve” storage for the example here. We could half the amount of reserve storage required by decreasing the dirty ratio to 0.25 so that log cleaning is more aggressive.
A higher setting results in more CPU and disk IO during cleaning, but this could be offset by the reduced cost of storage mentioned above. Also, if it is a common need for consumers to replay the entire topic from the beginning, then having less records to read at consumption time will reduce IO on the brokers (and potentially reduce downstream costs as well in systems that need to process less records). However, we are unlikely to know how often consumers do this until we are in production.