Testing Apache Beam Pipelines with Confidence

Oskar Wickström
Täckblog
Published in
7 min readJan 28, 2020

This article shows how to gain confidence in the correctness of Apache Beam pipelines using property-based testing (PBT). We’ll first look at the system under test (SUT), a simple Apache Beam pipeline, and then define a few testing facilities before we write a property test and find a bug.

We’ll use Kotlin and the jqwik testing framework. There’s nothing tying PBT and Apache Beam to these technology choices. You can perform similar testing with, say, Python and Hypothesis.

Problems in deployed Apache Beam pipelines can be hard to troubleshoot. Find them early! Image credit: https://pixabay.com/photos/water-pipe-leaking-water-drip-880975/

Introduction

Lately at work, we’ve been using Apache Beam and Google Cloud Dataflow to process high-volume data streams for near-real-time analytics. Working with global business data, our pipelines have grown increasingly complicated, processing historical and recent data from multiple time zones.

Suppose one has no way of testing pipelines other than running them on Dataflow. One’s work day might then follow a pattern like this:

  1. Add a new feature
  2. Deploy
  3. Wait
  4. Sift through output, try to verify that it’s correct
  5. Find a problem
  6. Add logging
  7. Deploy
  8. Wait again
  9. Find and fix the bug
  10. Deploy
  11. Yay, it works!
  12. Wake up the next day and find that it crashed miserably at midnight

I’m sure you can relate to this experience if you’ve worked with Apache Beam and streaming data. Luckily, we have ways of testing our pipelines, as described in the project documentation.

With complicated and time-dependent pipelines, an example-based test suite can be cumbersome to implement and hard to maintain. Moreover, it’s hard to achieve good coverage with examples, as we’re limited to the combinations of inputs we can think of and write down. To find bugs early while developing, with less effort and bias, we can use PBT.

If you’re unfamiliar with PBT and want an introduction, you can check out my take, or see the resources linked in the the jqwik documentation. In short, PBT is about testing properties of programs on large varieties of input. Many PBT frameworks, including jqwik, can generate test cases and shrink them down to minimal failing cases.

Running Example: Units Sold

The SUT in this article is an e-commerce application, counting how many of each stock-keeping unit (SKU) that are sold per day. It’s not an actual production system, but a small example of what one might use Apache Beam for.

In our pipeline, both input and output elements are of type UnitsSold:

data class UnitsSold(
val sku: String,
val count: Int,
val lastSoldAt: DateTime
) : java.io.Serializable

For simplicity, we use java.io.Serializable and the non-deterministic SerializableCoder. This is something you should avoid in a real system.

The transform that we’ll test, TotalUnitsSold, aggregates the sum of count, grouped by sku and whatever time windowing that we configure. In our tests, we’ll calculate sales count per calendar day (UTC). We’re intentionally not looking at the implementation, only the signature:

class TotalUnitsSold : PTransform<PCollection<UnitsSold>, 
PCollection<UnitsSold>>

Note that the output elements, the aggregates, are of the same type. The lastSoldAt value is the latest date/time of the aggregated input elements.

Testing Pipelines with Properties

As mentioned before, we’ll use jqwik and PBT to test our pipeline. We won’t, however, test the entire pipeline with actual sources and sinks. Instead, we structure our application logic into transforms that we can test deterministically. If we need non-determinism inside the transforms, e.g. to perform HTTP requests or database queries, we can inject dependencies into the transforms to make deterministic testing possible.

Before we start writing our property test, we need to define some testing facilities. The Input<T> class represents an input element of type T received in the pipeline, or that some amount of processing time has passed.

sealed class Input<out T>data class ElementReceived<T>(val value: T) : Input<T>()data class ProcessingTimePassed<T>(val duration: Duration) : Input<T>()

We define reusable generators for testing pipelines in the PipelineGenerators interface:

interface PipelineGenerators {
@Provide
fun <T> processingTimePassed(): Arbitrary<Input<T>> {
return Arbitraries.longs()
.between(1, 1000)
.map { n ->
ProcessingTimePassed<Nothing>(standardSeconds(n))
}
}
@Provide
fun dateTimes(): Arbitrary<DateTime> {
val min = DateTime(2000, 1, 1, 0, 0, 0, UTC)
val max = DateTime(2010, 12, 31, 23, 59, 59, UTC)
return Arbitraries.longs()
.between(min.millis, max.millis)
.map { millis -> DateTime(millis, UTC) }
}
}

