“Data Superhighway: The Direct Route from MSK Connect to OpenSearch”

Streamlining MSK Connect to OpenSearch for Seamless Loading — Part A

Ram Ambadas Dandale
Ankercloud Engineering
11 min readFeb 22, 2024

--

Part A — Simplifying Data Loading with MSK Connector for OpenSearch

MSK Connect to OpenSearch

In this blog, we’re diving into the exciting world of data pipelines, specifically how to craft a direct, super-efficient route from MSK Connect to an OpenSearch cluster. We’re going to be using some powerful tools here: the Aiven MSK Connector for OpenSearch and the Aiven Transformation Connector. Together, they’ll help us build a streamlined pipeline that smoothly moves data from a Kafka topic to an OpenSearch index.

Now, we’re all about keeping things simple and straightforward here. So, we’ll be guiding you through the entire setup process step by step, ensuring you understand how this pipeline works. But let’s make one thing clear: this blog won’t cover the basics, like setting up your MSK or OpenSearch clusters, creating Kafka topics, or setting up OpenSearch indices. We’re diving straight into the data pipeline!

Creating IAM Role

We need to provide the IAM role, which we must create for MSK. This role will grant MSK the necessary permissions to access AWS resources. Once provided, while loading data MSK will attempt to connect to the OpenSearch cluster and store the data in index.

MSK Connector Role

Inline Policy

