Uploading JSON data to Neo4j

Jason Koo
7 min readDec 15, 2023

--

Creatively mixed images from lexica.art and creative commons graphics

My earlier Mock Data Generator project can quickly generate intertwined fake data, but importing that data into a Neo4j database required a few extra steps. Pushing this data directly from the app would be a much better.

So today I’m going to walk through how to create a Python function to do exactly that, upload .JSON data to a running Neo4j instance.

There are 3 main parts to this:

  1. An opinionated .json schema that the source data needs to conforms to.
  2. Convert the data for use by the official Neo4j Python driver.
  3. Upload the converted data via the driver

Part 1 — A Schema.

The dictionary schema we’ll build around is the same used by the Mock Data Generator. Internally this data was created before it generated a series of .csv files. The following is a sample of that schema:

// Example
{
"nodes":{
"Person":[
{
"_uid": "abc123",
"first name": "John",
"last name": "Wick"
},
{
"_uid": "dcf456",
"first name": "Bowery",
"last name": "King"
},
]
},
"relationships":{
"FRIENDS_WITH":[
{
"_from_uid": "abc123",
"_to_uid": "dcf456",
"since": 1997
}
]
}
}

nodes and relationships are top level keys, each containing nested dictionary values. Within those, each key is a unique Node label or Relationship type with values being lists of property dictionaries for each record.

A graphy way of looking at the above data.

There are some drawbacks to this schema, mainly that every Node requires a uniquely identifying string. Assigning an id like this to records is good practice but not a strict requirement for Neo4j databases.

The positive tradeoff is a very compact schema. and the next process becomes simpler with the assumption this type of property is always present.

Part 2. — Converting to Cypher

The official Neo4j Python driver will be used for the final upload. To use the driver the data will need to be reformatted into valid Cypher statements. Additionally a dictionary of parameters will be needed and passed to the driver with the query to protect against Cypher injections.

In your Python IDE or from the command line in your application folder, add the driver with your dependency manager of choice. The following example uses pip.

pip install neo4j

Next, create a file to contain all the code for this function, ie neo4j_uploader.py. In it add the Neo4j import statement and create an initial wrapper for running arbitrary Cypher queries. This function will be called at the end.

from neo4j import GraphDatabase

# Part 3 - Python Driver
def execute_query(
uri: str,
username: str,
password: str,
query: str,
params: dict,
database: str = "neo4j"):
with GraphDatabase.driver(uri, auth=(username, password)) as driver:
records, summary, keys = driver.execute_query(query, params, database=database)
return summary

The driver’s execute_query requires a valid Neo4j uri, username, and password to interface with any cloud or locally based database. These credentials are made available when you first create a new Neo4j database.

The function returns a tuple of records, a ResultSummary object, and record keys. Because only write queries will be used, we can ignore the returned records, keys, and just focus on the summary object to optionally verify the upload status.

Because the syntax for creating Nodes and Relationships are different, we’ll process them separately then combine them before running the execute_query function.

Part 2a. — Converting Nodes

The basic syntax for creating a node in Cypher looks like:

