Hadoop!

And the family — Isn’t fun?

Omar Elgabry
OmarElgabry's Blog
25 min readNov 18, 2018

--

If the logo was to be designed.

This article is not a technical deep dive. It’s rather a quick glimpse over what tools are available and what they can accomplish.

Having a clear idea of all the solutions in the market will definitely help in shaping our decisions when facing a problem.

The complexity of Hadoop ecosystem comes from having many tools that can be used, and not from using Hadoop itself.

So, explaining the core ecosystem, categorizing these tools, highlighting on the major trade-offs, will make it a lot easier to understand and anticipate.

Hadoop

Apache Hadoop is an open source software platform for distributed storage and distributed processing of very large data sets on computer clusters built from commodity hardware — Apache Hortonworks

To break it down. It has three main points:

  1. A software platform. It runs on cluster of nodes (servers).
  2. A distributed storage. It’s a filesystem that stores files across multiple hard drives. Though, we can visualize all of them as a single file system. It keeps copies of the data and recover automatically.
  3. A distributed processing. All processing (like aggregation of data) is distributed across all nodes in parallel.

Hadoop was developed at Yahoo! to support distribution for the “Nutch”; a search engine project. While the data storage is inspired by Google File System (GFS), and data processing is inspired by MapReduce.

— What Hadoop is used for?

It’s for batch processing, but some solutions built on top of Hadoop provide interactive querying (CRUD applications), and we’ll go through them.

Batch processing means processing of batch of separate jobs (tasks) at the background given the fact it wasn’t triggered by the user (no user interaction), such as calculating user monthly bill.

The scaling and distribution of storage and work, is horizontal.

With vertical scaling, still, we’ll face issues like: disk seek times (one big disk vs many smaller disks), handle hardware failures (one machine vs data copied among different machines), processing times (one machine vs in parallel).

Core Hadoop

— What’s part of Hadoop itself?

HDFS, YARN, and MapReduce. These solutions are built on top of Hadoop directly. Everything else is integrated with it.

HDFS

It’s a file system. The data storage of Hadoop. Just like Amazon S3 and Google Storage.

HDFS can handle large files that are broken and distributed across machines in the cluster. It does this by breaking the files into blocks. And so, no more huge number of small files.

We can access the distributed data in parallel (say, for processing). It stores a copies of a block on different machines. So, machines talk to each other for replication.

Diving a little in its architecture.

First. A name node that tracks of where each block lives, and data nodes that have the blocks.

When reading …

  1. Our application uses a client library that talks to name node, and it asks for a specific data (file).
  2. The name node replies back with the data node(s) that have the data.
  3. The client library talks to these data nodes, and sends the result back to us.

When writing …

  1. The client library will inform the name node about the writing operation.
  2. The name node will reply back by where to store this file, and new entry for that file has been created.
  3. The client library go and save the file data in the given location (node X).
  4. The node X then talks to other machines for replication. And send back to the client library acknowledging the data has been stored and replicated.
  5. The client library forwards that acknowledgment to the name node that data has been stored, and where it was replicated.
  6. The name node updates its tracking metadata.

The name node is obviously a single point of failure. However, It’s important to have a single node as they may not agree on where data should be stored. To solve the problem, a backup name node can take over when the main node fails.

YARN

YARN: Yet, another resource negotiator. The resource (CPU, memory, disk, network) management layer of Hadoop.

It splits the work — in this case resource management and job scheduling and monitoring, across the nodes, taking care data ‘processing’.

It was part of Hadoop, but then split out, allowed data processing tools like graph processing, interactive processing, stream processing, and batch processing to run and process the stored data. We’ll see examples next.

An abstract view of its architecture.

  1. The client talks to YARN, and YARN then spin up a node running application master.
  2. YARN allocates resources to the applications running, and passes the jobs coming to the master to be executed.
  3. The application master talk to other (worker) nodes, which are running containers that execute the job, and talk to HDFS to get the data to be processed and save the result back.
  4. The application master can also ask for more resources from YARN, track their status and monitor the progress.
  5. Each node has its own node manager, which tracks what each node is doing, and constantly report back to YARN with the node status.

