How to Configure OAuth2 Authentication for Apache Kafka Cluster using Okta

Vishwa Teja Vangari
Egen Engineering & Beyond
11 min readSep 10, 2019

OAuth2 Authentication using OAUTHBEARER mechanism

For better understanding, I would encourage readers to read my previous blog Securing Kafka Cluster using SASL, ACL and SSL to analyze different ways of configuring authentication mechanisms to Apache Kafka cluster.

Securing Apache Kafka Cluster using Okta Auth Server

In this blog, we will go over the configuration & required support classes to setup authentication using OAUTHBEARER, and authorization using SimpleAclAuthorizer. We will use single-sign on OAuth server for authenticating kafka brokers and clients(producers and consumers).

In today’s software-centric world, we are seeing most organizations moving towards single sign-on OAuth server for authenticating all of its users, applications, databases and other services by creating service accounts. There are quite a number of readily available single sign-on authentication providers like Okta, OneLogin, Auth0, OpenID. As it requires tedious team efforts and time to develop and build organization’s own auth server, most of the organizations are adopting readily available cloud auth servers like Okta, which are highly available, secured and have multiple features. As Okta includes many such features, this blog will focus on the essentials to integrate authentication for Kafka cluster and its clients using Okta OAuth server.

Authentication using OAUTHBEARER mechanism:

We will have to implement 2 classes to connect with external OAuth2 server to generate tokens, introspect/validate tokens and renew tokens for Kafka cluster brokers and clients(producers and consumers).

OAuthAuthenticateLoginCallbackHandlerOAuthAuthenticateValidatorCallbackHandler

Both the classes implement org.apache.kafka.common.security.auth.AuthenticateCallbackHandler to configure and handle call back events.

OAuthAuthenticateLoginCallbackHandler is used by clients and brokers to authenticate and to generate token using OAuth server.

OAuthAuthenticateValidatorCallbackHandler is used by clients and brokers to make the validation of the generated token using OAuth token introspection.

You could see implementation of these classes and helper classes at this kafka-oauth2 Github repo. These classes require below properties to make calls with appropriate Auth server.

OAUTH_WITH_SSL=trueOAUTH_LOGIN_SERVER=<oauth-server-url>

OAUTH_LOGIN_ENDPOINT='/oauth2/default/v1/token'
OAUTH_LOGIN_GRANT_TYPE=client_credentialsOAUTH_LOGIN_SCOPE=kafka OAUTH_AUTHORIZATION='Basic <base64clientId:clientsecret>'OAUTH_INTROSPECT_SERVER=<oauth-server-url>OAUTH_INTROSPECT_ENDPOINT='/oauth2/default/v1/introspect'OAUTH_INTROSPECT_AUTHORIZATION='Basic <base64clientId:clientsecret>'

Let’s integrate these classes with Kafka setup libraries to configure Kafka OAUTHBEARER authentication. In order to do that, we will have to generate a jar file out of these classes. After cloning this kafka-oauth2 Github repo, navigate to this repo directory in command terminal and then generate jar file using this command, ./gradew clean build and make sure kafka-oauth2–0.0.1.jar is generated at folder <oauth2-repo-dir>/kafka-oauth2/build/libs/.

Authentication & Authorization Flow

Setting up Kafka Cluster:

We will set up Kafka cluster locally using Apache Kafka binary. we can download this from open source Apache Kafka Downloads.

Note: we will execute all the below commands by navigating to downloaded <kafka-binary-dir>/ in command terminal.

Now copy the generated jar file kafka-oauth2–0.0.1.jar from <oauth2-project-dir>/kafka-oauth2/build/libs/ into downloaded Kafka binary <kafka-binary-dir>/libs/

cp <oauth2-project-dir>/build/libs/kafka-oauth2–0.0.1.jar <kafka-binary-dir>/libs

As we have now copied these new OAuth classes into Kafka libs folder, all these classes will also be in kafka classpath when kafka is started and thus can be used for authentication and token validation by brokers and clients.

