How to ingest data into Neo4j from a Kafka stream

Andrea Santurbano
Feb 15, 2019 · 6 min read

This article is the second part of the Leveraging Neo4j Streams series (Part 1 is here). I’ll show how to bring Neo4j into your Apache Kafka flow by using the Sink module of the Neo4j Streams project in combination with Apache Spark’s Structured Streaming Apis.

Photo by Hendrik Cornelissen on Unsplash

In order to show how to integrate them, simplify the integration, and let you test the whole project yourself, I’ll use Apache Zeppelin — a notebook runner that simply allows you to natively interact with Neo4j.

The result

Leveraging Neo4j Streams

The Neo4j Streams project is composed of three main pillars:

  • The Change Data Capture that allows you to stream database changes over Kafka topics
  • The Sink (the subject of the first article) that allows consuming data streams from Kafka topics
  • A set of procedures that allows you to Produce/Consume data to/from Kafka Topics

The Neo4j Streams Sink

This module allows Neo4j to consume data from a Kafka topic. It does it in a “smart” way: by allowing you to define your custom queries. What you need to do is write in your neo4j.conf something like this:

So if you define a query just like this:

And for events like this:

Under the hood the Sink module will execute a query like this:

The batch parameter is a set of Kafka events that are gathered from the SINK and processed in a single transaction in order to maximize the execution efficiency.

So continuing with the example above, a possible full representation could be:

This gives to the developer the power to define their own business rules because you can choose to update, add to, remove, or adapt your graph data based on the events you get.

A simple use case: Ingest data from Open Data APIs

Imagine your data pipeline needs to read data from an Open Data API, enrich it with some other internal source, and in the end persist it into Neo4j. In this case, the best solution for doing this is using Apache Spark. This easily allows managing different data sources with the same Dataset abstraction.

Set-Up the Environment

Going to the following Github repo, you’ll find the whole code necessary in order to replicate what I’m presenting in this article. What you will need to start is Docker, and then you can simply spin up the stack by entering the directory and executing the following command from the terminal:

This will start up the whole environment that comprises:

  • Neo4j + Neo4j Streams module + APOC procedures
  • Apache Kafka
  • Apache Spark
  • Apache Zeppelin
The whole architecture based on Docker containers

By going into Apache Zeppelin @ http://localhost:8080 you’ll find in the directory Medium/Part 2 one notebook “From Open Data to Sink” which is the subject of this article.

The Open Data API

We’ll choose the Italian Ministry of Health dataset of Pharmacy stores.

Define the Sink Query

If you go into the docker-compose.yml file you’ll find a new property that corresponds to the Sink query definition:

The NEO4J_streams_sink_topic_cypher_pharma property defines that all the data that comes from a topic named pharma will be consumed with the corresponding query.

The graph model that results from the query above is:

Our data model

The Notebook — From Open Data to Sink

The first step is download the CSV from the Open Data Portal and load it into a Spark Dataframe:

val url = new
val localFilePath = s"/zeppelin/spark-warehouse/${url.getPath.split("/").last}"
val src ="ISO-8859-1")
val out = new
val csvDF = (
.option("delimiter", ";")
.option("header", "true")

Now let’s explore the structure of the csvDF:

We want to focus on two fields:

  • CODICEIDENTIFICATIVOFARMACIA: it “should” be the unique identifier given by the Italian Ministry of Health to a Pharmacy Store
  • DATAFINEVALIDITA: it indicates if the Pharmacy Store is still active (if it has no value it is active, otherwise it is closed)

We now save the Dataframe into a Spark temp view called OPEN_DATA:

Let’s now overwrite the OPEN_DATA temp view by filtering the dataset for valid records and renaming some fields:

Let’s now create the OPEN_DATA_KAFKA_STAGE temp view that must contain two columns:

  • VALUE: JSON that represents the data that we want to send to the Kafka topic
  • KEY: a key that identifies the row

You may notice that this is exactly the minimum requirement for a ProducerRecord:

Let’s now send the data to the pharma topic via spark:

The data streamed to the pharma topic via the spark job will now be consumed from the Neo4j Streams Sink module thanks to the Cypher template that we defined at the beginning of the article.

Now in the final paragraph, we can explore the ingested data. In the following video we are exploring all the Pharmacy stores located in Turin:

Explore the data just ingested

Wrapping up

In this second article (please check the first one if you haven’t already) we have seen how to use the SINK module in order to transform Apache Kafka events into arbitrary Graph Structures. You can do it in a very simple way by using the Apache Spark APIs.

In Part 3 we’ll discover how to use the Streams procedure in order to produce/consume data directly via Cypher queries, so please stay tuned!

If you have already tested the Neo4j-Streams module or tested it via this notebook please fill out our feedback survey.

If you run into any issues or have thoughts about improving our work, please raise a GitHub issue.

We’ve moved to

We’ve moved to and publish tons of tutorials each week. See you there.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Start a blog

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store