Portable Pipelines with Apache Beam

Dennis de Weerdt
DataPebbles
Published in
7 min readDec 26, 2020

There are lots of use cases for data processing and analytics pipelines, and nearly as many frameworks to use. Apache Spark is probably the de facto standard these days, but it is far from the only option. In this article, we'll take a look at Apache Flink — one such alternative — , and more importantly Apache Beam, which will make it so you don't have to worry about picking the right framework ever again.

First off, Flink. At first glance, Spark and Flink seem very similar (as do some of the other frameworks). Flink is a lot newer, though, and its main distinguishing feature is the fact that it is based on stream processing, rather than batch processing. It can still emulate batch processing if required, but fundamentally it works with streams. This is in sharp contrast to Spark, which uses batches and can emulate streams with so called micro-batches. Spark Structured Streaming does now also have a continuous processing mode, but in the most recent version at the time of writing (Spark 3.0.1), it remains experimental and unlike Flink does not offer exactly-once guarantees.

The choice of which framework to choose can be pretty difficult, then. Fortunately, Apache Beam comes to the rescue. Beam is not a data processing framework, but rather a unified programming model for data processing tasks. It ships with a number of runners capable of executing tasks using Spark, Flink, Google Cloud Dataflow, and a variety of other tools. That means you can implement your ETL logic once, and run it on any of the supported pipelines while only needing to change some command line parameters.

Initial Steps

Let's take a look at how to use Beam, then. I'll once again be using the dataset of Amazon Book Reviews (obtained here) which I've also used in previous posts. We'll be doing some basic data cleansing and aggregation on that data in the Beam model, and execute it on both Spark and Flink, both to see how easily that switch is made and how the two frameworks compare in terms of performance.

All code used for this demo can be found in a public git repo.

The first step is to create the Beam pipeline. This is easily done by forwarding the command line arguments to Beam's PipelineOptionsFactory:

We now need to apply a sequence of transformations to this pipeline, and then execute them. Note that while for this simple application a single linear sequence of transformations is sufficient, Beam allows you to use a more generic graph structure as well, with multiple inputs and outputs as required.

The core data structure in Beam is the PCollection (short for parallel collection), which can be compared to a Spark DataFrame. Transforms are implemented as subclasses of the PTransform class, which fundamentally represent functions from one PCollection to another, or an I/O operation.

Although you can use any serializable type in a PCollection, for this example we'll use the Row type provided by Beam, which is a generic map-like data structure with a schema, similar to a Spark Row or an Avro GenericRecord. The reason for this is that Beam provides a suite of useful transformations on Rows, which means you don't have to reinvent the wheel.

Loading & Cleaning

The first step is to load the data. It's stored in a local Parquet file. Beam's FileIO class is a very flexible tool from reading files from a multitude of sources and formats. Both file systems and formats can be added on if they are not already supported, so loading a locally stored file is no problem at all. The core of it is this expression, which returns a PTransform which does the job: "ParquetIO.read(<schema>).from(<file pattern>)". The only tricky bit is that ParquetIO uses Avro schemas and GenericRecords instead of the Beam equivalents. Fortunately, it also has utilities to convert between the two.

The process is simply to read the Parquet metadata, use it to construct the schemas, and then load the data. Finally, the records need to be converted to Rows using a conversion generated by the AvroUtils class, which is part of Beam.

Now it's time to clean the data up a bit, as there are some corrupt records in there and we don't need all the columns. First, we select just the columns we are interested in, and drop the rest:

Next, filter out the corrupt data. Specifically, the star_rating field should be a number between 1 and 5, but that is not always the case. We are also only interested in reviews which have at least one "helpfulness" vote, so we check that as well.

We want to do some calculations with those ratings now, however, the star ratings are given as strings, not numbers. To fix that, we need to convert that field. The implementation is a bit too large to post as a screenshot, but you can find it in the git repo linked previously as the CastTransform class.

SqlTransform

Now we are going to compute a "helpfulness ratio" for each review, which will be defined simply as the number of "helpful" votes on that review, divided by the total number of votes. This could be implemented in a standard Beam DoFn, but I thought it would be nice to take a look at a different feature: the SqlTransform. As the name implies, this allows you to specify your transform logic in SQL, which is pretty nifty:

Note the use of the special value "PCOLLECTION" as the table name in the FROM clause in the query. That value is used when you have a single PCollection as input, as is the case here. You can also create an SqlTransform over multiple inputs, so you could for example perform a join operation in SQL syntax. In that case, you need to provide a name for each PCollection, and then refer to them by those names in the SQL query.

Aggregation

It's now finally time to aggregate these numbers. We're going to calculate, for each product, the mean score and review helpfulness, as well as the total number of votes and helpful votes.

Specifying this computation is pretty intuitive, using the Group class and a chain of ".aggregateField" calls. The only downside, at least for our purposes, is that Group doesn't return a 'flat' row, but rather a row containing two nested rows. One with the "key" columns (just product_id, in our case), and the other with the "value" columns. The FlattenKVRows transformation, which you can also find on the repo, is a quick and dirty way of unnesting those structures into a plain row.

The last task is to write the output to a file, which is basically the same as reading it but in reverse. Surprise, surprise, I know. Putting it all together, we get this chain of operations:

Now, the only thing left to do is

Results

By default, Beam will run the pipeline using its internal DirectRunner. This is very slow, however, and only suitable for very small workloads. To get any real performance, you should specify one of the many available runners to use. I ran this job locally with a couple of different ones. All you have to do to switch from one to another is make sure you include the runner as a runtime dependency, and change the command line parameters accordingly. The difference in performance between them is quite stark:

This is just a single benchmark, of course, and hardly definitive. But it does show the crucial benefit of using Beam: if this job had been implemented directly in Spark, it would have to be rewritten from scratch if we wanted to switch to Flink instead. With the job written in Beam, all it takes is changing a few command line parameters.

Overall, Beam seems like an excellent choice if you need the flexibility it offers in your projects. However, if you already know which pipeline suits your use case best or are locked in due to external factors, there probably isn't much point in using Beam. While 1-to-1 performance comparisons between, for example, a Spark job generated by Beam and one implemented directly in Spark is very difficult, it stands to reason that the 'native' job would be more efficient. In addition, I will definitely be taking a closer look at Flink in the future. It has flown a little under my radar so far, but from what I've seen it looks very promising.

--

--