We’re defining them as default implementations in an interface because jqwik uses annotated methods and matching names to select generators for property tests. Using interfaces, we can inherit generators (methods annotated with @Provide) from multiple interfaces. Furthermore, we can override specific basic generators upon which more complicated generators rely.

Next, if we can generate a list of inputs, we need a way of converting that to a TestStream:

inline fun <reified T : java.io.Serializable> List<Input<T>>.toTestStream(
getEventTime: (T) -> DateTime
): TestStream<T> {
val coder = SerializableCoder.of(T::class.java)
return this.fold(TestStream.create(coder)) {
stream, input ->
when (input) {
is ElementReceived<T> -> {
stream.addElements(
TimestampedValue.of(
input.value,
getEventTime(input.value)
.toInstant()
)
)
}
is ProcessingTimePassed -> {
stream.advanceProcessingTime(input.duration)
}
}
}.advanceWatermarkToInfinity()
}

Another thing we need is a way to extract the input elements (values of type T) from a list of inputs:

inline fun <reified T> List<Input<T>>.values(): List<T> =
this.flatMap { input ->
when (input) {
is ElementReceived<T> -> {
listOf(input.value)
}
is ProcessingTimePassed -> {
emptyList()
}
}
}

In order to run our transform with a TestStream, we need a TestPipeline. We define a helper for running an action with a TestPipeline passed as an argument:

fun withTestPipeline(action: (TestPipeline) -> Unit) {
val p: TestPipeline = TestPipeline
.create()
.enableAbandonedNodeEnforcement(false)
action(p)
p.run()
}

Finally, when we run the TestPipeline, we get back a collection of output elements. Using withElements, we can apply them to a consumer function:

fun <T> PCollection<T>.withElements(f: (Iterable<T>) -> Unit) {
PAssert.that(this).satisfies { elements ->
f(elements)
null // sigh
}
}

In our tests, we’ll use withElements to perform assertions based on the output elements.

The Property Test

Enough ceremony, let’s write our property test! First, we need a generator for lists of UnitsSold inputs:

@Provide
fun inputs(): Arbitrary<List<Input<UnitsSold>>> {
val unitsSold = Combinators.combine(
Arbitraries.strings()
.withCharRange('a', 'f')
.ofLength(1),
Arbitraries.integers().between(0, 10),
dateTimes()
).`as`(::UnitsSold)
val elementsReceived: Arbitrary<Input<UnitsSold>> =
unitsSold.map { i -> ElementReceived(i) }
return Arbitraries.oneOf(
processingTimePassed(),
elementsReceived
).list().ofMinSize(1).ofMaxSize(20)
}

We constrain the list of inputs to a maximum of 20 elements. Testing pipelines is quite slow, so we try to keep down the number of elements, while still allowing the generator to produce interesting cases.

As a test oracle, we’ll use a model, which is a simplified functional version of the Beam pipeline:

object Model {
fun totalsPerSku(
inputs: List<Input<UnitsSold>>
): List<UnitsSold> {
return inputs.values()
.groupBy {
Pair(it.sku, it.lastSoldAt.withMillisOfDay(0))
}
.flatMap { (group, unitsSold) ->
val (sku, _) = group
if (unitsSold.isEmpty()) {
emptyList()
} else {
listOf(UnitsSold(
sku,
unitsSold.map(UnitsSold::count).sum(),
unitsSold.map(UnitsSold::lastSoldAt).max()!!
))
}
}
}
}

Our property test is configured to try 100 random examples, also for performance reasons. It accepts a list of inputs from our generator, and runs with a test pipeline:

@Property(tries = 100)
fun `accumulates units sold and the latest event time`(
@ForAll("inputs") inputs: List<Input<UnitsSold>>
) = withTestPipeline { p ->
// Defined below...
}

We use daily windows, with early triggers fired at an hourly basis

val hourlyWindows = Window.into<UnitsSold>(CalendarWindows.days(1))
.triggering(AfterWatermark
.pastEndOfWindow()
.withEarlyFirings(
AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardHours(1)))
.withLateFirings(Never.ever()))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(1))

