Integrating Kafka with Coherence
Coherence is a data grid that provides high-speed, high-volume processing of in-memory data at rest. In some cases, the data needs to be persisted, exists in external frameworks or systems, or needs to be sent to external data sources.
Apache Kafka is a popular streaming framework that is bi-directional, data-agnostic and key/value based. It is a solution of choice for a number of data streaming problems, and a great messaging and integration platform.
Sometimes, you need both.
This article gives an overview of our Kafka integration, which consists of two main parts:
- A Kafka Connect Sink connector; this is a plugin that can be installed onto Connect as a separate process, which consumes Kafka records and saves them as entries in a Coherence
NamedMap
:
- A Kafka Entry Store; this is a Coherence
CacheStore
implementation and sits in a Coherence cluster; it is invoked every time an operation is performed on aNamedCache
to forward the entry from Coherence to Kafka; it runs alongside the service where the corresponding backing map resides, on the node owning the corresponding partition:
Note:
You may notice that we are not using a “Source” type of Kafka Connect plugin in order to propagate data from Coherence to Kafka. The reason for that is that it is simpler and more efficient and scalable to use the Coherence
CacheStore
for this purpose, since publishing of new or modified entries to Kafka is performed immediately upon entry modification, on a cluster member that owns an entry, and across all cluster members in parallel.Kafka Connect Source plugin, on the other hand, would require us to query for changes across the Coherence cluster, which is not nearly as efficient or scalable, and introduces a significant bottleneck into the architecture.
Getting Started
For the examples below, you can download the artifacts directly from GitHub and/or Maven Central.
Files needed are:
The project source code is located in the Coherence Kafka repository on GitHub.
Sink Connector
Without further ado, let’s set up our Kafka Sink Connector to import data from a Kafka topic into Coherence. This is best run on Linux or MacOS, but can easily be adapted for Windows.
We will set up a development Kafka deployment along with a Connect instance that will run the Sink as a plugin.
Download Kafka open source and install on your machine; this is as simple as calling tar tvzf
on the downloaded archive:
Start Zookeeper and Kafka
From the location where Kafka is installed, run the commands below each in their own shell.
Start Zookeeper
The default properties, in config/zookeeper.properties
can be used:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Start Kafka
Again, with default properties for a standalone server:
$ bin/kafka-server-start.sh config/server.properties
Configure and start Kafka Connect
We need to add the location of the plugin zip file to the plugin.path
property in the connect configuration. For this example we use a standalone deployment (config/connect-standalone.properties
) but in a cluster environment the connect-distributed.properties
should be used instead.
plugin.path=/path/to/coherence-sink-connector
Where /path/to/coherence-sink-connector
is the location where the Sink Connector was downloaded into.
After that, the plugin configuration itself can be added. For this, edit a properties file (coherence-sink.properties
, for example), and insert the following content:
The properties are:
Once configured, the Connect instance can be started:
$ bin/connect-standalone.sh \
config/connect-standalone.properties \
coherence-sink.properties
By default, you will see a Coherence member server starting inside the Connect instance:
If you see these messages, the Connect instance has started correctly and deployed and started the Coherence Sink connector.
This member will be a full-fledged Coherence cluster member, that can be configured in the same was as any Coherence member: you can add operational override and cache config files to the classpath of the connect instance, and they will be used by Coherence.
For now let’s use the default ones.
Start a Kafka Producer and generate messages
From the Kafka installation directory, run the following command:
$ bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic MyTopic \
--property "parse.key=true" \
--property "key.separator=:"
This producer will then display a prompt and you can start generating messages in the form key:value
. For example:
>key1:value1
>abc:def
The created messages will be published to the topic MyTopic
, as defined above.
Start a Coherence Console for testing
By now the Coherence Sink plugin will have picked up the messages and stored them as map entries. Now let’s verify that by connecting a Coherence Console to the member running in Connect, and checking that a map exists and contains the entries.
All that is needed on the classpath is the main Coherence JAR, coherence-21.12.jar
.
At the prompt, enter the commands:
The Kafka messages have been stored in a map with the same name as the topic, which is the default.
Coherence Kafka Sink also supports some advance capabilities not shown here, that are worth mentioning:
- Topic to map name mapping
Allows you to use a different name for the map and the topic. - Pass-through data transfer
When the data in Kafka is in a Coherence serialized format already, this avoids incurring the cost of deserializing from Kafka and serializing back into Coherence; this is useful in the case where Kafka is used as a data store - More complex serialization use-cases
Any supported Kafka key and value serializers can be used when reading messages from a topic, and any Coherence serializer can be used when writing data to Coherence. For example, you could deserialize Kafka message from Avro or JSON into a POJO, and then store that POJO in Coherence using POF. You could also use POF within Kafka itself, as bothKafkaPofSerializer
andKafkaPofDeserializer
classes are provided within Coherence Kafka.
While the example above creates a two-member cluster with both members being storage-enabled, it is advised to configure the member residing in Kafka Connect as as either a storage-disabled member, or even preferably an Extend client in order to separate functions and not have the Coherence server services interfere with the consumption of Kafka topics.
Kafka Entry Store
The Kafka Entry Store performs the reverse of the above: Coherence entries are automatically sent to a Kafka topic, using a plain Kafka Producer
.
In this example, a single-member can be brought up to demonstrate publishing messages into a Kafka topic.
Start Zookeeper and Kafka
From the location where Kafka is installed, run the commands below each in their own shell.
Start Zookeeper
The default properties, in config/zookeeper.properties
can be used:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Start Kafka
Again, with default properties for a standalone server:
$ bin/kafka-server-start.sh config/server.properties
Start a Kafka consumer listening on topic foo
In its own shell, from the Kafka install directory, run:
$ bin/kafka-console-consumer.sh \
-bootstrap-server localhost:9092 \
—topic foo \
—property print.key=true
This will block and wait for messages to be produced.
Create a Coherence cache config
In this cache config, add a cachestore-scheme
which will associate the Kafka entry store with the configured Coherence map or cache:
Save it as coherence-cache-config.xml
, and make sure it is on the class path for the command in the next step.
Note: The
topic-name
is a property that is only understood by this entry store extension; the rest of the properties are passed directly to the underlying Kafka producer, after changing the dash into a dot. For example, the value withinbootstrap-servers
XML element will be passed as a configuration propertybootstrap.servers
to KafkaProducer
.This allows for using different versions of Kafka which may employ different properties, and also allows directly passing the Kafka configuration without having to handle each property individually.
Start a Coherence console
Notice the class path, which references:
.
(current directory), which is where thecoherence-cache-config.xml
file resides; use a different directory if you saved that file elsewhere;coherence-kafka-core-1.0.0.jar
, which is the JAR that contains theKafkaEntryStore
implementation and configuration support for it;coherence-21.12.jar
, the main Coherence JAR, for Console functionality or starting a cluster member;- Kafka libraries
At the prompt, enter the following commands:
Switch to the shell where the Kafka Consumer is running, and see the message being consumed:
aaa bbb
Conclusion
In these simple examples we have seen bi-directional integration of Coherence with Kafka using a Sink Connector to import data from Kafka into Coherence, and custom entry store implementation to publish changes from Coherence to Kafka.
The entry store approach is simple in its design, and also makes use of Coherence’s NonBlockingEntryStore
which performs all Kafka operations on their own threads for more efficient processing, in a non-blocking fashion.
In the other direction, the Sink Connector needs to poll and this is best achieved using a separate process which can be tuned on its own: you can add more workers or more task threads.
Both integrations allow you to use any serialization format you want with both Kafka and Coherence, providing for ultimate flexibility in data representation, and if you want ultimate efficiency and prefer to avoid serialization altogether, a pass-through mode is supported as well.
Hope you’ll enjoy using Coherence Kafka integration when you need it, and as always, we welcome any feedback and suggestions you may have.