Securing Apache Kafka Cluster using SSL, SASL and ACL

Pre-requisite: Novice skills on Apache Kafka, Kafka producers and consumers.
This blog will focus more on SASL, SSL and ACL on top of Apache Kafka Cluster.

Securing Apache Kafka Cluster

In this blog, we will go over the configurations for enabling authentication using SCRAM , authorization using SimpleAclAuthorizer and encryption between clients and server using SSL.

Apache Kafka these days is widely used for exchange of sensitive information as well between various systems with in the company and also between different companies. So secure connections like HTTPS is essential between client applications and Kafka Server. Apache Kafka open source community contributed multiple Kafka Security options for Authentication, Authorization and Encryption.

Authentication in Kafka:

  • SSL
  • SASL: PLAIN, SCRAM(SHA-256 and SHA-512), OAUTHBEARER, GSSAPI(Kerberos)

Authorization in Kafka: Kafka comes with simple authorization class kafka.security.auth.SimpleAclAuthorizer for handling ACL’s (create, read, write, describe, delete). We can add on our own custom implementation of Authorizer class as well.

Encryption: Data Encryption over the network between clients and Kafka server. i.e between producers and Kafka server, consumers and Kafka server.

Authentication using SCRAM:

Salted Challenge Response Authentication Mechanism (SCRAM) is similar to authentication using username/password. Apache Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512. For this mechanism, Kafka by default (ScramLoginModule) stores SCRAM credentials in zookeeper with the salt, so zookeeper need to be secured in the private network with very limited access. The default iteration count of 4096 is used if iterations are not specified. A random salt is created and the SCRAM identity consisting of salt, iterations, StoredKey and ServerKey are stored in ZooKeeper. In our example below, we will use SCRAM-SHA-512 for authentication.

Authorization using SimpleAclAuthorizer:

Apache Kafka ships out with simple default authorization implementation kafka.security.auth.SimpleAclAuthorizer for all ACL operations. SimpleAcl Authroizer comes up create, read, write, cluster_action, alter_configs, describe and delete permissions. In the below examples, we will see role of each of these permissions and any application errors in case if these are not properly assigned before any operation.

Encryption using SSL:

SSL(Secure Sockets Layer) can be configured for encryption and also serves as 2-way authentication between client and server. i.e broker authenticates client using client truststore certificate and client authenticates broker using broker truststore certificate. In this article, we will just configure SSL only for encryption.SSL uses private-key/certificates pairs which are used during the SSL handshake process.

  • each Kafka broker needs its own private-key/certificate pair, and then client uses certificate to authenticate the broker

You could refer this Github repo for spinning up secure cluster by adding the required configs locally on your machine.
Now, Let’s get onto working example:

Download Apache Kafka binary from open source Apache Kafka Downloads.

SSL Certificate and Key generation: Create Kafka broker SSL keystore and truststore certificate using confluent-platform-security-tools generate ssl script. Make a note of keystore and truststore password which you pass in while executing this script, we would need these for configuring Kafka server.properties.

After generating truststore and keystore folders through this script, we will copy them over to config folder in downloaded Kafka binary directory.

We will execute all below commands by navigating to <kafka-binary-dir>/ in terminal.

Start Zookeeper

./bin/zookeeper-server-start.sh config/zookeeper.properties

Create Kafka Super User:

./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-512=[password='admin-secret']' --entity-type users --entity-name adminCompleted Updating config for entity: user-principal 'admin'.

Create kafka_server_jaas.conf file in config folder

KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="admin"
password="admin-secret";
};

Create ssl-user-config.properties file in config folder

security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="demo-user" password="secret";
ssl.truststore.location=
<kafka-binary-dir>/config/truststore/kafka.truststore.jks
ssl.truststore.password=password

Append below security properties to existing Kafka server.properties file in config folder.

########### SECURITY using SCRAM-SHA-512 and SSL ###################listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093,SASL_SSL://localhost:9094advertised.listeners=PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093,SASL_SSL://localhost:9094security.inter.broker.protocol=SASL_SSLssl.endpoint.identification.algorithm=ssl.client.auth=requiredsasl.mechanism.inter.broker.protocol=SCRAM-SHA-512sasl.enabled.mechanisms=SCRAM-SHA-512# Broker security settings
ssl.truststore.location=
<kafka-binary-dir>/config/truststore/kafka.truststore.jks
ssl.truststore.password=passwordssl.keystore.location=
<kafka-binary-dir>/config/keystore/kafka.keystore.jks
ssl.keystore.password=passwordssl.key.password=password# ACLs
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:admin
#zookeeper SASL
zookeeper.set.acl=false
########### SECURITY using SCRAM-SHA-512 and SSL ###################

Now Start Kafka with jaas conf file:

export KAFKA_OPTS=-Djava.security.auth.login.config=<kafka-binary-dir>/config/kafka_server_jaas.conf./bin/kafka-server-start.sh ./config/server.properties

Clients can now connect to Kafka cluster using 3 ways for above server.properties configuration.

  • localhost:9092: PLAIN_TEXT with no authentication and authorization.
  • localhost:9093: PLAIN_TEXT with authentication and authorization.
  • localhost:9094: along with Encryption, authentication, authorization.

Now Let’s go over different workflow’s using Kafka Cluster.

Create User:

./bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-512=[password='secret']' --entity-type users --entity-name demouserCompleted Updating config for entity: user-principal 'demouser'.

Create ssl-user-config.properties in /config folder:

security.protocol=SASL_SSLsasl.mechanism=SCRAM-SHA-512sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="demouser" password="secret";ssl.truststore.location=
<kafka-bin-dir>/config/truststore/kafka.truststore.jks
ssl.truststore.password=password

