Using AWS IAM for client authentication

Naresh Maharaj
Aerospike Developer Blog
10 min readDec 20, 2023

In this blog post, we detail how to create an Amazon Managed Streaming for Apache Kafka (Amazon MSK) resource using AWS Identity and Access Management (AWS IAM) in roles and policies to authenticate user access. In the initial step, we establish an Aerospike Database cluster and insert sample messages into the database. Subsequently, we observe in real time how these messages are streamed to Amazon MSK using Aerospike’s Kafka Source Connector. Below we provide a comprehensive, step-by-step guide for users to successfully implement this process.

image

AWS MKS Kafka

In this section, you will set up a simple three-node Kafka cluster.

Visit the AWS console and select MSK service.

image

Create a new cluster by selecting Create ClusterQuick Create.

image

Select the provisioned cluster and instance type of kafka.t3.small.

image

Select the EBS storage type per broker of 10 GB.

image

NOTE: Take note of the VPC, subnets, and security group ID, as you will require these details later in the article.

The next step is the critical step where you will create the AWS IAM policy and roles. This setup ensures that the Aerospike Database authenticates using AWS IAM to write data to MSK.

From the AWS Console, select the AWS IAM service.

image

To create a new AWS IAM policy, copy the following JSON and paste it in the JSON tab. Replace region:Account-ID with your own region and AWS account ID.

image

