Big data using HDInsight : A journey in the zoo ecosystem

Elephants family : https://www.flickr.com/photos/75729488@N03/7234330010

History

Big data is one of those buzzwords that sounds like old fashion now but it’s more relevant now than ever before. Back when Google was established, the search page mentioned the whole index is about 25 million pages.

Business Insider : Here’s What Google Looked Like The First Day It Launched In 1998

At that time, 25 million pages was pretty huge but might be still manageable with some decent hardware and architecture. The internet kept growing at exponential rates and there was a need to handle orders of magnitude larger amounts of data because traditional approaches could not simply scale. Data had to be transferred, stored and processed which are the main pillars of any big data architecture. Google invented some proprietary tools for storage (Google File System) and processing (MapReduce). They have release some papers describing those tools and eventually the tools have been open sourced to be the basis of what’s known as Hadoop now with HDFS (Hadoop distributed file system) as the basis of storage layer and MapReduce as the basis of computation model. Hadoop is an ecosystem for doing big data and it is based on the simple idea of divide and conquer. To store mountains of data, you need a cluster of commodity hardware machines with a distributed file system designed for scale and inherently fault tolerant. To process the data, you can use the same cluster to distribute the processing across nodes of the cluster mainly moving the processing code to be executed on the nodes hosting the data instead of moving the data on the network to where the code is located. Hadoop is an ecosystem providing the foundations for many tools to be built on top of it to handle specific big data requirements.

Hadoop ecosystem : http://blog.newtechways.com/2017/10/apache-hadoop-ecosystem.html

As it is pretty hard to create a Hadoop cluster from scratch and wire up all the components together, many companies are specialized in providing fully packaged Hadoop distributions (similar to Linux distros idea) and also add some extra functionality wrapping the plain Hadoop features for ease of use and management. As far as I know, the main players in this field are Hortonworks, Cloudera and MapR. Microsoft has partnered with Hortonworks to provide an Azure PaaS Hadoop distribution and has branded it as HDInsight. Other major cloud providers like AWS and Google have also similar offerings.

The major benefits of doing Hadoop in the cloud are:

  • Increased speed for both development and production operations
  • Reduced administration complexity
  • Run the cluster only when processing is needed, data is decoupled from the cluster. This means dramatic cost savings.
  • As with most PaaS cloud offering, cluster size can be tuned easily to handle the load needed.

But wait, why do we need even a big data solution. Well, many businesses and solutions nowadays generate huge amounts of data that can be analyzed to provide critical business value to the organisation. Even for midsize and small businesses, they can still reap some benefits from a big data powered feature. I know about many clients asking for historical log analysis and others who produce massive logs/audit data stored in a SQL database which causes them performance issues. For example, the below screenshot shows a 200 GB database where the size excluding logs and audit table is merely 20 GB. It would be nice if audit/log data is moved to a different storage medium and still be available for analysis and inspection if needed. This is a common big data use case.

In this article, we will go through a journey in the big data world to get familiar with the main building blocks and how/when to use them. The tools covered will be:

  • MapReduce
  • Pig
  • Hive (coming soon…)
  • Storm
  • HBase
  • Spark
  • Azure event hubs and Power BI will be used as well although they are not Hadoop members

Preamble

First, I would like to stress a few point:

  1. There will be many practical examples, I believe in learning by doing. You cannot learn driving by watching car races only.
  2. Although the examples will be on HDInsight, they are still Hadoop and can be applied very easily on other cloud flavors. Many companies like Microsoft, AWS and Google provide other tools that can be used in a big data solution but you will be coupled to their cloud. For example, Microsoft has a hosted database called Cosmos DB that can replace HBase in Hadoop. But if you pick it you need to stick with Microsoft.
  3. A good sample dataset will be used to really get the feeling of big data scenarios. It will not be petabytes but something in the range of 160GB to allow quick tinkering and semi-practical use case.

For the prerequsities, they are numerous but most of them are free:

  • Azure subscription with enough credit, say $100. You can use the free trial subscription although I am not completely sure if it will allow good cluster size.
  • Power BI Pro subscription (or free trial of Pro tier).
  • Visual Studio 2017 (community edition will work)
  • IntelliJ (with JDK 8).
  • Any git GUI or CLI client.
  • SSH tool, I use Git Bash but Putty or similar tools will be fine as well.
  • Apparently I use a Windows machine but worst case you can have a Virtual Machine in Azure and still be fine as long as you can remote into it.
  • Azure Storage Explorer, it works on Windows/Mac/Linux. https://azure.microsoft.com/en-gb/features/storage-explorer/
  • Some knowledge of Azure would be very useful
  • PowerShell, can be installed on Linux/Mac I guess as of now but maybe not all the modules will be available on Linux/Mac so you might still need a Windows VM. It will be still mandatory for Visual Studio 😃

One important note, with any cloud artifact that consumes credit, it is highly recommended to delete it once done with it. Generally speaking storage is cheap but compute is not. So for example, it’s not a big deal to keep an Azure blob storage account for a while but virtual machines and Hadoop clusters should be dropped as soon as they are not needed. Otherwise, your whole Azure credit could be drained in a single day or worse you will get a stupidly expensive bill by the end of the month.

Big data : attributes and architectures

Before digging further, I cannot help repeat the major attributes of any solution to be called a big data thing:

  1. Volume : This is relative but it has to be more than a few GBs of data
  2. Velocity : Think Twitter, web click stream, IoT
  3. Variety: All data formats like CSV, relational, audio, images
  4. Veracity: Data could be messy, noisy, bursty or inaccurate

