“DATA SUPERHIGHWAY: THE DIRECT ROUTE FROM MSK CONNECT TO OPENSEARCH”

Streamlining MSK Connect to OpenSearch for Seamless Loading — Part B

Ram Ambadas Dandale
Ankercloud Engineering
6 min readMar 28, 2024

--

Part B — Seamlessly Managing Multi-Topic Data with Single MSK Connector

MSK Connect to OpenSearch

Welcome back, dear readers! If you’ve followed us from Part A of this blog, thank you for continuing this journey with us. And if you’re new here, welcome! In Part A, we explored the successful steps required to load data from an MSK Connect to an OpenSearch index. Additionally, we discussed the creation of a custom plugin by combining two different connectors, as well as the setup of the MSK connector configuration. Finally, we examined how to verify the data being loaded into the OpenSearch index.

Now, as we dive into Part B, we’ll delve even deeper into how we can load multi-topic data using a single MSK connector, and we will explore some additional MSK connector configurations that will help us manage the data efficiently. So, whether you’re continuing your journey from Part A or starting fresh with Part B, let’s embark together on this exploration of seamlessly managing Multi-Topic Data with MSK Connector and uncover valuable insights along the way.

In the previous part, we have already covered creating roles, and policies, customizing MSK plugins, and creating an MSK Connector. If you want to review that, you can check the previous blog here. Now, we will jump directly to the advanced section, which involves creating an MSK Connector with multi-topic data and some advanced connector configurations.

MSK Connector Configurations

connector.class=io.aiven.kafka.connect.opensearch.OpensearchSinkConnector
type.name=kafka-connect
tasks.max=5
bootstrap.servers=b-1.vpc-test-2.o4q88o.c6.kafka.ap-south-1.amazonaws.com:9094,b-2.vpc-test-2.o4q88o.c6.kafka.ap-south-1.amazonaws.com:9094,b-3.vpc-test-2.o4q88o.c6.kafka.ap-south-1.amazonaws.com:9094
topics=topic1-json,topic2-json,topic3-json
key.ignore=true
schema.ignore=true
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url=https://test-abcdefghijklmnopqrstuvw33xyz.ap-south-1.es.amazonaws.com:443
connection.username=test
connection.password=Test@123
transforms=Filter
transforms.Filter.type=io.aiven.kafka.connect.transforms.FilterByFieldValue$Value
transforms.Filter.field.name=EventId
transforms.Filter.field.value.pattern=^5000$
transforms.Filter.field.value.matches=true
data.stream.enabled=True
data.stream.prefix=pct
data.stream.timestamp.field=TimeStamp
max.buffered.records=600
errors.tolerance=all
errors.log.enable=True
errors.log.include.messages=True
errors.deadletterqueue.context.headers.enable=False
errors.deadletterqueue.topic.name=dlq-topic-json
behavior.on.version.conflict=ignore
behavior.on.null.values=ignore
behavior.on.malformed.documents=ignore
drop.invalid.message=False
index.write.method=upsert
batch.size=1000
flush.timeout.ms=50000
compact.map.entries=True
max.retries=3
connection.timeout.ms=3000

In the above connector configurations, we have passed a list of topics from which we are loading data into OpenSearch. The list of topics for the OpenSearch sink connector is divided by commas like this.

topics=topic1-json,topic2-json,topic3-json

OpenSearch Sink Connector Configuration Options

In the previous part of our blog, we assessed the options up until transforms.Filter.field.value.matches=true. Now, let's evaluate the advanced OpenSearch Sink Connector Configuration Options in the connector configuration so you can replace them with your values.

1). data.stream.enabled = True

Enable the use of data streams. If set to true the connector will write to data streams instead of regular indices. The default is false.

2). data.stream.prefix = pct

Generic data stream name to write into. If set, it will be used to construct the final data stream name in the form of {data.stream.prefix}-{topic}. In this case, it’s set to “pct”.

3). data.stream.timestamp.field = TimeStamp

The Kafka record field to used as the timestamp for the @timestamp field in documents sent to a data stream. Here, it’s set to “TimeStamp”.

4). max.buffered.records = 600

The maximum number of records for each task will buffer before blocking the acceptance of more records. This config can be used to limit the memory usage for each task. In this case, it’s set to 600 records.

5). errors.tolerance = all

Sets the tolerance level for errors. “all” indicates tolerance for all errors.

6). errors.log.enable = True

Enables logging of errors. It’s set to True, indicating that error logging is enabled.