Mesos

It’s an alternative to YARN, yet it’s still different. It’s used by Twitter, not directly associated with Hadoop, and has a more broad system.

— Mesos Vs YARN

  • Mesos manages the resources across the data centers, instead of just Hadoop. Not only about the data but also web servers, CPU, etc.
  • YARN, you give it a job, and it figures out how to process it. Mesos, you give it a job, and replies back with the available resources, and then we decide whether to accept or reject.
  • YARN is optimized for long, analytical jobs, while Mesos is more general purpose; can be for short, or long jobs.

— When to use Mesos?

To manage and run on top of your cluster, not only Hadoop. We can use both YARN to manage nodes for Hadoop, while Mesos manages nodes for web servers, and everything else.

MapReduce

It’s the data processing of Hadoop. It consists of Map: run a function on nodes in parallel, and Reduce: aggregate the results of all nodes.

It distributes the processing by dividing among partitions (nodes). A mapper function will be applied to each partition, and a reducer function to aggregate results of mapper.

Let’s take an example. The task here is to get the count for every word across massive amount of text data. A MapReduce job is composed of three steps:

  1. Mapper. Given text data, for each input, transform it to key-value pair. In this case, the key is the word, and value is just 1 (will be aggregated).
  2. Shuffle & Sort. MapReduce automatically groups same keys, and sort them. This step happens right before every reduce function. So, we’ll end up having the word as a key, and the value is an array of 1s.
  3. Reduce. Given a list of all the words, aggregate. So, for every key (word), we’ll get the count (value=[1,1,1,…]). The reducer gets called for each unique key.

— How MapReduce distributes the processing?

Remember YARN?. If so, then this should be familiar since MapReduce runs on top of it.

So. It talks to YARN saying, “Hey, I want to process this job”. And the same steps explained above will kick off. The only difference here is containers (workers) are running map/reduce jobs.

— Failures?
The application master restarts the worker (preferably on a different machine) when a worker fails. YARN will restart the application master if itself failed.

— How to write MapReduce jobs in code?

MapReduce can be written in Java and Python. An example is to rank movies by their popularity, and return them in a sorted order.

# Given a list of movies, transform it to (movieId => rating)
def mapper(self, _, line):
(userId, movieId, rating) = line.split(',')
yield movieId, rating
# Shuffle and sort. Merge similar movieIds and sort by movieId.
# Remember that similar keys can be distributed across different
# mappers, different nodes.
# For every movieId => [7,5,6,4,6,2,...], aggregate.
# The result is for every movieId, we get the average of rating
# But, return average as a key, and movieId as value.
def reducer(self, movieId, ratings):
# "ratings" is an iterator not the actual array.
# We can iterate over values one by one. Why?
# To avoid loading the whole array in the memory.

yield avg(ratings), movieId

Now, we can add another step, with another reducer to sort the result by average of ratings (key). The input for this step is the output of the preceding step: (average of ratings → movieId).

Before each reducer, “the shuffle and sort” step will kick in even if there is no mapper, and group similar ratings (average) together.

def reducer_sort_movies(self, avgRating, movies):
for movie in movies:
yield movie, avgRating

While MapReduce might solve some of our problems, It’s not easy to enforce a task into map/reduce functions. That’s why some other solutions like Spark that uses SQL-like queries are becoming more popular.

Tez

It runs on top of YARN. It’s an alternative to MapReduce. In fact, we can configure some of the tools that run on MapReduce by default like Hive and Pig to run on Tez instead.

— So, What’s good about it?

MapReduce vs Tez — source

Tez tries to analyze the relationship between the steps & independent paths, and figure out the most optimal path for executing the queries.