Any big data solution could be composed of one or more of the following workloads.

  • Data storage
  • Data transfer
  • Data processing (batch, streaming, near real-time analysis)

We will touch the storage and processing in this post but transfer will not be covered. One of the famous architectures in big data space is the Lambda architecture. It allows two branches of processing. Cold path; where data could be stored in its entirety for cumulative batch processing and. Hot path; where data is streamed and analysed as it arrives to extract insights and patterns. We will see an example of hot path in the example involving Storm and in a later update to the post, cold path will be included using showing HBase & HIVE.

Sample Dataset

There are heaps of sample datasets on the internet both small and large. I have picked one from AWS Public Datasets. The dataset is called GDELT (Global Database of Events, Language, and Tone). The below is a snippet from GDELT project home page describing it.

Supported by Google Jigsaw, the GDELT Project monitors the world’s broadcast, print, and web news from nearly every corner of every country in over 100 languages and identifies the people, locations, organizations, themes, sources, emotions, counts, quotes, images and events driving our global society every second of every day, creating a free open platform for computing on the entire world.

In a nutshell, it’s a gigantic store of tab delimited files containing all published news (not the contents but metadata) in the world since 1979. News could be sourced from TV, newspapers or the internet you name it. The data size up till now is around 160 GB and the schema is documented here. Our first task shall we accept it, is to download this dataset and load it into an Azure blob storage account for further analysis and processing. Depending on internet connection, it might be better to do this task from an Azure Virtual machine and delete the VM once done. First we need to create a storage account with similar specs as below and note its name and primary access key. Also a blob container name gdelt should be created inside this storage account to store GDELT files.

Also we need to stick with a single Azure data center for all resources to speed up data transfer and other things that will come later. Once the account is created, create a Windows Server 2016 Azure Virtual machine in the same region (another name of data center) and RDP into that VM. Download this PowerShell script locally and update it with your storage account name and key.

The script starts with installing some Azure modules that will be needed later as well. Next, run this PowerShell script from a PowerShell window or open it using ISE (integrated scripting editing tool on Windows) and run it from there. The script simply prepares a list of URLs of all the (tab delimited) files of GDELT on AWS. They are publicly available so no need to worry about AWS accounts. Then the script will download them one by one locally, upload them to Azure storage account and delete the local copies. Once complete, you can open your Azure storage account using Azure storage explorer and verify files there.

Skimming through the data dictionary of GDELT, it’s mainly a listing of news including some encoded details about actors involved in each news item plus some extra attributes like location and relevant impact of event inside that article on its actor(s). We will not do advanced machine learning on this dataset to focus on the basic ideas of big data. Basic analytics like grouping and counting per country and so on will be enough for our purpose.

MapReduce on plain Hadoop cluster

Our first experiment will be simple, just create a new cluster using Azure portal and do some basic MapReduce stuff using the provided example jars. In subsequent examples, we will script cluster creation which is mainly important for CI/CD scenarios and for operation purposes. Let’s open Azure portal and click the link to create a new resource.

From data and analytics section, we will select HDInsight.

Then a unique cluster name, admin (web & SSH) credentials, resource group name and locations should be provided. The same region of the storage account used to store our files should be used.

Before clicking Next, we need to hit the Cluster Type link to specify what type of HDInsight cluster we want to create. We need to pick Hadoop on Linux operating system and choose the latest version then click Select and then Next in the Basic blade to move to the storage section.

From the screenshot above, it is quite apparent that you cannot have a cluster with Storm and HBase together for example. The decision to segregate clusters into those types is mainly to enforce a cluster to be focused on a single big data problem and not mix different workloads on the same cluster which may cause performance issues. If the solution requires Spark & HBase for example, then two clusters have to be created. The good thing about this design is that you need enough cluster power for processing only and not for storage, once processing is done the cluster could be teared down and data will be still kept in blob storage or Azure data lake. It seems to be a wise decision from Microsoft because:

  1. Keeping data in HDFS is pricier than using Blob storage for the same data because the cost of a compute cluster is higher than a Blob storage container.
  2. It is safe to delete a cluster without losing any data with Blob storage; whereas HDFS is destroyed along with the cluster.
  3. HDFS scale-out requires adding new nodes, even if you don’t need more computation power, which is an overhead; whereas Azure Blob storage provides elastic scaling capabilities.

Let’s move on with the storage blade, we need to specify what type of storage we will use and its specs. HDInsight can work with blob storage or data lake storage as the HDFS layer. In our case we will opt to create a new storage account and provide a container name for HDFS home directory.

In other scenarios and specially with scripted and repeatable creation of clusters, an existing storage account could be used to process existing data. Meaning a cluster could be created (then teared down) many times linked to same storage account thus allowing data to be kept all the time and cluster to be created only when needed. Also a cluster could be created with a new storage account and allowed to link and use and existing one(s). We will use this scenario later but for now, just click the Next button to land on the Summary section which gives a high level summary of the cluster to be created.

Cost per hour is displayed along with the HW specs of the cluster in case different cluster size is required. There is a link in this blade to download an ARM template that can be used to automate creation of full environments including other Azure artifacts as well. Now, we need to bite the bullet and create the cluster which will need around 15 or 20 minutes to be ready for usage. Once the cluster is created, you can navigate to its home page which provides lots of details about it including links to cluster dashboard and SSH access.

