[GSoC][LibreHealth] Working with Bunsen for FHIR Analytics using Spark

During this Google Summer of Code, I’m working on FHIR Analytic capabilities using Apache Spark. I’m researching on Bunsen which built on top Apache Spark to provide FHIR Analytic capabilities via a Java and Python API.

Using Bunsen, it currently provides the functionality to load the FHIR Bundles to Spark which allows users to use Spark SQL or underline Java/Python APIs to perform queries on loaded data.

After loading data to Spark using Bunsen, it maps FHIR Resource to Java Object structure using HAPI FHIR library. For example, if Observations loaded to system, user can user Spark SQL in following manner to query data.

spark.sql("""select subject.reference,effectiveDateTime,valueQuantity.value from observations where in_valueset(code, "heart_rate") limit 5 """).show()
+---------------+-----------------+-------+
| reference|effectiveDateTime| value|
+---------------+-----------------+-------+
|Patient/9995679| 2006-12-27|54.0000|
|Patient/9995679| 2007-04-18|60.0000|
+---------------+-----------------+-------+

Bunsen also provide rich Java API to perform FHIR analytics capabilities. Bunsen make FHIR analytics easier by using FHIREncoders. With encoders, user can use JAVA API in following manner to analyze FHIR data.

FhirEncoders encoders = FhirEncoders.forStu3().getOrCreate();

List<Condition> conditionList = // A list of org.hl7.fhir.dstu3.model.Condition objects.

Dataset<Condition> conditions = spark.createDataset(conditionList,
encoders.of(Condition.class));

// Query for conditions based on arbitrary Spark SQL expressions
Dataset<Condition> activeConditions = conditions
.where("clinicalStatus == 'active' and verificationStatus == 'confirmed'");

// Count the query results
long activeConditionCount = activeConditions.count();

// Convert the results back into a list of org.hl7.fhir.dstu3.model.Condition objects.
List<Condition> retrievedConditions = activeConditions.collectAsList();

Bunsen also allow users to load data via JSON or XML using spark map functions.

// Created as a static field to avoid creation costs on each invocation.
private static final FhirContext ctx = FhirContext.forDstu3();

// <snip>

FhirEncoders encoders = FhirEncoders.forStu3().getOrCreate();

Dataset<String> conditionJsons = // A Dataset of FHIR conditions in JSON form.

Dataset<Condition> conditions = conditionJsons.map(
(MapFunction<String,Condition>) conditionString -> {
return (Condition) ctx.newJsonParser().parseResource(conditionString);
},
encoders.of(Condition.class));

// Arbitrary queries or further transformations the the conditions Dataset goes here.

Currently I’m researching on integrating Bunsen with Cassandra via loading data from Cassandra database. Datastax provide Cassandra spark connector which allow users to load data directly from Cassandra database to spark models. Following is a sample a JAVA API which provided by the spark Cassandra Datastax driver to Spark structure.

JavaRDD<SampleBean> cassandraRdd = CassandraJavaUtil.javaFunctions(sc)
.cassandraTable("simple_ks", "simple_cf", mapColumnTo(SampleBean.class)).select("value");

I’m researching more on integrating Bunsen to load data from Cassandra. Also according to my research, Bunsen accepts FHIR bundles. But I’m looking for the capability to load data to Spark via Bunsen using FHIR resources it self.

It’s very interesting to learn about these technologies.