{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:DescribeCluster"
],
"Resource": [
"arn:aws:kafka:region-name:account-id:cluster/*"
],
"Effect": "Allow"
},
{
"Action": [
"kafka-cluster:WriteData",
"kafka-cluster:ReadData",
"kafka-cluster:DescribeTopic",
"kafka-cluster:DescribeTopicDynamicConfiguration",
"kafka-cluster:CreateTopic"
],
"Resource": [
"arn:aws:kafka:region-name:account-id:topic/*"
],
"Effect": "Allow"
},
{
"Action": [
"kafka-cluster:AlterGroup",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:region-name:account-id:group/*"
],
"Effect": "Allow"
},
{
"Action": [
"kafkaconnect:*",
"ec2:DescribeSubnets",
"ec2:DescribeVpcs",
"ec2:DescribeSecurityGroups",
"logs:CreateLogDelivery",
"logs:GetLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups"
],
"Resource": "*",
"Effect": "Allow"
}
]
}

Creating MSK Connectors

To initiate the implementation, a series of steps must be followed:

  • Download the required plugins.
  • Create customized plugins.
  • Upload the plugins to S3.
  • Creating the MSK custom plugin.
  • Creating worker configurations.
  • Creating MSK connector.

Let’s get started with our implementation according to the above steps.

1. Downloading required plugins

Here, we are going to use the Aiven OpenSearch connector and the Aiven transformation connector, both of which are available on GitHub.

Aiven OpenSearch connector

First, we will explore the Aiven OpenSearch connector.

Aiven OpenSearch connector for kafka

In the above image, you can see the “Releases” section marked in red, with version 3.1.1. Once we click on it, we will see a zip file named “opensearch-connector-for-apache-kafka-3.1.1.zip”, which we need to download.

Aiven Transformation connector

Let’s now examine an Aiven Transformation connector.

Aiven Transformation connector for Kafka

In the above image, you can see the “Releases” section marked in red, with version 1.5.0. Once we click on it, we will see a zip file named “transforms-for-apache-kafka-connect-1.5.0.zip”, which we need to download.

2. Creating customized plugins.

After downloading and extracting both connectors, go to the “transforms-for-apache-kafka-connect-1.5.0/transforms-for-apache-kafka-connect-1.5.0” folder. Here, you will find two .jar files. Copy “transforms-for-apache-kafka-connect-1.5.0.jar” into the “opensearch-connector-for-apache-kafka-3.1.1/opensearch-connector-for-apache-kafka-3.1.1” folder. You don’t need to copy “slf4j-api-1.7.36.jar” as it is already present in the “opensearch-connector-for-apache-kafka-3.1.1” folder. Once this is done, zip the “opensearch-connector-for-apache-kafka-3.1.1” folder, and our customized plugin is created.

Copying transforms-for-apache-kafka-connect-1.5.0.jar into opensearch-connector-for-apache-kafka-3.1.1

3. Uploading the plugin to S3.

We have created an S3 bucket named “msk-custom-plugins,” and we have uploaded our zip file “opensearch-connector-for-apache-kafka-3.1.1.zip” into it.

Plugin uploaded to S3 bucket

4. Creating MSK custom plugin.

Now, we are going to create an MSK custom plugin. To do this, we need to go to the MSK console, then navigate to the custom plugin section. Here, we should choose the “Create Custom Plugin” button, add the required details, and finally, click on the “Create” button.

Creating MSK custom plugin

5. Creating worker configurations.

Firstly, we need to specify the worker configurations and add the offset value. In offset.storage.topic = __amazon_msk_connect_offsets_<topic-name>_json, you need to replace <topic-name> with your actual topic name.

Creating worker configurations

6. Creating MSK Connector.

Firstly, we need to choose the custom plugin that we have created from the dropdown list.

Selection of MSK custom plugin

Here, we need to provide a suitable name for the connector and a description. We can also choose between Kafka cluster types, such as an MSK cluster or a self-managed Apache Kafka cluster.

Connector Name and Description

Here we need to select in authentication of the MSK cluster as IAM. Then we need to add connector configurations in the provided area. A connector configuration is a set of key-value mappings.

Adding Connector Configuration

For the connector capacity, as provisioned, we will use default values for MCU count per worker as 1 and the number of workers as 1. This will provide us with a total connector capacity of 1 MCU, 4GB of memory, and a network bandwidth of up to 10 Gbps.

MSK Connector Capacity

In the worker configuration, we can use the MSK default configuration or we can use a custom configuration. So here, we will choose custom configurations which we have created just in the above step.

Selecting worker configuration

In the IAM role, we will select the role from the dropdown list that we created in the above steps.

Selecting IAM Role

For security configurations, we have chosen the default configurations.

Default Security Configurations

For log storage, we are going to use Amazon CloudWatch Logs, and we have created a separate log group named “msk-connector-log-group.” So here, we are going to add the log group ARN.

MSK Cloudwatch Logs

Lastly, we can review and create the connector. It will take approximately 10 to 15 minutes to create. Once it is up and running, its status will change.

Created MSK connector

Let’s see what connector configuration we need to add so that our data will be directly migrated from the Kafka topic to the OpenSearch index.

connector.class=io.aiven.kafka.connect.opensearch.OpensearchSinkConnector
type.name=kafka-connect
tasks.max=5
bootstrap.servers=b-1.vpc-test-2.o4q88o.c6.kafka.ap-south-1.amazonaws.com:9094,b-2.vpc-test-2.o4q88o.c6.kafka.ap-south-1.amazonaws.com:9094,b-3.vpc-test-2.o4q88o.c6.kafka.ap-south-1.amazonaws.com:9094
topics=topic-json
key.ignore=true
schema.ignore=true
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url=https://test-abcdefghijklmnopqrstuvw33xyz.ap-south-1.es.amazonaws.com:443
connection.username=test
connection.password=Test@123
transforms=Filter
transforms.Filter.type=io.aiven.kafka.connect.transforms.FilterByFieldValue$Value
transforms.Filter.field.name=EventId
transforms.Filter.field.value.pattern=^5000$
transforms.Filter.field.value.matches=true

Now, let’s evaluate the individual options in the connector configuration so you can replace them with your values.

1). connector.class = io.aiven.kafka.connect.opensearch.OpensearchSinkConnector

The connector.class specifies the class name of the connector to be used, in this case, io.aiven.kafka.connect.opensearch. OpensearchSinkConnector, which is a connector for integrating Kafka with OpenSearch.

2). type.name = kafka-connect

The type.name property specifies the OpenSearch type name that will be used when indexing documents from Kafka messages. This optional property is typically used to define a mapping between Kafka message keys and OpenSearch document types.

3). tasks.max = 5

The tasks.max property specifies the maximum number of tasks that the connector will use to process Kafka data in parallel. In this case, it has been set to 5, allowing for concurrent processing of up to 5 Kafka data streams. By default, this is 1.

4). bootstrap.servers = b-1.vpc-test-2.o4q88o.c6.kafka.ap-south-1.amazonaws.com:9094,b-2.vpc-test-2.o4q88o.c6.kafka.ap-south-1.amazonaws.com:9094,b-3.vpc-test-2.o4q88o.c6.kafka.ap-south-1.amazonaws.com:9094

The bootstrap.servers property defines the list of host/port pairs that the client will use to establish an initial connection to the Kafka cluster. In this case, it points to a list of Amazon MSK cluster brokers in the AP South 1 region.

5). topics = topic-json

The topics property specifies the Kafka topic(s) from which the connector will consume messages. The topics are listed as a comma-separated list.

6). key.ignore = true

The key.ignore property is a boolean flag that determines whether the connector should ignore the message key. If set to true, the document ID is generated using the message’s topic, partition, and offset, while the message key is not used as an ID. If set to false, the message key is used as the document ID. The OpenSearch document ID is set as the message key.

7). schema.ignore = true

The schema.ignore property, when set to true, instructs the connector to not attempt to infer a schema for the data. This is useful when the value schema is null, as in this case, allowing the connector to push the data to OpenSearch without schema inference.

8). key.converter.schemas.enable = false

The key.converter.schemas.enable=false property instructs the connector not to attempt to read a schema for the data in the key. This is useful when the key data doesn’t have a schema, allowing the connector to set it to null.

9). value.converter.schemas.enable = false

The value.converter.schemas.enable=false property instructs the connector not to attempt to read a schema for the data in the value. This is useful when the value data doesn’t have a schema, allowing the connector to set it to null.

10). key.converter = org.apache.kafka.connect.storage.StringConverter

The key.converter property specifies the converter to use for the message key, which is org.apache.kafka.connect.storage.StringConverter. This indicates that the message key is expected to be a string.

11). value.converter = org.apache.kafka.connect.json.JsonConverter

The value.converter property specifies the converter to use for the message value, which in this case is org.apache.kafka.connect.json.JsonConverter. This indicates that the message value is expected to be in plain JSON format without a schema.

12). connection.url = https://test-abcdefghijklmnopqrstuvw33xyz.ap-south-1.es.amazonaws.com:443
connection.username = test
connection.password = Test@123

The connection.url, connection.username, and connection.password properties are parameters required to establish a connection with the OpenSearch cluster. connection.url specifies the URL of the OpenSearch cluster, while connection.username and connection.password are the credentials used for authentication.

13). transforms = Filter

The transformation allows filtering records based either on a specific field or whole value and a matching expected value or regex pattern. Only, string, numeric and boolean types are considered for matching purposes, other types are ignored.

14). transforms.Filter.type = io.aiven.kafka.connect.transforms.FilterByFieldValue$Value

The transforms.Filter.type property specifies the type of transformation that will be applied to the Kafka messages. In this case, it is io.aiven.kafka.connect.transforms.FilterByFieldValue$Value, indicating that the transformation filters messages based on a specific field’s value.

15). transforms.Filter.field.name = EventId

The transforms.Filter.field.name property specifies the name of the field used for filtering the Kafka messages. In this case, the field name is EventId. If field.name is empty, the whole value is considered for filtering.

16). transforms.Filter.field.value.pattern = ^5000$

The transforms.Filter.field.value.pattern property specifies the regex pattern used for filtering Kafka messages based on the value of the EventId field. In this case, the pattern is ^5000$, meaning it will match messages where the EventId field exactly equals 5000. Either field.value or field.value.pattern must be defined to apply the filter.

17). transforms.Filter.field.value.matches = true

The transforms.Filter.field.value.matches property specifies whether the field.value.pattern should match the entire value of the field (true) or just a part of it (false). In this case, the value is true, indicating that the entire field value must match the pattern.

In addition to these, there are many other options that we can add for connector configuration. For more information, please refer to the official Aiven documentation.

Checking the data in the OpenSearch Index

We have three options to verify if data is being loaded into the OpenSearch index: the OpenSearch dashboard, CloudWatch logs, and querying on the Batison host.

1). OpenSearch Dashboard

As we have created an OpenSearch cluster in the VPC’s private subnet, it will not be possible for us to directly use it. To do so, we would need to enable SAML authentication or Amazon Cognito authentication. However, if we want to use it, there will be an increase in cost. If we want to enable it, we need to edit the security configuration from the actions. Once this is done, we will be able to see the data in the dashboard under the index. We can check the number of documents in the index and we can also query the data.

Authentication for OpenSearch Dashboard

2). CloudWatch Logs

While creating an MSK connector, we have already created a CloudWatch log group named “msk-connector-log-group” which will push real-time logs for data loading in OpenSearch. In those logs, we can see tasks running on each worker with some offset value. Here, we will not be able to see actual data like the other two options. Instead, we will only get information that data is being loaded into OpenSearch.

msk-connector-log-group

3). Quering on Batison Host

We can choose this option when we don’t want to use the GUI OpenSearch dashboard; we can use the Batison host instead. We need to create the Batison host in the same VPC in its public subnet so we can run queries from that Batison host.

Query to run on Batison host to check indices in the OpenSearch Cluster

curl -s -k -X GET "https://test-abcdefghijklmnopqrstuvw33xyz.ap-south-1.es.amazonaws.com/_cat/indices?pretty" -u test:Test@123

Query to run on Batison host to check first 10 documents

curl -X GET "https://test-abcdefghijklmnopqrstuvw33xyz.ap-south-1.es.amazonaws.com/.ds-sample-topic-json-000001/_search?pretty" -H 'Content-Type: application/json' -u test:Test@123

Query to run on Batison host to check last document added in the index

curl -X GET "https://test-abcdefghijklmnopqrstuvw33xyz.ap-south-1.es.amazonaws.com/.ds-sample-topic-json-000001/_search?pretty" -H 'Content-Type: application/json' -u test:Test@123 -d'
{
"size": 1,
"sort": [
{
"TimeStamp": {
"order": "desc"
}
}
]
}'

If you’re interested in creating an OpenSearch serverless cluster for loading data from Kafka to OpenSearch, I’ve written a separate blog on that topic. You can check it out here.

Conclusion

We have successfully reviewed the steps required to load data from an MSK Connect to an OpenSearch index. Additionally, we have discussed the creation of a custom plugin by combining two different connectors, as well as the setup of the MSK connector configuration. Finally, we have examined how to verify the data being loaded into the OpenSearch index. These steps allow for the direct loading of data into OpenSearch, enhancing data management capabilities.

--

--