Apache Beam . Deep Dive series Episode 1

Prag Tyagi
towardsdataanalytics
10 min readApr 25, 2021

Apache beam the latest open source project of Apache is a unified programming model for expressing efficient and portable Big Data pipelines.How ?

i)Unified : 1 unified API to process both Batch and Streaming data

Batch + Stream → Beam

ii)Beam pipeline once created in any language can be able to to run on any of the execution frameworks like Spark, Flink , Apex , CloudDataFlow etc.

It was started in 2016 and has become top level project for Apache.

Apache Beam Co-existence with other Big Data Components

It was has been developed by Google . Usually Google releases Whitepaper on lot of distributed data engineering systems . That’s how Hadoop came into existence . But this time they went ahead with developing full fledged component ,some thing which is engine agnostic and that’s how Apache Beam came into existence.

Apache Beam Architecture

Forgive me for bad power point skills . But the intention here is to showcase how does this processing agnostic component works. So you have a Runner API which helps Beam code to run into respective execution engine like Spark or Flink etc etc. Also there is a Function API which provides you the flexibility to code in the languages which are supported by Beam .

Now the introduction is complete . Let’s start getting into the skin of Beam .

Apache Beam’s Basic terminologies

Pipeline : A pipeline encapsulates entire data processing task,from start to finish.Includes reading input data,transforming that data and writing output data.

PCollection :A PCollection is equivalent to RDD of Spark. It represents a distributed data set that our Beam pipeline operates on.

Pcollection = Bounded + Unbounded

PTransform : PTransform represents a data processing operation or a step in our pipeline. Ex .ParDo ,filter,flatten,combine etc.

Now let’s see how Pcollection works

Pcollection characterisitcs

i)Immutability: Pcollections are immutable in nature .Applying a transformation on a Pcollection results in creation of another Pcollection.

ii)Element Type: The elements in a Pcollection may be of any type , but all must be of the same type

iii)Operation Type: Pcollection does not support grained operations.We can not apply transformations on some specific elements in a Pcollection

iv)TimeStamps: Each element in a Pcollection has an associated timestamp with it.

Unbounded Pcollections -Source assign the timestamps

Bounded Pcollections -Every element is set to same timestamp.

From here on let’s go in with the approach of hands-on and concepts

All the demos are being prepared using google Colab platform . Beauty of this platform is that you just need a web browser . No need to have any infrastructure and you should be good to go.

All the unix commands should be typed starting with ! in the beginning and you are good to go . Below command helps you to upload a file from local system to Google Colab .

Important Steps in Apache Bean workflow

i)Create a pipeline and give it a name

ii)Initiate a Pcollection by reading data from a source

iii)Apply Ptransforms on the data

iv)Write the processed Pcollection to a sink

v)Run the pipeline

Steps in Apache Beam workflow

Now Pcollection has lot of read transforms to read data from source . Since it support multiple source systems to read data from , for each source there is a separate Read transform. Let’s get into details of these .

ReadFromText : Parses a text file as newline delimited elements i.e it reads the file line by line and every line is a single element in Pcollection

Parameters :-

i)Mandatory parameters :file_pattern(str)

ii)Other parameters:

min_bundle_size(int) — bundles are nothing but partitions

compression_type(str)-helps in data compression and space optimization

strip_trailing_newlines (boolean)

validate(boolean)

skip_header_lines(int): Useful to remove headers from flat files .

ReadFromAvro() : Used to read Avro files

i)Mandatory parameters :file_pattern(str)

ii)Other parameters:

min_bundle_size(int) — bundles are nothing but partitions

validate(boolean)

use_fastavro(boolean)

ReadFromPubSub() :Used to read messages from Google PubSub service.

i)topic(str)

ii)subscription(str)

iii)id_label(str)

iv)with_attributes (boolean)

v)timestamp_attributes()

Source Systems for Read transformations :

i)Apache Kafka

ii)AWS Kinesis

iii)JMS

iv)MQTT

v)GCP PubSub

Ok So enough of theory . Let’s start with the hands-on part

Before that let’s look at our data set . We will be using department data and location data as sample data for our demo.

Demo 1

We will print a sample list of numbers .

Pipeline run should be at the start of the line . Any change in indentation will result in error .

Demo 2

Now let’s handle a dictionary data in Apache Beam

Now how about writing data back into sink post transformation . Let’s get some of the that .

Different Types of write transformations :-

1)WriteToText :- Writes each element of the Pcollection as a single line in the output file .

Parameters :- file_path_prefix(str),file_name_suffix(str),num_shards(int),append_trailing_newlines(boolean),coder(str),compression_type(str),header(str)

2)WriteToAvro:- Writes each element of the Pcollection to Avro file .

Parameters :-

file_path_prefix(str),file_name_suffix(str),num_shards(int),schema, codec,compression_type(str),use_fastavro(str),mime_type

3)WriteToParquet :- Writes each element of the Pcollection to Parquet file .

Parameters:-

file_path_prefix(str),file_name_suffix(str),num_shards(int),codec,mime_type,schema,row_group_buffer_size,record_batch_size

