Streaming Graph Loading with Neo4j and APOC Triggers
In preparation for GraphConnect coming up, and an idea I have for the hackathon there, I recently needed to look into how to do triggers in Neo4j.
In this article, we’ll cover what triggers are, how you can use them to do streaming data loading, and finally we’ll cover the details of how they work under the covers.
What’s a Trigger?
If you haven’t worked with triggers before, a trigger is a database method of running some action whenever an event happens. You can use them to make the database react to events, rather than passively accept data, which makes them a good fit for streaming data, which is a set of events coming in.
Triggers need two pieces:
- A trigger condition (which event should the trigger fire on?)
- A trigger action (what to do when the trigger fires?)
Triggers are available in Neo4j through Awesome Procedures on Cypher (APOC), and you can find the documentation on them here. Let’s jump in.
My Use Case: Streaming Data Loading
I use APOC triggers from time to time for data loading. Imagine you have records flowing into neo4j over time, that represent friend relationships in a social network:
CREATE (:FriendRecord { p1: "David", p2: "Mark" });
CREATE (:FriendRecord { p1: "Mark", p2: "Susan" });
CREATE (:FriendRecord { p1: "Bob", p2: "Susan" });
These might be coming from an external application, or a messaging queue like Kafka.
If this were bulk data, people might batch these into CSV and write some LOAD CSV code, which loads the data to a proper graph structure from the start. That won’t work for a streaming setup though, where the data arrival is continuous.
It’s also important to separate the loading (in a trivial format) from the reformatting, because if the data is coming from some other app across Kafka, that app doesn’t know or care what your graph model is.
So imagine your database is getting a constant stream of these “FriendRecord” nodes, but what really want is a proper graph like this:
(:Person { name: "David" })-[:FRIENDS]->(:Person { name: "Mark" })
That’s our use case for triggers — streaming graph loading and transformation! Let’s get going.
APOC Setup
First, you have to install APOC into some Neo4j database you’re using. APOC then requires that in order to use triggers, you must set:
apoc.trigger.enabled=true
in your neo4j.conf, so don’t forget to do that.
Simple Trigger Example
Here’s the trigger we will be working with. This defines both the trigger condition, and the trigger action.
CALL apoc.trigger.add('loadFriendRecords',
"UNWIND apoc.trigger.nodesByLabel({assignedLabels}, 'FriendRecord') AS node
MERGE (p1:Person { name: node.p1 })
MERGE (p2:Person { name: node.p2 })
MERGE (p1)-[:FRIENDS]->(p2)
DETACH DELETE node",
{ phase: 'after' });
Let’s go through the arguments. The first argument “loadFriendsRecord” is the name of the trigger. We can later use this name to ask for the status of the trigger or delete it if we need to. The second argument is a cypher query that is the trigger condition and action. That’s where all of the meat here is:
- UNWIND apoc.trigger.nodesByLabel portion fetches nodes with the label “FriendRecord”. We’ll explain down below in depth how this works.
- The “assignedLabels” part in the map indicates that the trigger condition fires when the label gets assigned to a node, or when it’s first created.
- UNWIND’ing that list, now our query has all of the recently created FriendRecord nodes. The rest of the query just specifies what to do with them. We’ll merge 2 new Person nodes, connect them, and delete the original FriendRecord node, which is what gets us from our incoming records to our target graph format.
Finally, the third argument { phase: ‘after’ } allows you to control when your action fires. Your choices are “before”, “after”, or “rollback”, which correspond to the lifecycle possibilities of a transaction in Neo4j. Because my trigger wants to modify data, the phase must be “after”. (Trying to do that delete statement before the transaction even committed wouldn’t work). If you needed to take some action before a new PersonRecord came in, then you’d want “before”. And if you wanted to do something on the failure of creating a PersonRecord, you’d use “rollback”.
How does it work?
The core Neo4j database includes a facility for “Transaction Event Handlers”. Neo4j plugins can create transaction event handlers and be called whenever the database commits a transaction. This basic facility lets any piece of software observe what’s happening in the data as it flows by.
APOC Grabs Transactions
When you enable triggers in APOC, this causes APOC to register a transaction event handler with the database and start listening to all of those changes. Without any triggers, this listener does nothing at all.
When you register a trigger, it comes with a “selector”. That’s the {assignedLabels} part. Every trigger also has a “phase” as we described above.
Triggers Execute Against Transactions
So APOC maintains a list of triggers, each with a selector, phase, and cypher query. And it has a transaction event handler. The rest is simple: every time a transaction comes into Neo4j, APOC runs the relevant triggers.
Remember each transaction has three possible phases: before, after, and rollback. APOC gets each of these events, figures out which triggers want to execute in that phase, and then runs the cypher code you specify. Just before running this query, it sets on your query a special set of parameters into a map. Those parameters give details about the transaction being executed, such as which nodes were added and removed, and so on.
So the cypher query that executes has pretty much full access to the data in the transaction!
Filtering out relevant data
The last piece to understand is this part in our original trigger:
apoc.trigger.nodesByLabel({assignedLabels}, 'FriendRecord')
This is at the heart of the trigger, because it defines when we want our code to execute. Whenever a new node is assigned the FriendRecord label!
The first argument is a map that can contain assignedLabels, removedLabels, removedNodeProperties, or any combination of the three. This map can look confusing to beginners, because notice that the map keys have no values. They don’t need them; APOC is only concerned with whether the key exists in the map.
In the last step, we explained that when a trigger executes it gets passed a lot of the transaction state. With that state, this function then can quite simply pull out only the nodes that got assigned a “FriendRecord” label. That becomes the array passed to UNWIND, and the rest is your code.
Conclusions
We’re only covering a small part of what triggers can do here, to give you a taste, specific to this use case. Above, we talked about different lifecycle phases (before/after/rollback) and also different selector types (assigned node properties, assigned relationship properties, deleted node labels, etc).
These can be combined quite flexibly to implement various kinds of business logic in neo4j, data quality cleanups, and also dependent transactions. We’d love to hear how other people are using them at the Neo4j community forum, so please drop by and let us know.
Have a look for yourself
Because APOC is open source, you can of course always have a look for yourself at how it’s done.