Increasing Apache Spark read performance for JDBC connections

Antony Neu
Mercedes-Benz Tech Innovation
5 min readApr 26, 2022
Photo by NASA on Unsplash

Apache Spark has established itself as one of the major frameworks for the distributed processing of big data in enterprise data lakes. Fitting the needs of analytical use cases, data is typically stored here in columnar file formats, e.g. Parquet or ORC. As many business applications rely on SQL-based databases for transactions, the data first needs to be ingested into the data lake from these databases.

Spark offers built-in capabilities to read data from SQL databases via JDBC. However, the default settings can lead to long-running processes or out-of-memory exceptions. This article describes how to fix these issues and tune performance.

Limit data by using a subquery

A simple yet effective way to increase the JDBC read performance is limiting the data being fetched. Instead of reading an entire table, specify a query selecting only those columns and rows which are required, e.g. with the following PySpark code:

You should avoid the query option on the data source as this is not compatible with the partitioning options presented in the next section.

Use partitioning on the JDBC data source

By default, Spark will store the data read from the JDBC connection in a single partition. As a consequence, only one executor in the cluster is used for the reading process. To increase the number of nodes reading in parallel, the data needs to be partitioned by passing all of the following four options:

  • partitioningColumn determines which table column will be used to split the data into partitions. The data type of partitioning column needs to be NUMERIC, DATEor TIMESTAMP.
  • numPartitions sets the desired number of partitions.
  • lowerBound and upperBound are used to calculate the partition boundaries.

Under the hood, Spark will generate a SQL query for each partition with an individual filter on the partitioning column. The diagram below illustrates how data is divided into four partitions using the options above:

This shows why the column cannot be of any other data type, as the SQL queries are created based on offsets from lowerBound and upperBound before reading the actual data.

It is important to note that all data will be read whether you are using partitioning or not. As shown, the first and last partition will contain all the data outside the respective upper or lower boundary if they do not match the actual boundaries of the data. As this can impact performance, the lower and upper bound should be close to the actual values present in the partitioning column.

PySpark implementation

The following PySpark code shows how you can first determine the minimum and maximum values of a column and then use these values for partitioning. The example assumes a PostgreSQL database as data source.

The partitioning is most effective when performed on an indexed column and the rows are equally distributed to the partitions. Keep in mind that partitioning increases the parallel queries running against the database, which will potentially be equal to the number of executors in your cluster unless restricted by configuration.

Partitioning tables without suitable columns

If the source table does not have a column with suitable data type, you have two options:

  • Depending on the database, you can use a generic row or block number to partition the table. In PostgreSQL the column ctid can be used for this purpose:
  • Use a hash function on a column and convert this hash to a numeric value. PostgreSQL offers a function for this purpose called hashtext , which computes the hash value for a string column. This can be used in combination with the modulo operator to receive the desired number of partitions. In this case, no pre-computation of minimum and maximum values is necessary. The SQL query below shows this for 64 partitions:

The second method can have significant impact on the performance and needs to be tested in combination with the specific database. Consequently, there will be a trade-off between the benefit of reading in a distributed manner and the hash computation cost.

Optimizing performance

Performance issues can be verified by either checking the Spark Web UI or your cluster metrics. The Spark UI shows you the number of tasks, which is equal to the number of JDBC connection partitions for the first stage. As a rule of thumb, the number of tasks is optimal when equal to the number of CPU cores in your cluster. Your cluster metrics will give you an idea whether the data is balanced equally across the cluster.

If you are still seeing out-of-memory errors on your workers although you are using partitioning, you should increase the numbers of partitions beyond the number of available executors. This will result in smaller partitions with less memory demand.

In some cases, this does not help, as your data may be skewed on the partitioning column. This means that a resulting partition is significantly larger than the other partitions, which in turn leads to memory problems. To solve this, a second column will need to be used for those partitions which are too large. Use the original column for partitioning the data excluding the large partitions by WHERE clause. Then, partition by a second column only for the excluded data in a second query.

Performance impact

All in all, partitioning can significantly boost your ingestion processes by keeping the required worker memory low and enabling parallel reads.

The following metrics were taken from two Databricks clusters writing 30 million rows from a PostgreSQL database to a Delta table. The first cluster contains only a single node and does not use partitioning, while the second cluster partitions the data into 32 partitions and uses auto-scaling to scale from 4 to 8 nodes.

Single-node cluster without partitioning
Multi-node cluster with partitioning

On the single node cluster without partitioning, the ingestion took over an hour, while the multi-node cluster with partitioning was able to reduce the execution time to under 10 minutes.

The partitioned job outperforms the other job by using less memory per worker and achieving higher network throughput. When reading even larger datasets, the non-partitioned approach did not scale, and lead to out-of memory exceptions. Ergo, the partitioned approach is the only viable option for very large datasets.

--

--

Antony Neu
Mercedes-Benz Tech Innovation

Big data architect — Cloud enthusiast — Passionate about data