Setting up Okta OAuth Server:

Thanks to Okta single sign-on OAuth server for free developer account. Okta has easy way to set up a new free developer account for about 1000 requests, which is good enough for us to learn and test out integration with Kafka cluster setup. Let’s go ahead and signup for Okta developer account, follow the steps in Okta site to verify your email address and activate your account.

Default OAuth Server

Once your account is activated, login into Okta. Navigate to API menu and check for the Authorization Servers, there should be default Auth Server created by default and make sure it is Active.

Adding Kafka Scope

Click on that default Authorization Server, and then click on Scopes tab to add a new scope. Click on Add Scope and fill out the fields Name: kafka, Description: kafka scope description, and check only Include in public metadata, and leave Set as a default scope as unchecked, and click on Create to add in new scope.

Default Auth Server Scopes

Make sure newly added kafka scope shows up in the scopes list.

By now, we have Okta Authorization server and kafka scope created, now let’s move onto creating new applications as kafkabroker, kafkaproducerapp and kafkaconsumerapp. We would use these three applications for Kafka brokers, Kafka producer app and Kafka consumer apps authentication.

To create new apps, Navigate to Applications menu, and click on Add Application.

Select Platform as service(Machine to Machine), and in settings options specify name as kafkabroker, and click on Done. This would generate broker app with clientId and clientsecret, which can be used for kafka brokers.

Repeat the same steps to create two other apps:

Select Platform as service (Machine to Machine), and in settings options specify name as kafkaproducerapp, and click on Done. This would generate kafka producer app with client Id and client secret, we would use these credentials for producer app.

Select Platform as service(Machine to Machine), and in settings options specify name as kafkaconsumerapp, and click on Done. This would generate kafka consumer app clientId and client secret, we would use these credentials for consumer app.

Kafka Apps (broker, producer, consumer)

Ensure that all the three machine to machine applications (kafkabrokerapp, kafkaproducerapp, kafkaconsumerapp) are created.

We could now use above applications for authentication with Auth server by passing in client Id and client secret. Let’s make sure we could successfully authenticate and get token back for all the above three apps. Use below curl request, by replacing auth-server-url with the one at default Auth server in Okta account, and look out for Applications and for each of the apps retrieve clientId & client secret. We will use clientId and clientsecret to be sent as Authorization header, so for it we will have to convert them into base64 encoded format as clientId:clientsecret. Use this online tool for encoding to base64 format as <clientId>:<clientsecret>, copy the encode value and replace it in the below curl request, and then test out for all three apps.

curl -i -H 'Content-Type: application/x-www-form-urlencoded' -X POST 'https://<auth-server-url>/oauth2/default/v1/token' -d 'grant_type=client_credentials&scope=kafka' -H 'Authorization: Basic <encoded-clientId:clientsecret>'

As we make sure we could retrieve token back from Okta Auth server and authentication works for all the three apps. we can ensure that okta auth server works well for all the three apps and then can proceed with integrating OAuth for Kafka brokers and clients. So for inter-broker authentication, we could use kafkabroker application credentials and its client id as Kafka Super User.

Move on to setup and integrate OAuth for Kafka Cluster :

Start Zookeeper

./bin/zookeeper-server-start.sh config/zookeeper.properties

Kafka Setup

Append below security properties to existing Kafka server.properties file in config folder.

