Optimising Glue Scripts for Efficient Data Processing: Part 1

Boosting Performance of Glue jobs without complexity

Abhishek Saitwal
Globant
11 min readNov 6, 2023

--

Photo by Veri Ivanova on Unsplash

If you’re a data enthusiast exploring AWS Glue, you’ve probably witnessed its data-handling prowess. But did you know there are simple ways to make your Glue jobs faster and more efficient? This guide, designed for those already familiar with AWS Glue, offers straightforward strategies to supercharge your data processing without delving into complex technical details. Whether you’re dealing with sizable datasets or just aiming to optimize your existing Glue jobs, read on to discover how to unlock AWS Glue’s full potential effortlessly. While AWS Glue is a powerful tool for ETL operations, script optimization is crucial to ensure efficient data processing.

This comprehensive guide explores various optimization techniques with code snippets, empowering you to maximize the performance of your Glue jobs. Why bother with optimization? Well, it’s not just about speed; it’s also about saving costs and making your data processing more time-efficient. So, let’s dive in and make your AWS Glue experience even better!

Parallelize Tasks with ThreadPoolExecutor

A ThreadPoolExecutor is a class provided by Python's concurrent.futures module that allows you to create and manage a pool of worker threads for parallel execution of tasks. It's commonly used when you have a set of tasks that can be executed concurrently, and you want to take advantage of multiple threads to speed up the execution.

In the context of PySpark, you can use a ThreadPoolExecutor to parallelize certain operations or tasks not directly handled by Spark's distributed processing. PySpark itself already parallelizes data processing across multiple nodes in a cluster. Still, there may be some CPU-bound or synchronous tasks that can benefit from further parallelism within a single machine.

Here are some scenarios where you might consider using a ThreadPoolExecutor in PySpark:

  • Pre-processing or post-processing tasks: If you have data pre-processing or post-processing tasks that are CPU-bound and can be performed independently on each record, you can use a ThreadPoolExecutor to parallelize these tasks. For example, data transformation, feature extraction, or data validation can be parallelized.
  • External API or database calls: If your PySpark job needs to make multiple external API requests or database queries, these operations can be time-consuming and are typically I/O-bound. Using a ThreadPoolExecutor to parallelize these calls can significantly reduce the job execution time.
  • Parallelism within UDFs: In PySpark, you can define User-Defined Functions (UDFs) to operate on DataFrame columns. If your UDF involves CPU-bound operations, you can use a ThreadPoolExecutor within the UDF to parallelize the processing of each row.

Here’s a simplified example of how you can use a ThreadPoolExecutor in PySpark to parallelize a data transformation task:

from concurrent.futures import ThreadPoolExecutor

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session


s3_folders = ["s3://demo-src-bucket/orders/", "s3://demo-src-bucket/sales/"]

def read_df(s3_folder):
"""
Read JSON files in DynamicFrame
"""
dynamic_df = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
format_options={"multiline": True},
format="json",
connection_options={
"paths": [s3_folder],
"recurse": True,
"useS3ListImplementation": True,
},
transformation_ctx="dynamic_df",
)
return dynamic_df


def add_new_cols(dynamic_df, header, table):
"""
Add new columns in source dynamic frame
"""
# Convert to spark dataframe
pyspark_df = dynamic_df.toDF()
# Iterate over dictionary and add columns to dataframe
for column_name, value in header.items():
pyspark_df = pyspark_df.withColumn(column_name, value)
# Converting dataframe back to Dynamic DataFrame
dynamic_df_final = DynamicFrame.fromDF(pyspark_df,
glueContext, "dynamic_df_final")
return dynamic_df_final

def write_df(df, s3_path, table):
"""
Write dynamic frame in S3 and catalog it.
"""
dynamic_df_write = glueContext.getSink(
format_options={"compression": "snappy", "useGlueParquetWriter": True},
path=s3_path,
connection_type="s3",
updateBehavior="UPDATE_IN_DATABASE",
partitionKeys=partitionKeys,
enableUpdateCatalog=True,
transformation_ctx="dynamic_df_write",
)
dynamic_df_write.setCatalogInfo(
catalogDatabase=database, catalogTableName=table
)
dynamic_df_write.setFormat("glueparquet")
dynamic_df_write.writeFrame(df.repartition(repartition_count))