4)WriteToPubSub :- Writes each element of the Pcollection to Google cloud PubSub service.

Parameters:-

topic(str),with_attributes(boolean),id_lable(str),timestamp_attribute(int)

Hands-on Resumed ….

Demo 3

Now here what I am doing is that I am reading data from a sample file dept_data.txt and then writing the same data to my data folder in a new file .

Post that I am using unix shell command to print the result . Since I have not mentioned the no of bundles(i.e partitions), by default it will divide the data into a specific no of bundled. And then I am fetching values from the 1st bundle.

Demo 4

Now using user defined function and calling the same in Map API I get the following result where we split each element by ‘,’ . So here we are getting one element for each element in response .

Demo 5

Now let’s see how this will behave with a Flat Map . For one element we will get multiple elements .

Demo 6

Now let try to apply a filter transformation here where in we will try to get the results of employees from accounts department only . So here I have created a custom UDF to filter data and passing that into beam p collection to apply on the data set so that we get records for only accounts department.

Demo 7

Now the use case is that I need to find the attendance of employees from accounts department . So here I have applied a lambda function so that I can implant 1 no against each element to perform a reduce by operation . Some people who have worked with spark RDD might find this a familiar aggregation pre step.

I hope you will get the feel of Spark RDD aggregation operations.

Demo 8

Now I will aggregate the above scenario data using CombinePerKey .

Demo 9

Now we can see that the above code has become too large with lot of UDF. So now let’s remove these UDF and see the equivalent implementation using lambda functions. Life becomes easy and your code more readable.Ideally try to use as much lambda functions as possible.

Demo 10

What if I want to eliminate run() method to execute my pipeline and want to run a unified one with declaration assistance from top . Let’ see that . In that case we need to change the declaration of pipeline using with syntax . One strange thing is that if we want this type of pipeline implementation then the p collection declaration indentation you need to push back by 2 places .

Demo 11

Now let’s add more classifier comments to our code . This will help to troubleshoot the code in case of any issues and make your code more user friendly and easily readable .Classifiers are nothing but a syntactical way of mentioning your comments in the Beam code.It makes your code more readable and easy to troubleshoot.

So finally we achieved this type of pipeline as of now .

Now what if we want to calculate attendance of employee from multiple departments at the same time . To serve such a scenario we need to go with branched pipelines .

Demo 12

So here what we have done is that we branched the pipeline and created parallel workflows in the same pipeline . In real world scenarios this is how codes are written in a Beam pipeline with multiple parallel work flows with each workflow revolving around a specific PCollection.

Demo 13

Now let’s add their consolidated output into one single file in place of writing it into different files . This will avoid small file issue from Beam perspective. To achieve this, we need to use Flatten API . This API accepts a tuple of P collections and returns merged output of these P collections.

ParDo Transform

Par Do transform :- This is a super class of Map and filter Map transformation API. It can be used to perform any transformation on P collection data elements . By default it returns multiple output for a single input . But if you want 1 element in correspondence to one input element convert the parsed data into a list . How this all works let’s see .

Demo 14

As we can see that by default ParDo uses DoFn class object internally .

Demo 15

We can pass lambda into ParDo function . How let’s see . Now here if you can see that I have not used any custom class for generating DoFn class object for Pardo . If you don’t do that then Pardo uses internal DoFn object just like here in lambda implementation .

Demo 16

Now let’s try the implementation of Pardo in our previous case of Branched P collections where we are finding attendance of employees for accounts department .

Demo 17

Composite Transformation :- Now what issue you see in the below code ?

If you notice then for both accounts and hr p collections we are repeating the same code for Combining , filtering and then publishing the results . This can be avoided if we have a common functionality which can be called in both of these p collections to help us out with this issue . That is where composite transformations come into play . Now how they make your life easier . Let's find out .

Here if you see what we done is that we created a class MyTransform and in that we have inherited class PTrasnform of Beam class . Now this Ptransform class is the parent class of all transformations in Apache Beam . Now in this we have to overwrite the expand method . This expand method takes the input collection from respective branched p collections as input and then you need to mention all the transformations which you need to implement on the input P collection . All the repetitive transformations can be mentioned here . Once done here , you can call this MyTranform class object in any P collection transformations just like it is happening in accounts and HR p collections.

CoGroupByKey

Relational join of two or more key/value Pcollections . It accepts a disctionary of key/value Pcollections and output a single Pcollection containing 1 key/value Tuple for each key in the input Pcollections.

Demo 18

From the output you can see that the output is coming in the format of Dictionary. Well Beam is still evolving and more optimizations are coming via open source community . So stay tuned for better data handling in joins . As of now from this step onward if we need any meaningful data we need to further parse this dictionary data.

With this we have come to an end of episode 1. This is going to be a 5–6 episode series . So stay tuned for next episode…

Post Credit :- Now in Episode 2 we will work out on a case study on detecting bank fraud using Apache Beam in real time .

--

--

Prag Tyagi
towardsdataanalytics

Senior leader and a technologist having 14+ years of experience in Data Analytics. Passionate to share new concepts and learning in Data Analytics domain.