It tries to run the steps in parallel instead of in sequence as in MapReduce, as It has a complete view or awareness of the job, before running it. It also optimize the resource usage, and minimize the move of data around.

This is called “Directed Acyclic Graph (DAG)”. It about accelerating the jobs.

Spark

It can set on top of YARN, and an alternative to MapReduce, allowing to use programming language like Python, Java or Scala to write queries. It uses the idea of map/reduce, but not the exact implementation.

Besides, It provides many functionalities for ML, data mining, streaming. And, uses DAG, same as in Tez.

— How It works?

It’s based on RDD; An object represents the dataset thats actually being distributed across nodes in the cluster and has a set of built-in methods.

So, we, just deal with a dataset; a single object, though, it’s distributed.

Here is an example in code to demonstrate. The problem is to get the count of how many each movie was rated.

# Create the Spark context object
sc = SparkContext(conf = conf)
# 1. Load the data from a source like HDFS. It returns an RDD.
lines = sc.textFile("hdfs:///user/.../movies.data")
# 2. Map. Transform each line to: (movieId, 1) using mapper function
movieRatings = lines.map(mapper)
# 3. Reduce. Given each movieId = [1,1, ...], count the ratings.
# The value array for each movie is passed as accumulated & current value. Why? To break an array in pieces, and apply a function in parallel. Want to visualize how it works?.
movieCount = movieRatings.reduceByKey(lambda acc, curr: acc + curr)
# We can also apply functions like sort
sortedMovies = movieCount.sortBy(lambda m: m[1])

Spark won’t kick-off unless we call one of the “action” functions (reduce, collect, etc). So, It determines the fastest way to perform the action and get the result.

— DataFrames

With Spark 2.0 DataFrames was introduced. DataFrames extends RDD. It contains row objects, each has name and datatype (instead of just tuple), and we can run SQL queries. An example for the same scenario above.

# mapper returns a Row object: (movieId=INT, rating=1)
movieRatings = lines.map(mapper)
# Convert the RDD to a DataFrame
movieDataset = spark.createDataFrame(movieRatings)
# Group by movieId and get the count of ratings
movieCount = movieDataset.groupBy("movieId").count("rating")
# Order by ratings
sortedMovies = movieCount.orderBy("rating")

As you noticed, we can perform the same task with less and easier code. It’s more efficient since the data has structure. DataFrames can also be used with ML and streaming capabilities built on top of Spark.

A common example is to predict the movie recommendations of a user.

# mapper returns a Row object: (userId=INT, movieId=INT, rating=FLOAT)
userRatings = lines.map(mapper)
# Convert the RDD to a DataFrame
userDataset = spark.createDataFrame(userRatings)
# Train the model (ALS)
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(userDataset)
# Predict movie recommendations given a list of movies of a userId
recommendations = model.transform(movies)

Pig

It runs on top of MapReduce and provides an interface to write SQL-like script, a procedural language called ‘Pig’, instead of Java or Python.

It lets us do what we can do with MapReduce (write map/reduce jobs) in a simpler way. We can define user custom functions. And, we can still run mappers and reducers. Also, It can run on its own without Hadoop.

Using the example above of the movie count. A Pig script looks like:

# Load the data from HDFS. A schema is applied on data load.
# The ratings is an array (series) of tuples
ratings = LOAD '/user/.../movies.data' AS (movieId:int, rating:int);
# Group by movieId. Returns an array of tuples. The first value is
# the movieId, and second called 'bag': matching rows.
# Under the hood, Pig is using mapper and reducer to get the result.
ratingsByMovie = GROUP ratings BY movieId
# Count ratings of each movie
# ratings gives access to the bag of each group (movieId)
movieCount = FOREACH ratingsByMovie GENERATE group AS movieId, COUNT(ratings.rating) AS countRating;
# Order by ratings
sortedMovies = ORDER movieCount BY countRating DESC;

