PySpark Performance Improvement: Part 2
Apart from built-in functionality, performance of a system lies in its execution path. We discussed in part 1, about how an SQL query is used to describe the end result rather than instructions to compute it. We’ll look into the idea of how to understand data for performance.
Businesses provide many kinds of data, such as in supply chain businesses provide data on Inventory, Sales, Orders, Delivery, Transportation, Billing, Commodity and so on. Data is collated over many sources, like Salesforce, SAP or 3rd Party sources that aggregate data from warehouse management systems to order creation systems.
Most often, the middleware or the service that integrates the data sources and preprocess for reporting, uses various logics to transform and validate, including cross data references such as verifying if the commodity exists in master data before processing and sending for inventory reporting.
Developers use keywords such as join, filter, flatmap, aggregate, grouping and more to perform the transformation and validations, using built in functions or as an SQL query.
Execution pattern for PySpark
Student = spark.read.format('csv').option('header',True).load('filepath_student')
Class = spark.read.format('csv').option('header',True).load('filepath_school')
#region QuerySample
Class.createOrReplaceTempView('Class')
intermediateDF = spark.sql("""
SELECT *
FROM
Class
WHERE
totalStudents >= 100;
""")
intermediateDF.createOrReplaceTempView('intermediateDf')
Student.createOrReplaceTempView('Student')
finalDF = spark.sql("""
SELECT s.*
FROM
intermediateDf i
INNER JOIN
Student s
ON
s.ClassId = i.Id
""")
#endregion
finalDF.show()
Spark is lazily executed. Logics for transformations can be written in any number of sequential steps. Spark reads all the instructions to form an optimum execution path. And hence the problem begins.
Above program does the same, where student details are displayed where the student belong to a class with more than 100 students in population. The effective result is displayed, as we mentioned. On a PySpark notebook, write the query in the QuerySample region in separate cell from other lines o program, observe the cell execute within seconds, even if the source data consists of millions of records. This is lazy loading.
Pypsark executes eagerly (opposite of lazy) when an action command is triggered. Action commands are functions which require immediate computing, example count(), display(), write, show(), rdd:collect() etc.
How to optimize query execution based on the data
- Environment optimization
Spark Applications use distributed and parallel computing, achieving faster throughput. It utilizes nodes, executors and containerization using Docker/K8 in the backend, abstracted from the user.
Developer can configure the scale at which the operations to be executed based data volume, number of sources, operations involved, and most importantly accepted latency by business.
Having too many nodes does not help but incur cost of owning the processing power, whilst a very low number can throttle the operation.
2. Repartition
As mentioned in the previous part of the series, Spark works by splitting the data into partitions and processing in a distributed system. Repartition can help distribute the data across nodes for parallel processing. Data is shuffled or moved across the nodes if there are many source files are to be joined or aggregated in parallel.
For large dataset partitioning is prevalent. Since shuffle or data movement is required, the optimization is to reduce the shuffle read/write, and target resource usage to executing the program.
Small datasets need not be partitioned unless needed.
Spark has a default partitioning, by 200 max partitions. Use coalesce or repartitions to control the partitions of data.
3. Broadcasting
Smaller datasets need not be partitioned as partitioning and data movement might utilize more time, compared to the operations on the data.
country = broadcast(spark.read.format('csv').option('header',True).load('filepath_ctry'))
Shipment = spark.read.format('csv').option('header',True).load('filepath_shipment')
country.createOrReplaceTempView('country')
Shipment.createOrReplaceTempView('Shipment')
finalDF = spark.sql("""
SELECT s.*
FROM
country c
INNER JOIN
Shipment s
ON
s.Location = c.country
""")
Broadcast() is a function that let spark know, to distribute the data as is to each node instead of default partitioning. Example: A list of countries is smaller than shipment details. To identify count of shipments at predefined destinations, we can join the two data. We can broadcast countryList and partition shipments data.
4. Bucketing
Bucketing is a Machine Learning concept for preprocessing the data for training yet can be used to preprocess our big data.
Bucketing is a technique rather than a built-in function. We can achieve bucketing by utilizing repartition and filter. The concept is to group together data that are alike based on attributes than can significantly partition the data.
In the above example, Shipments data can be in order of million transactions. Our new task is to find all delivery details for corresponding shipments. Which asks for more resources to execute a join with millions of records crossed with another source of millions of records.
Delivery = broadcast(spark.read.format('csv').option('header',True).load('filepath_dlry'))
Shipment = spark.read.format('csv').option('header',True).load('filepath_shipment')
Delivery.createOrReplaceTempView('Delivery')
Shipment.createOrReplaceTempView('Shipment')
finalDF = spark.sql("""
SELECT *
FROM
Delivery d
INNER JOIN
Shipment s
ON
d.ShipmentId = s.ID AND
d.loc = s.loc
""")
The above program can take anywhere from 10 hours to 10 days.
Optimizing the same will be discussed in detail later. The task is left to the user until next post in the series.
Happy Reading! Happy coding!