Pyspark Fundamentals — Theory

ksshravan
7 min readMay 28, 2024

--

This article contains notes for Pyspark based on Manish Kumar YT tutorial. The following topics have been covered

  • What is Spark and why is it needed?
  • Resilient Distributed Dataset (RDD)
  • Transformations and Actions (include wide and narrow dependency)
  • Repartition and Coalesce

What is Spark and why Spark?

Spark is a unified computing engine and set of libraries for parallel data processing on computer cluster.

Lets dive into definition in more detail

  • Unified: Data analyst, data scientist as well as data engineer can use the same platform for analysis, transformation and modeling hence the word unified
  • Computing engine: Spark does not store data, it only does compute (to store, saprk provides connectors with various data sources like HDFS, etc). Compute means performing a set of tasks like addition, multiplication, etc
  • Parallel data processing: Parallel processing means breaking down a big task into multiple smaller tasks and running each of them on a separate processor. For example instead of searching 1 million records using 1 processor, 1 processor searches the first 5 lakh records and second process searches the second 5 lakk records simultaneously
  • Computer cluster: A collection of computers, where one computer acts as master and other computers act as slaves

Spark was built to work with Big Data. Big Data is characterised by 3 Vs

  • Volume: amount of data (5GB, 5TB )
  • Velocity: speed of data (1sec, 1 hour)
  • Variety: forms of data (structured, unstructured, semi-structured_

Data is not classified as big data just because it is 5GB or 5TB. The velocity and variety also matters. So if data is generated at 5GB/sec and data includes both structured and unstructured, then we can call it big data.

Before Data Lake the paradigm used was ETL. But due to more and more of unstructured data being generated with huge velocity and volume, data lakes are used which use ELT paradigm. In ELT data is dumped first and then transformed later

Monolithic vs Distributed: Monolithic libraries are designed to run on a single machine whereas distributed libraries are designed to run on multiple machines. Hence monolithic libraries rely on vertical scaling (increasing the capacity of a single machine by adding more CPU, RAM, or storage to the existing machine), whereas distributed libraries rely on horizontal scaling (adding more machines to a system and distributing the load across multiple nodes)

Pandas is primarily a monolithic library. It is designed to run on a single machine and in a single process, handling data that fits into the memory of that machine. This means that it is not inherently designed for distributed computing, where data and computation are spread across multiple machines or processes.

Key points about Pandas:

  • Single-Node Execution: Pandas operates on data stored in memory on a single machine. This makes it highly efficient for datasets that fit into the available RAM of a single machine.
  • In-Memory Processing: All data operations in Pandas are performed in memory, which provides fast access and manipulation of data but limits the size of datasets to the available RAM.
  • Thread Safety: Pandas is not inherently thread-safe, meaning that concurrent operations can lead to data corruption or inconsistent states. It is designed to be used in a single-threaded manner for the most part.

For parallel/distributed processing, other libraries like Dask, Modin or Pyspark can be used.

Resilient Distributed Dataset

  • RDD is a collections of elements split across the nodes of the cluster. Each split (i.e. group of elements) is called a partition
  • Partitions: A partition in spark is an atomic chunk of data (logical division of data) stored on a node in the cluster. Partitions are basic units of parallelism in Apache Spark. RDD is a collection of partitions

Transformations and Actions (including wide and narrow dependency)

  • Spark, or more specifically RDD supports 2 kinds of operations: transformations and actions.
  • Transformations: functions that return another dataset/RDD from existing dataset/RDD after performing some transformation
  • Actions: any operation that does not return an dataset/RDD. Evaluation is executed when an action is taken. Actions trigger the scheduler, which build a directed acyclic graph (DAG) as a plan of execution
  • Since Spark follows lazy evaluation, no transformation is actually computed until an action is encountered. When an action is encountered, Spark creates a new job.
  • Transformations are of 2 type:
    - wide dependency
    - narrow dependency
  • Narrow dependency transformation: Operation in which each row can be operated on independently. A narrow transformation is the one that only requires a single partition from the source to compute all elements of one partition of the output (hence each input partition will contribute to only one output partition). Includes transformations like map() , filter(), union(),
  • Wide dependency transformation: Operation in which multiple rows need to worked on to get the output and each row cannot be worked independently. Even for a single partition of the output, it requires access to the entire content of one of the source rdds. Includes transformations like groupByKey(), join(), repartition()
  • Shuffling: Redistributing or reorganizing data across the partitions of a distributed dataset. For example, to group by country, we want all the rows belong to specific country to be in the same partition. But if that is not the case, rows from other partitions which belonging to that country are moved to this partition, which is called shuffling

Note: Add the practical example using employee data shown by Manish Kumar to show difference bw wide and narrow dependency to notes

Spark Architecture

  • A spark application is made up of driver and multiple worker nodes. Each worker node (which is a machine) hosts 1 or more executors. Each executor runs multiple tasks. The driver maintains information of the Spark application whereas the executors are responsible for carrying out the work the driver assigns to them. Task is the smallest unit of computation executed by executors.
  • A “core” in Spark (in configuration like — executor-cores) is nothing but a thread and refers to how many tasks each executor will run concurrently (1 core = 1 task i.e. each core can run only 1 task at a time)
  • Each task can work on only 1 partition at a time
  • To get the number of worker executors we can use code below
sc = spark._jsc.sc()
num_workers = len([executor.host()\
for executor in sc.statusTracker().getExecutorInfos()]) - 1
  • The value of defaultParallelism is derived based on the cluster manager being used:
    - Standalone or Mesos : total number of cores in cluster = no. of executors * cores per executor
    - YARN, Kubernetes : total number of executors * 2
  • Having multiple workers on a single machine is only relevant in standalone mode. Otherwise one worker means one machine. Running multiple workers on a single machine is not a common practice and is generally avoided in production environments due to the potential for resource contention and complexity in management
  • Spark parallelizes at two levels. One is the splitting the work among executors. The other is the slot/core. Each executor has a number of cores/slots. Each slot or core can be assigned a task.

References (for this section)

  1. https://stackoverflow.com/questions/57875141/the-difference-between-one-executor-per-core-vs-one-executor-with-multiple-core
  2. https://stackoverflow.com/questions/24696777/what-is-the-relationship-between-workers-worker-instances-and-executors
  3. https://stackoverflow.com/questions/29955133/how-to-allocate-more-executors-per-worker-in-standalone-cluster-mode
  4. https://stackoverflow.com/questions/24622108/apache-spark-the-number-of-cores-vs-the-number-of-executors
  5. https://stackoverflow.com/questions/37737110/how-is-task-distributed-in-spark
  6. https://stackoverflow.com/questions/39381041/determining-optimal-number-of-spark-partitions-based-on-workers-cores-and-dataf

Partitioning

  • To get number of partitions we use
num_partitions = df.rdd.getNumPartitions()
  • To get the distribution of data across different partitions use anyone of the 2 approaches (prefer the first one)
## Approach 1

from pyspark.sql.functions import F

df.groupBy(F.spark_partition_id()).count().show()


## Approach 2 (use only if size of data is small)

df.rdd.mapPartitionsByIndex(lambda index, it: [(index, sum(1 for _ in it))]).collect()

## Write the lambda function as a separate function for more clarity
  • To see the data within each partition use:
df.rdd.glom().collect()
  • Some important Spark configurations which determine the number of partitions are are:
    - spark.sparkContext.defaultParallelism
    - spark.sql.files.maxPartitionBytes (only used when data is huge)
    - spark.sql.shuffle.partitions
#
  • Typical advice is to have number of partitions (when you are repartitioning) just below a multiple of your total cores, for example, if you have 16 cores, maybe 47, 79, 127 and similar numbers just under a multiple of 16 are good to use. The reason for this is you want to make sure all cores are working (as little time as possible do you have resources idle, waiting for others to finish). but you leave a little extra to allow for speculative execution (spark may decide to run the same task twice if it is running slowly to see if it will go faster on a second try).

Repartition and Coalesce

  • Repartition creates new equal sized partitions whereas coalesce combines smaller partitions to create a larger partition

Spark can run a single concurrent task for every partition of an RDD, up to the total number of cores in the cluster

References

  1. https://stackoverflow.com/questions/33831561/pyspark-repartition-vs-partitionby

Interview Questions

  1. What is transformation and how many types of transformation do we have?
  2. What happens when we do group by and join in transformation?
  3. How are jobs created in spark?
  4. What is repartitioning and coalesce in Spark? Which one would you prefer and why?

Doubts

  1. What is the difference between following terms : node, core, executor, driver, data node ?

References

  1. https://community.databricks.com/t5/data-engineering/what-is-difference-between-action-and-transformation-in-spark/td-p/26612
  2. https://stackoverflow.com/questions/70643086/why-is-union-a-narrow-transformation-and-intersection-is-a-wide-transformati
  3. https://data-flair.training/blogs/dag-in-apache-spark/
  4. https://www.linkedin.com/pulse/apache-spark-basics-101-shuffling-transformations-shanoj-kumar-v
  5. https://www.linkedin.com/pulse/just-enough-spark-core-concepts-revisited-deepak-rajak
  6. https://stackoverflow.com/questions/46992060/spark-understanding-partitioning-cores
  7. https://docs.databricks.com/en/optimizations/spark-ui-guide/index.html
  8. https://stackoverflow.com/questions/58751144/list-all-files-in-a-folder-sitting-in-a-data-lake

--

--