Dart River-Te Awa Whakatipu, New Zealand shot by Alistair MacKenzie from unsplash.

Importing Data into Neo4j using RxJS

Adam Cowley
Neo4j Developer Blog
14 min readSep 23, 2020

--

This post has been inspired by a call I had with an old client in which we discussed ways to improve the efficiency of their imports into Neo4j. The client heavily uses Reactive programming to deal with streams of data into their application. The data is then split into new streams, which are transformed and then inserted into Neo4j.

This inspired me to take a closer look at RxJS, and to see where it might be useful in for importing data into Neo4j.

What is Reactive Programming?

There are many out there that will no doubt describe Reactive programming in better terms than me. But to me, Reactive programming is a declarative paradigm in which streams (or observables) of data are manipulated asynchronously using a “pipe” of one or more operators. There are implementations for Java, JavaScript and . NET amongst others — but being a {java|type}script guy I’ll be focussing on RxJS.

Usage of Reactive programming and RxJS seems to be growing fast. It is supported by a large number of frameworks including NestJS, which I am a huge fan of, and is a major component of Angular.

If we take a look at the example in the RxJS repository we can get a flavour of what we are in for:

import { range } from "rxjs";
import { map, filter } from "rxjs/operators";
range(1, 200)
.pipe(
filter(x => x % 2 === 1),
map(x => x + x)
)
.subscribe(x => console.log(x));

The RxJS library exports a number of observable creation methods (in this case range ) which return an observable. The observable itself produces a stream of results which can be manipulated using operators.

In the example above, a stream of numbers between 1 and 200 is produced by the range function. Those numbers are sent through a pipe of operators:

  1. filter - Similar to filtering an array, the filter operator accepts a function that should return true or false. If the function returns false for that item, then the item will be omitted from the stream - in this case odd numbers are omitted.
  2. map - Similar to the map function on an array, each item will be transformed - in this case the number is doubled.

The stream is then subscribed to using the subscribe method, where the callback function logs the value. The output would look something like this:

2
4
8
12
etc. etc.

RxJS for Neo4j

The Neo4j drivers have supported Reactive programming for a while now. The idea remains the same — you create a driver, session, then process the results.

Take a basic example where we create a new node using a regular session and consume the results:

const driver = neo4j.driver('bolt://localhost:7687', neo4j.auth.basic('neo4j', 'neo'))const session = driver.session({ defaultAccessMode: neo4j.session.READ })session.run( `CREATE (p:Person {name: $name}) RETURN n` , { name: 'Adam'})
// Manipulate the results
.then(res => res.records.map(row => row.get('n')))
.then(res => {
// Close the session
session.close()
return res
})
.then(output => console.log(output))

There are a few minor changes — instead of calling then on a promise, we use the records method to return an observable, and use pipe instead of map to manipulate the results.

const driver = neo4j.driver('bolt://localhost:7687', neo4j.auth.basic('neo4j', 'neo'))// Call .rxSession() rather than .session()
const session = driver.rxSession({ defaultAccessMode: neo4j.session.READ })
session.run( `CREATE (p:Person {name: $name}) RETURN n` , { name: 'Adam'})
// The records function returns an Observable which produces a stream of rows
.records()
// This stream can then be manipulated
.pipe(
// Close the session
concat(session.close()),
// Manipulate the results
map(row => row.get('n'))
)
.subscribe(output => console.log(output))

A working example — Skyshard

For this example, I’ll revisit a previous blog post for the fictional client SkyShard. SkyShard decided it was appropriate to split their data across many shards in order to improve read and write performance. The data is sharded by month with a two-day overlap, because the business rules dictate that a multi-hop flight can arrive in it’s destination up to two days after the initial departure.

I will also be upgrading the code to TypeScript to take advantage of strong typing.

If you are interested in the reasons behind the decision, head back and read the previous post. Otherwise, let’s write some code…

Extract, Transform, Load