Save the policy and name it msk-tutorial-policy.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
"Resource": [
"arn:aws:kafka:region:Account-ID:cluster/MSKTutorialCluster/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:*Topic*",
"kafka-cluster:WriteData",
"kafka-cluster:ReadData"
],
"Resource": [
"arn:aws:kafka:region:Account-ID:topic/MSKTutorialCluster/*"
]
},
{
"Effect": "Allow",
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:region:Account-ID:group/MSKTutorialCluster/*"
]
}
]
}

Create the IAM role.

image

Under Common Use Cases, select EC2 and then Next.

image

Under Permissions, select the policy named msk-tutorial-policy and then Next.

image

Give the role a name like msk-tutorial-role and click the Create Role button.

Kafka client machine

Next, create a client machine to install the Kafka tools necessary to access our MSK cluster.

Create a new ec2 instance using type t2.micro

image

Use the default AMI: Amazon Linux 2023

image

The AMI may be different depending on your region

Create a key-pair if required. I am using an already existing key-pair.

image

Under Advanced Options.IAM instance profile, select the IAM role created earlier.

image
  1. Launch the instance!
  2. Under instances launched, choose the instance you just created. Click on the ‘Security’ tab and note the security group associated with this instance. e.g., sg-0914e6271c97ae4c9 (launch-wizard-1)
  3. Navigate to the VPC section and select Security Groups from the left-hand menu. Locate the security group associated with the MSK cluster, such as sg-e5f51dfb, and choose Edit Inbound Rules.
  4. Create a new rule to allow all traffic from the new ec2 instance.
image

Kafka topics

After successfully establishing your initial Kafka cluster and Kafka client machine, proceed to conduct testing. Verify the functionality by accessing the MSK cluster, creating a topic, producing and consuming sample messages, and ensuring that everything operates as anticipated.

From the MSK Cluster, note the Kafka version being used. This examples uses 2.8.1.

From the Kafka client machine, install Java 11+.

sudo yum -y install java-11

Download Apache Kafka using wget, then extract the archive using tar.

wget https://archive.apache.org/dist/kafka/2.8.1/kafka_2.12-2.8.1.tgz
tar -xzf kafka_2.12-2.8.1.tgz

To use IAM, you will need the MSK IAM Auth jar file. Download the jar to the Kafka libs folder you just extracted.

cd kafka_2.12-2.8.1/libs/
wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar
cd ../bin/

Create a file called client.properties to use when authenticating to MSK. It will define the SASL mechanism to use and reference the Java class file that will handle your IAM callbacks.

cat <<EOF> client.properties
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
EOF

Creating topics

Go to the AWS Console and view the MSK Cluster Client Information. There will be three endpoints to choose from, but you only require one.

Example choose:

B-2.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098

image

From the kafka/bin folder, run the command to create a topic. Let's call it aerospike-airforce-1.

export BootstrapServerString="b-2.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098"./kafka-topics.sh --create --bootstrap-server $BootstrapServerString --command-config client.properties --replication-factor 3 --partitions 1 --topic aerospike-airforce-1

Listing topics

To list the topics, use the following command. Notice our latest topic, called aerospike-airforce-1, just showed up.

./kafka-topics.sh --bootstrap-server $BootstrapServerString --command-config client.properties --list

MSKTutorialTopic
__amazon_msk_canary
__consumer_offsets
aerospike
aerospike-airforce-1

Producer and consumer

I agree that this is more of a Kafka-101 rather than a straightforward Hello-World scenario. Nonetheless, it is essential to test our configuration by sending and receiving messages from the designated Kafka topic before proceeding further.

Produce some messages by opening a new window and running the following Kafka producer command. Type three or four messages, hitting the ‘Return’ key after each message

./kafka-console-producer.sh --broker-list $BootstrapServerString --producer.config client.properties --topic aerospike-airforce-1
>Instrument Check
>Pre flight checks confirmed
>Ready for takeoff
>Full throttle, flaps

You’re now ready to start a client consumer application. Open a new window and run the consumer. You should now see the same messages you published earlier.

./kafka-console-consumer.sh --bootstrap-server $BootstrapServerString --consumer.config client.properties --topic aerospike-airforce-1 --from-beginning
Instrument Check
Pre flight checks confirmed
Ready for takeoff
Full throttle, flaps

Database source

Let’s review your achievements thus far. You’ve established a 3-node Kafka cluster in AWS utilizing MSK, incorporating IAM roles and permissions. Additionally, you have successfully created topics and demonstrated the production and consumption of messages using the IAM credentials established during the setup.

The next phase of your journey involves installing the Aerospike Database, inserting messages, and configuring a simple XDR component. XDR is a Cross Datacenter Replication tool and is crucial for transmitting data from the Aerospike Database to the Aerospike Kafka Source Connector allowing us to subsequently forward messages to Amazon MSK.

Create the Aerospike Database

Start by creating a new ec2 instance. For this demo, you can use Linux Centos 8

Rocky 8 AMI: ami-043ceee68871e0bb5 ( us-east-1 )

image

Select the instance type as t2.medium.

image

Add the extra volume for the Aerospike data storage layer. EBS volume is all that is required for now.

image

Launch the instance and connect to the host using ssh. If you have an Aerospike license feature file, upload it to the instance.

Install the Aerospike Database server

Run the following to install the Aerospike Database Server.

export VER="6.1.0.2"
sudo yum install java python3 openssl-devel wget git gcc maven bind-utils sysstat nc -y
wget -O aerospike-tools.tgz 'https://www.aerospike.com/download/tools/latest/artifact/el8'
tar -xvf aerospike-tools.tgz
cd aerospike-tools_*
sudo ./dep-check
sudo ./asinstall
wget -O aerospike.tgz https://enterprise.aerospike.com/enterprise/download/server/$VER/artifact/el8
tar -xvf aerospike.tgz
cd aerospike-server-enterprise-$VER-el8
sudo ./asinstall
sudo mkdir -p /var/log/aerospike/
sudo systemctl enable aerospike

Confirm the storage disk for Aerospike.

lsblk
NAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINT
xvda 202:0 0 10G 0 disk
└─xvda1 202:1 0 10G 0 part /
xvdb 202:16 0 10G 0 disk <<----------------- This one!

When its data is available, replace the Aerospike configuration file under /etc/aerospike/aerospike.conf with the configuration file listed below, also replacing the following lines:

  • Under heartbeat.address add in your internal 172.x.x.x address
  • For xdr.dc.node-address-port enter the {kafka-client-machine-address}:8080

Aerospike Database configuration file for use with systemd

service {
# paxos-single-replica-limit 1 # Number of nodes where the replica count is automatically
proto-fd-max 15000
service-threads 10
feature-key-file /etc/aerospike/features.conf
node-id A1
cluster-name CLA
}

logging {
file /var/log/aerospike/aerospike.log {
context any info
}
}

# public and private addresses
network {
service {
address any
port 3000
}

heartbeat {
mode mesh
address 172.31.94.201
port 3002 # Heartbeat port for this node.
interval 150 # controls how often to send a heartbeat packet
timeout 10 # number of intervals after which a node is considered to be missing
}

fabric {
port 3001
}

info {
port 3003
}
}

namespace test {
replication-factor 2
memory-size 40G
default-ttl 0
index-type shmem
high-water-disk-pct 50
high-water-memory-pct 60
stop-writes-pct 90
nsup-period 0

storage-engine device {
device /dev/xvdb
data-in-memory false
write-block-size 128K
min-avail-pct 5
}
}

xdr {
# Change notification XDR block that round-robins between two connector nodes
dc aerospike-kafka-source {
connector true
node-address-port 172.31.58.190 8080
namespace test {
}
}
}

Start the Aerospike service

Copy the license feature file to the aerospike configuration directory.

sudo cp features.conf /etc/aerospike/

Start the Aerospike server and check the logs to ensure there are no errors.

sudo systemctl start aerospikesudo systemctl status aerospike

Aerospike Kafka Source Connector

The seamless flow of data from Aerospike Database Enterprise Edition to Apache Kafka hinges on the utilization of the Aerospike Kafka source (outbound) connector. This connector subscribes to change notifications. Upon receiving these notifications, the connector converts them into messages, which are dispatched to Kafka topics. Going back to the ec2 instance you created earlier with our Kafka client configured, go ahead and install the Aerospike Kafka Source Connector. This is your outbound connector to send data from the Aerospike to MSK.

sudo yum install java #( install 11+ JDK )wget https://enterprise.aerospike.com/artifacts/enterprise/aerospike-kafka-outbound/5.0.1/aerospike-kafka-outbound-5.0.1-1.noarch.rpmsudo rpm -i aerospike-kafka-outbound-5.0.0-1.noarch.rpm

Configure the outbound connector

The terms “outbound” and “source connector” are used interchangeably in this article.

  1. Locate the following file on the Kafka client box: /etc/aerospike-kafka-outbound/aerospike-kafka-outbound.yml.
  2. Replace the broker address for one of the node addresses in the MSK Kafka cluster producer-props.bootstrap.servers.
  3. Then add the following contents to the file with the changes that have been outlined.
# Change the configuration for your use case.
#
# Refer to https://www.aerospike.com/docs/connectors/enterprise/kafka/outbound/configuration/index.html
# for details.

# The connector's listening ports, TLS, and network interface.
service:
port: 8080

# Format of the Kafka destination message.
format:
mode: flat-json
metadata-key: metadata

# Aerospike record routing to a Kafka destination.
routing:
mode: static
destination: aerospike

# Kafka producer initialization properties.
producer-props:
bootstrap.servers:
- b-3.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098
ssl.truststore.location: /etc/aerospike-kafka-outbound/kafka.client.truststore.jks
ssl.truststore.password: changeit
security.protocol: SASL_SSL
sasl.mechanism: AWS_MSK_IAM
sasl.jaas.config: software.amazon.msk.auth.iam.IAMLoginModule required awsProfileName=default;
sasl.client.callback.handler.class: software.amazon.msk.auth.iam.IAMClientCallbackHandler

# The logging properties.
logging:
file: /var/log/aerospike-kafka-outbound/aerospike-kafka-outbound.log
enable-console-logging: true
levels:
root: debug
record-parser: debug
server: debug
com.aerospike.connect: debug
ticker-interval: 3600

Create the CA certificate trust store for use in the Kafka Outbound Connector config. You can see the SSL trust store location referenced in the file above as ssl.truststore.location.

sudo cp /usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts /etc/aerospike-kafka-outbound/kafka.client.truststore.jkssudo chmod 755 /etc/aerospike-kafka-outbound/kafka.client.truststore.jks

Finally, make the AWS IAM Kafka Auth Jar file available to the Aerospike Outbound Kafka Connector. This is the same jar file that you downloaded and added to the kafka/libs folder.

sudo cp kafka_2.12-2.8.1/libs/aws-msk-iam-auth-1.1.1-all.jar /opt/aerospike-kafka-outbound/lib/aws-msk-iam-auth-1.1.1-all.jar

Start the service.

sudo systemctl enable aerospike-kafka-outboundsudo systemctl start aerospike-kafka-outbound

Send data from Aerospike to Kafka

Open a separate window so you can list all messages on the Aerospike Kafka topic. Start by adding one of the private endpoint bootstrap servers as an environment variable for ease of use.

export BootstrapServerString="b-3.msktutorialcluster.450050.c11.kafka.us-east-1.amazonaws.com:9098"

Run the consumer client as follows:

./kafka-console-consumer.sh --bootstrap-server $BootstrapServerString --consumer.config client.properties --topic aerospike --from-beginning

In a new window, start AQL, the Aerospike command line client which connects to your Aerospike Database.

aql -U auser -P a-secret-pwd

Insert some data

insert into test (pk, a) values(400, "Your winning lottery ticket awaits you")

Check to see if the message appears in the Kafka consumer window

{"metadata":{"namespace":"test","userKey":400,"digest":"W7eGav2hKfOU00xx7mnOPYa2uCo=","msg":"write","gen":1,"lut":1681488437767,"exp":0},"a":"Your winning lottery ticket awaits you"}

Conclusion

You’ve just discovered how straightforward it is to transmit data from Aerospike to AWS MSK Kafka while ensuring client authentication through AWS IAM permissions! From establishing an Aerospike Database from scratch to configuring the AWS MSK Kafka cluster and employing the Aerospike Outbound Kafka Connector, you’ve effortlessly constructed a real-time streaming data pipeline. Congratulations on this accomplishment!

Share your experience! Your feedback is important to us. Join our Aerospike community!

--

--