There is a lot more on Pig. Other commands, connect to different data sources, user-defined functions, and more!.

Hive

It solves almost same problem as Pig. It takes a SQL queries (HiveSQL), as if the data is stored in a relation database (in fact, it’s not).

Hive will then execute SQL on multiple nodes and process it in parallel, and return the final result. It translates the SQL queries to mapper and reducers on top of MapReduce.

And same as Pig. It’s easier than to write map and reduce in code.

It’s not optimized for interactive queries. Its for analytics queries (at the background).

But with SQL, It’s limited. There is no way to chain methods like in Spark or Pig. There is no update, insert, or delete operations. In fact, there is no physical table at all (only logical). Under the hood, the data is denormalized.

Here is how It works …

It creates a schema that gets applied on data when it’s being read. In traditional relational database, schema is applied when data is loaded.

So, it’s taking unstructured data, and applying a schema on read. It doesn't copy the data. Loading the data will move data to Hive directory (which still in HDFS). It doesn't reformat the data.

It then creates “views”; a logical table as result of a query. Now this data is owned by Hive. If we drop a table in Hive, the data is gone!.

But, still, can have external tables, which share existing data given its location in HDFS. So, data is not in Hive directory, and Hive will just drop the schema.

CREATE VIEW IF NOT EXISTS movieCount AS
SELECT movieId, COUNT(rating) AS countRating
FROM movieRatings
GROUP BY movieId
ORDER BY countRating DESC;
SELECT * FROM movieCount LIMIT 10;

External Data Storage

We can use and integrate external data storage, relational and non-relational databases to expose the data to transactional platform (like interactive web applications).

We can import & export data directly to & from HDFS, and the data might be transformed by Spark or MapReduce.

MySQL

MySQL, is a monolithic, not distributed, relational database. It’s good for interactive queries (web based apps).

With MySQL, we can import and export data using Sqoop. Sqoop kicks off bunch of mappers, and populate the data to HDFS (or MySQL).

  • We can import or export with the option to use Hive directly.
  • We can also give number of mappers to run.
  • We can have incremental imports based on last value of a column (timestamp). So to keep data in HDFS in sync.

If all what you need is to answer queries very fast using a key (data is denormalized). Then, a NoSQL database might be a good candidate. NoSQL is better when you favor the speed and scaling over consistency.

HBase

A NoSQL database, sets on top of HDFS. It’s an implementation of Google Big Table.

There is no query language. And only CRUD via API. It partitions the data automatically, and re-partition, and auto scale as the data grows.

A quick peek at the overall architecture and data model.

The master nodes know where the data is stored and how partitioned, but it’s not meant to be queried. We talk to partitioned servers (salves) directly. Each partition has data of key range of, say 1–100, 101–200, etc.

  • It consists of a collection of tables. Each table has rows, each identified by rowkey, and keys are stored in lexicographical order.
  • Each row consists of family columns (a set of related columns).
  • Each cell may have different versions with timestamps.

It’s schema-less. It can store sparse data; not all rows have to have the same columns. If data doesn’t exist at a column, it’s not stored. This also allows us to dynamically add columns.

CustomerName: {     // column family in a row
‘FN’:1383859182496:‘John’, // column name, version, value
‘LN’:1383859182858:‘Smith’}
ContactInfo: { // another column family in the same row
‘EA’:1383859183030:‘john.smith@domain.com’,
’SA’:1383859183073:’5th Avenue’}

The format of data does matter, and it’s depends on how we query the data. So, If you’ll query the movies given a user, then structure should be look like

# userId is the key, and movies is a column family
userId,
movies: {
movieId1: rating,
movieId2: rating,
...
}

Furthermore, Spark, Hive, and Pig can be integrated with HBase. We can run queries and writing results back to HBase.

Cassandra

A distributed NoSQL database with no point of failure (no master node). It has a query language CQL, similar to SQL, but limited to what you can do.

The architecture has a ring of nodes connected to each other, no master node. They communicate to replicate data, and know who has what data.

A keyspace is like a database in relational database. It has denormalized tables (no joins), partitioned based on the primary key.

The client application talks to a node asking for a data, given its key. The node replies back with where to find the data, since they can talk to each other, and find out where this data is stored, and replicated.

Cassandra has good integration with other tools like Spark. So, we can:

  1. Read data stored in Cassandra for analytics.
  2. Write the result data back to Cassandra.

With Spark, we can read and write tables as DataFrames.

We can even setup another replica ring, used to be integrated with Hadoop, and run Spark, Pig, for analytics, while the main ring is for web interactions; CRUD operations.

MongoDB

It’s a document based model (has JSON format). MongoDB has a single master. It’s a schema-less, each document has a unique Id, and we can have indexes around keys for fast lookups.

It’s architecture is based on M-S. Writes to master, and replicated to slaves. When master goes down, one of the salves takes over. Replica are for availability, and not to scale.

The sharding is done based on unique key of document. Data is split among M-S architectures. Each M-S handles data in a range, say 1–100, 101–200, etc. MongoDB does rebalancing automatically to ensure data is distributed evenly.

The application code talks to “mongos” (library), which talks to master servers, and config servers.

There are some quirks with MongoDB.

  • We need odd number of servers to agree on primary (master) server.
  • Application code is tied to servers and needs to know about them.
  • Auto balancing may fail.
  • We must have 3 config servers. If one is down, so the whole database.

And some interesting facts about MongoDB …

  • It has built-in aggregation, map/reduce code, and file system (like HDFS).
  • It can integrate with Hadoop, Spark, and most languages. So, Spark may forward the map/reduce code to MongoDB; no need to use MapReduce.

Choosing your database

While there might not be a bullet proof, straight-forward solution. There are some points to consider when making a decision.

The most important point to highlight on is explained by the CAP Theorem.

  • Consistency. Data is consistent among databases (partitions & replica). Its OK to experience some latency. The consistency does matter.
  • Availability. Data is always available. Always get result even If its not consistent yet (out of sync). And consistency will be checked (sync) later (at background). This is called “eventually consistent”.
  • Partition-tolerance: Data can be partitioned.

For example, Cassandra prefers availability over consistency. Though, It can also force consistency by asking a number of nodes to confirm on the same answer when reading.

HBase (and MongoDB) prefer consistency. It has a master node, and thats a single point of failure; though, can have replica.

MySQL. It’s hard to partition but provides good availability and consistency.

Another aspect is, what are the components you need, and how well they are integrated. And, Do you really need Hadoop in the first place?.

— Using Hadoop + Spark Vs External databases

  • Hadoop. If you want to run analytical jobs at the background, and won’t be exposed to users.
  • External: If you want to run queries on real time, and expose it to users.
  • Both: For analytical data to be computed by Hadoop (at background) and push it to database occasionally.

Query Engines

There are handful of solution to query the data in the whole cluster, or from a certain database. Hive and Pig can also be candidates, but they are also part of Hadoop Ecosystem.

Drill

With Apache Drill, we can write SQL queries (and not SQL-like, unlike Hive) that run across NoSQL databases and data file sources like HBase, Cassandra, Amazon S3, etc.

We can join data from different sources, and tight them together in one SQL query.

It’s fast, but still it’s not real relational under the hood. It saves data as JSON. Data from different sources will be translated into databases and tables.

So, one can write a query to get number of movies that were rated by each user, integrating data from Hive and MongoDB.

SELECT u.name, COUNT(*) AS moviesRated 
FROM hive.movies m
JOIN mongo.users u
ON m.user_id = u.user_id
GROUP BY (u.user_id)

Phoenix

Apache Phoenix. The SQL solution for HBase. It sets on top it.