CREATE (n:Person { _uid: "abc123", first_name: "John", last_name: "Wick"}

A basic option is to create a function that loops through all our node records and creates a parameterized version of this for every record. Node labels can’t be assigned through parameters, so labels would have to be added directly in the query string like so:

def convert_nodes(
nodes: dict
) -> (str, dict):

# Using basic cypher
query = ""
params = {}
for node_label, node_records in nodes.items():
for node_record in node_records:
uid = node_record["_uid"]
query += f"""CREATE (:{node_label} ${uid})\n"""
params[uid] = node_record

return (query, params)

Note the query string is constructed with parameterized placeholders prefixed with a $. This placeholder value is also added as a key in the params dictionary for the driver function to reference later.

This will work for a small number of nodes like in our example, but per the official driver performance recommendations guide, this is inefficient for larger record sets.

The recommended option is to create anUNWIND statement and pass it the list of record dictionaries to process. We’ll use this method later for generating Relationships. But for Nodes there is a third option using the APOC library (which is included with nearly all Neo4j deployments).

This is the simplest option as our dictionary of Nodes property data is already formatted for the library’s apoc.create.nodes() procedure.

So the above loop option can be replaced with —

def convert_nodes(
nodes: dict
) -> (str, dict):

# Using APOC
query = ""
for node_label in nodes.keys():
query += f"""CALL apoc.create.nodes(["{node_label}"], ${node_label});"""

return query, nodes

Part 2b. — Converting Relationships

Relationship construction has a different set of requirements, and unfortunately there is no apoc command for quickly creating multiple relationships from a list of dictionaries.

There is an apoc.create.relationship command for creating single relationships however. Paired with the UNWIND statement mentioned earlier we can transform our list of relationship records into a valid batch Cypher write command.

def convert_relationships(
relationships: dict
) -> (str, dict):

rel_record_list = []
params = {}

for rel_type, rel_records in relationships.items():
for record in rel_records:

# Get the relationships _uid to distinguish it from other params
record_key = record.get('_uid')
params[record_key] = record

# Get the source and target node _uids
from_node_uid = record.get('_from__uid')
to_node_uid = record.get('_to__uid')

# Create a list of values and parameter keys which will be used to construct the relationship
item = f"['{rel_type}', '{from_node_uid}', '{to_node_uid}', ${record_key}]"
rel_record_list.append(item)

# Combine all the lists into a master list
composite_rel_records_list = ",".join(rel_record_list)

# Create a single query to process all the Relationship records
query = f"""WITH [{composite_rel_records_list}] AS rel_data
UNWIND rel_data AS relationship
MATCH (n {{`_uid`:relationship[1]}})
MATCH (n2 {{`_uid`:relationship[2]}})
CALL apoc.create.relationship(n, relationship[0], relationship[3], n2) YIELD rel
RETURN rel
"""

return query, params

Essentially a string list containing the Relationship type, source node _uid, target node _uid, and a parameter placeholder for the Relationship properties is created for every Relationship record.

Each of these lists are then assembled into a composite list the UNWIND statement will run through, similar to a for loop.

As each record is processed, a Relationship is created between the source and target Nodes. The YIELD and RETURN statements are needed for the larger statement to work, but effectively does nothing for our purposes.

Part 3. — Uploading the data

Finally, add an entry point function to tie everything together.

import json

# Part 1 - Upload
def upload(
uri: str,
username: str,
password: str,
data: str
):

if isinstance(data, str) is True:
try:
data = json.loads(data)
except Exception as e:
raise Exception(f'Error converting data to json: {e}')

# Convert the nodes and relationships into cypher queries and params
query, params = convert_nodes(data['nodes'])
rel_query, rel_params = convert_relationships(data['relationships'])

# Aggregate them into a single query and params dictionary
query += rel_query
params.update(rel_params)

# Upload
execute_query(uri, username, password, query, params)

Permitting the data argument to be either a .json string or a dictionary is purely an optional convenience.

By stacking the Nodes query first, all the Nodes needed by Relationships will have been created before they’re needed by Relationships.

If all the _uid values are indeed unique, then all the parameterized values will match up with their placeholders within the final query string containing both the Node and Relationship statements.

The final neo4j_uploader.py file should now look like:

from neo4j import GraphDatabase
from dotenv import load_dotenv
import json
import os

# Part 3 - Python Driver
def execute_query(
uri: str,
username: str,
password: str,
query: str,
params: dict):
with GraphDatabase.driver(uri, auth=(username, password)) as driver:
records, summary, keys = driver.execute_query(query, params)
return summary


# Part 2 - Conversion
def convert_nodes(
nodes: dict
) -> (str, dict):

query = ""

for node_label, _ in nodes.items():
query += f"""CALL apoc.create.nodes(["{node_label}"], ${node_label});"""
return query, nodes


def convert_relationships(
relationships: dict
) -> (str, dict):

rel_record_list = []
params = {}

for rel_type, rel_records in relationships.items():
for record in rel_records:

# Get the relationships _uid to distinguish it from other params
record_key = record.get('_uid')
params[record_key] = record

# Get the source and target node _uids
from_node_uid = record.get('_from__uid')
to_node_uid = record.get('_to__uid')

# Create a list of values and parameter keys which will be used to construct the relationship
item = f"['{rel_type}', '{from_node_uid}', '{to_node_uid}', ${record_key}]"
rel_record_list.append(item)

# Combine all the lists into a master list
composite_rel_records_list = ",".join(rel_record_list)

# Create a single query to process all the Relationship records
query = f"""WITH [{composite_rel_records_list}] AS rel_data
UNWIND rel_data AS relationship
MATCH (n {{`_uid`:relationship[1]}})
MATCH (n2 {{`_uid`:relationship[2]}})
CALL apoc.create.relationship(n, relationship[0], relationship[3], n2) YIELD rel
RETURN rel
"""

return query, params


# Part 1 - Upload
def upload(
uri: str,
username: str,
password: str,
data: str
):

if isinstance(data, str) is True:
try:
data = json.loads(data)
except Exception as e:
raise Exception(f'Error converting data to json: {e}')

# Convert the nodes and relationships into cypher queries and params
query, params = convert_nodes(data['nodes'])
rel_query, rel_params = convert_relationships(data['relationships'])

# Aggregate them into a single query and params dictionary
query += rel_query
params.update(rel_params)

# Upload
execute_query(uri, username, password, query, params)

Pushing Forward

Hopefully this helps you construct a Neo4j upload process around your own data schemas.

If you’re interested in a more feature rich version of this code that also accepts an alternate schema, check out the repo: https://github.com/jalakoo/neo4j-uploader

Or if already have data in a compatible format and just want this code as a package, it’s also available on PyPi.

--

--

Jason Koo

Developer Advocate at Neo4j, technophile and former iOS Developer.