A New Neo4j Integration with Apache Kafka

In the past year we have received a lot of requests for an integration of Neo4j with Apache Kafka and other streaming data solutions. So a few weeks ago, with the help of our Italian partner LARUS (esp. Andrea Santurbano) and our colleague Stefan Armbruster, we started to work on a first integration.

Today we want to make this available in a first release under an Apache License for you to try out and test. It works with Neo4j from 3.4.x and Kafka from 0.10.x.

Our integration is meant to be deployed as a Neo4j plugin, not as a Kafka Connect Connector yet.

The plugin has three modes of operation:

  • via a user defined Procedure to send individual payloads to a topic,
  • as a Producer, publishing Change Events from Neo4j, and
  • as a Consumer consuming Events from Kafka using templated Cypher statements.

Streams Procedure

To use the procedure you have to add the Kafka server config. Additionally, we require an explicit flag to start up the integration: streams.procedures.enabled=true

Then you can use the CALL streams.publish(topic, payload) statement to send arbitrary data. Scalar values, but also nodes, relationships, paths, maps, or lists. You can find more details in the documentation.

For example:

CALL streams.publish('my-topic', 'Hello World from Neo4j!')

The message retrieved from the Consumer is the following:

{"payload":"Hello world from Neo4j!"}

Kafka Producer

Taking inspiration from earlier ideas and our discussions with the Debezium team, the first focus area was making change events from Neo4j available to Kafka, so that other systems can consume them and, for example, update downstream systems.

You can configure what information is published to which TOPIC by providing patterns.

streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>

For example

streams.source.topic.nodes.master-data=\
Person{*};Address{street, zip, city};Profile{-description}
streams.source.topic.relationships.master-data=\
LIVES_AT{since};HAS_PROFILE

After installation and configuration of your Kafka endpoints, the plugin is automatically up and running after Neo4j is started. Each transaction communicates its changes to the Neo4j event listener.

We expose creation, updates, and deletes of Nodes and Relationships and provide before-and-after information. There is already some auditing metadata available but we will add more to that. Those events are sent asynchronously to Kafka, so the commit path should not be influenced by that.

The Event structure was inspired by the Debezium format and looks, for example, like this:

{
“meta”: {
“timestamp”: 1532597182604,
“username”: “neo4j”,
“tx_id”: 3,
“tx_event_id”: 0,
“tx_events_count”: 2,
“operation”: “created”,
“source”: {
“hostname”: “neo4j.mycompany.com” }
},
“payload”: {
“id”: “1004”,
“type”: “node”,
“after”: {
“labels”: [“Person”],
“properties”: {
“last_name”: “Kretchmar”,
“email”: “annek@noanswer.org”,
“first_name”: “Anne Marie” }
}
}}

More examples can be found in the producer documentation. Currently, we expose Events as binary JSON but Avro is on the roadmap.

Kafka Consumer

Another important feature, was the ability to consume events from Kafka.

Initially, we thought about a generic consumer with a fixed projection of events into Nodes and Relationships. But we felt this fell short of real-world needs. So we decided that instead, we want to give the user the ability to use custom Cypher statements per topic to turn Events into arbitrary graph structures.

So you can choose yourself what to do with a complex Kafka event. Which parts of it you want to use for which purpose.

Besides your Kafka connection information, you just add entries like this to your Neo4j config:

streams.sink.topic.cypher.<TOPIC>=<CYPHER_QUERY>

For example for an event like this:

{
“id”: 42,
“properties”: {
“title”: “Answer to anyting”,
“description”: “It depends.”}
}

you can use a configuration like this:

streams.sink.topic.cypher.my-topic=\
MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties

Under the hood, the consumer takes a batch of events and passes them as $batch parameter to the Cypher statement, which we prefix with an UNWIND, so each individual entry is available as event identifier to your statement.

So the final statement executed by Neo4j would look like this:

UNWIND $batch AS event
MERGE (n:Label {id: event.id})
ON CREATE SET n += event.properties

You can affect the batch sizes with your Kafka configuration for the topic.

For more information, see the Streams Consumer documentation.

Give us Your Feedback

Grab the release JAR from GitHub and put it into your $NEO4J_HOME/plugins.

Then, depending on which feature you want to use, you need to add the configuration to $NEO4J_HOME/conf/neo4j.conf. You can find more details in the project’s README.

In Neo4j Desktop, open the plugins folder via the Button-Drop-Down in the “Manage” view and add the config in the “Settings” tab.

kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092
# and
streams.procedures.enabled=true
# or
streams.source.enabled=true
streams.source.topic.nodes.<topic-name>=PATTERN
# or
streams.sink.enabled=true
streams.sink.topic.cypher.<topic-name>=CYPHER-QUERY

You can also run the setup with Docker Compose.

It would be very helpful for us if you could help test the producer and consumer part of our configuration in more real-world Kafka and Neo4j settings than we can provide and fill out our feedback survey.

If you run into any issues or have thoughts about improving our work, please raise a GitHub issue.

The existing features are also covered in the documentation. If you have suggestions on how to improve it or the getting started experience, please let us know.

A Kafka Connect Neo4j Connector

After a really good discussion with the folks from Confluent we agreed to extend the scope of the current extension towards a fully supported and verified connector for the Kafka Connect framework. So stay tuned for that.

Happy Streaming,

Michael