Sparking off at another thing to learn: Apache Spark

Following the Hadooping series, what is Apache Spark?

Credit : Jez Timms

What?

It is another “cluster computing” thing.

Why?

Well, MapReduce was beginning to creak. It was a bit slow. Work had to be split into batches and couldn’t have any dependance (Batch processing). Coding MapReduce tasks could be confusing. Because we like making new shiny things?

Spark is :

How?

Hadoop using MapReduce uses a lot of storage locations and nodes have to read data in from those locations. Spark saves the data on the RAM/memory of the node.

That is the headline: MapReduce but more, more easier and faster.

What is it actually?

Well here is two informative images.

This is trying to say Spark is database agnostic and helps a variety of techniques.
This is a better one showing it is storage agnostic, can work with a variety of management tools (YARN, Mesos and its own built in one) and then can be interacted with through its libraries.

That is it’s architecture, it is a layer to add to your storage. This layer can allow you to have a better control.

A large part of why Spark is supposedly so good is speed and it achieves that through minimisation using the Scala coding language. Scala is object oriented. Everything is an object. This is the crown jewel of programming. I am not sure why but it runs better faster harder stronger and more.

Resilience

A large feature of Spark is how it organises a Resilient Distributed Database (RDD). This is essentially similar to a Hadoop Distributed File System (HDFS) but HDFS is saved to disk (into a hard drive). This is good as if the power goes out you still have the data.

Spark’s RDD is built on RAM. If the power goes out you lose it. The clever part is that you can rebuild it to be exactly the same because the data’s building instructions are saved. These instructions are called DAGs or Directed Acyclic Graphs and are the reason why we can be confident to use it. Think of them as instruction booklets.

How is it safe? How does it run?

All data saved on RAM is duplicated elsewhere and the instructions/DAGs are saved.

Any operational change (“do this to the data”) is added to the DAG which is securely saved — this is called a Transformation as your command has transformed the DAG but not the data.

When it comes time to run the commands (“make all the changes I instructed now”), the data is processed according to the DAG — this is called an Action.

If any node fails we can redo all the actions because we still have the DAG — we just have to start again.

So how do we make an RDD?

Well there are 3 ways:

  1. If the data is already in a RDD, you can transform it (filter the older RDD to get a new one).
  2. If the data is already within Spark, you can duplicate it (known as parallelize because it allows you to work on the data in parallel to having it stored).
  3. If the data is in something else like HDFS, you can reference it (essentially copying it over but keeping a note of where it has come from).

What if I have more data than I do RAM?

Yeah this can happen. Spark is most efficient when everything is on RAM but that can’t happen all the time. In this case, you will have to pick from a variety of options:

From the Documentation: RDD Persistence

That is all I know here… please say if you know more!

Working with Spark

First, know that sc stands for “Spark Context” not Scala. For example, we can check the Spark version we are running as sc.version. If we want to parallelize some data already within Spark we need to sc.parallelize(input).

You need to import a Spark library to be able to use sc in your Java/Scala/Python script as you upload the script file to Spark.

import org.apache.spark.SparkContext

Note that Python is 0 indexed (it starts counting from zero) whereas the others a 1 indexed. As such if you want the first value of an array in python you need the 0th element, not the 1st.

How to load data

If it is already a RDD:

val newData = oldData.filter(conditionsForFiltering)

If it is already in Spark:

val data = sc.parallelize(nameOfData)

If the data is on your local machine it is as simple as:

val data = sc.textFile("nameOfData.txt")

Transformations, Actions and DAGs.

A note for above: transformation operations do not get run on the data but instead affect the DAG. parallelize is one of these operations and hence needs an Action to call it.

This process of staging all your work operations first (ie making the changes to the DAG) and then calling the Actions to be operated as the DAG says is known as Lazy Evaluation. Despite the name, it is actually quite efficient. The name comes from the idea of a lazy person doing all the work when it is demanded opposed to when it was first requested.

To be able to see the DAG of a particular RDD (group of data that will have actions performed on it) we can add .toDebugString operator. This will output a list of actions that will occur to the item put it on. This should be read in reverse order.

Examples of Transformations:

map(func) - go through the data and do this
filter(func) - filter the data to be just this type
flatMap(func) - destroys nesting while doing a map
join(func) - matches key/id values between two datasets and then does a function
reduceByKey(func) - similar to above but will reduce/aggregate the amount of data you have (ie. by adding numbers to a total)

Examples of Actions:

collect() - send all the results back 
count() - count the number of elements
first() - return the first element
take(n) - return the first n amount of elements
foreach(func) - do this func for each element

A note on generating RDDs.

If you have actions across multiple lines you maybe creating multiple RDDs. For example:

val manipulatedData = data.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
val words = data.flatMap(line => line.split(" "))
val countedWords = words.map(word => (word,1))
val totalWords = countedWords.reduceByKey(_+_)

This can be beneficial and not. If you needed each step of the data, or needed to test each step then it is good. If not, it will take longer to execute as data is being “saved out” more often.

Actual Coding

How do I filter for a particular word?

output = logFile.filter(lambda line: "INFO" in line)
output.take(10)
output.count()

So filter for the word "INFO", show me the first 10 records and then give me the count.

So for me the lambda function and the in needs explaining. Lambda functions are a python shortcut to writing a defined function. This means you can write functions that have names, but if they aren’t going to be used anywhere else why bother. Instead, don’t name them and keep them as anonymous functions:

def double(x):
return x*2
data.map(double, [1,2,3,4]) #this outputs [2,4,6,8]
data.map(lambda x: x*2, [1,2,3,4]) #so does this.

So in our example with filter we are saying call each bit of data a line and for every line filter it if the word "INFO" is in it.

Here is info for Lambda functions and here is info for in.

How do I do a word count?

To do a word count we need to map. What I mean by this is I need to:

  1. label every word with a value of one throughout all the text (these seems an obvious step to a human but a machine doesn’t know the value of words) Hello (1), world (1), some (1), text(1), Hello (1)
  2. then it condenses the words putting any identical words together Hello(1,1), world(1), some (1), text(1)
  3. then it reduces the identical words that are together to get a total Hello(2), world(1), some (1), text(1)

These steps are an illustration. What is the actual code:

readmeCount = readmeData.flatMap(lambda line: line.split("   ")). \
map(lambda word: (word, 1)). \
reduceByKey(lambda a, b: a + b)

The \ just allow use to go to another line visually but they are all joined in one long line for the computer. Once again lambda is used. First to split the data into a list of words, then to have a value next to them, and then to add the value. The last call confuses me a little. My initial thought is that it will try to add word and 1 which will not help! What it is actually is doing (because the people who made it were trying to simplify things) is adding a running total. For example, let’ say we have mapped and found a lot of Hello: Hello(1,1,1,1,1,1,1,1,1) . The function adds one by one. So :

Hello(a,b,1,1,1,1,1,1)
a+b = 2
Hello(2,1,1,1,1,1,1)
#then we loop
Hello(a,b,1,1,1,1,1)
a+b = 3
...
Hello(8,1)
Hello(9)

Finally there is not value for b to be assigned so it stops. Here is a more detailed explanation.

Let’s use Spark

Define things

You need to import spark, then define details about the app you are running and then define things like sc. Check some examples in sparks example directory. I am lost.

Give it functions

There are three ways to allow every node in the cluster to run functions:

  1. Keep all functions anonymous / only write lambda functions — this is fine if your functions aren’t reused.
  2. Create a list, which the nodes can access, of functions. This is done by creating an object which contains every functions name and its definition. When the nodes execute the code should reference the object and then the functions name it needs to run.
  3. Only send the function its need from the list, no more.

Spark Libraries

Spark SQL — Allows SQL, HiveQL or Scala queries to be written and executed.

Spark Streaming — Allows streaming of live data, from various sources like Kafka, Flume, HDFS and Twitter! It can also push streaming data to databases and HDFS. The way it works is essentially data comes in and is turned into blocks which are then sent off, as soon as they are full, to be processed.

MLib — It is a Machine Learning Library with a variety of common applications and models.

GraphX — For graphical processing.

Spark Processing Architecture

https://spark.apache.org/docs/latest/cluster-overview.html

Spark always has a driver (where the jobs / queries are) which communicates to the cluster manager for resource. The nodes that are created are workers and will complete the tasks, save their status in a cache, and then return results to the driver.