In the post I built a simple ETL process using Node.js to read files from a CSV file, partition the rows based on the month, and then insert the data into the correct Neo4j Fabric database. This worked by processing the lines from a CSV file one-by-one, then appending them to an array based on their date.

The code does the job but is far from perfect. The entire file needs to be processed upfront before any data can be inserted into Neo4j — and with over 320k rows in the file, this can take some time.

We could improve the process by using RxJS to filter and manipulate the data as it is produced. To do this, we can:

  1. Produce an observable which produces a stream of rows.
  2. Modify each row to prepare it for insert into Neo4j.
  3. Use the flights departure date to extract the flights for a particular month (eg. January 2020).
  4. Insert the data into the correct shard.

Producing an observable

If we take a look at the old code, we can see that the CSV file is read using the csv-parser library and added processed inside a Promise function. The promise only resolves when the end event is emitted, which means the whole file needs to be processed before the next step begins.

const csv = require('csv-parser')
const fs = require('fs')
// ...return new Promise((resolve, reject) => {
fs.createReadStream(file)
.pipe(csv())
.on('data', async row => {
// Process row and add to correct array
})
.on('end', () => {
// Resolve the Promise
resolve(results)
})
})

The fs.createReadStream method returns an Event Emitter which is piped (not using RxJS yet) into the csv function which converts a Buffer into an Object that represents the row.

RxJS exports a fromEvent function that will produce an observable object which will produce a new item when the specified event is emitted by an EventOmitter. A commonly used example in Ultimate Courses's RxJS Course is to listen for clicks on a DOM element:

import { fromEvent } from 'rxjs/operators'fromEvent(document.getElementById('my-button'), 'click')

We can do the same thing with our fs.createReadStream(file) call:

import csv from 'csv-parser'
import fs from 'fs'
import { fromEvent } from 'rxjs'
// ...const readStreamEventEmitter = fs.createReadStream(file).pipe(csv())// Observable will emit when the entire file has been read
const readCsvFinished$: Observable<void> = fromEvent(readStreamEventEmitter, 'done')
// Stream of objects representing CSV rows
const flights$: Observable<object> = fromEvent(readStreamEventEmitter, 'data')

Now when the data event is emitted, a new value will be sent to any subscriber of the flights$ stream.

NOTE: I’ve added a $ suffix to the flights$ variable to denote that it is an Rx observable.

Modify each item ready for import

Currently, the data isn’t in a format that we would like. To do this, we will pipe the observable through a map operator. In order to standardise the output, we can use a TypeScript interface to provide some guarantees about the payload:

interface Flight {
origin: string;
destination: string;
departsAt: Date;
price?: number;
}

This will ensure that any Flight object passed through to any subscriber will have the minimal information we need to perform an update to Neo4j.

Next, we can use the map operator imported from rxjs/operators to convert the row into an object that corresponds to the Flight interface.

import { map, share } from 'rxjs/operators'// ...const flights$: Observable<Flight> = fromEvent(readStreamEventEmitter, 'data')
.pipe(
// Convert the raw row into a Flight
map(row => {
const departsAt = new Date(row.time_hour)
return <Flight> {
origin: row.origin,
destination: row.dest,
departsAt,
}
),
// 'multicast' the values amongst subscribers
// without creating a new observable
share()
})

Filter on departure date

Now that the observable will emit the correct values, we need to partition the data based on the shard. To do this, we can create a function that accepts a flights observable, filters based on arbitrary dates, and sends a write query to Neo4j.

To do this, we can pipe the results through a filter

import { filter } from 'rxjs/operators'// ...const writeToShard$ = (flights$: Observable<Flight>, start: Date, end: Date, shard: string) => {
return flights$.pipe(
// Filter Results by Date
filter((flight: Flight) => flight.departsAt >= start && flight.departsAt < end )
)
}

This will ensure that any flights passed to subsequent operators will be within a particular date range. This function provides an easy way insert data into the appropriate shard. For example, we can create streams for January, February, and March 2020 with the two-day overlap into the next month.