##########SECURITY using OAUTHBEARER authentication ###############sasl.enabled.mechanisms=OAUTHBEARERsasl.mechanism.inter.broker.protocol=OAUTHBEARERsecurity.inter.broker.protocol=SASL_PLAINTEXTlisteners=SASL_PLAINTEXT://localhost:9093advertised.listeners=SASL_PLAINTEXT://localhost:9093#Authorizer for ACLauthorizer.class.name=kafka.security.auth.SimpleAclAuthorizersuper.users=User:<brokerapp-clientId>;################ OAuth Classes #####################sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required OAUTH_LOGIN_SERVER=<auth-server-url> OAUTH_LOGIN_ENDPOINT='/oauth2/default/v1/token' OAUTH_LOGIN_GRANT_TYPE=client_credentials OAUTH_LOGIN_SCOPE=broker.kafka OAUTH_AUTHORIZATION='Basic <encoded-clientId:clientsecret>' OAUTH_INTROSPECT_SERVER=<auth-server-url> OAUTH_INTROSPECT_ENDPOINT='/oauth2/default/v1/introspect' OAUTH_INTROSPECT_AUTHORIZATION='Basic <encoded-clientId:clientsecret>';listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=com.oauth2.security.oauthbearer.OAuthAuthenticateLoginCallbackHandlerlistener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=com.oauth2.security.oauthbearer.OAuthAuthenticateValidatorCallbackHandler########## SECURITY using OAUTHBEARER authentication ###############

Create kafka_server_jaas.conf file in config folder

KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
LoginStringClaim_sub="<brokerapp-clientId>";
};

Start Kafka after setting all the auth properties and kafka_server_jaas conf file in KAFKA_OPTS as below:

export KAFKA_OPTS="-Djava.security.auth.login.config=<kafka-binary-dir>/config/kafka_server_jaas.conf -DOAUTH_WITH_SSL=true -DOAUTH_LOGIN_SERVER=<OAuth-server-url> -DOAUTH_LOGIN_ENDPOINT=/oauth2/default/v1/token -DOAUTH_LOGIN_GRANT_TYPE=client_credentials -DOAUTH_LOGIN_SCOPE=kafka -DOAUTH_INTROSPECT_SERVER=<OAuth-server-url> -DOAUTH_INTROSPECT_ENDPOINT=/oauth2/default/v1/introspect -DOAUTH_AUTHORIZATION=Basic%20<encoded-clientId:clientsecret> -DOAUTH_INTROSPECT_AUTHORIZATION=Basic%20<encoded-clientId:clientsecret>"./bin/kafka-server-start.sh ./config/server.properties

Note: In-order to avoid issues with spaces in java args while starting kafka, we will use %20 as space after Basic%20<encoded-clientId:clientsecret> at both Authorization properties, this is taken care to resolve in two Authentication Callback handler classes.

As we look into kafka server logs while starting kafka server, we could see that token is generated using OAuthAuthenticateLoginCallbackHandler by connecting to Okta OAuth server and then generated token is introspected or validated using OAuthAuthenticateValidatorCallbackHandler.

TOKEN GENERATION LOGS:
[2019-09-09 19:56:09,354] INFO Retrieved token.. (com.oauth2.security.oauthbearer.OAuthAuthenticateLoginCallbackHandler)
[2019-09-09 19:56:09,355] INFO Successfully logged in. (org.apache.kafka.common.security.authenticator.AbstractLogin)
TOKEN INTROSPECTION LOGS:
[2019-09-09 19:56:10,256] INFO Trying to introspected (com.oauth2.security.oauthbearer.OAuthAuthenticateValidatorCallbackHandler)
[2019-09-09 19:56:10,257] INFO Validated! token.. (com.oauth2.security.oauthbearer.OAuthAuthenticateValidatorCallbackHandler)

As Kafka server is started well and ready to accept connections. we could now configure clients(producers and consumers) to connect with Kafka cluster on localhost:9093 and to use OAuth2 authentication mechanism. Let’s start with producerapp to produce some data on kafka topic. we know that, we didn’t yet configure any ACLs to the producerapp and also didn’t check if its OAuth authentication with kafka is working well.

sasl-oauth2-producerapp-config.properties

Create sasl-oauth2-producerapp-config.properties
file by filling out appropriate <auth-server-url> and <authorization header>. Let move on and check if we could authenticate and create topic.

Creating a Topic: Let’s first test out authentication by creating a topic using incorrect producerapp authentication details.

