Integrating Kafka with Coherence

Maurice Gamanho
Oracle Coherence
Published in
7 min readJan 18, 2022

Coherence is a data grid that provides high-speed, high-volume processing of in-memory data at rest. In some cases, the data needs to be persisted, exists in external frameworks or systems, or needs to be sent to external data sources.

Apache Kafka is a popular streaming framework that is bi-directional, data-agnostic and key/value based. It is a solution of choice for a number of data streaming problems, and a great messaging and integration platform.

Sometimes, you need both.

This article gives an overview of our Kafka integration, which consists of two main parts:

  • A Kafka Connect Sink connector; this is a plugin that can be installed onto Connect as a separate process, which consumes Kafka records and saves them as entries in a Coherence NamedMap:
Sink Connector
  • A Kafka Entry Store; this is a Coherence CacheStore implementation and sits in a Coherence cluster; it is invoked every time an operation is performed on a NamedCache to forward the entry from Coherence to Kafka; it runs alongside the service where the corresponding backing map resides, on the node owning the corresponding partition:
Kafka Entry Store

Note:

You may notice that we are not using a “Source” type of Kafka Connect plugin in order to propagate data from Coherence to Kafka. The reason for that is that it is simpler and more efficient and scalable to use the Coherence CacheStore for this purpose, since publishing of new or modified entries to Kafka is performed immediately upon entry modification, on a cluster member that owns an entry, and across all cluster members in parallel.

Kafka Connect Source plugin, on the other hand, would require us to query for changes across the Coherence cluster, which is not nearly as efficient or scalable, and introduces a significant bottleneck into the architecture.

Getting Started

For the examples below, you can download the artifacts directly from GitHub and/or Maven Central.

Files needed are:

The project source code is located in the Coherence Kafka repository on GitHub.

Sink Connector

Without further ado, let’s set up our Kafka Sink Connector to import data from a Kafka topic into Coherence. This is best run on Linux or MacOS, but can easily be adapted for Windows.

We will set up a development Kafka deployment along with a Connect instance that will run the Sink as a plugin.

Download Kafka open source and install on your machine; this is as simple as calling tar tvzf on the downloaded archive:

Start Zookeeper and Kafka

From the location where Kafka is installed, run the commands below each in their own shell.

Start Zookeeper

The default properties, in config/zookeeper.properties can be used:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka

Again, with default properties for a standalone server:

$ bin/kafka-server-start.sh config/server.properties

Configure and start Kafka Connect

We need to add the location of the plugin zip file to the plugin.path property in the connect configuration. For this example we use a standalone deployment (config/connect-standalone.properties) but in a cluster environment the connect-distributed.properties should be used instead.

plugin.path=/path/to/coherence-sink-connector

Where /path/to/coherence-sink-connector is the location where the Sink Connector was downloaded into.

After that, the plugin configuration itself can be added. For this, edit a properties file (coherence-sink.properties, for example), and insert the following content:

The properties are:

Once configured, the Connect instance can be started:

$ bin/connect-standalone.sh \
config/connect-standalone.properties \
coherence-sink.properties

By default, you will see a Coherence member server starting inside the Connect instance:

If you see these messages, the Connect instance has started correctly and deployed and started the Coherence Sink connector.

This member will be a full-fledged Coherence cluster member, that can be configured in the same was as any Coherence member: you can add operational override and cache config files to the classpath of the connect instance, and they will be used by Coherence.

For now let’s use the default ones.

Start a Kafka Producer and generate messages

From the Kafka installation directory, run the following command:

$ bin/kafka-console-producer.sh \
--broker-list localhost:9092 \
--topic MyTopic \
--property "parse.key=true" \
--property "key.separator=:"

This producer will then display a prompt and you can start generating messages in the form key:value. For example:

>key1:value1
>abc:def

The created messages will be published to the topic MyTopic, as defined above.

Start a Coherence Console for testing

By now the Coherence Sink plugin will have picked up the messages and stored them as map entries. Now let’s verify that by connecting a Coherence Console to the member running in Connect, and checking that a map exists and contains the entries.

