Apache Beam- Unified API for Data Engineering

Souvik Ghosh
Nov 4 · 8 min read

Tons of non-tech organisations have recently surged their interest in real-time/batch big data processing specially banking, retail and insurance and typically experts are flown in to help them understand their jungle of databases storing data from the ages.

What happens next will definitely NOT surprise you. There is already a very large ecosystem of big data processing tools and technologies and varying on the kind of experts you have in your company, their opinion will vary. Add to that the surge of cloud computing and specially these good ol’ giants moving towards some amazing cloud solutions like Azure, GCP or AWS probably in that order, you are looking at a cost surge with many not understanding why such massive complexity is actually necessary, great era for resume facelift!! We pour in millions into such extensive initiatives to deliver some rather simplistic outcomes as one would say embarrassingly parallel(love this line). Some things are so simple and probably fundamental in computer science, you don’t need to bring out the big guns to the battle.

I continue to remind you, i am talking about non-tech corporations like banks, insurance companies and to an extent brick and mortar retail agencies. Why, because i am a home-grown butterfly from one of them. 😃

What is Data engineering?

Just to give you a very naive(borderline stupid) explanation, Data Engineers in most of such organisations are developers who work in moving data from one source to another target typically providing transformed, very useful and most of the times clean datasets to Business Intelligence(BI) and lately Advanced Analytics where there is an ever growing need or hunger for more and more data.

The Problem Statement

In fact what i continue to observe is that we somewhat push ourselves or being pushed a bit by our experts towards these big data ecosystems(Hadoop, Spark, Flink…) we end up writing a spaghetti of code in variety of languages(Python, Scala, Java, C#) trying to define what kind of datasets to prepare for our beloved Data Analysts and Data scientists. Under the hood these data pipelines end up being complicated, unorganized and frankly a bit messy. Some of them are sometimes so outrageous, you end up thinking, what has been going on here!! 🙀

Hiring good data engineers also becomes a bigger challenge as some of them are adept in one technology which you push yourself into while in the meantime the technology starts to die(Hadoop is dying! Deal with it), a new breed of engineers come up and all they know are Streams yet you are either behind or you don’t even realize, what kind of race you are into.

There you go!! This is what i wanted to rant about. This is what led to my frustration and i thought, well i can use twitter and talk about it in 280 words or write some code, get down to business and try and advocate, how to get untagle those wires. After some soul searching, i end up finding an interesting toolkit, of course from the best minds at Google.


Apache Beam — the unifying force

So you probably have heard about Apache beam if you are an GCP user or have tinkered with the possibility of understanding what it does. I had the same curiosity, and ended up spending my weekend trying to understand it more in detail. I think i still need to dig in a lot than just a weekend but i needed some start.

I have been programming in python for at least a couple of years now, so if i search for anything, my immediate afterthought, does it have a python SDK, although Apache Beam does have a python SDK, it is not as mature as i was hoping, so for the time-being, I have to revisit my frenemy (Java). One must not forget where they come from!! 😱

The Project

I wanted to do some thing simple, as i don’t want to spend my entire weekend(bit of a long one) staring at my laptop. I decided to take up some tweets of none else than Sēnor Presidente El Donald Trump

Not trying to be political because I am not American, so i don’t care too much about their politics but just because there is so much information available about him online that it is just so easy to download a collection of his tweets. 😆, so that’s what i did.

#thankyoukaggle — https://www.kaggle.com/kingburrito666/better-donald-trump-tweets

Now the next step was to build the beam pipeline, but also what am i supposed to do with these tweets.

I decided to simply do the following:

  • Read a CSV of DJT’s tweets
  • Remove error lines
  • Create a model with the tweets itself
  • Split each tweet into words
  • Remove stop words
  • Count most frequent words
  • Export CSVs of the following
  • Load them in python(because), create cool visualisation.(We’ll see about that some other day perhaps)

Make it a standard and test it out with the various runners, notably Flink-Local and Spark-Local because i am not sponsored to write this and i am not going to spend $$$ on setting up a cluster etc etc.

Walkthrough

Let’s walk through the code, step by step.

Step 1

First the easy part, If you are using * nix/MacOs, make sure you have the essentials

$ Java --version
$ brew update
$ brew install maven

Next up, I simply cloned the example from apache-beam website for java quickstart. Do the same

Let’s now start with the pipeline.

Step 2

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);

Pass on the options to create the pipeline, you have a variety of options to choose and you can extends to the one, you have to set yourself and it is passed on like an args parser. These options is where you can decide whether you want the pipeline to use one of the runners

Take a look at this for the compatibility matrix of which runner is available and what can you do with it. It is still a work in progress.

Step 3

Next up, Let’s start by importing the csv of DJT’s tweets.

PCollection<String> tweetobj = p.apply("Read Tweets", TextIO.read().from("data/Donald-Tweets!.csv"));

PCollection — https://beam.apache.org/documentation/programming-guide/#pcollections

Potentially distributed abstract multi-element dataset

Step 4

