Spark Concurrent JDBC Data Reads

Gabriel Pintoiu
8 min readJan 26, 2023

--

Have you ever gone through the process of implementing Spark in your project, determining the optimal number of shuffle partitions, memory allocation for driver and executor instances, number of executor cores and all of that fun stuff just to read data from a JDBC source like this?

jdbcDF = spark.read \
.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url", "jdbc:postgresql:dbserver") \
.option("user", os.environ['user']) \
.option("password", os.environ['pass']) \
.option("query", query) \
.load()

For an engine used for large-scale distributed data processing, Spark’s documentation about JDBC data sources doesn’t focus that much on why the above snippet is not necessary the best way to read data from a Relational Database but this is why I’m writing this article — and also because I saw Spark queries written like this in projects I was part of.

What’s the problem with the code above? Well…it’s very slow :) If you execute an SQL query in Spark like in the above example, you will use only one thread (you can see in the Spark UI that only one task runs while executing the code) and the created dataframe will have one partition.

Most of us use Spark in ETL/ELT pipelines for parallel processing, so reading data from a JDBC source using only one thread is probably not what we want. Not to mention that we will have to repartion the dataframe after executing the query because the data is now stored in 1 partition. So how do we actually read data concurrently in Spark?

# pass the original query to dbtable as a subquery 
final_query = f'({query}) as q'

# depends on your usecase, how many executor cores you have in the cluster
# what do you plan to do with the dataframe downstream, data size etc
partitionsCount = 100

# I'm hardcoding the date values here, but I'll show below how to identify
# these values at run time if you don't know them upfront
min_date = '2020-01-01'
max_date = '2022-12-31'

jdbcDF = spark.read \
.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url", "jdbc:postgresql:dbserver") \
.option("user", os.environ['user']) \
.option("password", os.environ['pass']) \
.option("dbtable", final_query) \
.option("numPartitions", partitionsCount) \
.option("partitionColumn", "date") \
.option("lowerBound", f"{min_date}") \
.option("upperBound", f"{max_date}") \
.load()

You can see that the first option arguments are identical, but the last options didn’t appear in the original spark.read() command. Let’s discuss about each one of them:

  1. dbtable:
    Based on the official documentation from Spark, if we want to read data concurrently and partition it based on our needs, we can no longer use the query option and must pass our SQL query through the dbtable option instead. The only difference between these two options (other than the name) is that the query must now be enclosed in parentheses and have an alias — basically pass the original query as a subquery.
  2. numPartitions:
    The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing. (source) I have mine set to 100 in the EMR cluster , but a good practice would be to set the partition number between 1 and 4 times the number of cores. It’s not a number set in stone though, so always test with different values to see what works for you.
  3. partitionColumn:
    This is probably the most important parameter and the one that will make the biggest difference in the run time of your queries. Few things to keep in mind:
    - the column in question must be numeric, date, or timestamp
    - for the best possible performance, the column values must be as evenly distributed as possible and have high cardinality. We want to avoid data skewed columns — I’ll expand on this below.
    - if you have multiple columns with the above properties, select the indexed column.
  4. lowerbound, upperbound:
    Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. (source) Below you will see how Spark generates each partition. This is why you don’t want a low cardinality column as your partition column, but I’ll give an example below in case it doesn’t make sense now.
Photo by luminousmen on luminousmen.com

Low cardinality, high cardinality and why less is not always more

Imagine that the column you want to use for partitioning has only 0 and 1 values — an extreme example indeed, but a common scenario in Data Engineering where boolean values True and False are converted to numeric values.

Because you have only 0s and 1s, you use 0 as lowerbound and and 1 as upperbound. Based on how the partitions are generated — see the above picture — all the rows with the value 0 will be pushed to the first partition (the WHERE clause of the first partition is the only clause where 0 is not filtered out) and all the rows with the value 1 will be pushed to the last partition (the WHERE clause of the last partition is the only clause where 1 is not filtered out).

So, regardless of how many partitions you create, only two partitions will be populated and only two executor cores will do all the work, the other ones will be in an idle state if there’s no other job scheduled for them. Because of that, the performance of your query will take a hit.

Data Skewness

