Enhancing Cluster Performance for Memory-Intensive Workloads in Spark with Databricks

Epsilon India
Epsilon Engineering Blog
5 min readJan 31, 2024

--

Vivek. R

Prelude

Big Data Engineering and Data Analytics are intricate processes that involve the ingestion of large data sets from various sources such as data lakes, document stores, streaming sources like kafka / flink and relational databases using Apache Spark.

Databricks is a cloud-based platform for data and ai solutions that works with Apache Spark and other open-source tools. It provides various tailored options for various use cases like Data Lakehouse, ETL, data governance, data sharing and orchestration for data scientists and engineers.

However, a unique scenario arose where the task was to ingest a large volume of gzip pipe delimited files. This involved handling 1008 gzip files, each approximately 256MB in size and containing 1400 columns, stored in an S3 bucket. The total number of records amounted to around 244M. This task represented a deviation from the usual scenarios where default configurations are typically sufficient.

Design

The data load process begins with a request sent to an API via an API gateway. This request contains the information of the location of data source in s3 bucket, type of operation put, patch, delete and target object which is then submitted to kafka. This triggers event-lambda which identifies the load and queues a spark job in Databricks. Databricks allocates necessary cluster configuration, tags the job, and ingests the payload from s3, performs transformations and loads it into a delta lake table.

Figure 1: Data load Architecture

The expected SLA to finish the data load was around 6–8 hrs along with repartition and vacuum for better read throughputs. The cluster associated with the job ran using r6g.x large (4 vCPUs, 32GB of memory) instances, with auto scaling enabled (minimum: 5, maximum: 15). and additional 100GB of Elastic Block Store (EBS) volumes were attached to allow shuffle spill. But the job continued to run for more than 28hrs with cluster scaled to maximum and eventually failed with OOM error.

Owing to the importance of ensuring a successful data load , the cluster configuration was increased to r6g.16xLarge and we observed similar slowness as well from sparkUI it was evident that there is lot of memory spill over to disk and EBS(gp2) storage allocation is close to 80%. Understanding the need to have dedicated fast nvme local storage and faster node to node communication it was decided to go with r5dn.16xLarge (d- dedicated local nvme storage, n- higher network bandwidth).

With upgraded cluster the job was queued again, there were significant improvements and job was able to complete in 16hrs. Still its way over the expected SLA and cost factor started creeping in owing to the use of higher configuration cluster.

Continued Challenges

With cost factor coming into play, it necessitated a thorough analysis of the logs to identify the root cause of these persistent issues.

Upon further investigation, it was discovered that spark configuration for the data-loader was only utilizing 1024 GB of memory out of the total available 8.19 TB. It was determined that the issue stemmed from the spark job template submitted by Databricks, which set a configuration of one executor per node with a memory cap of 64GB to ensure stability. However, this approach proved problematic, particularly with larger clusters within Databricks.

2 Underutilized resources in a cluster

The problem encountered is commonly referred to as “fat executor.” With only one executor allocated per node, the available cores and memory resources were underutilized. This not only resulted in slower execution across all nodes, but also increased the overall cost due to longer processing times. Additionally, allocating 64 tasks to a single executor with 64 cores per node caused out-of-memory errors and timeouts, further hindering job performance.

Balanced Executor Configuration

The Spark cluster underwent custom configurations, with each Spark executor being allocated 5 cores and 46GB of memory. This resulted in approximately 10 executors being spawned per node, effectively utilizing 460GB of total 512GB memory per node. As a result of these adjustments, the overall resource utilization, particularly in terms of memory, improved significantly to reach a total of 7.4TB in a 16-node cluster.

"spark.executor.memory":"43g",
"spark.executor.cores":5,
"spark.executor.instances":12,
"spark.sql.shuffle.partitions":2000,
"spark.driver.maxResultSize":"4g"
"spark.network.timeout":"800s"
"spark.executor.heartbeatInterval":"600s"
"spark.shuffle.io.maxRetries":"10
Distribution of the executors and memory utilization across the nodes in a cluster
Cluster’s resource utilization

With the custom configuration, the occurrence of out-of-memory errors and timeouts reduced considerably. Additionally, the custom configuration enabled the Spark worker to allocate a greater number of tasks to nodes, maximizing the achievable parallelism in the cluster. The data load job took 1hr 40mins to complete.

Conclusion

The utilization of r5d.16xLarge instances with optimized Spark configurations has yielded impressive results as follows,

1. Load based spark configuration enabled us to complete the data load in 1 hour and 30 minutes.

2. The cost per data load of size 250GB (256 MB/ file, totalling 1008 gzip files) has come down by 80% with the mix of both spot and on-demand instances.

3. Utilization of dedicated storage in the instances improved latency when the data gets spilled over from memory to disk.

4. Disabling autoscaling reduced latency in node procurement as well prevented executor termination due to spot instances. This avoids rebalancing of executor nodes frequently during a large data load.

It’s always advisable during development to have a watch over the cpu and memory utilization of a node and decide the node characteristics (like aws r5d, d — dedicated nvme storage, r5dn — dedicated storage + faster networking ) in a cluster as Databricks brings in a variety of node types tailored for the respective type of loads (ml clusters, r5 series and r6g series for memory intensive delta accelerated loads, c series for compute optimized). Based on the observations the spark can be tuned to work efficiently thus saving both times to debug and reduce cost overruns associated with it.

--

--

Epsilon India
Epsilon Engineering Blog

Leader in outcome-based marketing, with a rich, 50-year heritage in helping marketers anticipate, activate and prove measurable business outcomes.