./bin/kafka-topics.sh --create --bootstrap-server localhost:9093  --command-config ./config/sasl-oauth2-producerapp-config.properties --replication-factor 1 --partitions 1 --topic oauth2-demo-topic[2019-09-09 21:46:42,021] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9093) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue. (org.apache.kafka.clients.NetworkClient)

As we there is some error with authentication due to invalid credentials, now move on by creating a topic using correct producerapp authentication details and still with no create ACL Permission. As expected, we should receive authorization error as Create permission is not assigned to producer app clientId.

./bin/kafka-topics.sh --create --bootstrap-server localhost:9093  --command-config ./config/sasl-oauth2-producerapp-config.properties --replication-factor 1 --partitions 1 --topic oauth2-demo-topicError while executing topic command : org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]
[2019-09-09 21:52:27,701] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [Topic authorization failed.]

Add Create and Describe ACL to producerapp clientId and then create Topic.

./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:<producerappclientId> --operation Create --operation Describe  --topic oauth2-demo-topicAdding ACLs for resource `Topic:LITERAL:oauth2-demo-topic`:
User:0oa1b4s67kEWR3f1k357 has Allow permission for operations: Create from hosts: *
User:0oa1b4s67kEWR3f1k357 has Allow permission for operations: Describe from hosts: *
Current ACLs for resource `Topic:LITERAL:oauth2-demo-topic`:
User:0oa1b4s67kEWR3f1k357 has Allow permission for operations: Create from hosts: *
User:0oa1b4s67kEWR3f1k357 has Allow permission for operations: Describe from hosts: *
CREATING TOPIC:./bin/kafka-topics.sh --create --bootstrap-server localhost:9093 --command-config ./config/sasl-oauth2-producerapp-config.properties --replication-factor 1 --partitions 1 --topic oauth2-demo-topicTopic oauth2-demo-topic created

Produce with incorrect authentication: By modifying authentication details to something else in sasl-oauth2-producerapp-config.properties and then try to produce data on oauth2-demo-topic.

./bin/kafka-console-producer.sh --broker-list localhost:9093 --topic oauth2-demo-topic --producer.config ./config/sasl-oauth2-producerapp-config.propertiesCaused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: java.lang.IllegalArgumentException: Null token returned from server

As expected Null token is returned from Auth server, so authentication failed.

Produce with correct authentication details and with no produce ACL on topic oauth2-demo-topic

./bin/kafka-console-producer.sh --broker-list localhost:9093 --topic oauth2-demo-topic --producer.config ./config/sasl-oauth2-producerapp-config.properties>OAuth Test Message1
>[2019-09-09 22:06:07,635] ERROR Error when sending message to topic oauth2-demo-topic with key: null, value: 19 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [oauth2-demo-topic]

Add Producer ACL for producerapp clientId on oauth2-demo-topic

./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:<producerappclientId> --producer --topic oauth2-demo-topicAdding ACLs for resource `Topic:LITERAL:oauth2-demo-topic`:
User:0oa1b4s67kEWR3f1k357 has Allow permission for operations: Create from hosts: *
User:0oa1b4s67kEWR3f1k357 has Allow permission for operations: Write from hosts: *
User:0oa1b4s67kEWR3f1k357 has Allow permission for operations: Describe from hosts: *
Current ACLs for resource `Topic:LITERAL:oauth2-demo-topic`:
User:0oa1b4s67kEWR3f1k357 has Allow permission for operations: Create from hosts: *
User:0oa1b4s67kEWR3f1k357 has Allow permission for operations: Describe from hosts: *
User:0oa1b4s67kEWR3f1k357 has Allow permission for operations: Write from hosts: *

Produce messages on oauth2-demo-topic with correct authentication details and after assigning producer ACL:

./bin/kafka-console-producer.sh --broker-list localhost:9093 --topic oauth2-demo-topic --producer.config ./config/sasl-oauth2-producerapp-config.properties>OAuth Test Message1
>OAuth Test Message2
>OAuth Test Message3
>^C

Now we could successfully authenticate with Okta OAuth server and could able to produce message on oauth2-demo-topic after assigning appropriate ACLs.

We now have seen that data is produced onto oauth2-demo-topic using producerapp, let’s proceed to check if we can consume that data using consumerapp.

sasl-oauth2-consumerapp-config.properties

Same as that of producer app, create sasl-oauth2-consumerapp-config.properties
file by filling out appropriate <auth-server-url> and <consumer-authorization header> and proceed to check if we could authenticate and consume messages from oauth2-demo-topic.

Consume with incorrect authentication: By modifying authentication details to something else in sasl-oauth2-consumerapp-config.properties and then try to consume data from oauth2-demo-topic.

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic oauth2-demo-topic --from-beginning --consumer.config ./config/sasl-oauth2-consumerapp-config.propertiesCaused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: java.lang.IllegalArgumentException: Null token returned from server

Consume messages with correct authentication details and with no consume ACL on topic oauth2-demo-topic

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic oauth2-demo-topic --from-beginning --consumer.config ./config/sasl-oauth2-consumerapp-config.properties[2019-09-09 22:29:29,837] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: console-consumer-30703
Processed a total of 0 messages

Add Consumer ACL and group for consumerapp clientId on oauth2-demo-topic

./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:0oa1b4sd0opbPgc0X357 --topic oauth2-demo-topic --consumer --group oauth2-consumer-groupAdding ACLs for resource `Topic:LITERAL:oauth2-demo-topic`:
User:0oa1b4sd0opbPgc0X357 has Allow permission for operations: Read from hosts: *
User:0oa1b4sd0opbPgc0X357 has Allow permission for operations: Describe from hosts: *
Adding ACLs for resource `Group:LITERAL:oauth2-consumer-group`:
User:0oa1b4sd0opbPgc0X357 has Allow permission for operations: Read from hosts: *
Current ACLs for resource `Topic:LITERAL:oauth2-demo-topic`:
User:0oa1b4s67kEWR3f1k357 has Allow permission for operations: Create from hosts: *
User:0oa1b4sd0opbPgc0X357 has Allow permission for operations: Read from hosts: *
User:0oa1b4s67kEWR3f1k357 has Allow permission for operations: Describe from hosts: *
User:0oa1b4s67kEWR3f1k357 has Allow permission for operations: Write from hosts: *
User:0oa1b4sd0opbPgc0X357 has Allow permission for operations: Describe from hosts: *
Current ACLs for resource `Group:LITERAL:oauth2-consumer-group`:
User:0oa1b4sd0opbPgc0X357 has Allow permission for operations: Read from hosts: *

Consume messages from oauth2-demo-topic with correct authentication details and after assigning Consumer and group ACL:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic oauth2-demo-topic --from-beginning --consumer.config ./config/sasl-oauth2-consumerapp-config.properties --group oauth2-consumer-groupOAuth Test Message1
OAuth Test Message2
OAuth Test Message3
^CProcessed a total of 3 messages

ConsumerApp(console-consumer) could successfully authenticate with Okta OAuth server and could able to consume messages from oauth2-demo-topic after assigning appropriate consume and group ACLs.

Summary

In this blog, we have seen how to setup Okta developer account to create Okta auth server, setup three different client apps (broker, producer and consumer) and then used these client app credentials within Kafka applications, here we demonstrated using console-producer/console-consumer to authenticate with Okta auth server and then used respective app clientId to configure all the produce, consume, create, and describe ACLs using SimpleAclAuthorizer and then used zookeeper as ACL store. we have also checked how to setup producer & consumer to produce and consume messages from Kafka cluster with proper authentication and authorization.

If you find this blog helpful, be sure to give it a few claps, read more or follow me on LinkedIn.

--

--