Let’s imagine we have a dataset w/ 20M rows and 30 partitions, lower and upper bounds being 2020–01–01 and 2022–12–31. For the purpose of this article, I’ll use a Spark Session with 30 executor cores (meaning we can run up to 30 tasks at the same time). Keep in mind that Spark assigns one task per partition, so each partition will be processed by one executor core.

Each year worth of data will be divided in 10 partitions and considering that 1 task will be assigned to 1 partition, each task assigned to years 2020 and 2021 will be responsible for processing 100k rows, but each task assigned to year 2022 will have to process 1.8M rows. That’s a 1700% increase in data that must be processed by one task.

The result is that the first 20 tasks will finish processing their assigned partition in no time and after that the 20 executor cores assigned to the completed tasks will sit idle (if there’s no other job where the idle executor cores can be used) until the last 10 tasks finish processing 1700% more records. This is a waste of resources and increase in operating costs, especially if you use Spark in an EMR cluster, Glue Job, or Databricks.

How do we fix that? The simplest solution: use a different column for partitioning. But if it’s critical to use this column, we can use two queries and two dataframes instead of one. You will still have issues with skewed data downstream (a recurring issue in parallel computational systems) but we tackle one issue at a time.

The first query will load the first two years (the 30 executor cores will be divided between 2020 and 2021), then the second query will load the final year (so all the 30 executor cores will now be used for 2022). Because of that, a given task associated with 2022 will now be responsible for processing 600k rows instead of 1.8M rows. That’s a 66% decrease in rows that must be processed by a single executor core versus the original implementation and on top of that, we now use all the executor cores from our Spark Session instead of having them sit idle.

What if I don’t have any numeric, date, or timestamp column, or the column has skewed data?

Not a problem. As Data Engineers, using a CTE and a Window Function to generate a numeric column through row_number() should be a piece of cake. Some overhead will be introduced as the new column is generated, but even with this overhead, the final query will be executed way faster by using the newly created column rather than trying to load all the data in one partition using one executor core.

with cte as
(
SELECT f1, f2, f3
FROM table
)
select *, row_number() over (order by f1) as rn from cte

In Python the above syntax would look like this:

original_query = "SELECT f1, f2, f3 FROM table"

final_query = f'(with cte as ({original_query}) select *, row_number() over (order by f1) as rn from cte) as q'

What if I don’t know upfront the lower and upper bounds of our column?

We execute two queries instead of one:
- the first query will be executed to find the min and max values of the column we will use as the partitionColumn
- in the second query we will pass the results from the first query as lowerbound and upperbound arguments.

query_min_max = f"SELECT min(date), max(date) from ({query}) q"
final_query = f"({query}) as q"

df_min_max = spark.read \
.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url", "jdbc:postgresql:dbserver") \
.option("user", os.environ['user']) \
.option("password", os.environ['pass']) \
.option("query", query_min_max) \
.load()

min = df_min_max.first()["min"]
max = df_min_max.first()["max"]

jdbcDF = spark.read \
.format("jdbc") \
.option("driver", "org.postgresql.Driver") \
.option("url", "jdbc:postgresql:dbserver") \
.option("user", os.environ['user']) \
.option("password", os.environ['pass']) \
.option("dbtable", final_query) \
.option("numPartitions", partitionsCount) \
.option("partitionColumn", "date") \
.option("lowerBound", f"{min}") \
.option("upperBound", f"{max}") \
.load()

Conclusions

  • If you go through the process of implementing Spark in your application, try to use it to the fullest and read data concurrently. Executing an SQL query to spark.read() like shown in Spark’s JDBC documentation will push all the data in one partition and use only one executor core, regardless of how many cores you have set up in your Spark Session.
  • If you use Spark today to read data from JDBC data sources using the single threaded approach, try to refactor your queries and improve their performance, you might even get a promotion while you’re at it :)
  • At the end of the day, time is money. If you’re worrying about the effort of rewriting your queries, I can speak from experience — some of the Spark queries I’ve refactored using the techniques described above now run >90% faster than before. You can imagine what a positive reception this improvement had in my team.

Thank you for reading this article! I hope you found it informative and helpful. If so, please consider sharing it with other colleagues who may benefit from it.

Gabriel

--

--