Getting started with Apache Beam

Davide Anastasia
5 min readJun 6, 2018

--

After almost 5 years on the AWS platform, and a lot of Elastic MapReduce (EMR), I have recently started working on GCP. It is a good chance to start looking at serverless architecture, learn and use Google Dataflow. Of course this means learning a new API: Apache Beam.

This post will tell you how I moved from Apache Spark to Apache Beam, and how I mapped some of my knowledge into the new framework.

Update (2019/09/07): please have a look at the follow-up article on (Custom) Sessionization in Beam

Base concepts

The first problem I had was learning the new terminology, so below is a quick glossary of base terms of the Apache Beam API:

  • PCollection: a collection of elements of a type T (this is the closest thing to an RDD in Spark)
  • PTransform: transforms an input X to output Y (usually, PCollection<T1> to PCollection<T2>)
  • ParDo: an element-wise transform in Apache Beam
  • DoFn: operation performed by a ParDo, usually user-specified

These elementary abstractions are enough to get started with Apache Beam: the rest is a specialisation of these concepts (for instance, I/O is a special PTransform with no input or no output, respectively PBegin and PDone).

The beauty of Apache Beam is its clear distinction between processing time and event time, which allows the framework to “reason” about event ordering, windows and sessions with a very simple API. I thought I could solve these problems with Apache Spark easily enough, but reading this article really opened my eyes. Even though Beam is a fantastic tool for streaming processing, I will focus my first efforts towards a batch job (and maybe leave the streaming for another article).

My first pipeline

Writing a pipeline in Apache Beam is quite simple. Apache Beam, thanks to its internal modelling, is then capable of running the pipeline on a multitude of frameworks, including Google Dataflow and Apache Flink (the runner with the best support in the open source space). A full compatibility matrix is available here.

As a first example I could explain the basics of the classic WordCount, but you can find a very well done description of it in the official documentation, which I strongly suggest you to read.

For this reason, I decided to build another classic: an Inverted Index.

Inverted Index

The idea behind the inverted index is quite simple: given a set of documents, we build a mapping between each available word (or at least, the most interesting ones) and a list of document/line where we can find it.

Implementation

To begin with, you need to setup the skeleton of the job, a quite standard boilerplate:

Basic skeleton of an Apache Beam pipeline

Input

To build an inverted index, together with the input data, you need also some additional metadata. In this very simple example, the input are text files, but the standard TextIO only returns the content of the input dataset (a behaviour that is enough for most use cases), so I had to use the slightly lower-level FileIO API to read, as well as the content, also the filename and the line ID.

The code on the driver looks like:

FileIO usage in the driver code

…but more interestingly, the code for the ReadFileFn looks like:

ReadFileFn implementation. Better logging would be welcome… :-)

It’s particularly interesting to note that FileIO.readMatches() returns FileIO.ReadableFile, and this gives us a lower-level hook on the actual resource, which we will read line by line, alongside the filename and the row ID (in the Metadata class).

From rows to words

The next stage of the pipeline is a simple word splitting: for each word in each line, we emit a key/value that contains the source Metadata object alongside the word:

One further stage is then performed to remove so-called stop words (it is a very simple filter, and I will use it below to explain how to unit test each DoFn in a Beam pipeline) alongside a stage which inverts key and value of the KV object currently travelling through the pipeline.

Building the inverted index

After some I/O and some preprocessing, we are ready to build our inverted index. In order to be as efficient as possible, I decided to use a custom CombineFn, which resembles in many way the aggregate/aggregateByKey of the Spark API. In order to build a CombineFn, we need 3 objects: input, accumulator and output; and 4 functions. In my particular case, accumulator and output are the same object: an Index class (which I have defined as a Map).

This accumulator is helping the Runner (in our case, our local runner) to pre-aggregate the data on the current node before sending the intermediate results to a single location and perform the final computation. This kind of pattern is common in MapReduce (combiner + reducer) and in Spark (aggregate into a partition before performing a shuffle).

Output

Final part of the pipeline is a simple formatting to text and write to file.

Unit test

How can we assure the correctness of our Beam pipeline? The Beam API provides a large set of helper class, as well as a local runner, in order to write unit tests.

In this case, I am going to describe how to setup a unit test for the StopWordRemoveFn class. The idea is very simple: we instantiate our DoFn class, we give some test data in and we check the result.

Unit Test using DoFnTester (deprecated)

However, DoFnTester is deprecated in the 2.4.0 API, and the suggested method is to use TestPipeline, as below:

Unit Test using TestPipeline (currently suggested method)

Other types of unit test can be created to test also interaction between different stages of the pipeline, as well as performing an E2E testng.

What I have learn?

I think the major lesson learnt from writing DoFn is that there is no distintion between map, filter and flatmap: this is quite a strong difference compared to Apache Spark. However, it actually simplify your reasoning about data processing blocks, which are now identified only by the fact that have an input and an output.

Of course, I have used here the official Java API, but very interesting is also the Scio API (Scala adapter), which was inspired by Apache Spark.

Source Code

All the source code referenced in this article can be found at https://github.com/davideanastasia/apache-beam-getting-started

What more?

I am not sure how much interesting this article will attract, but I am willing to get your feedback on what to write next.

Some hints:

Let me know somehow… :-)

--

--

Davide Anastasia

Head of Data at Audigent. Interested in Data Science, Data Engineering and High Performance Computing