terraform: create MSK work machine

John Zen
3 min readJul 31, 2024

--

I like to create an EC2 machine to run kafka command against an MSK cluster.

I follow this document.

TL;DR

Create an EC2 instance

  • in same vpc as the target msk cluster (or wherever network is reachable); in a private subnet
  • with instance profile to allow access to the msk cluster. (reference).
  • install kafka client software in cloud-init (reference)

At target msk cluster, add this ec2 security group to allow access from work machine.

Verify by running a kafka command

Steps

I describe the steps in reverse order as after create, the most frequent information to access is the verify commands.

Verify

# get cluster arn
cluster_arn=$(aws kafka list-clusters --region ap-southeast-2 --cluster-name-filter "${cluster_name}" --query 'ClusterInfoList[0].ClusterArn' | tr -d '"' )

# get IAM bootstrap string
bootstrap_brokers_str=$(aws kafka get-bootstrap-brokers --region ap-southeast-2 --cluster-arn "${cluster_arn}" --query 'BootstrapBrokerStringSaslIam' | tr -d '"' )

kafka-topics.sh --list \
--bootstrap-server ${bootstrap_brokers_str} \
--command-config /opt/kafka/kafka_2.13-3.5.1/config/client-config.properties

# client-config.properties is pre-set in cloud-init

Cloud-init log is in /var/log/cloud-init-output.log .

kafka client software is installed in /opt/kafka .

msk cluster allow ec2 traffic

data "aws_security_group" "my-msk-work" {
name = "my-msk-work"
}

locals {
...
my-msk-cluster = {
security_group_ingress = [
{
description = "Broker Access"
from_port = "9098"
to_port = "9098"
protocol = "tcp"
cidr_blocks = [
...
]
security_groups = [
data.aws_security_group.msk-test-work.id,
...
]
...
}
]
}

...
}

create EC2 instance

Userdata translated from here.

#cloud-config
# kafka client software runs on Java
packages:
- java-11
package_update: true
package_upgrade: true

write_files:
# creae a shell script to run at scripts per once stage
- path: /var/lib/cloud/scripts/per-once/10_install.sh
permissions: "0755"
content: |
#!/bin/bash
echo $(date) ': Scripts per once' >> /var/tmp/my-log.txt
mkdir -p /opt/kafka
cd /opt/kafka
wget https://archive.apache.org/dist/kafka/${msk_version}/kafka_2.13-${msk_version}.tgz

tar -xzf kafka_2.13-${msk_version}.tgz

cd kafka_2.13-${msk_version}/libs
wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar

# client-config.properties
- path: /opt/kafka/kafka_2.13-${msk_version}/config/client-config.properties
permissions: "0644"
content: |
# Sets up TLS for encryption and SASL for authN.
security.protocol = SASL_SSL

# Identifies the SASL mechanism to use.
sasl.mechanism = AWS_MSK_IAM

# Binds SASL client implementation.
sasl.jaas.config = software.amazon.msk.auth.iam.IAMLoginModule required;

# Encapsulates constructing a SigV4 signature based on extracted credentials.
# The SASL client bound by "sasl.jaas.config" invokes this class.
sasl.client.callback.handler.class = software.amazon.msk.auth.iam.IAMClientCallbackHandler
- path: "/etc/profile"
append: true
content: |
export PATH=$PATH:/opt/kafka/kafka_2.13-${msk_version}/bin
export CLASSPATH=/opt/kafka/kafka_2.13-${msk_version}/libs/iam-auth/aws-msk-iam-auth-1.1.0-all.jar

terraform code

Instance iam profile is created with permission described here.

To run kafka-reassign-partitions.sh , need to add:

        {
"Action": [
"kafka-cluster:AlterCluster"
],
"Effect": "Allow",
"Resource": [
"arn:aws:kafka:ap-southeast-2:1234567890:cluster/*"
]
},
locals {
msk-work = {
name = "my-msk-work"

description = "msk client machine to connect to my-msk-cluster"

key_name = "my_key_name"

# use a name search string to search for ami image
ami_search_str = "al2023-ami-2023.*-kernel-6.1-x86_64"

# role of same name has permission to msk cluster
iam_instance_profile = "kafka-connect"

instance_type = "t3.large"

# need vpc to create security group
vpc_cidr = "10.20.0.0/16"
# a private subnet where ec2 resite
subnet_cidr = "10.20.30.0/24"

# allow access from jumphost
sg_ingress_rules = [
{
description = "ssh from jumphost",
from_port = 22,
to_port = 22,
cidrs = [module.constants.cidrs.jumphost.internal]
}
]

userdata_vars = {
msk_version = "3.5.1"
}
}

...
}
```

```json
data "aws_ami" "this" {
most_recent = true

filter {
name = "name"
values = [local.msk-work.ami_search_str]
}
}

data "aws_iam_instance_profile" "this" {
name = local.msk-work.iam_instance_profile
}

data "aws_vpc" "this" {
cidr_block = local.msk-work.vpc_cidr
}

data "aws_subnet" "this" {
cidr_block = local.msk-work.subnet_cidr
}

resource "aws_security_group" "this" {
name = local.msk-work.name
description = "Security group for ${local.msk-work.name} ec2 instance"
vpc_id = data.aws_vpc.this.id

dynamic "ingress" {
for_each = local.msk-work.sg_ingress_rules
content {
description = ingress.value.description
from_port = ingress.value.from_port
to_port = ingress.value.to_port
protocol = "tcp"

cidr_blocks = try(ingress.value.cidrs, null)
security_groups = try(ingress.value.security_groups, null)
}
}

egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
ipv6_cidr_blocks = ["::/0"]
}

tags = local.tags
}

resource "aws_instance" "this" {
instance_type = local.msk-work.instance_type

ami = data.aws_ami.this.id

key_name = local.msk-work.key_name

vpc_security_group_ids = [aws_security_group.this.id]

subnet_id = data.aws_subnet.this.id

iam_instance_profile = data.aws_iam_instance_profile.this.name
user_data = templatefile("./userdata.yaml", local.msk-work.user_data_vars)

tags = merge(
local.tags,
{
Name = local.msk-work.name
}
)
}

--

--