Creating Topic without Create Permissions:

./bin/kafka-topics.sh --create --bootstrap-server localhost:9094 --command-config ./config/ssl-user-config.properties --replication-factor 1 --partitions 1 --topic demo-topicError while executing topic command : org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
[2019-06-20 11:27:29,244] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:175)
at kafka.admin.TopicCommand$TopicService.createTopic(TopicCommand.scala:134)
at kafka.admin.TopicCommand$TopicService.createTopic$(TopicCommand.scala:129)
at kafka.admin.TopicCommand$AdminClientTopicService.createTopic(TopicCommand.scala:157)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
(kafka.admin.TopicCommand$)

Assign Create and Describe permissions to demouser and then create topic:

./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:demouser --operation Create --operation Describe  --topic demo-topicAdding ACLs for resource `Topic:LITERAL:demo-topic`:
User:demouser has Allow permission for operations: Create from hosts: *
User:demouser has Allow permission for operations: Describe from hosts: *
Current ACLs for resource `Topic:LITERAL:demo-topic`:
User:demouser has Allow permission for operations: Create from hosts: *
User:demouser has Allow permission for operations: Describe from hosts: *
./bin/kafka-topics.sh --create --bootstrap-server localhost:9094 --command-config ./config/ssl-user-config.properties --replication-factor 1 --partitions 1 --topic demo-topicTopic demo-topic created

Create ssl-producer.properties in config folder.

bootstrap.servers=localhost:9094compression.type=none### SECURITY ######security.protocol=SASL_SSLsasl.mechanism=SCRAM-SHA-512sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="demouser" password="secret";ssl.truststore.location=
<kafka-binary-dir>/config/truststore/kafka.truststore.jks
ssl.truststore.password=password

Produce with incorrect password: By modifying password to something else in ssl-producer.properties and then try to produce data on demo-topic

./bin/kafka-console-producer.sh --broker-list localhost:9094 --topic demo-topic --producer.config config/ssl-producer.properties[2019-06-22 18:41:27,406] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9094) failed authentication due to: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512 (org.apache.kafka.clients.NetworkClient)

Produce with correct password and with no produce permissions on topic demo-topic

./bin/kafka-console-producer.sh --broker-list localhost:9094 --topic demo-topic --producer.config config/ssl-producer.properties>ERROR Error when sending message to topic demo-topic with key: null, value: 8 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [demo-topic]

Assign Producer permissions for demouser to demo-topic

./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:demouser --producer --topic demo-topicAdding ACLs for resource `Topic:LITERAL:demo-topic`:
User:demouser has Allow permission for operations: Create from hosts: *
User:demouser has Allow permission for operations: Describe from hosts: *
User:demouser has Allow permission for operations: Write from hosts: *
Current ACLs for resource `Topic:LITERAL:demo-topic`:
User:demouser has Allow permission for operations: Create from hosts: *
User:demouser has Allow permission for operations: Describe from hosts: *
User:demouser has Allow permission for operations: Write from hosts: *

Produce messages on demo-topic with correct password and permissions:

./bin/kafka-console-producer.sh --broker-list localhost:9094 --topic demo-topic --producer.config config/ssl-producer.properties
>message1
>message2
>message3
^C

we have now produced 3 messages on to demo-topic, let’s try to consume those messages from this topic, by creating a consumer.

Create ssl-consumer.properties in config folder

bootstrap.servers=localhost:9094# consumer group idgroup.id=demo-consumer-group### SECURITY ######
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="demouser" password="secret";ssl.truststore.location=
<kafka-binary-dir>/config/truststore/kafka.truststore.jks
ssl.truststore.password=password

Consume data from demo-topic with incorrect password in ssl-consumer.properties

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic demo-topic --from-beginning --consumer.config config/ssl-consumer.properties[2019-06-22 18:46:41,983] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512
Processed a total of 0 messages

Consume data from demo-topic with correct password in ssl-consumer.properties

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic demo-topic --from-beginning --consumer.config config/ssl-consumer.properties[2019-06-22 18:47:37,461] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: demo-consumer-group
Processed a total of 0 messages

Assign Consumer permissions for demouser on demo-topic

./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:demouser --consumer --topic demo-topic --group demo-consumer-groupAdding ACLs for resource `Topic:LITERAL:demo-topic`:
User:demouser has Allow permission for operations: Describe from hosts: *
User:demouser has Allow permission for operations: Read from hosts: *
Adding ACLs for resource `Group:LITERAL:demo-consumer-group`:
User:demouser has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Topic:LITERAL:demo-topic`:
User:demouser has Allow permission for operations: Create from hosts: *
User:demouser has Allow permission for operations: Describe from hosts: *
User:demouser has Allow permission for operations: Write from hosts: *
User:demouser has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Group:LITERAL:demo-consumer-group`:
User:demouser has Allow permission for operations: Read from hosts: *

Consume messages from demo-topic with correct password and consume permissions:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic demo-topic --from-beginning --consumer.config config/ssl-consumer.properties
message1
message2
message3
^C

Summary
To summarize, we have seen how to encrypt Kafka Cluster connections using SSL, authentication using SCRAM, and authorization using a Kafka inbuilt SimpleAclAuthorizer class. In the upcoming blogs, I will show you how to configure other options such as authentication using the OAuthBearer Token.

If you find this blog helpful, be sure to give it a few claps, read more or follow me on LinkedIn


Engineering & Beyond

Creative engineers & strategists helping businesses scale their digital products.

Vishwa Teja Vangari

Written by

Software Developer III

Engineering & Beyond

Creative engineers & strategists helping businesses scale their digital products.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade