Step-by-Step Guide to deploy a Kafka Cluster with AWS MSK and Terraform

A Comprehensive Walkthrough to Deploying and Managing Kafka Clusters with Amazon MSK and Terraform

Ana Escobar
datamindedbe

--

This is the second article of the series “Building a managed streaming data pipeline”. If you haven’t read the first one, I leave the link here for you to have a look:

In my first article I showed you how to use Amazon EMR Serverless to run your PySpark job managing the infrastructure in Terraform. In this case, I’ll show you how to use Amazon MSK to deploy your Kafka Cluster using also Terraform. Finally, the next article will be about putting all the pieces together and building the streaming data pipeline using both technologies.

Basic knowledge and some experience with Apache Kafka is recommended to fully understand all the configurations provided in the Terraform files.

Getting started

What do you need? First, you need to have all these things installed:

You also need to have an AWS account. All the necessary links are provided in this article.

Amazon MSK — Managed Streaming for Apache Kafka

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed service that enables you to build and run applications that use Apache Kafka to process streaming data.

It is an excellent solution for when you need to capture and process streams of events as they happen.

Amazon MSK offers Apache Kafka as a service, removing all operational complexities. You can use the AWS Console to get your Kafka clusters up an running but, it’s often a better idea to automate the lifecycle of your clusters using an infrastructure-as-code tool such as Terraform.

Managing your infrastructure with Terraform

We’ll put all our terraform files under a folder called terraform for better organisation.

config.tf

terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}
}

# Configure the AWS Provider
provider "aws" {
region = var.region
}

variables.tf

# This variable defines the AWS Region.
variable "region" {
description = "region to use for AWS resources"
type = string
default = "eu-west-1"
}

variable "global_prefix" {
type = string
default = "demo-msk"
}

variable "private_cidr_blocks" {
type = list(string)
default = [
"10.0.1.0/24",
"10.0.2.0/24",
"10.0.3.0/24",
]
}

variable "cidr_blocks_bastion_host" {
type = list(string)
default = ["10.0.4.0/24"]
}

data.tf

# Get the AccountId
data "aws_caller_identity" "current" {}

data "aws_availability_zones" "available" {
state = "available"
}

data "aws_ami" "amazon_linux_2023" {
most_recent = true
owners = ["amazon"]
filter {
name = "owner-alias"
values = ["amazon"]
}
filter {
name = "name"
values = ["al2023-ami-*"]
}
filter {
name = "architecture"
values = ["x86_64"]
}
}

main.tf

################################################################################
# Cluster
################################################################################
resource "aws_kms_key" "kafka_kms_key" {
description = "Key for Apache Kafka"
}

resource "aws_cloudwatch_log_group" "kafka_log_group" {
name = "kafka_broker_logs"
}

resource "aws_msk_configuration" "kafka_config" {
kafka_versions = ["3.4.0"]
name = "${var.global_prefix}-config"
server_properties = <<EOF
auto.create.topics.enable = true
delete.topic.enable = true
EOF
}

resource "aws_msk_cluster" "kafka" {
cluster_name = var.global_prefix
kafka_version = "3.4.0"
number_of_broker_nodes = length(data.aws_availability_zones.available.names)
broker_node_group_info {
instance_type = "kafka.m5.large" # default value
storage_info {
ebs_storage_info {
volume_size = 1000
}
}
client_subnets = [aws_subnet.private_subnet[0].id,
aws_subnet.private_subnet[1].id,
aws_subnet.private_subnet[2].id]
security_groups = [aws_security_group.kafka.id]
}
encryption_info {
encryption_in_transit {
client_broker = "PLAINTEXT"
}
encryption_at_rest_kms_key_arn = aws_kms_key.kafka_kms_key.arn
}
configuration_info {
arn = aws_msk_configuration.kafka_config.arn
revision = aws_msk_configuration.kafka_config.latest_revision
}
open_monitoring {
prometheus {
jmx_exporter {
enabled_in_broker = true
}
node_exporter {
enabled_in_broker = true
}
}
}
logging_info {
broker_logs {
cloudwatch_logs {
enabled = true
log_group = aws_cloudwatch_log_group.kafka_log_group.name
}
}
}
}