Clicking Cluster Dashboard link will show Ambari window after authentication using the credentials provided during cluster creation. Ambari is a tool used to view, monitor and configure Hadoop clusters.

Cluster storage can be inspected from Azure Storage explorer where lots of Hadoop system/sample files will be shown. The same container could be used to store data that will be processed on this cluster.

In big data world, first program to be tried is Word Count and this is sort of a religion so we cannot deviate :). We will have fun with GDELT dataset later but for now let’s stick with word count which is simple small program to count how many occurrences of each word in a text document(s). I have grabbed the transcript of 2003 USA state of union speech by George W. Bush and uploaded it into a new folder called data using storage explorer.

Now, using the portal we need to SSH into the cluster and the portal will provide needed details as long as you remember your SSH (admin) credentials entered when the cluster is created.

Once an SSH session is established, we can verify the cluster version details as follows.

YARN is the cluster resource negotiator/manager that has been introduced in Hadoop v2 and it provides many benefits over the original MapReduce model of running distributed jobs.

Anyway, the cluster is up and running and we would like to know what are the common words of that state of union speech. Hadoop cluster comes with an example JAR containing lots of sample big data programs and one of them is word count. Enter the following shell command to trigger a MapReduce job of word count with the state of union speech as input:

yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-mapreduce-examples.jar wordcount /data/state-of-union-2003.txt /data/state-of-union-2003-wordcount

Small disclaimer: MapReduce jobs are designed to run against gigantic files where a file is divided on HDFS using 128 MB blocks but still fine to try on smaller things first.

Once the job is complete, a new folder will be created as below containing two files. One of them is just a marker that the job has completed successfully and the other one is the job real outcome.

You can notice the outcome file name is suffixed with some sort of a counter. If we have a real big data job, then there will be probably multiple files in the outcome folder and they will be suffixed by the task (reducer) index that generated them. Anyway, download this file and open it in Excel as it is just a tab separated file. Ordering the data by word count descending and skipping the first few stop words of English language and we can simply guess what was on Mr. Bush’s mind at that moment. 🔫

Congrats, you are a big data expert now. Don’t underestimate this example. This is just a sample of the ideas that allowed Google to index the whole internet and be the giant we know today. I will leave it for you to do the same analysis for Mr. Trump 😃. Repeating myself again once done with the cluster, please delete it otherwise you will be risking your Azure credit.

Side note: Databricks is a company providing Spark clusters on Azure with a feature to tear down the cluster if it has no jobs running for a certain period of time like one or two hours. Hopefully, this cool feature could be incorporated in HDInsight soon. Databricks has another cool feature of auto-scaling Spark cluster (up or down) depending on the load.

Pig : The ETL tool

Our next animal in the zoo is Pig. To make long story short, MapReduce is very complex and that word count example we have seen is roughly 61 lines of java code. As almost all web developers do not implement web applications using plain HTML+JS but use libraries like Angular+Bootstrap; it will be very-counter productive for many people to use just MapReduce model for most big data tasks. I am not saying MapReduce is dead or obsolete but it snot not simply the tool for everyone and many other alternatives do exist. So, to make it easy for data engineers to do ETL on Hadoop, Pig has been invented.

http://bigdatadimension.com/brief-overview-about-pig/

Pig is a scripting tool allowing quick and easy extract, transform and load of data on Hadoop clusters. The language used is called Pig Latin and it could be used from a Shell called Grunt shell or run from script files for unattended execution. The task we will do in Pig is to summarize GDELT data and generate a CSV report showing number of news entries per country per year for all files along the time span from 1979 till 2018. This could allow us to show the trend over time of which countries are grabbing global attention. This is not strictly an ETL task but good enough to generate an outcome that could be analyzed in a tool like Excel or Power BI.

Let’ create a Hadoop cluster first to use it for the above analysis using Pig. The cluster will be created using a PowerShell script this time and it will have its new storage account but it will be also linked with the original storage account used to store GDELT files. So, if you still have that Azure VM used to download GDELT data you can use it as it will have Azure PowerShell modules installed or you can install them manually.

Install-Module PowerShellGet -Force
Set-PSRepository -Name PSGallery -InstallationPolicy Trusted
Install-Module AzureRM -AllowClobber
Install-Module Azure

Download this PowerShell script and open it to update it with your GDELT storage account name and key. The script defines a config object that is used in the New-AzureRmHDInsightCluster command to link the new cluster with an additional storage account.

After updating the storage account details and running the script, a new Hadoop cluster will be created and this time we have picked more powerful worker nodes as we will process the whole GDELT dataset. SSH into the new cluster using the credentials found in PowerShell file and run pig --version to confirm Pig is available.

Now download this Pig script locally and edit it to replace the highlighted text below with your storage account name.

Before executing the script, let’s explain it from a high level perspective:

  1. It defines a relation to load all data in a certain folder using a provided schema. The schema can be obtained from GDELT data dictionary and I have marked all fields as strings except for the year of news event. For the location scheme wasbs, it’s an HDInsight way of referencing storage account as if it is an HDFS location. You can read more about it here. Please notice the astrisk in that URL that shows that a single load operation can handle all files in a directory.
  2. The next few lines after loading all GDELT files simply project the country and year columns, apply a GROUP BY operation and then dumps the outcome into another container in GDELT storage account.

To execute this Pig script, in SSH window run nano ~/summarize.pig and then paste the contents of the updated Pig script and save the file. Then run pig ~/summarize.pig

By default, Pig will be executed using a modern execution engine called Tez instead of using MapReduce.

This could be verified from the first few console lines after submitting Pig job for execution.

As this operation will run for a few minutes, you can track the progress from Tez View in cluster dashboard. As shown below, the whole job took 5 minutes to process 1934 files of total size of around 160 GB on a 4-node cluster.

That’s amazing 🎉

Let’s see the output and verify it. In Azure storage explorer, there will be a new container created called count-over-time that has a single outcome file as below.

The outcome is in a single file because I have instructed Pig to aggregate the output using a single reducer to make it easy for me to analyze it in Excel. Generally this is a bad practice as it limits the throughput and parallelism of the execution but for this case it should be fine specially it is just for the final group by outcome which is pretty small. Also GDELT files are tab separated and the outcome from Pig is configured to be comma separated. Downloading and opening this file in Excel will look like below where the columns are country code, year and count of news events per country-year combination.

Using some Excel + Power BI magic, we can come up with the below chart which shows news count over time for the top 10 countries with most news events overall (US is excluded because a flood of news events comes from US every year and it will just make the rest of chart not readable).

There was a UK spike for 2016 and it’s likely related to the Brexit referendum 😉. I have included the Power BI file containing the above chart in GitHub repo of this blog. I guess it’s enough grunting for now but one last thing to mention here. Most of the scenarios in HDInsight could be fully automated using PowerShell/SDK/REST APIs even creating or deleting the clusters. So it is fairly easy to inject such tasks to run automatically based on a certain schedule or to be triggered by user actions. Outcomes could be easily read/consumed using any type of application that can access blob storage and so on. The outcomes don’t have to be used in reports/dashboards only. They can be integrated into mobile/web applications as needed.

Storm : Get ready for the hurricane

In many cases, analyzing historical data is not the main use case in the solution. It might be more important to analyze/process data while it is being generated or transferred. Think about Twitter trending hashtags or fraud detection. We cannot just wait and do the analysis after the fact, it has to happen online on the current stream of data. If you remember the lambda architecture briefly mentioned before, we are talking about the hot path now. For such use case, Apache Storm could be used to process fast/large streams of data as they happen and do whatever processing or analysis needed. Storm works on a element by element basis on the stream compared to mini batch approach used by other tools like Spark Streaming. This allows very granular control of what can be done plus ability to customize the processing by calling into other APIs/databases/etc. Storm also allows guaranteed delivery specially when the streaming source supports message acknowledgement. The architecture of storm is based on a distributed group of spouts and bolts that do stream data reading and processing respectively. The design of how many spouts and bolts are included and how they connect to each other is called a Storm Topology and it is controlled by requirements and load needed.

http://storm.apache.org/

Most of the examples you will find online on Storm or Spark Streaming will involve either Twitter or web server log analysis. I would like to deviate a bit and use the same GDELT dataset we have. It’s not very well known that we can simulate streaming by grabbing some old historical data and dump it inside some message queuing application like Kafka or Azure Event Hubs. Once the data is stored in the queuing system, it can be consumed from Storm for example as if it is a live stream. Queuing systems play a pivotal role in such architectures because they decouple the stream producer from stream consumer. So if you need to shutdown your cluster and recreate it, no messages will be lost compared to the case where the cluster is connected directly to the data stream. This is the most complex example in this post as it has many moving parts:

  1. Custom console application to inject GDELT data into Azure Event Hubs.
  2. Power BI streaming dataset and dashboard used to render final outcome.
  3. Storm cluster used to analyze GDELT on the fly as if we are doing a fast forward in time and grouping+counting events as they happen.

Conceptually, Storm will continuously read message from Azure Event Hub, calculate in memory summary about how many news items are there per each country and finally every few seconds push the summaries to Power BI streaming dataset via REST API.

Let’s face the storm.

1. Inject GDELT data into Azure Event Hubs

I will try to be brief starting here as including screenshots of every step will be an overkill.

  • Create an Azure Event Hubs namespace called gdeltns using Standard pricing tier, in same region as GDELT storage account and with 10 throughput units with auto inflate disabled. BTW, for some types of Azure resources the name of the resource should be unique so you cannot use the same above name if there is another resource with the same name already taken. I will just implicitly assume that you will invent your own names and adjust accordingly. I am just referencing the names I’ve used because they may appear in my screenshots or just to connect the pieces of the puzzle together so readers can understand what’s happening.
  • Inside Event Hubs namespace, create and event hub named gdelt with 8 partitions.
  • Select gdelt event hub and create a new shared access policy named admin with manage, send and listen permissions. We will use this policy for both send and receive to the event hub for the sake of simplicity. Make a note of the name, primary key, primary connection string of this shared access policy as we will need them later
  • Create an Azure VM to run the console app or use your own computer if you have a decent internet connection. You have to have .NET 4.7.1 installed to run this console application.
  • Clone the repo and open the solution file FeedEventHub.sln in folder Storm\Event Hub Feeder using Visual studio 2017.
  • Before running the console app, you need to plug in your event hub connection string. This tool downloads a single CSV file (the biggest one) to feed it into Event Hubs but can be extended to inject the whole dataset if needed. Also, there is a Storm package (spout) to read directly from blob storage (HDFS) in case you don’t like to add an extra moving part like Azure Event Hubs or Kafka.
  • The tool also batches multiple news event lines into a single message so individual message in event hub could be something like a 100 lines of news items from a certain CSV file and so on. It’s pretty common in streaming solutions to batch multiple small messages into one specially if they are submitted from devices/phones/sensors with unreliable internet connection. So instead of trying to send one message at a time as it happens, a group of them are batched and sent when there is a connection.
  • Restore nuget packages by right clicking the solution name and select the restore option, build the solution and run it locally or copy the contents of bin\debug folder inside an Azure VM and run the FeedEventHub.exe file from there.
  • In gdeltns namespace page, the ingestion of message will be clear from the monitoring charts

Now we have lots of messages in event hub waiting to be consumed. Next we will configure the rendering part of this solution using Power BI.

Configure Power BI Streaming dataset

I will assume you have a Power BI Pro subscription and if you don’t you can register for a free trial. Power BI streaming dataset is simply a schema defined of a bunch of fields and a REST endpoint that can be used to push data into this dataset continuously and use it in a live dashboard.

  • Create a new streaming dataset and select the API option, give it a name and define the schema as below.
  • Click Create and make a note of the Push URL that will appear on screen as we will use it in Storm topology.
  • Use a tool like Postman or Advanced REST Client to test posting some sample payload to this dataset.
  • Create a new empty dashboard in the same workspace and add a new streaming tile to this dashboard.
  • Connect this tile to GDELT dataset and configure the tile as follows.
  • Give the tile a name and description and click Apply.

Now we have Power BI live dashboard ready to receive online summaries of GDELT events as they happen. So assuming Storm stuff we will do shortly works as expected, this dashboard will be updated in real time with live statistics every few seconds.

Create Storm cluster

Following the same pattern, a Storm cluster could be created using PowerShell using this script. Once the cluster is created we will need to SSH into it and grab some JAR files. We can write Storm spouts from scratch to consume data from source systems but for many common systems, ready made spouts are available to be used as packages. Unfortunately, Azure Event Hubs spout is implemented but not available on Maven (AFAIK). So, we need SSH into the cluster first, find the location of those JARs before grabbing a local copy.

In another Git Bash window, run the below command to download a copy of Event Hubs spout JAR including its dependencies.

scp sshadmin@stormforgdelt-ssh.azurehdinsight.net:/usr/hdp/2.6.2.25-1/storm/contrib/storm-eventhubs/storm-eventhubs-1.1.0.2.6.2.25-1-jar-with-dependencies.jar storm-eventhubs-1.1.0.2.6.2.25-1-jar-with-dependencies.jar

A JAR file named storm-eventhubs-1.1.0.2.6.2.25–1-jar-with-dependencies.jar will be downloaded locally and we will used in in subsequent steps. Please note that the file names above as per the latest HDInsight version at the time of writing, so if you are running this sample against a more recent Storm cluster you will probably get different file name (version number changed). Also you might need to change the cluster name part of the command if you created the cluster with a different name.

Unless this post is extremely popular and many people are trying the examples live concurrenlty, probably you can use the same cluster names in my scripts/screenshots as no one will be keeping the cluster up and running for too long otherwise, their credit will be drained 😉

Compile and deploy Storm topology

Starting with a disclaimer, I am not a Java developer so if the steps do not make much sense or can be simplified or source code is rubbish, please adapt to best practices. Anyway, we will need IntelliJ community edition installed to open the source code of Storm project.

Clone the the repo and checkout tag fix-storm-bottleneck and open the project Storm\Topolgy using IntelliJ. You need to switch to that commit because this Storm project copy that does not have HBase part plugged in yet. Trying to build this project, you will probably get the below error as the Event Hubs spout jar is not available or your machine.

Module dependencies should be updated to include the JAR we have downloaded in previous step; all other dependencies should be downloaded from Maven. Now the project should compile successfully.

The other dependencies needed are Storm-Core, jackson-databind (json serialization) and commons-csv which is used to parse some CSV for country code-to-name mapping.

Before explaining the source code, config values should be updated in the Config.properties file. They are event hub user name (policy name), password (primary key), hub name and hub namespace. Also Power BI streaming URL should be provided. Effectively, we are defining the source of data and final destination.

Explaining the topology: it’s a simply topology of a group of spouts (Event Hub spouts) reading messages from Event Hub and submitting them to a parser (summarizer) bolt that will group news events per country. The summarizer bolt will generate summary tuples every few seconds to be globally aggregated by a final single bolt that will submit top 10 countries with their counts to Power BI REST endpoint.

There is no custom code for the spout itself as we use the provided event hub spot, but the bolt has the following logic (simplified code):

1- Summarizer Bolt

public void execute(Tuple tuple , BasicOutputCollector outputCollector) {
if (isTickTuple(tuple)) {
outputCollector.emit(new Values(dataMap));
dataMap.clear();
return;
}

String value = tuple.getString(0);
String[] lines = value.split("\\r?\\n");
summarizeLines(lines);

}
private void summarizeLines(String[] lines) {
for(int i=0;i<lines.length;i++)
{
String[] values = lines[i].split("\\t");

String countryCode = values[51]; // action country code position

if (countryCode == null || countryCode.trim() == "")
countryCode = values[37]; // actor 1 country code position

if (dataMap.containsKey(countryCode))
dataMap.put(countryCode,dataMap.get(countryCode) + 1);
else
dataMap
.put(countryCode,1);
}
}

With every event hub message received, we split based on newline delimiter and than split every line using the tab character. Then a hash map of country codes is incremented by 1 for every news item.

With every tick tuple (system generated timer tuple) received, we just prepare local map of summaries and create a new tuple that will be consumed by the final global (single) summarize and submit bolt.

2: Power BI Bolt

public void execute(Tuple tuple, BasicOutputCollector outputCollector) {
if (isTickTuple(tuple)) {
submitSummaryToPowerBi(globalMap);
return;
}

Map<String,Integer> localMap = (Map<String,Integer>)tuple.getValueByField("gdeltDataSummary");
accumulateEntries(localMap);
}
private void accumulateEntries(Map<String, Integer> localMap) {
for (Map.Entry<String, Integer> entry : localMap.entrySet()) {
String key = entry.getKey();
if (globalMap.containsKey(key))
{
globalMap.put(key, globalMap.get(key) + entry.getValue());
}
else
{
globalMap.put(key, entry.getValue());
}
}
}
public void submitSummaryToPowerBi(Map<String,Integer> map) throws IOException {
HttpHelper helper = new HttpHelper();
ObjectMapper mapper = new ObjectMapper();

List<PowerBiPayload> countrySummary = GetTop10Countries(map);
String stringData = mapper.writeValueAsString(countrySummary);
logger.info("Submitting to PowerBI: {}" , stringData);
int responseCode = helper.post(ConfigValues.powerBiPushUrl, stringData);
logger.info("Response from PowerBI: {}" , responseCode);
}

For the global Power BI bolt, if the tuple received is a summary hashmap from the previous bolt, we take that map and accumulate it over a global map owned by this Power BI bolt. Another tick tuple (system generated timer tuple) is also configured such that when it is triggered, we just prepare some JSON payload of top 10 countries in the global hashmap and POST it to Power BI. Simple, huh.

Before compiling, project artifact should be recreated to include current module source code and event hub spout JAR. Open project structure dialog and remove current artifact and hit Apply. Then create a new one as below:

Then select the main class and location of manifest and then tick Include in project build.

Still, in the new artifact section, clear all other dependencies and keep the ones in the following screenshot which are module output, commons-csv, jackson-bind, JAR or event hubs spout.

Compile and build the project from build menu, do not use Maven package option. The current project can be tested locally but we will just try it on the cluster directly. Source code has a hard coded flag that can be tweaked to run locally or on a real cluster. Currently, it is set to boolean runLocal = false; but make sure of it just in case. Based, on the current project settings, the compiled JAR will be located in out folder not targets.

Next, this JAR file should be uploaded using SCP to the home folder of sshadmin user of the created Storm cluster

scp StormyHub.jar sshadmin@stormforgdelt-ssh.azurehdinsight.net:StormyHub.jar

Next in SSH window of Storm cluster, run the below to deploy the topology and start it on Storm. The command simply instructs storm to create and run a topology based on our provided JAR, give a class name of the main entry point class that will create and trigger the topology and finally give the new topology a name in Storm.

storm jar ./StormyHub.jar com.yousry.SummariserTopology GDELTTopology

If things work smoothly, the topology will be submitted and Power BI dashboard will refresh every 5 seconds or so and that would be cool 😃. I am testing with 2003 data file and as expected you will find USA, Iraq, Russia and Israel dominating the news.

From cluster dashboard in Azure, you can open Storm admin UI and investigate status of current deployed topology.

The topology is running fine and it’s seems it has capacity for even more load. The latency for PowerBI bolt is a bit higher than summarizer bolt because it does a network call to Power BI backend service. Other than that ,there are now errors or failures.

Phew, that was intense, let’s move on with another section of the zoo.

HBase : The database of the zoo

https://worldvectorlogo.com/logo/hbase

Well, if you are still awake or not cursing me then you deserve to see the next animal in the zoo. This time it’s not a land animal just to make things more exciting. Let’s see what the mighty Orca can provide here.

In the world of big data, traditional relational databases will not be fit to handle the pressure. There will be request for high throughput, high availability and much more flexibility in allowing what can go into the database. We cannot simply stick everything into CSV or raw text files but we need a database that can handle workloads like Google, Facebook and Twitter. We are familiar with the relational databases like SQL Server, Oracle, MySql etc and maybe with document databases like MongoDB.

https://app.pluralsight.com/library/courses/real-world-big-data-microsoft-azure/table-of-contents

HBase is a column-oriented non relational database that allows storage and quick access to massive amounts of data. It is based on Google Bigtable design and integrates well in Hadoop ecosystem. In a nutshell, rows have two major components; row keys and column families. A row key is our normal primary key which is unique across the whole table. Column families are groups of columns that go under a certain name/category like personalInfo or posts for example. We can have multiple columns within the same family and the schema is not statically defined in advance but defined with insert/update operations. The only thing defined in advance is table name and column family names. This type of design provides flexibility and high throughput for problems like storing Facebook-like data and so on. One major difference here is that we do not have indexes (they don’t make sense in HBase case). So designing what to put in the primary key should be done with great care because it will be very critical for read operations. Normally, primary key will be a concatenation of a few pieces of information to facilitate querying for a single/group of records plus being very uniform in nature to help distributing table data evenly across many servers holding the data. In contrast to databases like SQL server, data is spread across multiple nodes and the system is capable of being usable even with some nodes going down or something.

For our example with HBase, we will extend Storm example and store some projected set of columns in HBase in addition to the live dashboard we feed to Power BI. Effectively, this will cover the cold path of lambda architecture which is mainly about keeping the whole dataset (or important parts of it) for later analysis or processing. So if we want to get some global summaries or find some piece of information we can rest assured we have a database backing us.

As usual, we will start by creating a lovely HBase cluster using this PowerShell Script. Once the cluster is created, we will need to access HBase shell to create our gdelt table and define its column families. So, first SSH into the cluster and then run the below in bash shell.

hbase shell
create 'gdelt', 'time', 'actor1', 'actor2', 'geo', 'attributes'
list
count "gdelt"

The first line simply opens HBase shell which is used to do DDL-like operations plus it can be used to do some DML but this is not its main use case. Second line creates a table called gdelt and defines the column families in this table. Next line lists all the tables in default database and should simply show the new table there. This stuff could be automated using APIs/SDK but let’s simply do it manually here.

Next, we will use the same Storm topology discussed in previous section but this time (master branch) it has some extra functionality to submit a few columns to HBase plus preparing the summary for Power BI. HBase supports doing DDL/DML operations using REST API. There are other options like Java API or even an HBase packaged bolt which comes as part of Storm code base but I guess REST API is the easiest option specially to be used from any tech stack. Please note that with REST API comes some latency as other options could be running inside or very near to the cluster. In our case we will be calling HBase from a Storm cluster hosted in the same Azure region and they have pretty good internal networks so there is no problem with this approach.

To give you a glimpse about using REST API to insert data, the below is request URL & payload (for a local Hortonworks sandbox cluster) used to insert a new record with a single cell.

The URL contains cluster REST endpoint address plus table name, ignore the 2 in the URL because it’s irrelevant. The payload is JSON blob containing row key and cell value(s) but everything is base64 encoded. HBase does not have data types and everything goes into byte arrays. Also the request is authenticated using basic HTTP authentication with the admin credentials (or SSH but I use the same for both) used to create the cluster.

Just to make things clear, if we base64 decode the above payload we will get:

{
"Row":[
{
"key":"2",
"Cell": [
{
"column":"family:NumChildren",
"$":"5"
}
]
}]
}

It’s pretty clear now that we are inserting (or updating if row key exists) a record with row key of 2 and set the column family:NumChildren to 5. It’s also apparent that Row element in this JSON payload is an array so it’s good for batching many records to be inserted in the same network call. This is what we will be doing in our Storm topology.

In SummariseBolt, all the news items of the current message of event hub (tuple) is split line by line and parsed to generate count per country summary and an array of records to be submitted for storage in

for(int i=0;i<lines.length;i++)
{
String[] values = lines[i].split("\\t");

String countryCode = values[51]; // action country code position
if (countryCode == null || countryCode.trim() == "")
countryCode = values[37]; // actor 1 country code position

if (dataMap.containsKey(countryCode))
dataMap.put(countryCode,dataMap.get(countryCode) + 1);
else
dataMap
.put(countryCode,1);

prepareHBaseRecord(values, countryCode);
}
try {
hbaseHelper.SaveBatch(records);
logger.info("Yay, saved " + records.size() + " hbase records");
}
catch (Exception e) {
logger.error("cannot save hbase records: {}", e);
}
finally {
// just ignoring HBase save failures for this simplistic implementation
records.clear();
}
..........
private void prepareHBaseRecord(String[] values, String countryCode) {
HBaseRecord record = new HBaseRecord();
record.Country = countryNameMapper.GetCountryName(countryCode);
String fullDay = values[1].trim(); // in format 19790101
record.Day = Integer.parseInt(fullDay.substring(6,8));
record.Month = Integer.parseInt(fullDay.substring(4,6));
record.Year = Integer.parseInt(fullDay.substring(0,4));
record.GoldsteinScale = Float.parseFloat(values[30]);
record.Actor1Name = values[6];
record.Actor2Name = values[16];

records.add(record);
}

With every tuple process in this Storm bolt, all lines are parsed to HBase records and sent to a helper class that uses REST API to serialise them in JSON format we saw before and POST them to HBase. The source code of this helper class is not particularly important as it’s mainly plumbing. One thing to note is the HTTP authentication part:

String credentials =  "Basic " + Base64.encodeBase64String((userName + ":" + password ).getBytes());

Geared with understanding of the extra HBase logic added to our Storm topology, you will just need to adjust config file with the cluster name and credentials.

Now, we will just follow the same process we have done with the Storm example, build and package and deploy the JAR file as a running Storm topology. I forgot to mention that we need two clusters here. One cluster for Storm and one for HBase and the PowerShell scripts to create both are listed already before. Also, the mentioned JAR file above is to be uploaded to Storm cluster not to HBase cluster.

Depending on your Azure subscription, you might need to adjust number of worker node per each cluster as there will be a limit on the number of cores that can be used in a single region (at least for my subscription that comes with MSDN benefits).

After waiting a couple of minutes to have Storm topology up and running, you can verify from HBase shell (on HBase cluster) that some records have been inserted.

scan "gdelt" , {'LIMIT' => 2}

There will be one line per record per column family column(cell). It’s very obvious the above screenshot has 14 line for two rows of saved GDELT data.