const jan$ = writeToShard$(flights$, new Date('2020-01-01'), new Date('2020-02-02'), 'january2020')
const feb$ = writeToShard$(flights$, new Date('2020-02-01'), new Date('2020-03-02'), 'february2020')
const mar$ = writeToShard$(flights$, new Date('2020-03-01'), new Date('2020-04-02'), 'march2020')

Insert the records into Neo4j

Now that we have a stream with the correct data we can start to insert the data into Neo4j. The original import used an async function to insert batches of 1000:

while ( data.length ) {
const batch = data.splice(0, batch_size)
await session.run(query, { batch })
}

We can create these batches using a buffer operator. There are a few options to choose from, but in this case the bufferCount or bufferTime operators may be applicable. bufferCount will collect the emitted values up to a certain number and emit the values as an array. bufferTime will collect the emitted values until the provided interval has passed.

In this case, bufferCount makes the most sense but if you were working with streams of real-time data, it may make more sense to insert a new batch every few seconds or minute.

import { filter, bufferCount } from 'rxjs/operators'// ...const writeToShard$ = (flights$: Observable<Flight>, start: Date, end: Date, shard: string) => {
return flights$.pipe(
// Filter Results by Date
filter((flight: Flight) => flight.departsAt >= start && flight.departsAt < end ),
// Collect batches of 1000
bufferCount(1000)
// On the resulting batches, insert into Neo4j
// ...
)
}

The bufferCount operator will convert a stream of individual flights into an array of flights (or Flight[] in TypeScript terms).

We can create a function to accept an array of flights and insert them into Neo4j using the RxSession. We’re not interested in the individual records, but instead the outcome of the query — namely the number of rows written and the time taken to insert the data. So we can create a new interface to represent this:

interface Summary {
shard: string;
writes: number;
availableAfter: number;
}

Now we can use the Neo4j driver to create a new session and run the query. Because we’re not interested in any rows, just the summary, we can call .consume() instead of .records().

This will return an observable which omits the ResultSummary for the query. This can be used to get the update statistics for the query.

Once the query result has been consume d, and the ResultStatement has been safely produced, we can use the concat operator to close the session and release any resources that it is currently holding. The concat operator will ensure that the session is closed before the result is emitted.

// Import dependencies
import neo4j, { ResultSummary } from 'neo4j-driver'
import { concat, map } from 'rxjs/operators'
// Create one instance of the driver per application
const driver = neo4j.driver('bolt://localhost:7687', neo4j.auth.basic('neo4j', 'neo'))
// ...const writeFlightsToNeo4j = (batch: Flight[], shard: string): Observable<Summary> => {
const cypher = `
USE fabric.${shard}
UNWIND $batch AS row

MERGE (origin:Airport {code: row.origin})
MERGE (destination:Airport {code: row.dest})

MERGE (f:Flight {id: row.year +'-'+ row.month +'-'+ row.day +'--'+ row.flight})
SET f.price = coalesce(parseFloat(row.price), f.price)

MERGE (f)-[:ORIGIN]->(origin)
MERGE (f)-[:DESTINATION]->(destination)
`
const session = driver.rxSession({ defaultAccessMode: neo4j.session.WRITE }) return session.run(cypher, { batch })
.consume()
.pipe(
// Close the Session
concat(session.close()),
// Convert the neo4j ResultSummary into our Summary
map((result: ResultSummary) => (<Summary> {
shard,
writes: result.counters.updates().nodesCreated,
availableAfter: result.resultAvailableAfter.toNumber()
}))
)
}

To use this function within the stream, we can use the switchMap operator. By using this operator, the previous observable (ie. the one that produced the batch) is cancelled and replaced by the new observable which is produced by the function — in this case an observable which emits a Summary .

import { filter, bufferCount, switchMap } from 'rxjs/operators'// ...const writeToShard$ = (flights$: Observable<Flight>, start: Date, end: Date, shard: string) => {
return flights$.pipe(
// Filter Results by Date
filter((flight: Flight) => flight.departsAt >= start && flight.arrivesAt < end ),
// Collect batches of 1000
bufferCount(1000),
// On the resulting batches, insert into Neo4j
switchMap(batch => writeFlightsToNeo4j(batch, shard))
)
}