It supports transactions (ACID), secondary indexes, and user defined functions. It’s fast, low-latency, good for analytics and transactions applications. But still, we don’t with relational database.

— Why not to use HBase directly?

Because SQL can be easier. Instead, we can optimize HBase, like having a denormalized table only for answering queries.

Phoenix talks HBase API, which talks to HBase server. Each HBase server has a Phoenix co-processor attached to it.

Other tools like Hive, Pig, Spark, and MapReduce, can be integrated with it. For example, Pig can be used to access Phoenix, which in turns access HBase.

A simple SQL query using Phoenix.

SELECT * FROM users;

Presto

Presto an open-source SQL query engine, created by Facebook.

Its same as Drill. SQL-like syntax. Runs across different sources, and optimized for analytical queries. Though, facebook uses it for interactive queries on massive amount of data!.

It can connect to local files, Cassandra (unlike Drill), Hive, MySQL, Kafka, Redis, etc.

ZooKeeper

It keeps track of configurations in the cluster, shared states to ensure synchronization (like replica data), which nodes are up and down, electing master node, naming service, etc.

Many systems use it to keep track of things; whats the master server, which servers are up, and so on. It has an API, and the information is stored in a small distributed file system.

In HBase, Zookeeper watches the watcher (HBase master node). It tells which master node is running. If failed, determines which master to talk to, and ensures no two master nodes are live at a given time. A tool to let us recover from failures.

If a worker is down, It keeps a list of all tasks, so when having a new worker, we know from where to continue, and so we can re-distribute the work.

The way it works, is, it has a list of servers, or otherwise it would be a single point of failure.

The client must know about these servers, and talk to them evenly; spread out the load. The Zookeeper ensemble will replicate data across all Zookeeper servers.

The client can specify the least number of Zookeeper servers to agree on the received information to make sure the its valid.

Oozie

Orchestrating running and scheduling Hadoop jobs. For example, If you have a task that involves jobs to run even across different tools (Hive, Spark, Hbase).

It has a workflow: Chaining a list of jobs (actions) from possibly different technologies.

A common workflow is to copy data from external data source to Hadoop, and do analytical queries on the data, then write the result back.

The ‘fork’ node starts parallel actions, while ‘join’ gets the result from these action, and pass it to the next action. And it uses Directed Acyclic Graph (DAG) to run jobs in parallel, if possible.

The Oozie coordinator schedules execution of the workflow periodically, define how frequently to run, and can wait for input (data) before starting.

Oozie bundle is a A collection of coordinators. For example, each coordinator process a log data and do different analysis (like understand customer behavior). It then useful so we can start and stop the entire bundle.

Zeppelin

Apache Zeppelin. A notebook interface to our data in the cluster, much like IPython Notebook.

We can interactively run scripts against the data. We can notes, and share it with other people.

It has a good integration with Spark. It can run Spark code, SQL queries, that run against SparkSQL. Results can be visualized in charts. It can be also integrated with Cassandra, Hive, HBase, etc.

Streaming Data

So far we assumed data is sitting on the cluster, but, we didn’t say how to get the data as it comes in into the cluster for analysis.

For sure we can have a database, but, we can also format and analyze the data as it comes in.

Streaming is when the data being sent from the source to be processed. For example, log entries, sensor data, stock data, etc.

There are two problems here:

  1. How can we get data from different sources. Kafka, Flume.
  2. How to process it when it gets there. Spark Streaming, Storm, Flink.

Kafka

A general-purpose publish / subscribe messaging system. It’s not just for Hadoop.

It gets incoming messages (data) from publishers (sources), and publish them to a stream of data called ‘topic’ (like logs). Then, subscribers can listen to these topics and get data as it’s being published.

Kafka stores the incoming data for a while, just in case any subscriber failed to pick up.When the subscriber starts again, it will start from where it left off.

The architecture has a box at the center, that’s the Kafka cluster. It has processes and data, all distributed across servers.

