Oscars 2015 on Twitter: Analyzing data with Google Cloud Dataflow

Luca Albertalli
4 min readFeb 25, 2015

This is the second post of a 3-part series we wrote after analyzing the Oscars 2015 with data from Twitter. In the first post, my friend, Megan, tells you what we learned, and in the final post, Emanuele will explain how we setup a PubSub system to collect the data. In this post I will show you how to analyze the data using Google Cloud Dataflow.

First check out the result of our analysis on Oscars 2015 on Twitter

The 2015 Oscars had just finished, and we were interested in learning what was hot and what flopped at the Oscars. We decided to look at Twitter to learn what was going on, and as technologists, we picked only the best technology available today for Data Analysis, Google Cloud Dataflow.

Our plan was to provide a graphical representation of the relevant hashtags and mentions collected during the Oscars. To do so we need to consolidate the raw data in order to create the graphs, and we need to analyze the raw data to find the relevant entities to display. The first task is done easily with Dataflow so let’s start with it; we will then build on this code to get the analysis step.

(Google Cloud Dataflow is still in it’s alpha stage but you can apply to have access on this page. Once you have an account, the setup process and the tutorials are pretty helpful to learn how to use it).

Here’s what we did with Dataflow.

Summarizing the data.

The first task required loading the raw data we had saved in BigQuery, extracting the feature we want to analyze (mentions and hashtags), doing a count over a fixed window and then saving it on a file. Let’s take a look at the code:

Pipeline pipeline = Pipeline.create(options);pipeline
.apply(BigQueryIO.Read.from(“tablename”))
.apply(ParDo.of(new ExtractEntities()))
.apply(Window.<String>into(
FixedWindows.of(Duration.standardSeconds(5))
.apply(Count.<String>perElement())
.apply(ParDo.of(new TransformForPrint()))
.apply(TextIO.Write.to(“filename”).withoutSharding());
pipeline.run();

As you can see, the code is pretty straightforward. We create the processing pipeline, instruct it to read the data from the database, group it in processing windows, and do a count and save the results on file. The only code that needs a little bit more explanation are the two “ParDo”. They are essentially “glue code” to extract the mentions to each single tweet and to format the Key-Value pair we get from the count in strings suitable to be stored in a file.

But now we have a problem. Let’s suppose we want to filter out some data, e.g. we want remove retweets or take only the tweets in a given timeframe. The BigQueryIO component seems not support filtering the query, so we should find a way to filter the data. We achieve this adding this single line between the data read and the data extraction:

    ...
.apply(BigQueryIO.Read.from(“tablename”))
.apply(Filter.by(new TweetFilter(...)))
.apply(ParDo.of(new ExtractEntities()))
...

where TweetFilter is a class that implements the filtering we want (More on that later).

Analyzing the data.

The second step was to analyze the data in search of meaningful mentions and hashtags to present and we need a tool to assist us. To build this tool we can start from the first pipeline and modify it. First of all, we wanted to group the data in bigger sliding window to better summarize it and we want to find the top 10 for each window. The code looks like this:

Pipeline pipeline = Pipeline.create(options);pipeline
.apply(BigQueryIO.Read.from(“tablename”))
.apply(Filter.by(new TweetFilter(...)))
.apply(ParDo.of(new ExtractEntities()))
.apply(Window.<String>into(
SlidingWindows.of(Duration.standardMinutes(30))
.every(Duration.standardMinutes(5))))

.apply(Count.<String>perElement())
.apply(Top.<KV<String,Long>,
KV.OrderByValue<String,Long>>of(
10,new KV.OrderByValue<String,Long>()))

.apply(ParDo.of(new TransformForPrint()))
.apply(TextIO.Write.to(“filename”).withoutSharding());
pipeline.run();

The code to take the top 10 appears a little complex but let clean it from the Java templates and we get:

.apply(Top.<...>of(10,new KV.OrderByValue<...>()))

that is much easier to read.

To complete our analysis, we decided to split mentions and hashtags to get the top 10 for each type (Note that the code for calculating the top 10 need to be changed as well.) We also decided to have a filter to remove common hashtags (e.g. #oscars2015). This is the (redacted) final code:

Pipeline pipeline = Pipeline.create(options);pipeline
.apply(BigQueryIO.Read.from(“tablename”))
.apply(Filter.by(new TweetFilter(...)))
.apply(ParDo.of(new ExtractEntities()))
.apply(Filter.by(new FilterIn("...",false)))
.apply(Window.<String>into(
SlidingWindows.of(Duration.standardMinutes(30))
.every(Duration.standardMinutes(5))))
.apply(Count.<String>perElement())
.apply(ParDo.of(new SplitMentionHash()))
.apply(Top.<String,KV<String,Long>,
KV.OrderByValue<String,Long>>perKey
(
10,new KV.OrderByValue<String,Long>()))
.apply(ParDo.of(new TransformForPrint()))
.apply(TextIO.Write.to(“filename”).withoutSharding());
pipeline.run();

Running the code.

It’s finally time to run the code that you can find on Github; this code has little final retouching to make it more useful. First of all we wanted to be able to pass the parameters from the command lines to quickly change them without recompiling everything. Second, we modified the first filter of the pipeline to make it more powerful: it then became possible to include or exclude tweets based on the words they contain.

The Google Console while the code is running

At long last, we could then run the code and check our results saved in Google Cloud Store. We simply compiled and created the bundle with Maven, then ran the code with Java while the Google Runtime worked its magic.

To analyze the 2M tweets we collected during the Oscar our code took less than 5 minutes including the time to optimize the code and launch a new server on which to run it.

You can find more information on the code on this Github repo. If you enjoyed this post, please share it on Facebook and Twitter!

--

--

Luca Albertalli

Product Leader, Experimenter, Entrepreneur, and Startup Advisor. I often play with code but more often with ideas!