Spark concepts

Gughapriyaa Elango
16 min readDec 13, 2023

--

It is a big data processing framework to store in a distributed way and process data parallely across various clusters. It can process data 100 times faster than Map reduce as it is done In-memory as opposed to from disk in Hadoop. Spark is mainly used for real time frameworks, and it supports both batch and real processing, and the lines of code is simpler as it can be implemented from Scala or Python using APIs.

Explain the spark submit command, explain the configurations:

spark-submit --master spark://<IP-address>:port \\
--deploy-mode cluster \\
--conf spark.sql.crossJoin.enabled=true \\
--files oci://file1.json \\
--class org.apache.spark.examples.SparkPi \\
--jars oci://file2.jar<path_to>/main_application_with-dependencies.jar 1000
spark-submit \\
--class com.example.MySparkApp \\
--master yarn \\
--deploy-mode client \\
--executor-memory 2G \\
--num-executors 4 \\
my-spark-app.jar input.txt output

— master-yarn — Specifies the cluster manager to use. In this case, we’re using YARN as the cluster manager. Other options include “local” for running Spark locally, “mesos,” or “standalone” for a Spark standalone cluster.Specifies the cluster manager URL

Standalone mode — it is the built in cluster manager that comes with Spark — default — It manages the CPU of the nodes and coordinates spark jobs. It is based on FIFO — because each application will try to use all the nodes.

Mesos — another cluster manager — Mesos can be used for various resource distribution, it was not orginally designed for hadoop. YARN was orginally designed for hadoop ecosystem. Mesos has more fine grained resource allocation — allows multiple applications to share cluster resources and mesos has better resource management. YARN is more coarse grained resource allocation — the resources are allocated in predefined containers, which leads to less efficient management.Mesos is more scalable than YARN has certain limitation on scaling.

— class — Specifies the main class of your Spark application. This is the entry point of your Spark program. We can specify the main class here if we dont specify it in the JAR file.

— deploy-mode — cluster or client — “Client” mode means the driver runs on the machine that submits the job. “Cluster” mode means the driver runs on one of the cluster nodes.

-executor-memory 2G — Sets the memory allocated to each executor in the cluster. In this case, 2 gigabytes for each executor.

-num-executors 4 — Specifies the number of executors to launch in the cluster.

— jar file — jar file contain spark application code

— conf — we can specify specific spark configuration and properties — given as key value pairs — app name, executor memory, cores-max (limit of max CPU allocated), spark driver memory — memory allocated for driver program, spark.sql.shuffle.partitions — determines the number of partitions for data shuffling, spark.serializer — serializer used for data serialization, spark.default.parallelism — default parallelism level for RDD operation.

MEMORY -

2xLarge 4xLarge,etc:

drivery memory

executor memory

no of cores

no of executors

Load mode — incremental

Serialiser — what serializer to use — Kryoserializer — JAVA based framework — used to serialise and deserialise objects — fast, space efficiency,compatible with different data types. It serializes into binary format.

PARTITIONS -

no of initial mappers — initial number of map tasks that are created when processing data in a distributed manner — it depends on how many records we want to process parallely.

shuffle partition — Shuffle is used during the shuffle phase of application or job. Shuffle operation is redistributing data and reorganising the data across the different partitions. Shuffle operation typically occurs after transformations, where the data requires to be shuffled. Shuffle partition config will mention the number of partitions.

parallelism — specify the default parallelism — controlled by number of partitions, determines how many tasks run in parallel.

input data — input data to the code — or source table and source table schema — primary key

Any utility files are submitted as zip files.

output data — output directory where result is stored — output table source and output table schema — primary key

JDBC connection to create external hive tables

authenticatioon username,passwords to databases

Why is Spark immutable

Immutability is easy to run tasks across various processes and transformation in parallel.

Concurrency -

Data integrity

Lazy evaluation -

Cacheing of intermediate data

Why is Spark in memory

To acheive speed in computation, gives low latency

How is spark better than hadoop

In memory

Real time

Lesser lines of code due to API

More secure due to support for authentication

What are the features of Spark

Fault tolerant — Spark contains RDD which can handle large workloads

Fast processing — RDDs are much faster

In memory computing — Data is stored in RAM, quicker access to the data

Better analytics — unified analytics engine — machine learning

Flexible — to support multiple languages and provides API — python sql

What are the components of Apache Spark — 5 spark components

Spark Core — Spark Core is the engine for parallel and distributed processing of large data sets. The various functionalities supported by Spark Core include:

Spark doesnot have its own storage, it relies on hdfs or other nosql database.

Memory management

Fault recovery

Scheduling, managing different jobs, monitor the cluster, task dispatching

Interacting with storage systems

Spark core is embedded with RDDs — These are immutable, fault toleratn, distributed collections of objects that are processed in parallel. Physically RDD is stored in the JVM Java Virtual Machine Driver. RDD refers to the data in the storage like HDFS or if data is present in cache.

Explain about RDDS — spark context creates RDD. It will create RDD without data in the memory of the node. This will create a DAG — which is a series of steps when something wants to get executed. With each step of transformation, a DAG is created. RDDs can perform transformations and actions — with transformation we can only specify the logic. Due to lazy evaluation of spark, an action needs to done on RDD for trasformations to work. Only when a action is triggered data gets loaded into RDD, till then RDD is empty or in previous stage. Action loads data into RDD and execution of DAG gets started from beginning.

RDD is present in the JVM of driver. It will refer to the data in storage like HDFS. It contains metadata information:

partition, partitioner — how data is divided into partitions — by default the partitioner is HashPartitioner

dependencies — The parent RDDs involved with RDD in the lineage graph

computation — function to calculate children RDD from parent RDD

preferred location — where is it best to place the computation by partitions.

What is spark context? — entry point of spark application, to the spark cluster. It is the heart of the application. To use spark , we need to create an instance, which is responsible to coordinate and manage resources in cluster. The context communicates with cluster manager like YARN to get resources for the application we are running. Here we can give sparkconf to specify the resources that we need. The serializer, the core max,etc. The SC — spark context creates RDD — an empty dataset is created in memory of node. Initializing will create RDD without any data.

sc = SparkContext(conf=conf)

Spark SQL -

Spark SQL framework is used for structured data processing, it allows us to work with dataframes. PySpark and SparkSQl

**Spark streaming -**Spark streaming is an extension of the SPARK API for live data streams. Ingested from different sources like Kafka, the data is processed using transformations using Spark streaming and the data is pushed into dashboards.

Input data stream goes into Spark streaming, the streaming data is broken down into smaller batches, which are processed by spark engine in batches.

Streaming data source from Kafka — Spark streaming has ML library to train the models with live data and use the trained model — MLLib.

Or the data can go through structure processing — using Pyspark or Spark SQL.

Watch this video: https://www.youtube.com/watch?v=UuRhEmqqhRM

Spark MLib -

Graphx -

Transformation process and 2 types of transformations-

When a rdd gets transformed, a new rdd gets created. Spark keeps track of transformations using DAG and lineage graph. Due to lazy evaluation data doesnot get loaded into RDD till action is called.

Wide and narrow transformations -

Narrow transformations are when data resides on one partition and the gets transformed into one partition, theres not much data movement between partitions. When we do select, filter, cast — these are narrow transformations. Doesnt require shuffling

EXAMPLES : Map, flatmap

Wide transformations are when the data comes from different partitions and they get written to different partitions — when we joins, groupby, distinct — data moves between different partitions. Shuffling occurs here, data gets regrouped between partitions — when data is required from other partitions — and consolidated into a new partition — this will occur on another executor.

RDD Actions -

We apply when we need to materialize the result and trigger the transformation.

EXAMPLES : reduce, count

How does DAG work in RDD-

RDD objects are created from parent and every RDD creates a DAG with every step of transformation. The DAGScheduler splits the graph into stages of tasks. Task scheduler schedules and launches tasks via cluster manager, which goes to worker nodes — which executes the tasks.

Unlike Hadoop, where user breaks down tasks into different stages, DAG is created. In DAG — node is a partition, edge node represent the transformation.

Explain about Spark Architecture — driver program and executors -

Master node and executor node/worker node/slave node

Master node has the driver program — which contains the spark context which is the entry point. It interacts with cluster manager to get resources. Resource manager makes request to node manager asking for containers with space requirements. It distributes the jobs to the worker nodes — it contains executor, cache, and tasks. Resource manager starts another code called the application manager — to execute spark application — appl manager will be in one container and then it will use other containers with resources to execute the tasks.

DRIVER contains — spark context, DAG scheduler, task scheduler, block manager

Cluster manager — resource allocation

Worker node — Executor — runs the application and returns back to spark context. Lifetime of executor is same as lifetime of spark application.

Driver and executor can exist in one machine or in differnt machines depending on deploy mode.

YARN cluster manager -

Resource manager.- manages resources of all applications in the system — it contains scheduler and application manager.

Node manager contains application manager/master and container — an application requires many containers for resources and node manager monitors these containers and resource usage. This is reported back to the resource manager.

Also provides authentication and authorization for security purposes.

Modes of execution — cluster client local-

Cluster mode — most common way — when a jar script is submitted for execution — the drive program is launched on a worked node in the cluster. Now the worker has both driver and executor — It implies that cluster is in charge of all spark application process.

Client mode — same as cluster mode but the driver is present in the client machine that submitted the spark application. Client machine maintains and in charge of spark application.

Local mode — entire spark application is run on a single machine — testing and debugging, not for production. Parallelism is done using threads on a single machine.

What do you mean by spark execution plan?

A query language statement (SQL, Spark SQL, Dataframe operations, etc.) is translated into a set of optimized logical and physical operations by an execution plan. It is a series of actions that will be carried out from the SQL (or Spark SQL) statement to the DAG(Directed Acyclic Graph), which will then be sent to Spark Executors.

What is executor memory in spark?

For a spark executor, every spark application has the same fixed heap size and fixed number of cores. The heap size is regulated by the spark.executor.memory attribute of the –executor-memory flag, which is also known as the Spark executor memory. Each worker node will have one executor for each Spark application. The executor memory is a measure of how much memory the application will use from the worker node.

Which language is faster — Native Spark is implemented in Scala.

For tasks that involve complex data transformations, distributed processing, and performance-critical workloads, Scala is generally expected to provide better performance due to its closer integration with Spark and the JVM.

For interactive data exploration and rapid development, Python (PySpark) can be more user-friendly, even if it may not be as performant as Scala for large-scale distributed data processing tasks. Decorator program

What is AQE _ Adaptive Query Execution?

In Spark lateest version, it is enabled by default. This is optimisation done during runtime — how the number of partitions are changed at run time optimally is AQE. The join conditions are changed at runtime optimally. This is based on actual data statistics and dynamic runtime conditions. AQE allows Spark to dynamically adapt query execution plans based on the data characteristics and distribution. This enables Spark to make more informed decisions about how to optimize the execution of a query. During query execution, AQE monitors the progress and adjusts the query plan as necessary. This can help optimize the execution of complex queries, especially when there are variations in data distribution and sizes.

OPTIMISATION IN SPARK — IMPORTANT

How will you optimise a Spark Job?

Data skew handling — Broadcast join — It’s an optimization technique used when one of the dataframes (or tables) involved in the join is small enough to fit in memory on each worker node. Instead of distributing the smaller dataframe across the cluster, it’s broadcast to all worker nodes, where the larger dataframe is already partitioned.The join operation is performed locally on each worker node. Each worker node has a copy of the small dataframe and a partition of the large dataframe, allowing for efficient local joins.

Cacheing/Persisting -Caching is a technique that involves storing the results of an operation or a portion of a dataset in memory for faster access in subsequent computations. When you cache data, it remains in memory on the worker nodes, reducing the need to recompute or re-read the data from disk. By doing cache, we have faster data acess, less need to reshuffle, we can perform complex transformations.

Persisting has more control over how the data is stored in memory — we can provide custom storage level like memory-only, memory and disk or memory-only-ser.

Shuffle partition — This is required when we perform data shuffling operations like joins, aggregations that involves data access across multiple clusters for transformations. Shuffle partitions refer to the number of partitions within which the data is distributed. This can impact the performance of the spark application. To give a balanced workload and to efficiently utilise the CPU resources, we can do shuffle partitioning.

Overpartitioning and underpartitioning can have an impact. Shuffle management can have impact on the performance.

If you’re looking for a simple way to cache data in memory with the default storage level, cache() is a convenient choice. However, if you need more control over the storage level or want to persist data in a specific way, you can use persist() and specify the desired storage level.

Filter before shuffling / Filter pushdown/ Predicate pushdown- The idea is to apply filter operations before performing shuffle-needing transformations like joins or aggregate. This will reduce the data size that needs to be transformed, reduce CPU usage and give faster execution.

Column pruning — select only the columns which are important for computation to avoid excessive workload.

Use the right file format — Parquet is used for query retreival, Avro for ETL, ORC for hive.

AQE — Spark allows Dynamic query execution plan according to data statistics and execution.

UDFs optimized — optimise the code for better performance.

Catalyst Optimiser — Used in Spark SQL — query optimizer

When does Spark face out of memory issues?

Driver OOM — Most common issue is when we use collect() operation on the dataframe. Spark gets all the data from nodes on to driver, which will be huge and more than the memory capacity. Next reason can be broadcast join. There is big file split into different nodes, a small files in 2 nodes. When we want to join small and big files, if the small file is not small enough to sit on a single machine.

Executor OOM — When YARN can go out of memory, Node manager memory is used.

A node will be assigned some level of memory, within that node a executor container is running, withing that container, we will have dedicated memory as YARN memory overhead and specific set of memory for executor memory divided into storage and executor memory.

YARN memory overhead — if that runs out, then yarn memory overhead needs to be increased. Here in yarn memory has off-heap memory part — the strings we create, the spark internal objects, pyspark objects are also stored in YARN memory overhead. YARN memory is 10% of total node memory. YARN killed the container means you need to increase YARN memory overhead.

Executor OOM can also be due to high concurrency — From one machine, has many executors, each executor has many cores like 5 cores. Usually we have 4–5 cores per executor. If we assign no of cores without thinking about the machine capacity, each core will have only small part out of the total memory available. This exceeds the machine capacity, and will cause executor OOM.

Executor OOM due to big partition — one of the partition is very big in one machine, and other machines have smaller partition. The machine with bigger paritition would handle larger data file, which can have larger metadata and larger partitions, which can cause OOM. We should make sure to have balanced workload and divide bigger partitions into smaller parts.

How will you choose number of executors and memory to each executor?

We need to make assumptions:

Lets assume :

We have 6 machine in our cluster

Each machine — 16 cores — total 16*6 cores in all machines

64GB ram per machine

We have many stratergies to emplou:

1 — If we have very small executor:

1 core per machine — and 4GB RAM per executor — 16 executors — 96 executors will start and every executor will have 4GB. This is bad approach, we are not using the parallel processing power and JVM power completely — we have not allocated memory for executor cores.

2- If we have very big executor

1 executor per machine — executor will take all memory and the cores on that machine — there will be 16 executors — 64 GB ram in every executor. All the cores will be trying to read their own parittion from HDFS. There will be HDFS IO congestion, with cores competing with each other for resources and throughput. Reads and writes will be slow. We have to leave some memory for YARN overhead. Whole machine will slowdown. We need to leave some memory offerhead for buffers.

3- Right way to choose executor

We have 96 cores, and 6 machines, we should assign 96/6 = 15 cores per machine. On every machine we have 15 cores, 1 core is given to OS processes. We will run 5 cores/threads per executor, we will have 3 executors per machine.

Memory available per mahcine — 63GB

Available per executor — 63/3–21GB

YARN overhead — 2GB

Per executor memory — 19GB.

Avro Vs Parquet:

What is predicate pushdown and columnar projection in parquet?

What is the default partitioner algorithm used by Spark and mapreduce and bucket?

The most commonly used partitioning strategy in Spark is hash partitioning, where data is partitioned based on the hash value of a specified key or set of keys. This ensures that data with the same key(s) goes to the same partition. Hash partitioning is the default in Spark for operations like groupByKey and reduceByKey.

Hash(key) mod no_of_partition

Hash(key) mod no_of_reducer

Hash(key) mod no_of_buckets

If i run a query with 2 partitions, we get 2 output files — 0000, 0001

If it is bucket — we have 2 buckets

Hash(value) = 23/2 — mod or remainder is 1 — this denotes the position of the record to partition. This records will go the 0001 file. All odd numbers will go to a file, and even numbers will go another file in case of 2 partitions. This is how the records get split using algorithm internally using hash partitioning algorithm. If we want to customise it, we have to write custom paritioning.

In addition to hash partitioning, Spark also supports range-based partitioning, where data is partitioned based on the specified ranges of values. Range partitioning is often used when performing range-based operations like sorting or filtering.

Where does spark put its output files?

You have a Spark job that is taking longer than expected to complete. What steps would you take to identify and troubleshoot performance bottlenecks?

You have a Spark cluster with limited resources. How would you allocate resources and configure the cluster for optimal performance?

I was asked to write code to read data from delta lake (S3 bucket) & run the upsert command to update the data if the data already exists based on the primary key & insert the data if it didn’t.

Questions on Spark Optimisations included Skewed Join, Broadcast Join, CBO & repartion vs coalesce.

IMPORTANT — Repartition Vs Coalesce:

When you have to use repartion and coalesce in terms of performance

Repartition is used shuffle or increase parititons. Coalesce is used to decrease the partitions.

If i want to increase output tasks, i will use repartition and if i want to reduce parition to 1, i will use coalesce.

Repartition will do shuffle, and coalesce will not do shuffling, it is not exactly like shuffling will not happen, the shuffling will reduce. Reparition will do a full shuffling.

If i have 2 blocks of hdfs — i want to create a spark job to count words, how many input tasks are created in input? How many input tasks will get created for Mapreduce?

How many otuput tasks counts will get created?

Assuming without repartition, how many task counts?

I have 2 blocks in HDFS — and i write a spark program -

Number of blocks = no of input tasks in spark.reduce= 2

By default, we have not given repartition or coalesce, then no of input tasks = output tasks

output tasks = 2

We will have part-0000 and part-0001

When i use repartition as 3, then output will be 3.

When i use coalesce to reduce partition to 1, then output will be 1.

Write spark word count program — use pyspark

#Python version:

sc = SparkContext(“local”, “WordCountApp”)

text = sc.textFile(“data.txt”)

counts = text.flatMap(lambda line: line.split(“ “)) \ .map(lambda word: (word, 1)) \ .reduceByKey(lambda a, b: a + b)

counts.collect()

for (word, count) in result: print(f”{word}: {count}”)

sc.stop()

Write SparkSQL to join two tables or files — using dataframes. Not with SQL.

df = df1.join(df2,df1.id === df2.id,”inner”).show(truncate=False)

EXPLAIN HOW YOU DID SPARK PERFORMANCE TUNING IN YOUR PROJECT

Spark Tungsten & Catalyst Optimiser.

Spark monitoring & Spark performance management.

What is Avro file format & what is its significance in delta tables?

Difference between Presto vs. Spark underlying architecture.

Can Presto work with Near Real-Time Data ( Streaming Data Source)?

What do I think about Data uncertainty?

What is the issue when you work with small files using Spark?

What is AKKA timeout? What is akka?

Spark job logs, Spark UI, YARN application logs.

What is heap memory and how do you calculate heap memory for spark jobs?

How do you do initial load and next loads? How does it differ, how do you manage memory between initial and next loads?

--

--