To get the whole picture:

  • Producers. Our applications that push data to Kafka coming from sources.
  • Consumers: Any applications that receives data from Kafka. Consumers listening to the same topic can be grouped so that incoming data will be distributed among them.
  • Connectors: Databases that can publish and receive data from Kafka.
  • Stream Processors: To re-structure or format the incoming data, and send it back to Kafka.

Flume

It was built with Hadoop in mind unlike Kafka which is general purpose. It was originally built to aggregate the logs coming from different web servers, and send it to Hadoop.

It architecture is called ‘Agent’, and consists of:

  • Source: It listens to, say, a web server. It can format data or add logic to where to send the data.
  • Channel: Determines how data is transferred from source to sink (via memory or file). The memory is faster, while file has persistence to recover from failures.
  • Sink: Connects to the wherever the data will go. A sink can connect to only one channel. Flume has built-in sinks to write data to HDFS and HBase.

Unlike Kafka, once the sink receives the data from the channel, it gets deleted from the channel. So, it’s hard to have multiple sinks listening to the same data at different rates.

Think of Kafka and Flume as a buffer between the source and destination. Most often they are used with other tools for analyzing data before saving it.

Spark Streaming

With Spark core, we can analyze batch of data (say, daily). With Spark Streaming, we analyze data in real time as it comes in.

A high-level overview of how it works.

  1. As the data comes in from the source to the ‘receivers’, a batch (chunk) of this data is created for every interval of time.
  2. The DStream create RDD for each batch interval. We can transform or play around with the RDD before sending to other systems. So, we are just writing Spark code. A function can be applied on every batch.
  3. Batches are then sent to other systems, whoever is listening.

Now, we might need to apply a function over a longer period of time, instead on every batch. This is called ‘Windowing’.

For example, having a batch with interval is 1 second, but maintain the result of batches within 1 hour. And the window slides (1 hour interval) forwards as the times goes on. We can set this function to be applied every say, 10 min, on every last 1 hour interval.

An example could be, take the logs as it comes in, count top requested URLs over every window interval, and save it to the database.

— DataFrames

And as Spark core is moving towards DataFrames, and so Spark Streaming.

Data coming is stored in DataFrames, instead of dealing with individual RDDs in DStream. It’s like a table of rows, and so has structure, where rows are getting appended to it.

It’s easy to code; since code of non-streaming data wouldn’t be much different for streaming data. And has a better performance for SQL-like queries, and better integration with ML Spark API.

Storm

Apache Storm. Another solution to process and analyze real time data as it comes in. It’s built on its own, but can run on top of YARN.

It works on events (each sample of data is an event; real-time) not batches of data as in Spark Streaming.

The way It works might sound now familiar. It has:

  • Spouts: The source of data, can be an application, Kafka, etc.
  • Tuples: The stream consists of tuples, each is a single data.
  • Bolts: Receive, transform, aggregate, and send data to database, or file, as the data comes in.
  • Nimbus: Track of jobs and whats happening (has a backup for recovery).
  • Supervisor: Worker nodes, where all the processing takes place.

We can have a topology, with different Spouts and Bolts connected to each other. For example, a Spouts that passes logs data to two bolts, one after another, one to format the logs, and another to count similar URLs.

— Storm Vs Spark Streaming
Spark has more advantages like ML, and easy to write Spark code with many languages. While Storm application usually written in Java, but It’s more real-time as it deals with events not batches.

Flink

It similar to, yet might be faster than, Storm, and the newest of them.

Flink has real time streaming on events (real-time), and also batch streaming. It uses snapshots; can survive failures and ensures an event will be processed once in the correct order.

Flink supports Scala, has its own libraries for things like ML (same as Spark), and has a UI showing the running work.

On top of the Flink runtime engine:

  • DataStream API for real-time data. DataStream API supports libraries for SQL queries (as data comes in).
  • DataSet API for batch data. DataSet API supports libraries for ML, graphics, SQL queries. Same as Spark.

