An evening playing with Apache Spark.
Today someone asked me about Spark so I started to investigate a bit more to see behind the surface of our daily operation. I wish to thank him for sparkling me this kind of curiosity. I really like going deeper on things, it gives you gratification. This evening i started reading like a crazy..here we present my findings about 3 hours reading. I will continue this during the week, to play with some Kaggle dataset.
Apache Spark is an analytical processing engine for large scale powerful distributed data processing and also machine learning applications. Spark consists of a large set of data analytics and machine learning.
Apache Spark works in a Master/Slave architecture. The master node is referred to as “Driver” and the slaves nodes are called “Workers”. When you run a spark application a spark session creates a spark context, which is an entry point to your aqpo0pplication. Spark operations which are your data transformation are executed on worker nodes. The resources are are managed by a cluster manager,
A cluster manager:
- cluster manager, the cluster manager knows there the worker are located and their accounting information.
- when you submit a spark manager the cluster manager determine how much ram and CPU each worker node to execute your programs.
Spark Driver.
Spark driver: works with the cluster manager to interact which manager to run processing logic on.
The Spark driver create a Spark Session, that create a spark context which then executes your Spark Configurations to allocate the required APIS for your application. API we mean allocation of the requited tools and this is why the Spark session, together with the Spark context, is the main entry point of your Spark application.
Spark Unified Stack:
1. Spark Core provides all the functionality to manage and run distributed applications such as scheduling and fault tolerance.
2. Spark SQL: Designed the work with database table and structured file formars such as CSV, JSON, Acro, ORC, Parquet, Iceberg.
3. Spark Structured Streaming: built to write applications for real time stream processing that offers highthrough put and is also fault tolerance.
4, Spark MLib: provides many popular machine learning algorithms out of the box.
5. Spark GraphX: graph processing operates on data structures which are used to represent real-life networks, such as Linkedin that connect professional people together.
Spark Jobs Concepts.
A spark job is compose of one or more stage.
Stages are created whether an operation can be performed serially o in parallel. Each stage can have one or more tasks.
Application
A user program built on Spark using its APIs. It consists of a driver program and executors on the cluster.
SparkSession
An object that provides a point of entry to interact with underlying Spark functionality and allows programming Spark with its APIs. In an interactive Spark shell, the Spark driver instantiates a SparkSession for you, while in a Spark application, you create a SparkSession object yourself.
Job
A parallel computation consisting of multiple tasks that gets spawned in response to a Spark action (e.g., save(), collect()).
Stage
Each job gets divided into smaller sets of tasks called stages that depend on each other.
Task
A single unit of work or execution that will be sent to a Spark executor.
Spark operations on distributed data can be classified into two types: transformations and actions.
Transformations, as the name suggests, transform a Spark DataFrame into a new DataFrame without altering the original data, giving it the property of immutability. Put another way, an operation such as select() or filter() will not change the original DataFrame; instead, it will return the transformed results of the operation as a new DataFrame. All transformations are evaluated lazily. That is, their results are not computed immediately, but they are recorded or remembered as a lineage.
A recorded lineage allows Spark, at a later time in its execution plan, to rearrange certain transformations, coalesce them, or optimize transformations into stages for more efficient execution. Lazy evaluation is Spark’s strategy for delaying execution until an action is invoked or data is “touched” (read from or written to disk). If you’re familiar with LINQ, it’s kind of the same mechanism but better implemented.
DataFrame API.
A DataFrame is the most common Structured API and simply represents a table of data with rows and columns. The list that defines the columns and the types within those columns is called the schema. You can think of a DataFrame as a spreadsheet with named columns. The list that defines the columns and the types within those columns is called the schema. You can think of a DataFrame as a spreadsheet with named columns.
The core data structures are immutable, meaning they cannot be changed after they’re created. This might seem like a strange concept at first: if you cannot change it, how are you supposed to use it? To “change” a DataFrame, you need to instruct Spark how you would like to modify it to do what you want. These instructions are called transformations. Let’s perform a simple transformation to find all even numbers in our current DataFrame. So in Spark we’ve just pure functions that works for dataframes.
So you can to transit from a data structure to another you should do a transformation. Transformation are lazy, we’ve two kinds of transformations:
- transformations that specifies narrow dependencies (i.e. myRange.where). Transformation of narrow dependecies are those for which each input partition will contribute to only one output partition. (1:1 — transformation)
- transformation that specifies wide dependencies. A wide dependency style transformation will have input partitions contributing to many output partitions. You will often hear this referred to as a shuffle whereby Spark will exchange partitions across the cluster. With narrow transformations, Spark will automatically perform an operation called pipelining, meaning that if we specify multiple filters on DataFrames, they’ll all be performed in-memory. The same cannot be said for shuffles. When we perform a shuffle, Spark writes the result to the disk.
Spark is lazy like any functional program. Lazy evaluation means that Spark will wait until the very last moment to execute the graph of computation instructions. In Spark, instead of modifying the data immediately when you express some operation, you build a plan of transformations that you would like to apply to your source data.
1. Build a plan of transformations.
2. Spark compiles the plan and perform optimizations in order that plan will be run efficiently as possible across the cluster.
We call this plan logical transformation plan. We need to introduce the concept of actions. An action instruct Spark to compute a result from a series of transformations. There are three kinds of actions:
- Actions to view data in the console.
- Actions to collect data to native objects in the respective language.
- Actions to write to output data services.
import pyspark
from pyspark.sql import SparkSession
if __name__== "__main__":
spark = SparkSession.builder.master("local[1]") \
.appName('SparkByExamples.com') \
.getOrCreate()
flightData2015 = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv("./2015-summary.csv")
# take is an action that triggers the transformation plan.
#items = flightData2015.take(3)
print(flightData2015.sort('count').explain())
azureuser@testingazure:~/playground/spark$ python flight.py
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/11/19 22:22:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#19 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#19 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=27]
+- FileScan csv [DEST_COUNTRY_NAME#17,ORIGIN_COUNTRY_NAME#18,count#19] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/azureuser/playground/spark/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
Here we’ve a Spark plan, you can read explain plan from bottom to top. The top is the end result and the bottom are the source of data.
You will see:
- FileScan
- Exchange
- Sort
We need an action to kickoff this plan:
1. csv file
2. data frame. (read)
3. data frame. (sort)
4. take
DataFrames and SQL.
Spark can run the same transformations, regardless of the language, in the exact same way. The language (Python/Scala/R/SQL) is just a mean to express transformations and actions. The transformations will be compiled in a plan, optimized before being executed in the cluster, so there is no performance gain in using a language or another one.
The execution plan of the last query is a DAG of transformations each resulting in a new immutable DataFrame, on which we call an action to generate a result.
flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count").withColumnRenamed("sum(count)", "destination_total").
sort(desc("destination_total").limit(5).explain()
csv_file -> read -> groupBy -> sum -> rename column -> sort -> limit -> collect.
Architecture of Spark.
Spark driver: responsibility is the controller of the execution of a spark application and maintains all of the state of the Spark cluster. It must interface with the cluster manager in order to get actually physical resources and launch executors. In our contest we’ll call it the “emitter”.
Spark executors: spark executors are the processes that performs the tasks assigned by Spark driver. Executors are like a worker. They take the tasks assigned by the driver, run them and report their state and results. Each spark application has its own separate execution processes.
The cluster manager. The Spark Driver and Executors do not exists in void and this where the cluster manager comes in. The cluster manager is responsible for maintaining a cluster of machines that will run its own “driver” and “workers” abstractions.
So what happens when a Spark application will be executed? The cluster manager does the mapping of the cluster between nodes and executors.
Execution Modes.
- cluster mode.
- client mode.
- local mode.
Cluster Mode.
- The cluster manager then launches the driver process on a worker inside the cluster, in addition to he executor processes. This means that the cluster manager is responsible for maintaining all Spark application-related processes.
Client mode
Client mode is nearly the same as cluster mode except that the Spark driver remains on the client machine that submitted the application. This means that the client machine is responsible for maintaining the Spark driver process and the cluster manager maintains the executor processes.
1. Gateway machines
Local Mode. Uses threads to simulate processes and execution, ideal for learning Spark but not for production.
Lifecycle of a spark application.
Assumptions. We are running spark in cluster mode:
1. 4 nodes.
2. a driver (but a cluster manager driver)
3. three working nodes.
1. We submit the application through Spark submit.
2. The cluster manager accepts this job and places the Spark driver into a node into a cluster.
3. Now the application is running in the cluster.
Launch.
4. Now the driver process that has been placed in a node, it begins running a user code. This code must include a Spark Session that initializes a Spark cluster.
The Spark Session will subsequently communicate with the cluster manager asking it to launch Spark executors processes across the cluster. The number of executors and their relevant configurations are set by the user via the command-line arguments in the original spark-submit.
Execution.
Spark foes about its merry way executing code. The driver and the workers communicate among themselves, executing code and moving data around. The driver schedules tasks onto each worker and each worker responds with the status of those tasks and success or failure.
Completion.
After a Spark application completes, the driver process exits with either success or failure. The cluster manager then shuts down the executors in the spark application. At this point you can see the success or failure of the Spark Application by asking the cluster manager for this information.
The Life Cycle of a Spark Application. (inside spark).
- The first step of any Spark application is creating a spakr session.
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option", "some-value").getOrCreate()
After you’ve a SparkSession, you should be able to run your Spark code. From the SparkSession, you can access all of low-level and legacy contexts and configurations accordingly as well.
SparkContext.
A SparkContext object within the SparkSession represents the connetction to the Spark cluster.
Through a spark context you can create RSS, accumulators, broadcast variables. Most the time the spark context is accessed through the spark session and no need to access explicitly
After you’ve initialized your SparkSession it’s time to execute some code.
Now we get a Dataframe job and see what happens:
1. we are going to do a three step job using a simple Dataframe
2. repartitioning it
3. performing a value-by-value manipulation.
4. aggregate some values.
5. collect the final result.
A Spark Job.
In general there should be one Spark Job for one action. Action always return results. Each job breaks down into a series of stages, the number of which depends on how many shuffle operations need to take place.
Stages.
Stages in Spark represent groups of tasks that can be executed together to compute the same operation on multiple machines. In general, Spark will try to pack as much work as possible into the same stage, but the engine starts new stages after operations calles shuffles.
A shuffles is a physical repartitioning of the date (i.e. sorting a dataframe, repartion, join, etc, ) this kind of repartition requires coordinating across executors to move data around. Spark starts a new stage after each shuffle and keeps track of the order the stages must run in to compute the final result.
A good rule of thumb is that the number of the partitions should be larger then the number of executors on your cluster, potentially by multiple factors depending on the workload.
An important thing to say: the entire stage is executed in parallel. The final result (collector) aggregates those partition individually brings them all to a single partition before finally sending the final result to the driver. Stages consists on tasks. Each task corresponds to a combination of blocks of data and a set of transformations that will run on a single executor. If there is one big partition in our dataset we will have on task, If there are 1k partition we’ll have 1k tasks that can be executed in parallel.
A task is just a unit of computation applied to a unit of data (the partition). Partitioning your data into a greater number of partitions means that more can be executed in parallel. This is not a panacea, but it is a simple place to begin with optimization.
Execution details
Spark :
1. Spark pipelines stages and tasks that can be done together (.i.e. map.map)
2. Second for all shuffles operations, Spark writes the data to stable storage and can reuse it across multiple job (does a kind of memoization).
Shuffle persistence.
Shuffle persistence. Suppose that Spark has to run an operation that has to move data across node, such a reduce-by-key operation, the engine can’t perform pipelining anymore, and instead it perform a cross-network shuffle.
Spark always executes shuffles by first having the “source” tasks write shuffle flies to their local disks during the execution stage. Then the stage that does the grouping and reduction launches and runs tasks that fetches their corresponding records from each shuffle file and performs that computation.
For this evening it’s enough. I guess Ineed to do a simple processing app to be more familiar. I will play with hotel recommendations and keep you posted:
https://github.com/sathyaanurag/Expedia-Hotel-Recommendation-Spark