Our next step in the pipeline is to filter out the empty lines so we don’t have any issues later.

PCollection<String> tweetobj1 = tweetobj.apply(“Remove empty lines”, Filter.by((String line) -> !line.isEmpty()));

Let’s use a lambda function, where I pass the tweetobj object, and filter out lines which are not empty.

Step 5

Next up, loading put data to my model. For the simplicity, i have taken basically two data types

The reason for Serialization here is because when the data is distributed across the cluster, Apache beam jumps back and forth between different nodes through serialization.

PCollection<tweetModel> tweets = tweetobj1.apply("LoadTweetstoTable", // the transform nameParDo.of(new DoFn<String, tweetModel>() { // a DoFn as an anonymous inner class instance/****/private static final long serialVersionUID = 1L;@ProcessElementpublic void processElement(@Element String line, OutputReceiver<tweetModel> out) {String[] strArr = line.split(",");tweetModel tm = new tweetModel(0, "");try {tm.setTweet(strArr[2]);} catch (ArrayIndexOutOfBoundsException e) {tm.setTweet("no tweet");}out.output(tm);}}));

Few important elements to note here:

We are creating a new PCollection of type TweetModel, meaning each line of tweet is now wrapped with the TweetModel where we set the Year and Tweet. I hope you get the idea, most obvious next step from here would be to load these tweets into a database for further NLP by a data scientist, but perhaps we are interested in more preprocessing because maybe we need the pipeline to iron-out some more stuff before having a final output.

Another thing to note here is ParDo(Parallel Do), quite simply do the processing in parallel utilizing the cores/various nodes depending on your runner, (we will see about that later)

Step 6

Our next step is to create a set of tokens, here we will simply use whitespace tokenisation.

PCollection<String[]> words = tweets.apply("Create Tokens", // the transform nameParDo.of(new DoFn<tweetModel, String[]>() { // a DoFn as an anonymous inner class instance/****/private static final long serialVersionUID = 1L;@ProcessElementpublic void processElement(@Element tweetModel tweet, OutputReceiver<String[]> out) {String[] strArr = tweet.getTweet().split("[^\\p{L}]+");out.output(strArr);}}));

quite simply. each tweet of the TweetModel Dataframe here will get converted into an array of words.

Step 7

Removal of stop words, As typical of any NLP pipelines, since we want to do word count from all the tweets, we would need to remove the stop words in english which do not carry so much weight in a sentence.

PCollection<String> words1 = words.apply("Remove Stopwords", // the transform nameParDo.of(new DoFn<String[], String>() { // a DoFn as an anonymous inner class instance/****/private static final long serialVersionUID = 1L;String ar;@ProcessElementpublic void processElement(@Element String[] tokens, OutputReceiver<String> out) {try {List<String> stopwords = Files.readAllLines(Paths.get("data/english_stopwords.txt"));for(String word : tokens) {if(!stopwords.contains(word)) {ar += " ";ar += word;}}} catch (IOException e) {e.printStackTrace();}out.output(ar);}}));

Step 8, Step 9…

Keeping the other parts short, the goal is to output the word count and it is achieved with the following code. I am going to skip over this section because it is mostly generic. What would be further interesting will be the execution model

PCollection<String> words2 = words1.apply(FlatMapElements.via(new SimpleFunction<String, List<String>>() {/****/private static final long serialVersionUID = 1L;public List<String> apply(String line) {return Arrays.asList(line.split(" "));}}));PCollection<KV<String, Long>> count = words2.apply(Count.perElement());count.apply(MapElements.into(TypeDescriptors.strings()).via((KV<String, Long> wordCount) ->wordCount.getKey() + ": " + wordCount.getValue()))// Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.// TextIO.Write writes the contents of a PCollection (in this case, our PCollection of// formatted strings) to a series of text files.//// By default, it will write to a set of files with names like wordcounts-00001-of-00005.apply(TextIO.write().to("data/wordcounts").withSuffix(".csv"));

Step 10

Running the pipeline simply means

p.run().waitUntilFinish();

That’s it, now this generic Beam pipeline can run anywhere

You can now run this code using various methods

An example to run it on the Direct Runner(For local testing purposes only)

mvn compile exec:java -Dexec.mainClass=tweetAnalysis.App \
-Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner

You can run the same on your spark local mode

mvn compile exec:java -Dexec.mainClass=tweetAnalysis.App \
-Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner

Conclusion

I hope this gives you an idea why it is important to create a more formatted Data pipeline, understandable and most importantly maintainable by a great team of Data engineer without jumping around the big data technologies too much. As large non-tech organisation, our core métier is not to focus on such technological advances but rather focus on consistency, scalability and robustness of our workloads which has kept us running for 100s of years.

You will find the code example here:

LinkedIn: https://www.linkedin.com/in/souvik-ghosh-aaa30470/

Stay Tuned

For what i will do with the dataset of word counts — let’s see in the next article, which i promise i will write. 😅

Souvik Ghosh

Written by

AI enthusiast||Conversational AI||ML Engineer

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade