Creating a Data Pipeline with Apache Beam

Photo by Yiran Ding on Unsplash
This is part 3 of a four part series that explores the creation of a data pipeline. So far, we created a data source with the Twitter Streaming API in part 1, and stored the collected tweets to Cloud BigQuery by sending them through Cloud PubSub in part 2. Now, we’ll learn about Apache Beam and how to use it to make this flow of data more interesting.

All the code we’ll write in this tutorial can be found on my Github. Go to the pub-to-big branch if you want to get the code.

Why Beam?

Beam is used by Spotify! I first found out about Beam through their blog, and if a big data shop like Spotify uses this (or any technology) then there must be something to it. And it was created by Google.

Google implemented into Beam’s architecture the “unified model”, which allows the code to switch from batch to streaming mode with minor code changes.

In other words: Beam allows the pipeline to execute on data coming in, in real-time, or on data that is already stored, and to transition between these two different types of inputs with relative ease.

The fundamental feature of Beam is that it supports data parallelism. In the world of big data, there are many applications that are considered “embarrassingly parallel”, where an operation can be chunked into many small sub-pieces that do the same thing.

For example, associating a number with either blue or red (depending on if it’s even or odd) can be done by first dividing the collection of numbers and sending each smaller group of numbers to different processing units. Each processing unit can then independently compute the corresponding color of their sub-group of numbers in parallel. At the end, all the processing units (or nodes) bring together the solution by communicating their results.

This division of work and communication between nodes is essentially what Apache Beam does!

Dividing the work like this is necessary because when the size of data reaches the 100 millions and the computation is more complex than a simple “if-else” like in our example, one node can’t take on the entire work load on its own.

So, architectures will employ horizontal scaling, where you have more nodes working together to solve a problem. This is what Apache Beam allows us to do — solve problems. 😉

Our Problem

Currently, our flow of information looks like this — we interact with the Twitter Streaming API until a certain amount of tweets is collected, and then we use the Google Cloud Platform to store them.

We could have simply stored the tweets to our computer, and actually, in part 1 all we did with the tweets was print them to standard output. However, by using Google Cloud PubSub, we have the capability to channel this stream of tweets to different destinations — not just Cloud BigQuery like we did in part 2.

In this post, we’ll send the tweets to Apache Beam and run some interesting computations on each tweet.

We want to find out which is the happiest country on Earth. This has been our objective from the start — can we use Twitter to answer this common question?

So, with our pipeline, we’ll run sentiment analysis on every tweet, group the tweets by country, find the average sentiment in the group, and store these results to Cloud BigQuery.

We’re basically sandwiching in Apache Beam between what we did in part 1 and 2. 🍔

Getting Started

I bootstrapped the pipeline using Beam’s “word-count example”. The startup project is very useful because it sets up the pom.xml for you, and if you scroll down the link, there are commands for running the pipeline with different runners.

The Dataflow and Direct runner are the ones we’ll be using. Basically, they allow the pipeline to compute on the Google Cloud Platform or on your computer, respectively.

Add these dependencies to the pom.xml:

<!-- Adds adependency to the StanfordCoreNLP modules -->
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
</dependency>
<dependency>
<groupId>edu.stanford.nlp</groupId>
<artifactId>stanford-corenlp</artifactId>
<version>3.8.0</version>
<classifier>models</classifier>
</dependency>
<!-- Adds a dependency on Apache Commons Math3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<!-- Adds a dependency on Google's JSON Encoder/Decoder -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.2</version>
</dependency>

We’ll be using Stanford’s CoreNLP library for our sentiment analysis.

Authentication

The Google Cloud Platform is a McLaren, and we need the keys if we want to get in and feel the luxurious power.

So, this is what we’ll do. Follow the instructions in the “Obtaining and providing service account credentials manually” section.

Pipeline Config

Also, lets change the code’s Pipeline Options to this:

public interface Options extends PipelineOptions {
@Description("Pub/Sub topic to get input from")
@Validation.Required
String getInputTopic();

void setInputTopic(String value);

@Description("Pub/Sub topic to send output to")
@Validation.Required
String getOutputTopic();

void setOutputTopic(String value);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(Options.class);
Pipeline pipeline = Pipeline.create(options);

pipeline.run().waitUntilFinish();
}

The naming convention allows us to set flags in the terminal when the pipeline is run. We can then access them through the PipelineOptions. Lets see this in action.

First, lets set the pipeline’s input to come from a PubSub subscription.

