Processing COVID data through Google Dataflow in Scala
In this article I will walk you through the processing of some Dutch COVID-19 data using Google Dataflow and Apache Beam via Spotify’s Scio Scala library and a dash of Twitter’s Algebird. (Bake at 200 degrees for 20 minutes)
Why this combo? Because I wanted to learn more about Dataflow, from a quick comparison I much prefer the Scala API over the Java or Python API for Beam and Covid numbers are of course a very current topic and relatable for many people.
We will take a look at the COVID data as published by the RIVM (which is like the US’ CDC). The data contains the number of cases by day and per municipality, as well as the number of hospital admissions and deaths. Our goal is to calculate the 7 day running average of those statistics per municipality. As a bonus at the end of the article there’s some plots.
CoronaWatchNL graciously collects this data from various source and makes it available on GitHub with daily updates. This is of course a great initiative, enabling data scientists and developers and other newly-discovered amateur epidemiologists to analyse and interpret the data independently, contributing to transparency about the ongoing epidemic.
A sample of rows for one municipality on one day looks like this:
As you can see the data is denormalized with the three statistics per municipality (and their totals): positive test cases (Totaal
), hospital admissions (Ziekenhuisopname
) and deaths (Overleden
).
A bit about Dataflow and Apache Beam
Google Dataflow is a cloud-based data processing service supporting massively parallel executions. It’s powered by Google’s Compute Engine and Storage and you can program it using Apache Beam. Apache Beam offers input from eg BigTable, Kafka, BigQuery or files and many more.
We can process practically infinitely large data sets with Beam’s computational model and Dataflow’s resource scalability. Of course in this example case we’re just practicing our skills with a small dataset.
Reading the CSV data
Let’s load the data into Dataflow. Because a Dataflow pipeline runs on Google Cloud, we first need to upload the data to Google Storage:
gsutil cp RIVM_NL_municipal.csv gs://your-storage-bucket/
The data is in CSV format, which we can load with Scio. We have to define the columns of the CSV file and how to decode a row to our internal data model, a scala case class
. Here is the model for the input data:
Defining a CSV decoder for a RivmDataRow
is easy. Decoders for the primitive types like String
and Int
are already available, we just have to specify the CSV header fields and the datatype to convert a whole row to. Scio will then infer the types of the individual fields and find the right decoder for them. We’re omitting the Gemeentecode
and Provinciecode
fields.
We also need to define the decoder for LocalDate
as that is not baked in, but can be built from the String
decoder:
All Dataflow operations start with a ScioContext
which we can create from command line args. From this context we can read the CSV file to get a SCollection[RivmDataRow]
.
Normalizing the data
To combine the three figures per date-municipality into one object, we will transform the data to a datatype MunicipalityData
that we can easily combine with other RivmDataRow
s and then use reduceByKey
to efficiently combine them into one. It will be useful later on to separate the statistical values from the ‘key’ data, in a Counts
datatype.
Using the Scio API, we first group our inputs by the date + municipality, transform to the new datatype and then combine using add
.
This will give us data like:
Moving averages
To calculate a 7-day moving average, we make use of Beam’s windowing operations. Windows define how to group our data along the time axis so that we can apply a calculation on that group. Windows can operate on the actual time we’re processing the data or on the event time, which is a timestamp in the data itself. For our purposes we will timestamp using the date field.
...
.timestampBy(r => dateToInstant(r.datum))
We then define a sliding window of size 7 days that is defined for every day. So we get windows from October 1 to October 7, October 2 to October 8, and so on.
There is a dash of magic here: windowing the data adds a grouping level, in addition to the explicit grouping we do by themunicipality
field. We still only need to reduce the data once.
This is however a quite buggy version of a running average: it will produce incorrect results for the start of our time series where we don’t have 7 days of data yet.
A better average
Using Dataflow is most efficient when computations are expressed as map-reduce style operations. These can be executed largely in parallel on many workers and threads and without having to reshuffle data between workers. The reduce operations have to be associative and commutative. Averaging numbers is commutative but not associative, unless we express the average values it in a smart way: the pair of the number of values and the mean value:
case class AveragedValue(count: Long, mean: Double)
We can combine two of these values by taking the weighted sum of the means. This is implemented in the algebird
library that comes bundled with scio
. We can create an Aggregator
for averaged values and use it with aggregateByKey
.
We want to average all three COVID statistics: tests, hospital admissions and deaths. We have these in our Counts
datatype. algebird
allows us to compose Aggregators so that we only need to make a single pass over our data and we keep our computation efficient and parallelizable. We can define an average aggregator for Counts
by composing three average value aggregators.
With composePrepare
and andThenPresent
we map the input and output of our aggregator from and to a Counts
.
There is also a commutative and associative datatype to calculate the standard deviation called Moments
. We will use that and combine all of it into one composite aggregator that will operate on our 7-day windows. Because we want to track the municipality name and date as well. Let’s make a new datatype to track all of our calculation results for a single municipality and a 7 day window:
Our total aggregator will take MunicipalityData
and aggregate it into a CovidStatistics
object:
The total pipeline now looks like this:
As the last step we write the output as CSV. This uses a HeaderEncoder
for the CovidStatistics
(see the full code on Github). Dataflow will produce a CSV file for every window, like such:
Running the program
Now we just have to run the program from the command line and collect our output CSV files
sbt “runMain nl.vroste.dataflow_scio_covid.Main <<<
— project=my-project-id — runner=DataflowRunner — zone=us-central1-a — region=us-central1
— input=gs://mybucket/RIVM_NL_municipal.csv
— output=gs://mybucket/scio1/wordcount-3.csv”
Results
Below is a plot made with RStudio using our output data, showing the daily new cases and their 7-day average + standard deviation for some of The Netherlands’ larger municipalities. Note that from about September onwards, the 7-day average is linear on the logarithmic scale, clearly showing an exponential growth in the number of cases.
Final Thoughts
For processing a simple 13 Mb CSV file like this, there is a lot of coordination overhead using Apache Beam and Dataflow for transferring files and coordinating worker nodes. But of course we are just practicing here and the computation model scales to data that is orders of magnitude larger, where the overhead becomes negligible.
The scio API is really nice compared to the verbose Java and Python APIs, very much like Apache Spark, which is already quite familiar when you have used the Scala collections API.
It took some time to get the windowing settings just right, but things work as expected in the end.
The full code including the R notebook to produce outputs is available at https://github.com/svroonland/dataflow-scio-covid