We create our test stream from the generated inputs, using lastSoldAt as event time, and apply our windowing and our transform:

val output = p.apply(inputs.toTestStream(UnitsSold::lastSoldAt))
.apply(hourlyWindows)
.apply(TotalUnitsSold())

Finally, we calculate the expected results based on the model, and check that they’re equal:

val perSku = Model.totalsPerSku(inputs).toSet()
output.withElements { unitsSold ->
assertThat(unitsSold.toSet()).isEqualTo(perSku)
}

Results

This is a simple example, but when running the tests we find a real bug:

Expecting:
<[UnitsSold(sku=a, count=0, lastSoldAt=2000-01-01T00:00:00.000Z), null]>
to be equal to:
<[UnitsSold(sku=a, count=0, lastSoldAt=2000-01-01T00:00:00.000Z)]>
but was not.

Looking at the minimal sample (formatted for ease of reading), we try to figure out the cause of this mismatch:

sample = [
[
ElementReceived(
value=UnitsSold(
sku=a,
count=0,
lastSoldAt=2000-01-01T00:00:00.000Z
)
),
ProcessingTimePassed(duration=PT913S),
ProcessingTimePassed(duration=PT24S),
ProcessingTimePassed(duration=PT446S),
ProcessingTimePassed(duration=PT289S),
ProcessingTimePassed(duration=PT136S),
ElementReceived(
value=UnitsSold(
sku=a,
count=0,
lastSoldAt=2000-01-01T00:00:00.000Z
)
),
ProcessingTimePassed(duration=PT10S),
ProcessingTimePassed(duration=PT39S),
ElementReceived(
value=UnitsSold(
sku=a,
count=0,
lastSoldAt=2000-01-01T00:00:00.000Z
)
),
ElementReceived(
value=UnitsSold(
sku=a,
count=0,
lastSoldAt=2000-01-01T00:00:00.000Z
)
),
ProcessingTimePassed(duration=PT766S),
ElementReceived(
value=UnitsSold(
sku=a,
count=0,
lastSoldAt=2000-01-01T00:00:00.000Z
)
),
ProcessingTimePassed(duration=PT19S),
ProcessingTimePassed(duration=PT959S)
]
]

Wow, that’s a lot input values! How’s that minimal? Well, if we sum the seconds of processing time passed, 913+24+446+289+136+10+39+766+19+959, we get 3601. One hour and one second. This is obscure, indeed, but shows that when a trigger fires and there are no elements to reduce into a single sum in that pane, the output is null.

If we filter out nulls in our transform and run the tests again, all tests pass.

This is not only an actual bug that I found when writing this article. It’s the exact same bug we’ve found at work after having deployed jobs to Dataflow.

Summary

We’ve seen a simple Apache Beam pipeline and learned how to test it using PBT. We found a bug in the transform by generating test cases and shrinking them to smaller, yet failing, test cases. Before I wrap up, I’d like to leave you with some parting thoughts and tips for further reading.

Speed, Size, and Count

The balance between test speed, sample size, and number of samples is tricky when testing pipelines using properties. In development, we want quick feedback, but in CI or in nightly builds we might allow for test runs to take longer. We could override the jqwik configuration for different test environments.

To increase coverage, even with a small number of samples, we can use statistics to improve our generators. I wrote a longer tutorial about coverage and PBT called Time Travelling and Fixing Bugs with Property-Based Testing.

Using Models

Using a model is a double-edged sword. While it can precisely define the expected behaviour of our system, it often leads to duplicating a lot of the logic in the SUT.

Notably, these tests do not advance the watermark. This is because our model doesn’t specify what happens when data arrives late. We could expand our model to cover such scenarios, but it would make our test code a lot more complicated. We’d essentially reimplement some of Apache Beam in our model.

Another approach would be to not use a model, and try to define properties that specify the system more abstractly. See Choosing properties for property-based testing for more inspiration.

Takeaway

Testing pipelines allows you to gain confidence in your pipeline code before deploying. Use property-based testing to find corner cases you can’t come up with, or that are too cumbersome to write manually.

Happy deploying!

--

--