Apache Beam fundamentals

Brachi Packter
4 min readAug 13, 2020

--

After almost 2 years into Apache Beam, processing terabytes of data per day, millions of events per minutes, I listed here some terms and technique I learned over the time.

Runtime Exceptions- prevent infinite retries: input elements that cause consistent exceptions will be retried forever, this can lead to a huge lag, of minutes and hours, until the error will be fixed.

Infinite retries can be handles in different ways:
1. Catch and log: catch any runtime exceptions and recover from it, you can log them and add alert when number of error logs cross some threshold.

2. Dead-letter queue: If you read data from PubSub, you can configure dead-letter queue, retry interval, and retry attempts, and then PubSub itself will stop sending events that cause exceptions, and will deliver them to dead-letter queue.

Being said that, you will still want to throw exceptions on transient errors, there is a good chance that they will be resolved after a couple of retries.

Let’s have a look into BigQueryIo, the Beam library for BigQuery operations, it has nice implementation for exception handling, when inserting rows to BigQuery you can define what is the retry policy: alwaysRetry, retryTransientErrors, neverRetry.
In case you choose retryTransientErrors, you can get the consistent failed rows and handle them, log/ dead-letter queue or similar.

See the bellow example:

Counters & Distributions- use for deeper insights: you can add any kind of measure for any action you are doing in your pipeline.
Counters used for showing growing increment, (increment only), distribution will capture a value and will show you its distribution over the time.
For example:
For a step that calls Redis, we can add the latency as a distribution.
In case there are some invalid element, we can add a counter that counts how much invalid elements we have.
Any counter/distribution can be shown in metrics + alerts (StackDriver).
Here is how you create Counter

Here is how you create Distribution

Integration tests- must! In my case I prefer to run tests with DataFlow runner, run them for 3–4 minutes, and then to examine the counters and distributions. for example, if I have counter for invalid data, the test will assert that it is equal zero, and also assert that the counter for valid elements is equal to tests data input size, meaning no invalid data received.

Wall time- catch the killers: in DataFlow you can see for any step the total wall time, find the steps with highest wall-time and try to reduce it, try use profiler for deeper tuning: for more information follow this article, or in short just add --profilingAgentConfiguration=’{ \”APICurated\”: true }’ And than find the profiler graph in StackDriver.

Back pressure- to slow down: In case you pull records and the process can’t handle them in time, you can do some “back pressure” and stop pull for a while until you can catch the spike.

For example, a job that writes data to AWS Kinesis, Kinesis has shards capacity limit, in case the job hit the limit I used to do some back pressure, to slow the Kinesis write operations. (Thread.sleep(1000)and for every retry multiple the thread time by 2x)

Singleton pattern- Don’t mess your code, use Beam pipeline options to create a single instance of a service and get that instance from the options when you need it.
@Default.InstanceFactory(RedisClientFactory.class)
RedissonClient getRedissonClient();
void setRedissonClient(RedissonClient value);
I wrote a post describing it.

Windows- don’t let late arrival to be dropped silently : windows in Beam is complex, I’m not going to explain here what are the kinds of windows and how to use it. just make sure you don’t drop late arrival. in Data Flow you have metrics for it out of the box.

Use these API to configure it:
* .triggering.withLateFirings
* .withAllowedLateness
Better explain it with some code example, in the bellow snippet, we created 1m window, allowing elements to be processed after 10 minutes late, after the window had triggered. Also allowing element before 1 hour to be proceed. Element that arrived after after this allowed lateness will be dropped and the above metrics will count them.

Beam sql- to simplify: pretty known, with SQL queries you can perform join, group-by, aggregation functions and more, it makes the code much readable and maintainable. from their docs: https://beam.apache.org/documentation/dsls/sql/walkthrough/

Side input- For external data : As it named, side, used for side actions you need to do on your pipeline data, for example enrichment, external dynamic configuration and more.
Here I set 10 minute scheduler, (it is just a scheduler, Beam API can be complex sometime…)

And now I apply the side action on that ticks collection, in this case I want to read some config file from Google Storage, later I can get this side input in the DoFn and get the needed configuration.

Tags- use for fork, When element needed to be output to different paths by its type, for example invalid data and valid data.
Or when same element need be send to multiple path.

Making the split:

Transformers- to aggregate several steps together, basically you can look on it as a sub flow, used to make the flow graph simpler.
For Example, BigQueryIO.writeTableRows() is transformer, it does a lot of thing inside that steps, but it is encapsulated inside.
To create your own transformer, implement PTransform, implement it’s single method expand, and add there all sub work you need.

Do you have more? just leave me comment..

--

--