################################################################################
# General
################################################################################

resource "aws_vpc" "default" {
cidr_block = "10.0.0.0/16"
enable_dns_hostnames = true
}

resource "aws_internet_gateway" "default" {
vpc_id = aws_vpc.default.id
}

resource "aws_eip" "default" {
depends_on = [aws_internet_gateway.default]
domain = "vpc"
}

resource "aws_route" "default" {
route_table_id = aws_vpc.default.main_route_table_id
destination_cidr_block = "0.0.0.0/0"
gateway_id = aws_internet_gateway.default.id
}

resource "aws_route_table" "private_route_table" {
vpc_id = aws_vpc.default.id
}

resource "aws_route_table_association" "private_subnet_association" {
count = length(data.aws_availability_zones.available.names)
subnet_id = element(aws_subnet.private_subnet.*.id, count.index)
route_table_id = aws_route_table.private_route_table.id
}

################################################################################
# Subnets
################################################################################

resource "aws_subnet" "private_subnet" {
count = length(var.private_cidr_blocks)
vpc_id = aws_vpc.default.id
cidr_block = element(var.private_cidr_blocks, count.index)
map_public_ip_on_launch = false
availability_zone = data.aws_availability_zones.available.names[count.index]
}

resource "aws_subnet" "bastion_host_subnet" {
vpc_id = aws_vpc.default.id
cidr_block = var.cidr_blocks_bastion_host[0]
map_public_ip_on_launch = true
availability_zone = data.aws_availability_zones.available.names[0]
}

################################################################################
# Security groups
################################################################################