def main():
"""
Read JSON files from source S3 bucket, apply transformations and write into target S3 bucket.
"""
# 1. read data from source bucket
with ThreadPoolExecutor(max_workers=threads) as executor:
read_results =
{
s3_folder: executor.submit(read_df, s3_folder)
for s3_folder in s3_folders
}

# 2b. add new columns in flattened dynamic frame
with ThreadPoolExecutor(max_workers=threads) as executor:
add_col_results =
{
table: executor.submit(
add_new_cols, dynamic_df.result(), headers, table
)
for table, dynamic_df in read_results.items()
}
# 3. write dataframe
with ThreadPoolExecutor(max_workers=threads) as executor:
_ =
{
executor.submit(write_df, dynamic_df.result(), target_bucket, table)
for table, dynamic_df in add_col_results.items()
}

if __name__ == "__main__":
main()

In the provided code, we observe the data being read from two locations, columns being added, and then the data is written. Although the process involves distributed processing, it can further benefit from parallelization. By importing the ThreadPoolExecutor and utilizing executor.submit to submit tasks to threads, we enhance the parallel processing capabilities, optimizing the efficiency of reading, column addition, and writing operations across the distributed dataset.

The extent to which your job gets faster depends on various factors, including the nature of the tasks, the number of worker threads (controlled by max_workers), and the available CPU resources. Using a ThreadPoolExecutor can help improve performance for CPU-bound or I/O-bound tasks, but it won't necessarily speed up Spark's core data processing, which is already designed for distributed parallelism. You should carefully assess the bottlenecks in your specific PySpark job and determine which tasks can benefit from additional parallelism using a ThreadPoolExecutor.

Leverage Glue Functions vs. Spark DataFrame Operations

When working with AWS Glue, you can access functions and transformations specifically designed for ETL tasks. While Glue functions offer ease of use and are optimized for the Glue environment, you may occasionally encounter scenarios where Spark DataFrame operations provide a more straightforward solution. It’s essential to strike a balance between the two approaches to ensure your Glue jobs are both efficient and maintainable.

One optimization strategy involves minimizing the conversion between Glue DynamicFrames and Spark DataFrames. Converting between these two formats can introduce overhead and affect job performance. Therefore, it’s advisable to restructure your code in a way that limits the need for frequent conversions. Here’s a brief outline of how to do this:

  • Understand the Differences: Start by gaining a clear understanding of the differences between Glue DynamicFrames and Spark DataFrames. DynamicFrames are built on top of DataFrames but have some unique characteristics, such as schema flexibility.
  • Evaluate the Task: Before choosing your approach, evaluate the specific task at hand. Determine whether it can be accomplished efficiently using Glue functions or if a Spark DataFrame operation is more suitable.
  • Opt for Native Glue Functions: Whenever possible, opt for Glue-provided functions. These functions are optimized for performance within the Glue environment and can often outperform equivalent Spark operations.
  • Limit Conversions: Avoid unnecessary conversions between DynamicFrames and DataFrames. If you frequently switch between the two, consider refactoring your code to use one format consistently throughout your job.
  • Use Spark When Needed: In cases where Spark DataFrame operations provide a more straightforward or efficient solution, don’t hesitate to use them. Just be mindful of the potential conversion overhead.
  • Profile and Benchmark: Profile and benchmark your code to identify performance bottlenecks or areas where conversions can be reduced. This will help you make informed decisions about which approach to use.

By following these guidelines and carefully considering when to use Glue functions versus Spark DataFrame operations, you can optimize your AWS Glue jobs for efficiency and maintainability, ultimately achieving the best possible performance for your ETL tasks.

Efficient Data Writing: Minimize the Number of Output Files

In the realm of AWS Glue, the way you write data can significantly impact job performance. One crucial optimization strategy is to ensure that your output data is stored in a minimal number of files. This not only simplifies data management but also enhances efficiency during data retrieval and downstream processing.

When writing data in your Glue jobs, consider employing the repartition and coalesce functions to achieve this objective. These functions allow you to control data distribution across partitions and minimize the number of output files generated.

  • Repartition: Use the repartition function to specify the number of output partitions you desire explicitly. By carefully selecting the partition count, you can control the size of each resulting file.
from pyspark.sql import SparkSession
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Assume df is your DataFrame
df = ...
# Use repartition before writing to control the number of partitions
df_repartitioned = df.repartition(10) # Set the desired number of partitions
# Write the repartitioned DataFrame to a file system
df_repartitioned.write.format("parquet").mode("overwrite").save("/path/to/output")

In this example, repartition(10) is used to set the number of partitions to 10. The number 10 is arbitrary and should be adjusted based on the characteristics of your data and the available resources. The goal is to find a balance that maximizes parallelism without causing excessive overhead.

It’s important to note that repartitioning can be a costly operation, as it involves shuffling the data across the cluster. Therefore, it’s generally more efficient to perform repartitioning before any expensive operations and persist the result if necessary.

The actual number of output files will depend on the number of partitions in the DataFrame and the nature of the data format you are writing (e.g., Parquet, Avro, JSON). For example, writing a DataFrame with 10 partitions in Parquet format could result in 10 Parquet files, one for each partition.

  • Coalesce: Alternatively, the coalesce function enables you to collapse existing partitions into a smaller number. This is particularly useful when your data has been excessively partitioned, resulting in numerous small files.

Here’s a simplified example of how to use these functions within a Glue job:

Write the code to show the coalesce and repartion
from pyspark.sql import SparkSession
# Create a Spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Assume df is your DataFrame
df = ...
# Use coalesce before writing to control the number of output files
df.coalesce(numPartitions).write.format("parquet").mode("overwrite").save("/path/to/output")
  • df: The DataFrame on which you want to perform the coalesce operation.
  • numPartitions: The target number of partitions after the coalesce operation.

The purpose of coalesce is to minimize data movement when reducing the number of partitions. It does this by merging adjacent partitions together, which can be more efficient than the repartition function in certain scenarios.

If you want to determine the optimal number of partitions dynamically based on the data, you might need to analyze the characteristics of your data and adjust the numPartitions parameter accordingly. The ideal number of partitions depends on factors like the size of your data, the available resources, and the operations you plan to perform on the RDD or DataFrame.

Keep in mind that setting the number of partitions too low might lead to inefficient parallelism while setting it too high might result in increased overhead due to managing a large number of partitions. Experimentation and profiling with your specific dataset and use case are often necessary to find the optimal number of partitions for your Spark application.

By strategically utilizing these functions, you can control the granularity of your output data files and ensure that your Glue job writes data efficiently, resulting in a manageable number of output files well-suited for downstream processing and analytics.

Optimize Processing Power: Choose the Right Configuration

