Messaging — Kafka implementation using puppet

Sorin Tudor
METRO SYSTEMS Romania
9 min readOct 27, 2017

--

As human beings need to interconnect and interact with each other, so do systems. And our corporate landscape has lots of them, be it legacy or brand-new systems.

So far, we have been using traditional messaging solutions, from well-known vendors. This proved to be reliable and fast. However, requirements to interconnect new platforms lead us to think of portfolio extension, in terms of messaging.

This has been a very pleasant journey; we would like to look to the past and share it with you.

Today we will talk about Apache Kafka and the points we found essential in order to deploy it safely in production. Surely, there are lots of other things to be implemented in time, but let’s start with small and well established steps. And the picture is big

The easiest way to clarify these steps is by providing small and concise answers to a couple of questions that are important.

Why Apache Kafka?

Well, because it’s fast, very fast actually, it can transfer hundreds of thousands of messages per second if you scale your clusters with care and you make sure that you have a correct estimation in the first place.

It is cloud-oriented and scalable. This reason is actually more and more important in our days since the majority of the applications and the computing are migrated to clouds in different data-centers. One can-not ignore that anymore.

Data is distributed on multiple nodes. Redundancy and fail-overs are as equally important since you no longer have full control of your infrastructure layer, and even if you have, you can avoid a lot of trouble by thinking your solutions in such a way.

And last but not least, it will help you with parallel processing and delivery pipeline.

How do we scale?

Infrastructure

The primary factor to take in to consideration is the size. You don’t have to create a large cluster, if load is not that big. This being said, an estimation is needed before you take a decision. Depending on that, there are two scenarios:

  • small / medium clusters — instances are installed on same machine
  • medium / large clusters — instances installed on different machines

Only restraint that is available for both cases is that you will need at least three Zookeeper instances in order to create a cluster.

We generally recommend the second use case because it will provide you with a better loose coupling and fail-over recovery time.

Independent of the way you choose to scale, there are general infrastructure points to consider:

  • server flavor is chosen depending on the requirements
  • JVM process memory is allocated only for the input / output operations
  • working memory is cached on the filesystem — swap needs to be disabled by default, it will not help you. Multiple active log directories need to be configured on high speed storage (SSD — with LVM in order to provide size increase flexibility)
  • log rotate with log4j.properties properly configured and also system limits to avoid process hang situations

Modules and implementation

The modules that we used for our implementation:

Essential details before component deployment are zookeeper.connect and broker.list. Code used to return them:

$hosts = query_nodes(" role='kafka'").sort
$hosts_hash = $hosts.map |$value| { [$value, seeded_rand(254, $value)+1] }.hash
$overide_hosts_hash = hiera_hash('profiles::kafka_hosts_hash', $hosts_hash)
$overide_hosts = $overide_hosts_hash.keys.sort
if $overide_hosts_hash.size()!= $overide_hosts_hash.values.unique.size() {
#notify {"Duplicate IDs detected! ${overide_hosts_hash}": }
$overide_hosts_hash2 = $hosts.map |$index, $value| { [$value, $index+1] }.hash
} else {
$overide_hosts_hash2 = $overide_hosts_hash
}
$zookeeper_connect = $overide_hosts_hash2.keys.sort.suffix(':2181').join(',')
}

These parameters are needed to configure and establish the cluster details. The list of servers is dynamically generated from the puppetdb and since each zookeeper needs to have a unique id between 1 and 255 we tried to use a special Ruby function. We observed that in some cases same ids get generated, so we just incremented it to work around this bug.

What else do we need to know about the Kafka broker? Well, it will be helpful to do the proper configuration of persistence parameters.

$broker_config = {
'broker.id' => '-1',
'zookeeper.connect' => hiera('profiles::kafka::zookeeper_connect', $zookeeper_connect),
'inter.broker.protocol.version' => hiera('profiles::kafka::inter_broker_protocol_version', $kafka_version),
'log.dir' => hiera('profiles::kafka::log_dir', '/srv/kafka-logs'),
'log.dirs' => hiera('profiles::kafka::log_dirs', '/srv/kafka-logs'),
'log.retention.hours' => hiera('profiles::kafka::log_retention_hours', '168'),
'log.retention.bytes' => hiera('profiles::kafka::log_retention_bytes', '1073741824'),
'num.partitions' => hiera('profiles::kafka::num_partitions', 10),
'default.replication.factor' => hiera('profiles::kafka::default_replication_factor', '2'),
'delete.topic.enable' => hiera('profiles::kafka::delete_topic_enable', 'true'),
'auto.create.topics.enable' => hiera('profiles::kafka::auto_create_topics_enable', 'true'),
}