7). errors.log.include.messages = True

Specifies whether error messages should be included in the logs. True means error messages will be included.

8). errors.deadletterqueue.context.headers.enable = False

Enables or disables the inclusion of context headers in the dead-letter queue. Here, it’s set to False, indicating that context headers in the dead-letter queue are disabled.

9). errors.deadletterqueue.topic.name = dlq-topic-json

Specifies the name of the topic used for the dead-letter queue. In this case, it’s set to “dlq-topic-json”. Here, the term “topic” needs to be replaced with the actual topic name.

10). behavior.on.version.conflict = ignore

How to handle records that OpenSearch rejects due to document version conflicts. It may happen when offsets were not committed or/and records have to be reprocessed. Here, it’s set to “ignore”, indicating that conflicts will be ignored.

Valid options are:

  • ignore - ignore and keep the existing record
  • warn - log a warning message and keep the existing record
  • report - report to errant record reporter and keep the existing record
  • fail - fail the task.

11). behavior.on.null.values = ignore

How to handle records with a non-null key and a null value (i.e. Kafka tombstone records). Valid options are ignore, delete, and fail.

12). behavior.on.malformed.documents = ignore

How to handle records that OpenSearch rejects due to some malformation of the document itself, such as an index mapping conflict or a field name containing illegal characters.

Valid options are:

  • ignore - do not index the record
  • warn - log a warning message and do not index the record
  • report - report to errant record reporter and do not index the record
  • fail - fail the task.

13). drop.invalid.message = False

Whether to drop a Kafka message when it cannot be converted to an output message. Here, it’s set to False.

14). index.write.method = upsert

The method used to write data into the OpenSearch index. The default value is insert which means that the record with the same document ID will be replaced. The upsert will create a new document if one does not exist or will update the existing document.

15). batch.size = 1000

The number of records to process as a batch when writing to OpenSearch. It’s set to 1000 records per batch.

16). flush.timeout.ms = 50000

The timeout in milliseconds to use for periodic flushing, and when waiting for buffer space to be made available by completed requests as records are added. If this timeout is exceeded the task will fail. Here, it’s set to 50,000 milliseconds (or 50 seconds).

17). compact.map.entries = True

Defines how map entries with string keys within record values should be written to JSON. When this is set to true, these entries are written compactly as "entryKey": "entryValue". Otherwise, map entries with string keys are written as a nested document {"key": "entryKey", "value": "entryValue"}. All map entries with non-string keys are always written as nested documents. Prior, this connector always wrote map entries as nested documents, so set this to false use that older behavior.

18). max.retries = 3

The maximum number of retries that are allowed for failed indexing requests. If the retry attempts are exhausted the task will fail. The default value is 5.

19).connection.timeout.ms=3000

How long to wait in milliseconds when establishing a connection to the OpenSearch server. The task fails if the client fails to connect to the server in this interval, and will need to be restarted. The default value is 3000 (3 seconds).

Pros

  • Utilizing a single connector streamlines the data loading process, making it easier to manage and maintain.
  • With a single connector, there’s less overhead and potential for errors compared to managing multiple connectors.
  • By reducing the number of connectors, you may lower costs associated with resource allocation and management.
  • Loading data through a single connector ensures consistency in data ingestion methods and configurations.
  • The use of a single connector can simplify scalability efforts, as you only need to manage scaling for one integration point.

Cons

  • Using a single connector may limit flexibility in terms of data transformation and routing options compared to using multiple specialized connectors.
  • If the single connector becomes a bottleneck due to high data volume or complex data processing requirements, it could impact overall system performance.
  • Relying on a single connector means that any issues or downtime with that connector could potentially disrupt data loading for all topics.
  • While using fewer connectors can simplify management in some cases, it may also increase complexity if the single connector needs to handle diverse data sources or formats.
  • Concentrating all data loading tasks onto a single connector may increase the risk of overloading and resource contention, particularly during peak usage periods.

If you’re interested in delving deeper into similar topics, don’t hesitate to visit my profile for more insightful blogs. You can find it right here.

Conclusion

By leveraging a single MSK connector, users can simplify management processes while enhancing data ingestion capabilities. Despite potential limitations, such as reduced flexibility and dependency risks, the benefits of simplicity, scalability, and cost-effectiveness outweigh the drawbacks. With careful configuration and consideration of specific requirements, utilizing a single connector proves to be a valuable solution for seamlessly loading data into OpenSearch, empowering organizations to optimize their data management strategies and drive actionable insights.

--

--