Testing in Apache Beam Part 1: Batch

Anton Sitkovets
The Startup
Published in
5 min readOct 22, 2019

Source code referenced in article can be found here: https://github.com/papaizaa/apache-beam-examples/tree/master/src/main/java/com/papaizaa/batch_example

Introduction

Apache Beam is a unified model for defining both batch and streaming data pipelines. You can use Beam to write Extract, Transform, and Load (ETL) tasks to process large data sets across many machines. Its SDK and use cases are very similar to Apache Spark and Hadoop.

But one place where Beam is lacking is in its documentation, it is very difficult to find examples of how to effectively write end to end code for all the features outlined in its Beam Programming Guide. An even bigger hole in its documentation is examples of how to write unit tests in Beam using its PAssert module. In this series I hope to share my experiences and insights into how to write unit tests in Beam for both Batch and Streaming.

Example Main Pipeline

For this example, we are going to run a batch pipeline on data coming from BigQuery. We have two BigQuery tables: BookSales and GrocerySales. Both are partitioned by a date_time column and contain UserID and Price columns.

Every week we want to run a batch pipeline to aggregate the total sales for each user between books and groceries.

For this tutorial, we will ignore the Beam SDK’s built in aggregation functions and do everything by hand to illustrate how to do a join on two tables and how to write unit tests for these ParDos. Fundamentally, we want to read from multiple BigQuery tables, join the data, do a transformation and write the results to BigQuery. The pseudo code can look like this:

bookDataForWeek := ReadBookBigQueryTable() -> ParseTheTableEntry() -> GroupByUserID()groceryDataForWeek := ReadGroceryBigQueryTable() -> ParseTheTableEntry() -> GroupByUserID()totalPurchasedPerUserPerWeek := JoinByUserID(bookDataForWeek, groceryDataForWeek) -> SumAllPurchases() -> WriteToBigQueryTable()

You can follow along to this example here: https://github.com/papaizaa/apache-beam-examples/tree/master/src/main/java/com/papaizaa/batch_example

Now let’s translate this pseudo code to actual Beam code.

com.papaizaa.batch_example.BatchExampleMain.java

When it comes to unit testing Beam code, the key things to test are the ParDo functions as well as their interactions from one ParDo to the next. Therefore, we will take a look at the ParDo functions and then start writing some tests.

com.papaizaa.batch_example.ParseTableData.java
com.papaizaa.batch_example.JoinEvents.java

Some things to note:

  1. In ParseTableData, we first check if the column name returns anything from the TableRow object. This is because BigQuery column entries are nullable, and if the column value is null in the table, the TableRow object will give you a null object. Hence we check if the value is null and if not, cast the return of element.get to the expected type.
  2. In our case, if either UserID or Price are null, we do an early return. This is a way of filtering data from being passed on to the next steps in the pipeline.
  3. In JoinEvents, we have to use a TupleTag object for the constructor of JoinEvents. A TupleTag object should be defined for every object that you apply the CoGroupByKey operation on. In our case we will have two instances of TupleTag for book and grocery sales. The output of a CoGroupByKey returns a PCollection<KV<K, CoGbkResult>>, which groups values from all the input PCollections by their common keys. You can access a specific collection in an CoGbkResult object by using the TupleTag that you supplied in the ParDo’s constructor.
  4. When it comes to CoGroupByKey operations it is possible to have situations where there is an element for the given key in only one of the supplied TupleTags. For instance, the user might have bought groceries that week but not books. When trying to retrieve the iterable of books in the ParDo, we can do c.element().getValue().getOnly(booksTag, null). This says that if there is no value in that TupleTag instance, we can default it to a null object.
  5. In BatchExampleMain, you may notice the usage of the PCollectionView object. This is a technique to pass constants or small data structures that can fit in memory to the rest of a pipeline. This constant parameter is called a side input and it will be passed to every call of the ParDo. This is a useful technique to pass objects to the rest of the pipeline, and only creating the object once at the start of the Main. In our case we made the dateStart and dateEnd as side inputs so that we wouldn’t have to create the DateTime objects on every call to JoinEvents. To pass a side input into a ParDo you must add the PCollectionView as a parameter to the constructor as well as call the withSideInputs function on the ParDo declaration.

Unit Testing in Apache Beam

When it comes to testing Beam pipelines, it is very similar to typical unit testing but instead of using JUnit you will have to get used to using Beam’s testing framework called PAssert. As in any unit test, you generate a set of inputs and check if your expected output matches the actual output of the function you are testing.

Some things you may notice in this test.

  1. We define an object of type TestPipeline at the top of the class. This is a an equivalent class to the standard “Pipeline” which was used throughout the main code. An important thing to remember is that you always need to define the testPipeline as a static global variable within a test class, and you should always reuse the same object for all tests in a class.
@Rule
public final transient TestPipeline testPipeline = TestPipeline.create();

2. At the end of each test, the run() function is called on the testPipeline object. Beam won’t execute the specified steps until the run function is called. The steps you are defining with “apply” calls are an execution plan, and are only executed when specifically told to, this is called Lazy Execution. This same requirement is also true for Beam unit tests.

3. The output of a pipeline is evaluated with a containsInAnyOrder function. Unlike typical unit testing of functions, where you can expect an exact ordering on the execution of the input, Beam does not guarantee order. Hence, we cannot presume to know the order in which the input will be executed. PAssert only asserts that it knows what is in the output, but not the order.

4. When we define the input to the pipeline, we also need to make sure to convert the input object into a PCollection of that object. Hence the first step in your test pipeline should always be to call the Create function. Beam steps can only be executed on PCollection objects.

int x = 4;
PCollection<Integer> xPcollection = testPipeline.apply(Create.of(x));

Now lets take a look at the JoinEventsTest class

5. As discussed above we need to make our startDate and endDate into side inputs to the JoinEvents ParDo by wrapping the objects as PCollectionView instances. We also need to create TupleTag objects for books and groceries to use with the CoGroupByKey transformation.

This test is a good example of how you can test your entire end to end pipeline very easily in Beam. Here we have tested all the functionality of our pipeline except the BigQuery read and write, which we abstract away by assuming these services always work.

For more details on Apache Beam testing here are some more resources: https://beam.apache.org/documentation/pipelines/test-your-pipeline/

Look out for Part 2 of this series which will focus on testing Streaming pipelines in Beam.

--

--