“DATA SUPERHIGHWAY: THE DIRECT ROUTE FROM MSK CONNECT TO OPENSEARCH”
Streamlining MSK Connect to OpenSearch for Seamless Loading — Part B
Part B — Seamlessly Managing Multi-Topic Data with Single MSK Connector
Welcome back, dear readers! If you’ve followed us from Part A of this blog, thank you for continuing this journey with us. And if you’re new here, welcome! In Part A, we explored the successful steps required to load data from an MSK Connect to an OpenSearch index. Additionally, we discussed the creation of a custom plugin by combining two different connectors, as well as the setup of the MSK connector configuration. Finally, we examined how to verify the data being loaded into the OpenSearch index.
Now, as we dive into Part B, we’ll delve even deeper into how we can load multi-topic data using a single MSK connector, and we will explore some additional MSK connector configurations that will help us manage the data efficiently. So, whether you’re continuing your journey from Part A or starting fresh with Part B, let’s embark together on this exploration of seamlessly managing Multi-Topic Data with MSK Connector and uncover valuable insights along the way.
In the previous part, we have already covered creating roles, and policies, customizing MSK plugins, and creating an MSK Connector. If you want to review that, you can check the previous blog here. Now, we will jump directly to the advanced section, which involves creating an MSK Connector with multi-topic data and some advanced connector configurations.
MSK Connector Configurations
connector.class=io.aiven.kafka.connect.opensearch.OpensearchSinkConnector
type.name=kafka-connect
tasks.max=5
bootstrap.servers=b-1.vpc-test-2.o4q88o.c6.kafka.ap-south-1.amazonaws.com:9094,b-2.vpc-test-2.o4q88o.c6.kafka.ap-south-1.amazonaws.com:9094,b-3.vpc-test-2.o4q88o.c6.kafka.ap-south-1.amazonaws.com:9094
topics=topic1-json,topic2-json,topic3-json
key.ignore=true
schema.ignore=true
key.converter.schemas.enable=false
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
connection.url=https://test-abcdefghijklmnopqrstuvw33xyz.ap-south-1.es.amazonaws.com:443
connection.username=test
connection.password=Test@123
transforms=Filter
transforms.Filter.type=io.aiven.kafka.connect.transforms.FilterByFieldValue$Value
transforms.Filter.field.name=EventId
transforms.Filter.field.value.pattern=^5000$
transforms.Filter.field.value.matches=true
data.stream.enabled=True
data.stream.prefix=pct
data.stream.timestamp.field=TimeStamp
max.buffered.records=600
errors.tolerance=all
errors.log.enable=True
errors.log.include.messages=True
errors.deadletterqueue.context.headers.enable=False
errors.deadletterqueue.topic.name=dlq-topic-json
behavior.on.version.conflict=ignore
behavior.on.null.values=ignore
behavior.on.malformed.documents=ignore
drop.invalid.message=False
index.write.method=upsert
batch.size=1000
flush.timeout.ms=50000
compact.map.entries=True
max.retries=3
connection.timeout.ms=3000
In the above connector configurations, we have passed a list of topics from which we are loading data into OpenSearch. The list of topics for the OpenSearch sink connector is divided by commas like this.
topics=topic1-json,topic2-json,topic3-json
OpenSearch Sink Connector Configuration Options
In the previous part of our blog, we assessed the options up until transforms.Filter.field.value.matches=true
. Now, let's evaluate the advanced OpenSearch Sink Connector Configuration Options in the connector configuration so you can replace them with your values.
1). data.stream.enabled = True
Enable the use of data streams. If set to true the connector will write to data streams instead of regular indices. The default is false.
2). data.stream.prefix = pct
Generic data stream name to write into. If set, it will be used to construct the final data stream name in the form of {data.stream.prefix}-{topic}. In this case, it’s set to “pct”.
3). data.stream.timestamp.field = TimeStamp
The Kafka record field to used as the timestamp for the @timestamp field in documents sent to a data stream. Here, it’s set to “TimeStamp”.
4). max.buffered.records = 600
The maximum number of records for each task will buffer before blocking the acceptance of more records. This config can be used to limit the memory usage for each task. In this case, it’s set to 600 records.
5). errors.tolerance = all
Sets the tolerance level for errors. “all” indicates tolerance for all errors.
6). errors.log.enable = True
Enables logging of errors. It’s set to True, indicating that error logging is enabled.
7). errors.log.include.messages = True
Specifies whether error messages should be included in the logs. True means error messages will be included.
8). errors.deadletterqueue.context.headers.enable = False
Enables or disables the inclusion of context headers in the dead-letter queue. Here, it’s set to False, indicating that context headers in the dead-letter queue are disabled.
9). errors.deadletterqueue.topic.name = dlq-topic-json
Specifies the name of the topic used for the dead-letter queue. In this case, it’s set to “dlq-topic-json”. Here, the term “topic” needs to be replaced with the actual topic name.
10). behavior.on.version.conflict = ignore
How to handle records that OpenSearch rejects due to document version conflicts. It may happen when offsets were not committed or/and records have to be reprocessed. Here, it’s set to “ignore”, indicating that conflicts will be ignored.
Valid options are:
ignore
- ignore and keep the existing recordwarn
- log a warning message and keep the existing recordreport
- report to errant record reporter and keep the existing recordfail
- fail the task.
11). behavior.on.null.values = ignore
How to handle records with a non-null key and a null value (i.e. Kafka tombstone records). Valid options are ignore
, delete
, and fail
.
12). behavior.on.malformed.documents = ignore
How to handle records that OpenSearch rejects due to some malformation of the document itself, such as an index mapping conflict or a field name containing illegal characters.
Valid options are:
ignore
- do not index the recordwarn
- log a warning message and do not index the recordreport
- report to errant record reporter and do not index the recordfail
- fail the task.
13). drop.invalid.message = False
Whether to drop a Kafka message when it cannot be converted to an output message. Here, it’s set to False.
14). index.write.method = upsert
The method used to write data into the OpenSearch index. The default value is insert
which means that the record with the same document ID will be replaced. The upsert
will create a new document if one does not exist or will update the existing document.
15). batch.size = 1000
The number of records to process as a batch when writing to OpenSearch. It’s set to 1000 records per batch.
16). flush.timeout.ms = 50000
The timeout in milliseconds to use for periodic flushing, and when waiting for buffer space to be made available by completed requests as records are added. If this timeout is exceeded the task will fail. Here, it’s set to 50,000 milliseconds (or 50 seconds).
17). compact.map.entries = True
Defines how map entries with string keys within record values should be written to JSON. When this is set to true
, these entries are written compactly as "entryKey": "entryValue"
. Otherwise, map entries with string keys are written as a nested document {"key": "entryKey", "value": "entryValue"}
. All map entries with non-string keys are always written as nested documents. Prior, this connector always wrote map entries as nested documents, so set this to false
use that older behavior.
18). max.retries = 3
The maximum number of retries that are allowed for failed indexing requests. If the retry attempts are exhausted the task will fail. The default value is 5.
19).connection.timeout.ms=3000
How long to wait in milliseconds when establishing a connection to the OpenSearch server. The task fails if the client fails to connect to the server in this interval, and will need to be restarted. The default value is 3000 (3 seconds).
Pros
- Utilizing a single connector streamlines the data loading process, making it easier to manage and maintain.
- With a single connector, there’s less overhead and potential for errors compared to managing multiple connectors.
- By reducing the number of connectors, you may lower costs associated with resource allocation and management.
- Loading data through a single connector ensures consistency in data ingestion methods and configurations.
- The use of a single connector can simplify scalability efforts, as you only need to manage scaling for one integration point.
Cons
- Using a single connector may limit flexibility in terms of data transformation and routing options compared to using multiple specialized connectors.
- If the single connector becomes a bottleneck due to high data volume or complex data processing requirements, it could impact overall system performance.
- Relying on a single connector means that any issues or downtime with that connector could potentially disrupt data loading for all topics.
- While using fewer connectors can simplify management in some cases, it may also increase complexity if the single connector needs to handle diverse data sources or formats.
- Concentrating all data loading tasks onto a single connector may increase the risk of overloading and resource contention, particularly during peak usage periods.
If you’re interested in delving deeper into similar topics, don’t hesitate to visit my profile for more insightful blogs. You can find it right here.
Conclusion
By leveraging a single MSK connector, users can simplify management processes while enhancing data ingestion capabilities. Despite potential limitations, such as reduced flexibility and dependency risks, the benefits of simplicity, scalability, and cost-effectiveness outweigh the drawbacks. With careful configuration and consideration of specific requirements, utilizing a single connector proves to be a valuable solution for seamlessly loading data into OpenSearch, empowering organizations to optimize their data management strategies and drive actionable insights.