You most surely checked the most important values needed for a working setup. What can I tell you is that most of them are also suggested by the creators of Kafka itself and they definitively have more experience than anyone on this topic.

There is an extensive documentation to clarify the meanings and if they are well suited. Important facts are clarified. We now have the structure for the deployment, the rest is done easily by three extra blocks of code.

class { '::zookeeper':
hosts => $overide_hosts_hash2,
}
$kafka_version = hiera('kafka::version', '0.10.1.1')
class { '::kafka':
version => $kafka_version,
scala_version => '2.11',
install_java => false,
}
class { '::kafka::broker':
config => $broker_config,
heap_opts => "-Xmx${jvm_heap_size}M -Xms${jvm_heap_size}M",
}

Heap size is either calculated dynamically as part of the actual memory that is available on the machine or statically saved in Hiera (as the rest of the parameters shown earlier).

Congrats, Zookeeper and Kafka cluster should now be working.

What else do we need to deploy in production?

There are a lot of things that are important when we talk about production environment, but in our opinion the most essential topics are security, monitoring and tooling.

Security

We are working everyday with essential and confidential information related to our customers; a security mechanism to protect it is a must. Basic form of doing it is to provide SSL encryption from the client and also at inter-broker level.

The easiest way to do this is by keytool and openssl commands put in erb template. One primary server is always needed to provide you with the means to generate the server and client keystore / truststore. Here is the actual erb template that we use:

#!/bin/bash
HOST=<%= @fqdn %>
PASSWORD=<%= @pass %>
KEYSTOREPASS=<%= @keystorepass %>
VALIDITY=365

keytool -keystore kafka.server.keystore.jks -alias $HOST -validity $VALIDITY -genkey -dname "CN=${HOST}, OU=Messaging, O=MSYS, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY -subj "/CN=${HOST}/OU=Messaging/O=MSYS/L=Bucharest/S=Romania/C=RO" -passout pass:$PASSWORD
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
keytool -keystore kafka.client.keystore.jks -alias $HOST -validity $VALIDITY -genkey -dname "CN=${HOST}, OU=Messaging, O=MSYS, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass $KEYSTOREPASS -noprompt

<% @servers.each do |server| -%>
# <%= server %>
keytool -keystore kafka.server.keystore.jks -alias <%= server %> -validity $VALIDITY -genkey -dname "CN=<%= server %>, OU=Messaging, O=MSYS, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
keytool -keystore kafka.server.keystore.jks -alias <%= server %> -certreq -file cert-file-<%= server %>.host -storepass $KEYSTOREPASS
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-<%= server %>.host -out cert-signed-<%= server %>.host -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.server.keystore.jks -alias <%= server %> -import -file cert-signed-<%= server %>.host -storepass $KEYSTOREPASS -noprompt

keytool -keystore kafka.client.keystore.jks -alias <%= server %> -validity $VALIDITY -genkey -dname "CN=<%= server %>, OU=Messaging, O=MSYS, L=Bucharest S=Romania C=RO" -storepass $KEYSTOREPASS -keypass $KEYSTOREPASS
keytool -keystore kafka.client.keystore.jks -alias <%= server %> -certreq -file cert-file-<%= server %>.client -storepass $KEYSTOREPASS
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-<%= server %>.client -out cert-signed-<%= server %>.client -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.client.keystore.jks -alias <%= server %> -import -file cert-signed-<%= server %>.client -storepass $KEYSTOREPASS -noprompt

<% end -%>

I know, it doesn’t look that elegant but it’s needed. Passwords will be stored as variables in Puppet and taken from Hiera, the host is equal with fqdn taken from Facter and also list of servers will be returned from the first piece of code that I shown you.

Beside this, you will also have to write code to generate a ssh key pair in order to distribute the keystore / truststore on rest of the nodes. Public key will be shared using Puppet shared_resources, and extra fact will need to be written to check if node is already configured.

Once they are copied on each node, brokers need to be reconfigured. Here is a list of parameters for this task:

'security.inter.broker.protocol' => 'SSL',
'ssl.client.auth' => 'required',
'ssl.enabled.protocols' => ['TLSv1.2', 'TLSv1.1', 'TLSv1'],
'ssl.key.password' => hiera('profiles::kafka_security_gen::password','password'),
'ssl.keystore.location' => $keystore_location,
'ssl.keystore.password' => hiera('profiles::kafka_security_gen::keystorepass','password'),
'ssl.keystore.type' => 'JKS',
'ssl.protocol' => 'TLS',
'ssl.truststore.location' => $truststore_location,
'ssl.truststore.password' => hiera('profiles::kafka_security_gen::keystorepass','password'),
'ssl.truststore.type' => 'JKS',
'listeners' => "PLAINTEXT://${::fqdn}:9092,SSL://${::fqdn}:9093",
'advertised.listeners' => "PLAINTEXT://${::fqdn}:9092,SSL://${::fqdn}:9093",

Nothing out of the ordinary in this list, the single thing that I wanted to underline is that I specified the keystore / truststore location as variables directly in the puppet code.

Once they are set, you will have a secured cluster.

Monitoring

There is nothing that goes into production in our days without a basic monitoring form. Clearly this is not our case as I will show you next.

Multiple solutions can perform this task just fine. We had as an option Prometheus, Datadog, Nagios, and Datadog was the right one for us, since it’s easy to deploy and manage.

In order to install the agent, we used another module that is available on puppetforge (https://forge.puppet.com/datadog/datadog_agent) and you will also need an API key to register it.

To cover what we need to be safe, there are two types of monitoring:

  • JMX monitoring
  • process monitoring

Let’s treat them both in a few words.

First, there aren’t any out of the box integrations for Apache Kafka. You will have to construct it yourself, but it’s pretty easy and straightforward.

For the JMX integration, you can take the template given by Datadog to be used on Apache Kafka and just use hostname — localhost and port — Kafka JMX port. Deployment is done using this block:

file { "${datadog_agent::params::conf_dir}/kafka.yaml":
ensure => file,
owner => $datadog_agent::params::dd_user,
group => $datadog_agent::params::dd_group,
mode => '0600',
content => template("${module_name}/kafka.yaml.erb"),
require => Package[$datadog_agent::params::package_name],
notify => Service[$datadog_agent::params::service_name],

As for the process integration it’s quite similar, except the fact that it will have to be called process.yaml and contain the following structure:

- name: kafka
pid_file: /var/run/kafka.pid

On JMX monitoring, you can export whatever MBean you like but please keep in mind that the number of metrics that can be sent is limited and this can affect your implementation.

As for the process monitoring, it will dynamically check the process from the PID file and report status. Customs monitors can be also created for alerting on this block and you can choose in what way you would like to be notified.

We finished also with monitoring, there is still one last point to cover.

Tooling

Lots of tools are available for managing Apache Kafka, some scripts come with standard installation, and we chose Kafka-Manager for versatility.

You can find details regarding installation process in the official documentation here. Package can be built easily using sbt and uploaded to a repository.

What must be told is that since it’s a powerful tool for this kind of situations it is better to have it deployed behind a reverse proxy. The solution we used is Traefik, but you can also choose HAProxy or any other option.

Putting Kafka-Manager behind such a proxy will provide you with security, high availability and also load balancing functionality.

Details for the puppet implementation are depending on level of complexity and needs, so I will not go in any elaborate information.

However, extra thing that should not be left behind is the limitation of connectivity from outside on port 9000 (default Kafka-Manager port)

$kafka_hosts.each | Integer $index,String $host | {
firewall {"10${index} adding tcp rule kafka manager node ${index}":
proto => 'tcp',
dport => 9000,
source => "${host}",
destination => "${fqdn}",
action => 'accept',
}
}
firewall {"10${hosts_count} droping rest of kafka manager calls":
proto => 'tcp',
dport => 9000,
destination => "${fqdn}",
action => 'drop',

Have this deployed and you are finally done, there is enough information and components to safely start you Apache Kafka support.

Our journey will continue with:

  • centralizing the logs in ELK for a better overview of the cluster state and health
  • integration of the Confluent Kafka package that provides extra functionalities like Schema Registry and REST Proxy
  • testing with Kafka image for Docker and different orchestrators like Kubernetes, Mesos or Swarn

and we will keep you posted.

Should you have questions / comments, they would be welcome. Also, we are interested in different ideas or implementations; in the end, your journey is custom to your goals.

--

--

Sorin Tudor
METRO SYSTEMS Romania

DevOps Engineer, Technical blogger (log-it.tech) and amateur photographer.