Subscribing to the subject

Nothing will happen to this stream until it has at least one subscriber. We could call subscribe on each method call, but instead we can use the merge function exported by rxjs to combine the three observables into a single observable. This will mean we only need to create a single subscriber through which all results will be emitted.

import { merge } from 'rxjs'const jan$ = writeToShard$(flights$, new Date('2020-01-01'), new Date('2020-02-02'), 'january2020')
const feb$ = writeToShard$(flights$, new Date('2020-02-01'), new Date('2020-03-02'), 'february2020')
const mar$ = writeToShard$(flights$, new Date('2020-03-01'), new Date('2020-04-02'), 'march2020')
merge(jan$, feb$, mar$)
.subscribe({
next: (summary: Summary) => console.log(
'wrote', summary.writes,
'records to ', summary.shard,
'in', summary.availableAfter, 'ms'
),
error: (error: Error) => console.error(error),
complete: () => console.log('Completed import')
})

Completing the stream

Running this code at the moment will produce a stream of results, but the complete function will not be called, and the process will not be exited. This is because we need to explicitly close the original stream. You may have noticed earlier that I created readCsvFinished$ which listened a done event but I hadn't used it:

// Observable will emit when the entire file has been read
const readCsvFinished$: Observable<void> = fromEvent(readStreamEventEmitter, 'done')
// Stream of objects representing CSV rows
const flights$: Observable<object> = fromEvent(readStreamEventEmitter, 'data')

We can use this observable to explicitly complete the flights$ — automatically unsubscribing all subscribers, closing the stream, and releasing all resources. To do this, we can use the takeUntil operator. This instructs the observable to emit results until another observable emits a value. In this case, when the done event is emitted by the File Reader, the file has finished being read.

import { map, share, takeUntil } from 'rxjs/operators'const flights$: Observable<Flight> = fromEvent(readStreamEventEmitter, 'data')
.pipe(
// Convert the raw row into a Flight
map({ /* ... */),
// Emit values until the File Reader emits the "done" event
takeUntil(readCsvFinished$),
// 'multicast' the values amongst subscribers
// without creating a new observable
share()
})

Deadlocks and retrying

The benefit to this approach is that the rows are processed and batched as they are received which means that the overall import will be quicker. However, this may lead to what are known as deadlock errors in Neo4j.

When two competing transactions attempt to modify the same record, the first will acquire a lock on the node until the transaction has been either committed or rolled back.

If the second transaction cannot acquire the lock on the same node within a reasonable timeframe, a Neo.TransientError.Transaction.DeadlockDetected error will be thrown and the transaction will be rolled back.

You may see something similar to the following error message in your neo4j.log or debug.log :

2020-09-15 10:38:09.150+0000 ERROR Client triggered an unexpected error [Neo.DatabaseError.Statement.RemoteExecutionFailed]: Remote execution failed with code Neo.TransientError.Transaction.DeadlockDetected and message 'ForsetiClient[21] can't acquire ExclusiveLock{owner=ForsetiClient[23]} on NODE(2183), because holders of that lock are waiting for ForsetiClient[21].

This is referred to as a transient error because it is temporary because that lock will only be held for a short time. There is retry logic built into the drivers for transient errors and a Maximum Transaction Retry Time can be configured but there may be cases where one or more of these errors leak into the application.

This is particularly applicable in this use case, because of our simple model model:

(:Airport)<-[:ORIGIN]-(:Flight)-[:DESTINATION]->(:Airport)

When creating a relationship, a transaction will acquire a lock on the nodes at both ends of the relationship before creating the relationship. With busy airports like London Heathrow (LHR) or Frankfurt (FRA), these nodes are likely to be locked by many transactions.

You may be able to avoid this problem by further filtering results to segregate airports within a batch to avoid concurrent transactions locking the same node, or limit the application to sending one write query at a time — but that is beyond the scope of this article.

