New Features of Kafka 2.1

The text version for those who like to read


I did a short while ago a video on the new features in Kafka 2.1, that you can watch below, but I wrote this blog to also allow you to read in your own time:


Kafka Upgrade Notes

Kafka 2.1 is quite a special upgrade because you cannot downgrade due to a schema change in the consumer offsets topics. Otherwise the procedure to upgrade Kafka is still the same as before, see: https://kafka.apache.org/documentation/#upgrade


Kafka Internals

Java 11

Kafka 2.1 is now available with Java 11! Java 11 was created in September 2018 and we get all the benefits from it, such as the Improved SSL and TLS performance (the improvements come from Java 9). According to one of the main Kafka committer, it is 2.5 times faster than Java 8.

We also get garbage collector improvement, G1 being the default collector and now can run in parallel. That means you’re more likely to have less long GC pauses.

Other Java 11 goodies include:

  • var allows you to reduce the amount of boilerplate code in your JDK applications. It is more easy to read as you can see in this example where we create a new producer:
var producer = new KafkaProducer<String, String>(properties);
  • You also have Future improvements if you use Future.
  • Project jigsaw is now in Kafka. It allows you to have smaller compiled program binaries by modularizing Java and more efficient runtimes.

Fencing of Zombie Replicas (KIP-320):

This new KIP fixes a rare data loss condition and it is now completely gone.
 You also get a new exception for the consumer when doing .poll(): OffsetOutOfRangeException

If you want to know how to deal with it, just read the KIP details it is well explained: https://cwiki.apache.org/confluence/display/KAFKA/KIP-320%3A+Allow+fetchers+to+detect+and+handle+log+truncation


Kafka Clients

In this new version of Kafka, the biggest change by far is the Support for ZStandard Compression (KIP-110).

Support for Zstandard Compression (KIP-110)

This algorithm was created by Facebook in September 2016 and it has been for 2 years in the works. The compression ratio is as good as gzip on the first pass. But, if we now look at the compression and decompression speed, which means how many megabytes can be encrypted or decrypted per second, we have 5 times the performance of gzip.

In Kafka, according to the KIP, the compression works great, as we have an 4.28 compression ratio. Shopify is an example of a production environment using ZStandard and they noted a massive decrease in CPU usage. Overall, if you use ZStandard you should have more throughput for a fraction of the cost.

Here is how to use ZStandard compression:

  • Producers(2.1): compression.type=zstd
  • Consumers (<2.1): UNSUPPORTED_COMPRESSION_TYPE (error)
  • Consumers(>=2.1): It should be working out of the box

Also, to use zstd compression, you also have to update the brokers to 2.1. To summarize, if you want to use zstd, the first thing you have to do is update your consumers, then your brokers and finally your producers.

Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-110%3A+Add+Codec+for+ZStandard+Compression

Avoid Expiring Consumer Offsets for Active Consumer Groups (KIP-211)

In previous versions of Kafka, there was a bug. Indeed, a consumer that was still active but not receiving data would lose its consumer offsets at some time, meaning there was a dormant producer or a down pipeline. If there was a restart or a rebalance, offsets would be reset, which would result either on data loss or duplicates reads (based on auto.offset.reset setting). It has been fixed in Kafka 2.1. Now, an active consumer will not get its consumer offsets reset if it is active. It is one of the reasons why we cannot downgrade to a previous version after upgrading our brokers to 2.1.

Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets

Intuitive Producer Timeout (KIP-91)

In Kafka, there was never an easy setting to provide a timeout for an entire send request. Indeed, we had to use four settings: max.block.ms , linger.ms , retry.backoff.ms, request.timeout.ms , all those settings linked up one to another

Now this has been fixed, by wrapping all these settings in a new setting called delivery.timeout.ms which provides an upper bound on how long you will wait until a message is delivered.

Here is how to use delivery.timeout.ms : by default, it’s quite a big number of 120000ms (120 seconds, 2 minutes). However, if you set it to Integer.MAX_LONG , you can delegate the retry mechanism to Kafka (wait indefinitely) which is quite nice.

Also, inside this KIP, there was a big change for retries. It is now defaulted to Integer.MAX_INT, instead of 0. Now you can get out of order data if the sends get retried unless you do this:

  1. You set max.in.flight.request.per.connnection=1 and that’s perfect because even with retries there is no out of order, but you lose in throughput
  2. You set enable.idempotence=true (which allows max.in.flight.request.per.connection=5 ) and not have any out of order data, while making sure you get maximum throughput.

Link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-91+Provide+Intuitive+User+Timeouts+in+The+Producer


Kafka Streams

Kafka Streams had lots of small changes

  • KIP-358: Kafka Streams has migrated to using Duration instead of longMs times. Thus, you might get a lot of deprecations when you upgrade your Kafka Streams to 2.1.
  • KIP-353: Kafka Streams had complicated the logic to select the “next record to process”, especially when there were multiples input partitions to read from. The algorithm used to be quite hard and non-deterministic if you use the old Kafka Streams. There is now a new setting, max.task.idle.ms , which allows you to control how long you are willing to wait for the synchronization of timestamps across different partitions. It is default to 0 ms to favor latency. If you want to test out that setting and want to introduce a little bit of latency, at the benefits of maybe synchronizing timestamps better between partitions, then put the setting to something a little bit higher.
  • KIP-319: Internal fix when using WindowByteStore .
  • KIP-321: There was the possibility to extract the TopologyDescription at runtime (KIP 120), but this has been improved in KIP 321.
  • KIP-330: This new version adds a missing interface to get the retention period of a SessionBytesStoreSupplier .
  • KIP-356: Kafka 2.1 adds a withCachingDisabled() function to StoreBuilder to complete the existing withCachingEnabled() function. Nevertheless, caching is still disabled by default.

Kafka Security

In my mind, Kafka 2.1 bring lots of really amazing changes regarding security.

DNS — KIP-235: You can now resolve CNAME before authentication using client.dns.lookup=true . This allows you to be a bit more smart around how you use DNS with Kafka while keeping security guarantees.

DNS — KIP-302: Now, if a DNS record returns multiple IP, the clients have a new setting to resolve and to try all the IP instead of just the first one client.dns.lookup=”use_all_dns_ips” . This new feature allows us to create some complex DNS records with a round robin and only give this to our clients, instead of a longer bootstrap.servers

ACL — KIP-231: ListGroups API now recommends to use Describe(Group) ACL instead of Describe(Cluster) ACL.


Kafka Administration

The admin improvements in Kafka 2.1 are mostly around CLI (Command Line interface) improvements. Here is a list of these improvements:

KIP-308: There is a new CLI name kafka-get-offsets.sh to get offsets for multiple topics at a time including partitions. (edit: actually did not make it to 2.1.0, should be 2.2.0)

KIP- 338: You now have the ability to exclude internal topics in kafka-topics.sh using the--exclude internal option, which was not achievable before unless you used some complex RegEx by parsing the output.

KIP-340: kafka-reassign-partitions and kafka-logs-dirs CLI now accept a file of properties file input. The producer and the consumer already had that logic, but it is now implemented for others CLI.

KIP-324: It is now possible to get the AdminClient metrics using a .metrics() function the same way we can do it for producers and consumers already.


Closing Comments

Overall, I would say that this upgrade is a really good one. It has mostly been around stability, but nevertheless, I strongly recommend that you upgrade your Kafka software to this version.

If you want to learn more about Kafka 2.1, please go check out my online courses: https://kafka-tutorials.com

You can also subscribe to my Youtube, follow me on Twitter, and connect with me on LinkedIn.

Happy learning!