[GSoC] [Librehealth] Apache Spark with Cassandra For FHIR Analytics

During past two weeks, I’m working hard to get FHIR analytics to work with Apache Spark and Cassandra.

What is Cassandra?

Apache Cassandra is a free and open-source distributed wide column store NoSQL database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. Cassandra offers robust support for clusters spanning multiple datacenters,[1] with asynchronous masterless replication allowing low latency operations for all clients[1]. Cassandra is popular for it’s distributed nature. LibreHealth decided to use cassandra as the datastore for FHIR due to it’s scalable architecture.

Cassandra Query Language (CQL)

Like many other databases, cassandra also a support SQL like query language. CQL is a simple interface for accessing Cassandra, as an alternative to the traditional Structured Query Language (SQL). CQL adds an abstraction layer that hides implementation details of this structure and provides native syntaxes for collections and other common encodings. Language drivers are available for Java (JDBC), Python (DBAPI2), Node.JS (Helenus), Go (gocql) and C++. In Cassandra, database identified as a keyspace. Table identified as a column family. Following queries can be taken as a example which demonstrate the functionality of CQL.

CREATE KEYSPACE school
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };

USE MyKeySpace;

CREATE COLUMNFAMILY students (id text, name text, mobileNo text, PRIMARY KEY(id));

INSERT INTO students (id, name, mobileNo) VALUES ('1', 'Doe', '123456');

SELECT * FROM students;

Datastax Java Driver for Apache Cassandra

In order to access cassandra database, it’s required to have a driver. Datastax provides a java driver which provide the support to access cassandra database from both native manner and using CQL.

Datastax Spark Cassandra Connector

Datastax provides spark cassandra connector which allow users to connect to cassandra with spark. This connector provide the capability to expose Cassandra tables as Spark RDDs, write Spark RDDs to Cassandra tables, and execute arbitrary CQL queries in your Spark applications.

I have used these libraries to connect my application to Spark and Cassandra. Librehealth is moving to adopt FHIR and casssandra will be the underline storage provider. In my application I have loaded the google FHIR data set to cassandra. This is for the demonstration purposes only. After that, data is loaded to spark RDDs using the spark cassandra connector.

JavaRDD<Patient> patientRDD = javaFunctions(sc).cassandraTable("librehealth", "patient")
.map((Function<CassandraRow, Patient>) cassandraRow -> {
FhirContext fhirCtx = FhirContext.forDstu3();
IParser parser = fhirCtx.newJsonParser().setPrettyPrint(true);
String patientSrt = cassandraRow.getString("value");
Patient patientOb = parser.parseResource(Patient.class, patientSrt);
return patientOb;
});

JavaRDD<Observation> observationRDD = javaFunctions(sc).cassandraTable("librehealth", "observation")
.map((Function<CassandraRow, Observation>) cassandraRow -> {
FhirContext fhirCtx = FhirContext.forDstu3();
IParser parser = fhirCtx.newJsonParser().setPrettyPrint(true);
String observationStr = cassandraRow.getString("value");
Observation observationOb = parser.parseResource(Observation.class, observationStr);
return observationOb;
});

After loading this to Spark RDD, I have used Bunsen encoders to convert these data in to dataset which allow users to run Spark SQL on top of the FHIR data.

FhirEncoders encoders = FhirEncoders.forStu3().getOrCreate();
Dataset<Patient> peopleDFq = sparkSession.createDataset(patientRDD.rdd(), encoders.of(Patient.class));
Dataset<Observation> observationDfq = sparkSession.createDataset(observationRDD.rdd(), encoders.of(Observation.class));

Bunsen Encoders?

Bunsen encodes FHIR data in Apache Spark by generating Spark Encoders from the FHIR resource definitions. The encoders convert the FHIR data model into a Spark schema on a field-by-field, type-by-type basis. At runtime, they automatically generate byte code that serialises FHIR objects as Spark Datasets, which are an efficient binary representation that can be analysed at petabyte scale. Those datasets can then be saved or manipulated like any other Spark data.

Ex : FHIR Observation Representation after convert using Bunsen Encoders

root
|-- id: string (nullable = true)
|-- meta: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- versionId: string (nullable = true)
<snip>
|-- status: string (nullable = true)
|-- category: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- coding: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- id: string (nullable = true)
| | | | |-- system: string (nullable = true)
| | | | |-- version: string (nullable = true)
| | | | |-- code: string (nullable = true)
| | | | |-- display: string (nullable = true)
| | | | |-- userSelected: boolean (nullable = true)
| | |-- text: string (nullable = true)
|-- code: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- coding: array (nullable = true)
| | |-- element: struct (containsNull = true)
<snip>
|-- valueDateTime: string (nullable = true)
|-- valueQuantity: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- value: decimal(12,4) (nullable = true)
| |-- comparator: string (nullable = true)
| |-- unit: string (nullable = true)
| |-- system: string (nullable = true)
| |-- code: string (nullable = true)
|-- valueRatio: struct (nullable = true)
| |-- id: string (nullable = true)
| |-- numerator: struct (nullable = true)

At last loading FHIR data from Cassandra to Spark allowed me to run complex queries against the FHIR data.

Sample Query


SELECT * FROM patient inner join observation where observation.subject.reference == patient.id and gender='male'

References

[1] https://en.wikipedia.org/wiki/Apache_Cassandra