A rudimental way to handle these errors would be to use the retry operator. The retry operator retries the sequence of pipes for a predefined number of times before the error should be passed through to subscribers.

Think of a mobile device with an intermittent signal — you may want to try a HTTP request a few times before forcing the user to re-submit a form.

In our case, the error may be resolved in a few ms, so we could just try to run the query again:

import { concat, map, retry } from 'rxjs/operators'const writeFlightsToNeo4j = (batch: Flight[], shard: string): Observable<Summary> => {
// ...
return session.run(cypher, { batch })
.consume()
.pipe(
// Close the Session
concat(session.close()),
// Convert the neo4j ResultSummary into our Summary
map(/* ... */),
// Retry 3 times before giving up
retry(3)
)
}

Bonus: Extracting airports

To demonstrate how easy it can be to manipulate data, you can also create an Observer to get a list of airports with a few lines of code.

The pluck operator will take a value from an emitted object — in this case the origin and the distinct operator will ensure that only distinct values are emitted.

import { distinct, pluck } from 'rxjs/operators'flights$.pipe(
pluck('destination'),
distinct(),
bufferCount(50),
take(2)
).subscribe(batch => console.log(batch))

This will produce two distinct batches of 50 airports:

[
'EWR', 'IAH', 'LGA', 'JFK', 'MIA', 'BQN',
'ATL', 'ORD', 'FLL', 'IAD', 'MCO', 'PBI',
'TPA', 'LAX', 'SFO', 'DFW', 'BOS', 'LAS',
'MSP', 'DTW', 'RSW', 'SJU', 'PHX', 'BWI',
'CLT', 'BUF', 'DEN', 'SNA', 'MSY', 'SLC',
'XNA', 'MKE', 'SEA', 'ROC', 'SYR', 'SRQ',
'RDU', 'CMH', 'JAX', 'CHS', 'MEM', 'PIT',
'SAN', 'DCA', 'CLE', 'STL', 'MYR', 'JAC',
'MDW', 'HNL'
]
[
'BNA', 'AUS', 'BTV', 'PHL', 'STT', 'EGE',
'AVL', 'PWM', 'IND', 'SAV', 'CAK', 'HOU',
'LGB', 'DAY', 'ALB', 'BDL', 'MHT', 'MSN',
'GSO', 'CVG', 'BUR', 'RIC', 'GSP', 'GRR',
'MCI', 'ORF', 'SAT', 'SDF', 'PDX', 'SJC',
'OMA', 'CRW', 'OAK', 'SMF', 'TYS', 'PVD',
'DSM', 'PSE', 'TUL', 'BHM', 'OKC', 'CAE',
'HDN', 'BZN', 'MTJ', 'EYW', 'PSP', 'ACK',
'BGR', 'ABQ'
]

Conclusion

Reactive programming is a new paradigm for me, but I found it a fun way to manipulate streams of data. One benefit to this approach is that many streams can be combined into a single pipeline — you can consume events from many sources and update the database in real time.

We can also merge events provided from different sources into a single stream — we can manage real-time streams coming in from other data sources. For example, a web-socket connection from another micro-service or events provided by message queues like Kafka or RabbitMQ.

The huge number of operators available mean that you can take those streams and do some pretty powerful things to the data.

On the downside, there are a lot of operators to remember and those operators aren’t always logically named. They can be hard to find if you don’t know specifically what to google.

I spent a long time searching for batching operators before someone pointed me in the direction of buffer - which, in my mind, meant something completely different.

The RxJS documentation is also scattered, which can be frustrating but I’m sure that will be fixed over time.

The Learn RxJS website that I have linked to in this post is a good place to look if you know the operator name. Personally, I also learned the basics from Ultimate Courses, but I’m sure there are also many free resources around too.

A full code example is available on Github in the rx-etl folder. If you have any comments, questions, or spot anything that could be improved, feel free to reach out. I'm @adamcowley or you can raise an issue or open a PR in the repository.

For more content like this, join us on the Neo4j Twitch Channel.

--

--