At the bottom. It can run on top of its own cluster, YARN (Hadoop), or AWS, or Google Cloud, or event locally.

Designing Real-World Systems

This last section is all about examples of real world problems, and what solutions we can use.

But, before diving deeper, we need to answer the question of, what are the main aspects we should look for when we get the requirements.

While, again, there is not bullet proof, straight-forward solution, and trade-offs are always there. But, as with choosing the database, there are some points to consider when making a decision.

First. Start from what the end result needs to be, from what’s needed, not from what tools are available.

Figure out what common queries; Analytical or many small transactional queries or both. Availability?, Consistency?. Determine the demand for each.

Second. How quickly data gets processed.

Every day, minute? Scheduled jobs using Oozie with Hive, Pig, Spark, etc. Real-time? Storm, Spark Streaming, or Flink with Kafka or Flume.

Third. Keep it simple. Use existing solutions if possible.

Reddit: Get the top links for the last hour.

Given that there are any users. It must be fast. Consistency is not that important. It has to be hourly updated. Not really real time.

The solutions.

  • Database. Cassandra, or Amazon DynamoDB. Will store the top links.
  • Kafka or Flume. Streaming the data as it comes in.
  • Spark Streaming. Analyze the data over a window interval of 1 hour.

The workflow:

  1. A reddit service. Whenever a user submits a new link, it sends that transaction to Flume.
  2. Flume will stream the links to Spark Stream, which will …
  3. Compute the top links for a window of 1 hour, and send it to Cassandra.
  4. The result will be stored in Cassandra, which can be exposed to the web application.

Trade-offs.
Instead of streaming the data. Run jobs using Spark, every say 1 hour. Load HDFS with the data to run analytical queries, and store the result back to Cassandra. Use Sqoop to export data to Spark (HDFS) from Cassandra.

IMDB: Recommend movies for a user.

Given that availability is a concern, and consistency less important.

The recommendations should be in real-time. So, If a user liked a movie, immediately the user should see the movies related.

The solutions.

It’s overwhelming to compute recommendations every time user gives a rating. So, we should compute the the recommendations based on existing users behavior once, periodically. The result from this algorithm, let’s call it ‘Boltzman’, is not likely to change frequently.

And based on our current understanding of the user behavior, we can predict the recommendations for all the movies, for a given user, then sort them, and filter the ones user have already seen.

  • Database. Cassandra, or any NoSQL database like HBase. Will store users ratings (coming from stream) and Boltzman (generated periodically).
  • Spark ML (or Flink) and Oozie to run every some hours (in frequently), and re-compute the user movie recommendations.
  • Flume and Spark Streaming. For streaming and processing real time user behavior.

The workflow:

  1. The web server sends user ratings to Flume, which passes it to Spark Streaming, and then saves result into HBase.
  2. A recommendation service returns the user recommendation, given user ratings and Boltzman from HBase.
  3. Every, say 1 day, Oozie will kick a Spark ML job to re-compute the movies recommendations (Boltzman) and save it to HBase.

Insights: Daily insights into the traffic coming to our website.

We need to know how many users, their geo locations, number of total requests, etc. Only run once a day based on previous day activity. It’s used only internally. Not exposed to users.

The workflow.

  1. The web server sends the user logs to Kafka, and then to Spark Streaming, which will …
  2. Determine and answers all the queries across 1 hour interval.
  3. Then stores the data in Hive (which uses HDFS).
  4. Every day, Oozie will kick a query to run on Hive.

If we are storing logs to existing database, maybe we can export the data using Sqoop, or even no need to use Hive at all.

Thank you for reading! If you enjoyed it, please clap 👏 for it.

--

--

Omar Elgabry
OmarElgabry's Blog

Software Engineer. Going to the moon 🌑. When I die, turn my blog into a story. @https://www.linkedin.com/in/omarelgabry