In my initial implementation, I had lots of out of memory exceptions due to the usage of GSON library. I have replaced it with another library called jackson-bind and also did the following:

  • Fixed number of worker processes in source code as it was configured to be 1 by mistake
  • Increased cluster node count from 2 to 3 plus using a worker node with double memory size (28GB instead of 14GB)
  • Made sure Event Hub has enough throughput to feed my Storm topology

Still I am not getting the performance I need due to network latency for HBase calls I guess. So, the solution could be to batch more HBase records for insertion in the same call. This has to be tested against max size of payload that’s accepted by HBase REST API.

Also in the next update, I will add a HIVE section to query this GDELT data stored in HBase to complete the full picture of lambda architecture.

For now, we will move on to the final part of this journey, Spark.

Spark: Chuck Norris of big data applications

Unfortunately, the logo for Spark is not an animal so I picked the most suitable meme that can represent what Spark is supposed to do. Well, many big data problems could be solved using Spark. I am just assuming the workflow will be simply try Spark first and then use something else if you don’t get acceptable result. In this final exercise, we will do two things with Spark.

  1. Convert GDELT 160 GB of flat text files into columnar compressed format (parquet). This could be considered an ETL task.
  2. Run interactive query against parquet data and compare it to Pig query we have done before. This is a simple analytics task.

Spark is a radical improvement over plain old MapReduce (without YRAN/Tez) as it provides:

  • In memory execution engine instead of multiple MapReduce jobs with heaps of HDFS read/write consecutive operations
  • High level abstractions in Scala/Java/R/Python to facilitate data processing and rapid development
  • Many built-in libraries for SQL/Streaming/Graph/ML functionality
  • Very active community and rich vendor support

But why should we convert our massive dataset to another format which is parquet? Mainly for two reasons:

  1. Columnar formats are much better for BI & big data workloads as most of the queries will touch just a few fields (columns).
  2. Parquet stores data compressed (by default) thus reducing storage costs if this is a big concern and also speeds up loading data (there is still some overhead of decompression but for complex queries this would be negligible).

Following the same pattern, Spark cluster could be created using this PowerShell script. Please remember to adjust the script to suite your resources settings. Once the cluster is created, we can write Spark jobs using Jupyter or Zeppelin notebooks. This is the approach that will be used by data scientists or data engineers. Once an acceptable solution is ready in a notebook, it can be handed over to a developer to include it in a Scala application for operationalization. HDInsight provides also other tools like PowerShell and SDKs to submit Spark jobs for automated execution.

I will use Jupyter notebook and create a new Spark notebook.

In the first cell, config parameters of the session will be updated to make use of all cores and memory within the cluster then a Spark version is printed to instantiate a Spark application. The config parameters are tweaked based on the hardware specs of the current cluster (4 nodes, 32 cores, 56 GB memory per node). You can start with default settings and see if custom config provides any improvements.

Then we will define the schema of GDELT tab delimited file because saving to parquet will include data plus it schema (self contained format). The whole notebook is shared on GitHub for reference.

Next cell, we will read GDELT using the configured schema above and write it into another container using parquet format

Spark UI link found in the first cell outcome in this notebook provides a link to navigate to Spark UI page where the progress of the job can be monitored.

Once the job is done, switching to Azure storage explorer we can see a new container with the name of gdelt-parquet appearing with heaps of files inside it. They still represent a single dataset that can be queried later by providing the URL of the container itself only. Also the data is compressed using a codec called snappy; there are other options like gzip and no compression as well.

I guess there are ways to limit the number of files generated but let’s stick with this implementation. By the way, the whole job executed in 20 minutes on 4 nodes cluster similar to the cluster used for Pig.

If that’s too long to do the conversion, a larger cluster could be used as after completion it will be simply deleted. One thing to note here, the conversion job converts text files into columnar format and includes all columns. Also, I was very lazy with the schema and defined everything as strings. Most likely, better result could be obtained if integer columns are defined as numbers specially if they have a small range. Currently, the 160 GB flat files are compressed into 23 GB of parquet content (you can get that number from storage explorer folder statistics).

Now for the same query done using Pig, it could be written as follows:

Notice that the data is being read from parquet folder followed by projection (to make use of columnar optimization) then a group by operation. The result is shown on screen but can be also dumped into an output file.

The whole thing took roughly 1.3 minutes, around 4 times faster than Pig. That’s pretty cool; summarizing 160 GB in 1.3 minutes on a 4 nodes cluster.

Also Spark has caching features that add tangible speed improvements for operations on the same dataset or for iterative data processing jobs like machine learning algorithms.

For multiple queries and calculations on the same dataset, converting it to parquet (or similar optimized format) will pay off the extra overhead of the conversion operation which is supposed to be done once only. For new data that is supposed to be added to the same dataset, parquet supports an append mode which should be very fast although at certain point in time files have to be merged together to decrease fragmentation.

We have just scratched the surface of what can be done with Spark so if you plan to invest time on a big data tool, it’s better to start with Spark.

Please remember to tear down all your Azure resources 😉.

Summary

That would be the end of our big data journey on Azure. We have seem a bunch of tools and hopefully you’ve got some idea now about the tools and where to use them. The emphasis was on open source big data that can be applied with any cloud provider rather than custom proprietary tools. I hope it was a cool journey with one or two things learned. Till the next adventure 🐘

Change Log:

23–04–2018 : Fixed Storm topology to handle the problem of a single bottleneck bolt.

25–04–2018: Added HBase section.

29–04–2018: Improvements to Storm topology: replace JSON library and lots of refactoring.