All that is needed on the classpath is the main Coherence JAR, coherence-21.12.jar.

At the prompt, enter the commands:

The Kafka messages have been stored in a map with the same name as the topic, which is the default.

Coherence Kafka Sink also supports some advance capabilities not shown here, that are worth mentioning:

  • Topic to map name mapping
    Allows you to use a different name for the map and the topic.
  • Pass-through data transfer
    When the data in Kafka is in a Coherence serialized format already, this avoids incurring the cost of deserializing from Kafka and serializing back into Coherence; this is useful in the case where Kafka is used as a data store
  • More complex serialization use-cases
    Any supported Kafka key and value serializers can be used when reading messages from a topic, and any Coherence serializer can be used when writing data to Coherence. For example, you could deserialize Kafka message from Avro or JSON into a POJO, and then store that POJO in Coherence using POF. You could also use POF within Kafka itself, as both KafkaPofSerializer and KafkaPofDeserializer classes are provided within Coherence Kafka.

While the example above creates a two-member cluster with both members being storage-enabled, it is advised to configure the member residing in Kafka Connect as as either a storage-disabled member, or even preferably an Extend client in order to separate functions and not have the Coherence server services interfere with the consumption of Kafka topics.

Kafka Entry Store

The Kafka Entry Store performs the reverse of the above: Coherence entries are automatically sent to a Kafka topic, using a plain Kafka Producer.

In this example, a single-member can be brought up to demonstrate publishing messages into a Kafka topic.

Start Zookeeper and Kafka

From the location where Kafka is installed, run the commands below each in their own shell.

Start Zookeeper

The default properties, in config/zookeeper.properties can be used:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

Start Kafka

Again, with default properties for a standalone server:

$ bin/kafka-server-start.sh config/server.properties

Start a Kafka consumer listening on topic foo

In its own shell, from the Kafka install directory, run:

$ bin/kafka-console-consumer.sh \
-bootstrap-server localhost:9092 \
—topic foo \
—property print.key=true

This will block and wait for messages to be produced.

Create a Coherence cache config

In this cache config, add a cachestore-scheme which will associate the Kafka entry store with the configured Coherence map or cache:

Save it as coherence-cache-config.xml, and make sure it is on the class path for the command in the next step.

Note: Thetopic-name is a property that is only understood by this entry store extension; the rest of the properties are passed directly to the underlying Kafka producer, after changing the dash into a dot. For example, the value withinbootstrap-servers XML element will be passed as a configuration property bootstrap.servers to Kafka Producer.

This allows for using different versions of Kafka which may employ different properties, and also allows directly passing the Kafka configuration without having to handle each property individually.

Start a Coherence console

Notice the class path, which references:

  1. . (current directory), which is where the coherence-cache-config.xml file resides; use a different directory if you saved that file elsewhere;
  2. coherence-kafka-core-1.0.0.jar, which is the JAR that contains the KafkaEntryStore implementation and configuration support for it;
  3. coherence-21.12.jar, the main Coherence JAR, for Console functionality or starting a cluster member;
  4. Kafka libraries

At the prompt, enter the following commands:

Switch to the shell where the Kafka Consumer is running, and see the message being consumed:

aaa bbb

Conclusion

In these simple examples we have seen bi-directional integration of Coherence with Kafka using a Sink Connector to import data from Kafka into Coherence, and custom entry store implementation to publish changes from Coherence to Kafka.

The entry store approach is simple in its design, and also makes use of Coherence’s NonBlockingEntryStore which performs all Kafka operations on their own threads for more efficient processing, in a non-blocking fashion.

In the other direction, the Sink Connector needs to poll and this is best achieved using a separate process which can be tuned on its own: you can add more workers or more task threads.

Both integrations allow you to use any serialization format you want with both Kafka and Coherence, providing for ultimate flexibility in data representation, and if you want ultimate efficiency and prefer to avoid serialization altogether, a pass-through mode is supported as well.

Hope you’ll enjoy using Coherence Kafka integration when you need it, and as always, we welcome any feedback and suggestions you may have.

--

--