Spark and Neo4j Integration
In this article, I will take you through the core concepts of Neo4j (Network Exploration and Optimization 4 Java) and how we can leverage Spark with Neo4j in a real-time scenario. We will be going through how we can connect to Spark and use Neo4j effectively with the quickest query language, Cypher, and we will also discuss the Neo4j optimization.
Upon completing this, you will get familiar with what, when, and why Neo4j is used with its core concepts, working, and connection with Spark. The target audience for this article is data analysts, engineers, and other data enthusiasts who want to explore the graph database.
The prerequisites are basic SQL knowledge and any programming language, preferably Python, Scala, or Java.
Brief descriptions
Neo4j is a graph database management system. Neo4j follows a property graph model to store and manage its data. It is highly scalable, schema-free (NoSQL), and supports ACID rules.
Neo4j provides a powerful declarative language known as Cypher. Cypher is a human-friendly query language that uses ASCII-Art to represent visual graph patterns for finding or updating data in Neo4j.
Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.
Cypher is used as a query language to query the Neo4j database. Cypher is a very declarative way of querying your graph database by traversing the nodes in your graph using all the relations defined compellingly.
Now we got a brief introduction to the above terminology. Let's deep dive and understand the relationship between Neo4j and Cypher.
Neo4j and Cypher
Below is an example of a simple Neo4j query output. In the query, we are querying a Movie Label and we are checking movie release date should be greater than 2000.
Match (m:Movie) where m.released > 2000 RETURN m limit 1
The colored circles are NODES. Nodes can hold a set of PROPERTIES which can be seen on the Right Side Panel which is the way to store data in them. The lines connecting nodes are RELATIONSHIPS (which can also hold data!), and the grey part is the expansion if we click on the grey part multiple nodes can come, connected to one node. The SQL developer can relate nodes to relational database tables, which have columns.
Now we understand Neo4j bits and pieces. Let's talk about leveraging the Neo4j connector for Spark with Neo4j.
Neo4j Connector for Apache Spark
Neo4j Connector for Apache Spark provides easy, bi-directional access between Neo4j graph datasets and many other data sources — including relational databases, semi-structured and unstructured (NoSQL) repositories — transforming data from tables to graphs and back as needed. The new connector is available at no cost and is fully supported by Neo4j customers. The below image gives a pictorial overview of how Neo4j Connector helps while connecting to Spark.
Neo4j Connector for Apache Spark allows you to read from and write to Neo4j databases. It's easy to use, although it can be highly customized.
The Neo4j Spark Connector uses the binary Bolt protocol to transfer data from and to a Neo4j server. It offers Spark APIs for RDD, DataFrame, GraphX, and GraphFrames, so you can choose how you want to use and process your Neo4j graph data in Apache Spark.
You can find more information and details here.
As you got some knowledge of how the connector works, now let's set up the connector to use Spark and Neo4j together.
Set-Up of using Connector
Neo4j-connector-apache-spark_2.12: For connecting to Spark, we need to use Neo4j-connector-apache-spark_2.12–4.0.1_for_spark_3.jar.
You can download the connector JAR from Neo4j Connector Page or the GitHub releases page. If you are using maven, then please use the below dependency.
<dependencies>
<! -- list of dependencies -->
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-connector-apache- spark_${scala.version}</artifactId>
<version>4.0.1_for_spark_${spark.version}</version>
</dependency>
</dependencies>
We have set up the connector; let's now figure out how to configure it.
Configuring the connector
For configuring the connector, we need the following:
- Neo4j-URL
- Username and password
spark = SparkSession.builder()
.config("spark.neo4j.bolt.url", NEO4j_3URL)
.config("spark.neo4j.bolt.password", NEO4j_PASS)
.config("spark.neo4j.bolt.user", NEO4j_USERNAME)
Below are the basic steps we need to use to configure.
- Create org.neo4j.spark.Neo4j(sc).
- Set cypher(query,[params]),nodes(query,[params]),reels(query,[params]) as direct query, or
pattern(“Label1”,Seq(“REL”),”Label2") or pattern( (“Label1”,”prop1"),(“REL”,”prop”),(“Label2”,”prop2") ). - optionally define partitions(n), batch(size), rows(count) for parallelism(refer below example).
- choose which datatype.
- loadRowRdd, loadNodeRdds, loadRelRdd, loadRdd[T]
- loadDataFrame,loadDataFrame(schema)
- loadGraph[VD,ED]
- loadGraphFrame[VD,ED]
Below is an example of the above steps.
import org.neo4j.spark._
val neo = Neo4j(sc)
val rdd = neo.cypher("MATCH (n:Person) RETURN id(n) as id ").loadRowRdd
rdd.count
// inferred schema
rdd.first.schema.fieldNames
// => ["id"]
rdd.first.schema("id")
// => StructField(id,LongType,true)
neo.cypher("MATCH (n:Person) RETURN id(n)").loadRdd[Long].mean
// => res30: Double = 236696.5
neo.cypher("MATCH (n:Person) WHERE n.id <= {maxId} RETURN n.id").param("maxId", 10).loadRowRdd.count
// => res34: Long = 10
In the above code, we have used the loadRdd method.
Similar operations are available for DataFrames and GraphX. More examples and details can be found in the docs of the GitHub repository.
At this time our connector got set up; let's work on reading and writing from Neo4j through Spark.
Reading from Neo4j
Below code is the sample code for reading data from Neo4j through Spark.
In the below configuration, we are configuring data source format, URL for connection, and providing username and password (in real-time, we will not do this configuration for username and password, we will use a secure way, this is just easy to understand). Finally, the last option we are using PERSON label of Neo4j database to read the data.
Dataset < Row > ds = spark
.read()
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "password")
.option("labels", "Person")
.load();
ds.show();
Writing to Neo4j
Writing to your Graph Database works similarly. In the following query, we will update our original data to include names for the first two nodes.
In the below configuration code, we are creating a simple Spark DataFrame, which we will be writing into the Neo4j database. We are configuring the URL and authentication type for connection and providing a username and password (in real-time, we will not do this configuration for username and password, we will use a secure way, this is just easy to understand).
At last, we are going to write on the PERSON label of the ID property.
df = spark.createDataFrame([(1, "John"), (2, "Thomas")], ["id", "name"]) df.write.format("org.neo4j.spark.DataSource")
.option("url", "bolt://XXX.XXX.XXX.XXX:7687")
.option("authentication.type", "basic")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "password")
.option("labels", ":Person")
.option("node.keys", "id")
.mode("Overwrite")
.save()
Optimization with neo4j
Whenever we use Cypher queries with Spark, we can optimize it to give high performance. The following things we can do to optimize.
- Profiling Queries
One of the more essential things you can do is profile your query when you run it with PROFILE or EXPLAIN: it will give you a query plan indicating the expensive operations you are doing.
In the above example, you can see the profile helps to explain the Query Planner. It gives us information about memory and keeps track of how many rows pass through each operator.
- Use of index
Like other DBMS languages(ORACLE, MySQL), Neo4j also has an index concept that will make searches of related data more efficient. But it will come at a cost, which means it slows down the writing speed, so deciding what to index and what not to index is an important and often non-trivial task.
Indexes are most often used for MATCH and OPTIONAL MATCH clauses that combine a label/relationship type predicate with a property predicate.
Therefore, knowing what kind of predicates can be solved by the different indexes is essential. Now let's discuss the types of indexes.
- Lookup indexes
A named NODE label lookup index for all nodes with one or more labels can be created.
Syntax:
CREATE LOOKUP INDEX index_name FOR (n) ON EACH labels(n)
Example:
In the below code, we are creating a node label lookup index where node_label_lookup_index is the name of the index.
CREATE LOOKUP INDEX node_label_lookup_index FOR (n) ON EACH labels(n)
A named RELATIONSHIP type lookup index for all relationships with any relationship type can be created.
Syntax:
CREATE LOOKUP INDEX rel_type_lookup_index FOR ()-[r]-() ON EACH type(r)
Example:
CREATE LOOKUP INDEX rel_type_lookup_index FOR ()-[r]-() ON EACH type(r)
Note that a relationship-type lookup index can only be created once and that the index name must be unique. Only one relationship type lookup index can exist at the time.
- Ranges Indexes
A named range index on a single property can be created for all relationships with a particular relationship type.
Syntax:
CREATE INDEX index_name FOR ()-[r:TYPE]-() ON (r.property)
Example
Below is an example of creating a range index where rel_range_index_name is the name of the index. We are using the SINCE property of the KNOWS relationship.
CREATE INDEX rel_range_index_name FOR ()-[r:KNOWS]-() ON (r.since)
Syntax:
The syntax for range index on relationships, either on a single property or composite. The index provider can be specified using the OPTIONS clause.
CREATE [RANGE] INDEX [index_name] [IF NOT EXISTS]
FOR ()-"["r:TYPE_NAME"]"-()
ON (r.propertyName_1[,
r.propertyName_2,
…
r.propertyName_n])
[OPTIONS "{" option: value[, ...] "}"]
Example:
The clause is used to create a range index with a specific index provider. Only one valid value exists for the index provider: the default value. Below is the example in which we create a range index with the name range_index.
CREATE INDEX range_index
FOR ()-[r:TYPE]-() ON (r.prop1)
OPTIONS {
indexProvider: 'range-1.0'
}
There are many more types of indexes that can be found here.
Conclusion
You are all done. So we understand how Spark integrates with Neo4j, and we have seen the method to configure Neo4j with Spark for reading and writing in Neo4j using and have seen some of the important optimization techniques.
Further Reading
For more query examples and syntax overview, take a deep dive into the official Neo4j Connector for Apache Spark documentation, or check the quick guides on reading and writing from/to Neo4j. Additional examples can be found in the Zeppelin notebook example repository. The connector also supports Cypher queries, allowing you to re-use existing queries from Neo4j Desktop / Web applications. Examples of Cypher queries can be found on the official Cypher page.