resource "aws_security_group" "kafka" {
name = "${var.global_prefix}-kafka"
vpc_id = aws_vpc.default.id
ingress {
from_port = 0
to_port = 9092
protocol = "TCP"
cidr_blocks = var.private_cidr_blocks
}
ingress {
from_port = 0
to_port = 9092
protocol = "TCP"
cidr_blocks = var.cidr_blocks_bastion_host
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}

resource "aws_security_group" "bastion_host" {
name = "${var.global_prefix}-bastion-host"
vpc_id = aws_vpc.default.id
ingress {
from_port = 22
to_port = 22
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}

resource "tls_private_key" "private_key" {
algorithm = "RSA"
rsa_bits = 4096
}

resource "aws_key_pair" "private_key" {
key_name = var.global_prefix
public_key = tls_private_key.private_key.public_key_openssh
}

resource "local_file" "private_key" {
content = tls_private_key.private_key.private_key_pem
filename = "cert.pem"
}

resource "null_resource" "private_key_permissions" {
depends_on = [local_file.private_key]
provisioner "local-exec" {
command = "chmod 600 cert.pem"
interpreter = ["bash", "-c"]
on_failure = continue
}
}

################################################################################
# Client Machine (EC2)
################################################################################

resource "aws_instance" "bastion_host" {
depends_on = [aws_msk_cluster.kafka]
ami = data.aws_ami.amazon_linux_2023.id
instance_type = "t2.micro"
key_name = aws_key_pair.private_key.key_name
subnet_id = aws_subnet.bastion_host_subnet.id
vpc_security_group_ids = [aws_security_group.bastion_host.id]
user_data = templatefile("bastion.tftpl", {
bootstrap_server_1 = split(",", aws_msk_cluster.kafka.bootstrap_brokers)[0]
bootstrap_server_2 = split(",", aws_msk_cluster.kafka.bootstrap_brokers)[1]
bootstrap_server_3 = split(",", aws_msk_cluster.kafka.bootstrap_brokers)[2]
})
root_block_device {
volume_type = "gp2"
volume_size = 100
}
}

bastion.tftpl

#!/bin/bash

yum update -y
yum install java-1.8.0 -y
yum install java-17-amazon-corretto-devel.x86_64 -y
yum install wget -y
wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz
tar -xzf kafka_2.13-3.4.0.tgz
rm kafka_2.13-3.4.0.tgz

cat > /home/ec2-user/bootstrap-servers <<- "EOF"
${bootstrap_server_1}
${bootstrap_server_2}
${bootstrap_server_3}
EOF

echo "PATH=$PATH:/bin:/usr/local/bin:/usr/bin:/kafka_2.13-3.4.0/bin" >> /home/ec2-user/.bash_profile

source ~/.bash_profile

outputs.tf

################################################################################
# Client Machine (EC2 instance)
################################################################################
output "execute_this_to_access_the_bastion_host" {
value = "ssh ec2-user@${aws_instance.bastion_host.public_ip} -i cert.pem"
}

Overview of resources

The variable global_prefix is used to prefix all the resources that will be created. This is useful if you want to identify the resources created for this specific use case in a fast way.

You can change the value of this variable in the variables.tf file.

We are also customising our Kafka Cluster options. This means we will change the parameters that are going to be written into the server.properties file of each broker of the cluster. This can be found under the resource "aws_msk_configuration" "kafka_config" in the serfver_properties field. Basically we are telling with the configuration provided that we want to enable automatic topic creation in the Kafka cluster. This means that whenever developers write and read data from topics, they will be created automatically in the cluster. Similarly, we are enabling the deletion of topics, so the cluster won't reject any commands issued to delete topics.

Having these configurations are great because it allows you to have better control of your clusters, as they are managed separately from the cluster. You can share the same configuration with different Kafka clusters, or have each cluster with their own configuration.

Based on resource "aws_msk_cluster" "kafka", our cluster will have 3 nodes, each one using the kafka.m5.large instance type. We configured each broker to use an AWS EBS volume, with 1TB of capacity. This cluster will run in private subnets and use a custom security group. Encryption in transit and at rest was enabled. For at rest, we used the custom KMS key.

Networking

This cluster will run in private subnets. This is important because Kafka is a persistent layer for applications and microservices; and just like you would do with any other data store, it is a best practice to isolate the resource in private subnets. For this reason, you will need to create three subnets in a given VPC, associate a valid CIDR block for each, and map them to availability zones.

Finally, you need to create a security group for the Kafka cluster. This is required because you want to allow ingress traffic to the cluster over the exposed port 9092, and this traffic needs to be enabled for all private subnets.

Monitoring using Amazon CloudWatch is also configured in the Terraform files.

Deployment

Locate yourself under the terraform directory and run the following commands:

aws configure  # enter your AWS credentials

terraform init
terraform plan # this is optional, it will show you what will be deployed - check that 23 resources will be created
terraform apply

It will take around 35 minutes to complete the deployment. You will see the following output:

Apply complete! Resources: 23 added, 0 changed, 0 destroyed.

execute_this_to_access_the_bastion_host = "ssh ec2-user@x.xxx.xxx.xxx -i cert.pem"

You probably have noticed as well a new file under your terraform folder, cert.pem. This is the private key that you need to use to access the bastion host. You can use the command provided in the output to access the bastion host.

Now, check on your AWS Account that everything has been created. Connect to you EC2 instance with the output command.

Test your Kafka Cluster

Once you are connected to the bastion host, you can test the Kafka cluster by running the following commands:

# Check your bootstrap servers
more bootstrap-servers

# Create a topic
kafka-topics.sh --bootstrap-server <your-bootstrap-server> --create --topic test --partitions 6 --replication-factor 3

# Check the correct creation of your topic
kafka-topics.sh --list --bootstrap-server <your-bootstrap-server>

The following GitHub repository was taken as reference to build this project.

Next steps

You could use GitHub Actions in combination with Terraform Cloud to automate your workflow, so that in every push to main terraform checks if your infrastructure has any changes and applies them automatically without you having to type terraform apply in your terminal and approving the changes.

Do you like this content?

Subscribe to my medium page and be the first to get notified whenever I publish a new one!

Follow me on LinkedIn for daily insights about Software & Data Engineering 🫰🏻

--

--

Ana Escobar
datamindedbe

Galician 🖖🏼 | Data enthusiast, passionate about Event-Streaming platforms | ana-escobar.com