Pipeline pipeline = Pipeline.create(options);
pipeline.apply(PubsubIO.readStrings().fromTopic(options.getTopic());pipeline.run().waitUntilFinish();

Notice the “options.getTopic()” line. We’ll provide this value through the terminal with the following command:

$ mvn compile exec:java -Dexec.mainClass=HappinessPipeline -Dexec.args="--topic=PATH/TO/YOUR/TOPIC" -Pdirect-runner

By the way, I show you how to set up a PubSub topic in part 2.

If you set up your credentials with the Google Cloud Platform (GCP) correctly then you should see this kind of message:

WARNING: Created subscription... Note this subscription WILL NOT
be deleted when the pipeline terminates

If you don’t see this message then make sure you followed the previous steps correctly.

Tweet Input

Now it’s time to get those tweets we were publishing to Cloud BigQuery.

This is the code that streams and publishes the tweets in case you don’t have it.

And this is the code that triggers the action.

There are some configurations required, which I go over in my previous posts. The setup basically involves creating a Twitter developer account so calls to their API can be authenticated, and setting the necessary environment variables.

If that’s all configured, then we can execute the above file to start sending tweets to the topic with $ python twitter_publisher.py

PubSub will connect our pipeline to this publisher. By setting the pipeline input to the same topic to which we publish our tweets, we’re essentially subscribing to the topic. And we set this topic to the pipeline with the “--topic=…” flag we passed through the terminal with the command above.

Let’s keep going! We’ve passed the configuration hurdle. 💥


Analyzing Tweets

When we execute both the twitter_publisher and our pipeline, we should start receiving tweets into our pipeline. We’ll make sure by logging them to stdout. Once we ensure we actually have tweets, we can start doing cool things with the data. 👽

Beam Paradigm

Reading Beam’s documentation would be useful, but this is a quick rundown of how programming with Beam works.

The PCollection is the data structure on which every computation runs. It represents all the data in the pipeline. It is the collection of data.

When a computation is executed, there’s going to be an input PCollection and an output PCollection. The type of the PCollection may change, or it may stay the same. Either way, the data in the PCollection will change. This process is known as a Transform. We’ll see several different Transforms throughout the implementation of this pipeline.

From start to finish, we need to know what our input and output is going to be, and most importantly, what Transforms we’ll need to reach the data state that satisfies our output requirements.

First Transform!

Our first transform is going to be simple. We are publishing bytes to PubSub because that’s PubSub’s requirements. So we need to decode these bytes into separate Tweets.

We’ll print each one out to the console.

I’m going to use Beam’s ParDo. ParDo is Beam’s “staple” parallel transform that:

  • Filters out data
  • Formats or type converts input data
  • Extracts portions of each data element
  • Performs computations on each data element

We’ll use ParDo to convert data from a byte stream to individual tweet entities.

Remember a PCollection represents multiple data elements. Initially, one data element in our PCollection is actually a batch of tweets since that’s what we sent to PubSub. We access the whole batch with c.element(), and after converting the data into something we can iterate over, we build our output PCollection with the c.output() call.

By outputting something of type TweetEntity, our PCollection went from PCollection<String> → PCollection<TweetEntity>.

Before we run this code, we need two more things. First, we need to define the TweetEntity data type.

static class TweetEntity {
private String text;
private String location;
private double sentiment;

TweetEntity() {
this.sentiment = 0;
}

TweetEntity(String t, String l, double s) {
this.text = t;
this.location = l; this.sentiment = s;
}

String getText() {
return text;
}

void setText(String text) {
this.text = text;
}

String getLocation() {
return location;
}

void setLocation(String location) {
this.location = location;
}

double getSentiment() {
return sentiment;
}

void setSentiment(double sentiment) {
this.sentiment = sentiment;
}
}

And to execute this transform, we need to call it from our pipeline like this:

Pipeline pipeline = Pipeline.create(options);
pipeline.apply(PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply(ParDo.of(new ExtractTweets()));
pipeline.run().waitUntilFinish();

When you run the pipeline again, you should see an extended version of this error.

[Error] An exception occurred while executing the Java class. Unable to return a default Coder.

This error message gives us a lot of valuable insight as to how Coders work. Essentially, messages between nodes are passed in byte strings, but Beam needs to know to how to encode and decode these messages. By assigning a Coder to each data type, we ensure Beam always knows what to do with every piece of data as it passes from node to node.

There’s a simple way to do this using the Avro coder.

@DefaultCoder(AvroCoder.class)
static class TweetEntity {
private String text;
private String location;
private double sentiment;
...

With the above change, your code should now run. Then, with the pipeline running, use the twitter_publisher to send tweets that our pipeline can consume and print to the console.

Sentiment Analysis

Now that we’re getting tweets into the pipeline, and they’re formatted correctly, we can run some more interesting computations on them.

We want to know how positive or negative a tweet is, and natural language processing can help us achieve this.

One of the things natural language processing allows us to do is measure the sentiment of a word or complete sentences. This is sentiment analysis.

We’ll use Stanford’s CoreNLP library to help us calculate each tweet’s sentiment. I won’t go into the specifics of the code since that’s outside the scope of this post, so I just want you to paste in the code below into a new file called SentimentAnalyzer.java

In our pipeline, set a class property that initializes the SentimentAnalyzer:

public class HappinessPipeline {
private static final SentimentAnalyzer ANALYZER = new SentimentAnalyzer();
...

Define another ParDo transform that uses the ANALYZER to calculate a tweet’s sentiment.

And we’ll execute this transform the same way we did the first one.

Pipeline pipeline = Pipeline.create(options);
pipeline.apply(PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply(ParDo.of(new ExtractTweets()));
.apply(ParDo.of(new GetSentiment()));
pipeline.run().waitUntilFinish();

Tweet Buckets

We want to group our tweets by where they were written from.

There are a few pieces we need for this algorithm to work. It’s not as simple as setting the location to what the tweet’s author set as their location because people can set their location to whatever they want.

Of course, we won’t be able to handle all these cases, but we want to be able to scan these strings and decide what country the tweet’s location is describing.

We’ll need a lookup table to compare each tweet’s location string against real values.

Lookup Table with Cloud Datastore

We’re going to try out a new storage technology— Cloud Datastore. We’ll use Datastore because our data size is very small; the look-up table is only 1.87 MB. Also, we won’t be performing any heavy analytics, just simple scans as we search for a matching string.

We could have used Cloud SQL as it’s also used for smaller data sets, but our data is just a simple document (i.e. not relational), so we don’t need the full power of SQL queries.

Before I describe the process of creating this document on Datastore lets learn some quick terminology. A Table in Datastore is called a Kind, and a Row is called an Entity.

  • Table → Kind
  • Row → Entity

Easy. Now lets create this look-up table (aka Kind)!

We’ll implement a client application that reads from a file stored in a Cloud Storage bucket and stores each row in the file to a Datastore entity.

First, go to this link to download the data set we’ll use for our look-up table.

Save the .csv file to a Cloud Storage bucket on the GCP.

Once you have that set-up, use this client application to pass the data from the bucket to Datastore.

Make sure to set the same authentication credentials we had previously configured. Then, just use the run command I describe in the code’s comments.

If everything runs as expected, there should be a “world_city” kind in Datastore that looks like this.

Awesome. The major piece of the puzzle is in place.

Where are you from?

Now we get to do a very interesting transform. This is possibly the most involved transform, but it’s also the most rewarding. As mentioned before, we need to map each tweet to its country of origin. There are a lot of edge cases that we can consider, but we’re going to focus on two simple ones.

The two cases are: the location contains a city name or a country name. Also, both might be present, in which case we’ll assume there’s a comma to separate the two values.

So here’s the code that handles opening a Datastore connection, building and running each query, and returning a country name whenever one of the queries produces a result — since the result should be the country name that matched the token.

The client is pretty low-level, so that’s why the code is so long. But by using this template you can extend and tweak the code as you wish. There are many more cases that can be considered.

For now though, lets add this transform to the pipeline’s execution and continue.

Pipeline pipeline = Pipeline.create(options);
pipeline.apply(PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply(ParDo.of(new ExtractTweets()));
.apply(ParDo.of(new GetSentiment()))
.apply(MapElements.via(new MapTweetsByCountry()));
pipeline.run().waitUntilFinish();

Last Transforms

We’re almost at the end — we only have two more transforms to do! The first one is very simple.

We need to calculate the average sentiment of tweets from each country. By creating country keys, we’re able to separate the calculation based on the key value.

Pipeline pipeline = Pipeline.create(options);
pipeline.apply(PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply(ParDo.of(new ExtractTweets()));
.apply(ParDo.of(new GetSentiment()))
.apply(MapElements.via(new MapTweetsByCountry()))
.apply(Mean.<String, Double>perKey());
pipeline.run().waitUntilFinish();

Our data is ready to be written to a table on BigQuery!

First, lets create a schema so Beam knows how to place the data into our table.

The values of the column constants should exactly match the column names in BigQuery. The data must be written to an existing table so create the BigQuery table if you haven’t already.

I showed you how to create this table in part 2.

Lets call the actual transform that writes the pipeline’s output.

...
.apply(MapElements.via(new TweetDataToTableRow()))
.apply(BigQueryIO.writeTableRows()
.to("PROJECT-NAME:DATASET.TABLE-NAME")
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

We’re done!

I hope you enjoyed the ride and nice job for making it to the end.

What we saw today was just scratching the surface. There is so much more to be learned with Beam.

Pipelines can be tested, windowed, triggered, and combined with many different types of inputs and outputs. The possibilities are only limited by your creativity!

Thank you so much for reading. I hope you learned something valuable, and I’ll see on the next one.✌️