In the quest for peak performance with AWS Glue, understanding how to fine-tune your job’s processing power is key. You have two primary options: enhancing the processing power of a single machine or adding more executors to expedite processing. However, it’s essential to tread carefully, considering both efficiency and cost-effectiveness in your approach.

  • Increasing Single Machine Processing Power: AWS Glue offers various machine types with CPU and memory configurations. When aiming to boost the processing power of a single machine, you can select a machine type that best aligns with your job’s demands. These machine types range from small instances for light workloads to larger ones with substantial CPU and memory resources. The choice depends on your specific job requirements.
  • Adding Executors for Parallelism: Alternatively, you can ramp up processing speed by adding more executors to your job configuration. This approach capitalizes on parallelism, enabling multiple tasks to run concurrently. However, it’s crucial to exercise caution here, as each additional executor incurs costs. Finding the right balance between performance gains and cost efficiency is paramount.AWS Glue jobs that utilize Spark incur a cost per hour of run, set at $0.44. Consider a scenario where a specific job runs for 4 hours with 10 executors. The cost for this execution can be calculated by multiplying the run time (4 hours) by the number of executors (10), and then multiplying the result by the cost per hour ($0.44). Using the formula: Cost=Run Time × Number of Executors × Cost per Hour, the cost for this would be 4 hours × 10 executors × $0.44 per hour = $17.60. Now, let’s consider another instance where the same job runs for 1 hour 30 min but with more executors. Applying the same formula, the cost for this would be 1.5 hours × 20 executors × $0.44 per hour = $13.20. This comparison demonstrates how the cost varies based on the runtime and the number of executors used in AWS Glue Spark jobs; for more details, check AWS Glue Costing here.
  • Checking Executor Limits: AWS Glue has predefined maximum limits for the number of executors you can allocate based on your Glue job’s configuration and instance type. It’s advisable to check these limits to ensure your job operates within the prescribed bounds. Exceeding these limits may lead to unexpected behavior or increased costs.
  • Configuring Executors: To adjust the number of executors for your job, you can access the AWS Glue job configuration settings. While it’s beyond the scope of this article to provide a detailed step-by-step guide, AWS Glue’s user interface offers straightforward options for configuring executors. Additionally, you may consider enabling auto-scaling to adjust the number of executors based on the workload automatically.

To visualize these settings and make informed adjustments, it’s helpful to refer to the AWS Glue console, where you can view and fine-tune your job’s configuration. By striking the right balance between processing power and cost efficiency, you can ensure that your AWS Glue jobs run optimally for your specific data processing requirements.

Maximize Efficiency with mapPartitions()

Another valuable technique at your disposal within the AWS Glue environment is the mapPartitions() transformation. This transformation allows you to apply a function to each partition of an RDD (Resilient Distributed Dataset), offering a unique advantage for specific tasks. Particularly useful when handling connection initialization or heavy computations, the mapPartitions() transformation shines in scenarios where applying the same operation to every individual row would be inefficient.

It’s worth noting that the mapPartitions() transformation is considered a narrow transformation. This means that it does not involve shuffling data between partitions, making it an ideal choice for tasks that don't require data shuffling, such as those involving connection setup or resource-intensive computations.

Here’s a practical example of how to employ the mapPartitions() transformation:

def myFunc(iterator):
for row in iterator:
# Do something with the row
rdd.mapPartitions(myFunc)

In this example, the myFunc() function is invoked once for each partition of the RDD. The flexibility of this function allows you to tailor it to your specific needs, whether that involves initializing a connection, performing computationally intensive operations, or other tasks that benefit from partition-level processing.

By leveraging the mapPartitions() transformation, you can effectively optimize your AWS Glue jobs, particularly when faced with tasks that require efficient handling of partition-level operations without the need for data shuffling.

The relationship between the number of Data Processing Units (DPUs) and the optimal number of partitions for parallel processing in AWS Glue can be expressed using the formula (Worker − 1) × 2. This formula suggests that the optimal number of partitions for parallel processing is calculated by subtracting 1 from the number of DPUs (Workers) and then multiplying the result by 2.

The formula (Worker − 1) × 2 provides guidance on achieving an effective balance between the available DPUs and the number of partitions for optimal parallelism in AWS Glue. The aim is to ensure that each DPU is utilized efficiently and that parallel processing capabilities are maximized.

For further details and a comprehensive understanding of monitoring, debugging, and optimizing capacity in AWS Glue, you can refer to the official AWS Glue documentation here. The documentation provides valuable insights into capacity management to enhance the performance of your AWS Glue jobs.

Summary

Optimizing AWS Glue scripts is a critical step in achieving efficient data processing for your organization’s big data needs. By implementing the strategies and techniques outlined in this guide, you can significantly enhance the performance of your Glue jobs, whether you’re working with streaming data from AWS Kinesis Firehose or any other data source. Remember that each optimization should be carefully considered in the context of your specific use case to achieve the best results.

This is just the start of my blog. In the next part, I’ll share some easy techniques to make Glue jobs work even better. With these simple tricks, a Glue job can run faster and save costs.

References

MapPartitions

ThreadPoolExecutor

Spark Optimization

AWS Glue Best Practices

--

--