Speeding up reading from JDBC through Spark

Alexander Lopatin
9 min readSep 2, 2023

--

Reading data from JDBC sources by Spark can be really challenging sometimes. When your data is stored in Hadoop cluster and your Spark applications are run on Yarn in Hadoop cluster your data don’t have to go through network to reach their destination in cluster. In situation, when you read data from JDBC you have to think carefully about speed of reading and use each opportunity to improve reading speed. Here’s I'm going to show you a few tricks that you can make to improve reading your JDBC sources.

I will do my experiments on MPP database Greenplum that has parallel computation in itself. But because we use Apache Spark we can use advantages of Greenplum only when read/write data from/to database.

There are different Spark connectors for Greenplum are developed by a few companies. But theme of this article is how to speed up reading through JDBC drivers from different database. So I won’t use or speak about special connectors. Because of Greenplum is based on the Postgres opensource database we can use Postgres JDBC Driver.

Source table

We will select table of clients. The whole table contains about 59 millions rows and has size about 1.5 Gb.

We don’t need all rows from this table. We will select only rows that were active in given month. For simplicity, let’s use this query for selection data from our table:

SELECT 
*
FROM
schema.client
WHERE
date('2023-08-01') < dt_to
AND date('2023-08-31') >= dt_from;

This query returns to us about 23 millions rows.

First try

So, we know our source. Our goal is to read data from GP table and save data into Hive table.

For this, we write simple Spark application.

object Main extends App {

val spark =
SparkSession
.builder()
.enableHiveSupport()
.getOrCreate()

spark.sparkContext.setLogLevel("WARN")

import spark.implicits._

val connectionOptions = Map(
"url" -> "jdbc:postgresql://localhost/dwh",
"Driver" -> "org.postgresql.Driver",
"user" -> "user",
"password" -> "password"
)

val dataDF =
spark
.read
.format("jdbc")
.options(connectionOptions)
.option("dbtable", "schema.client")
.load()
.where(lit("2023-08-01") < $"dt_to" && lit("2023-08-31") > $"dt_from")

val dataDFFinal =
dataDF
.withColumn("loading_dt_part", lit("20230831"))
.cache()

dataDFFinal
.write
.mode("overwrite")
.insertInto("schema.client")
}

Here, we use org.postgres.Driver to read data through Postgres JDBC Driver in Spark. This application will read and write the whole result of query in about 1 — 1.5 hours.

DAG of read/write JDBC table by Spark

What’s the problem with this application? As you can see on screenshot above whole work was done by one task. But Spark is about parallel computations. There are settings that allow us to read from JDBC in a few tasks. Let’s look on them.

Second try

We need to make a few changes.

val partitionOptions = Map(
"partitionColumn" -> "id_client",
"lowerBound" -> "0",
"upperBound" -> "240112247",
"numPartitions" -> "5"
)

val dataDF =
spark
.read
.format("jdbc")
.options(connectionOptions)
.option("dbtable", "schema.client")
.options(partitionOptions)
.load()
.where(lit("2023-08-01") < $"dt_to" && lit("2023-08-31") > $"dt_from")

val dataDFFinal =
dataDF
.withColumn("loading_dt_part", lit("20230831"))
.cache()

Here we declare partitionOptions to say to Spark to read data from JDBC in a few tasks.

It’s need to mention that you need to choose carefully values for lowerBound and upperBound.

How does partitioning reading from JDBC by Spark work?

Spark takes settings that you provide to it and make a few queries.

Let’s assume that you provide Spark with these partition options:

val partitionOptions = Map(
"partitionColumn" -> "id_client",
"lowerBound" -> "0",
"upperBound" -> "10",
"numPartitions" -> "5"
)

In this case, Spark create a few queries with such conditions:

SELECT * FROM schema.client WHERE id_client < 2 or id_client is null;
SELECT * FROM schema.client WHERE id_client >= 2 AND id_client < 4;
SELECT * FROM schema.client WHERE id_client >= 4 AND id_client < 6;
SELECT * FROM schema.client WHERE id_client >= 6 AND id_client < 8;
SELECT * FROM schema.client WHERE id_client >= 8;

Spark divides upperBound by numPartitions 10 / 5 = 2 and generate bounds for each partition: 2, 4, 6, 8.

Also, you need to be careful with amount of partitions that you want to use for reading data from JDBC source because of your administrator can limit the number of possible connections in the same time per user. In this case, you can get error in your Spark application and as result your application will be failed.

Too many connections

In my case, administrator limits the number of connections per user by 10 and I’ve already had other connections in DBeaver and wanted to get other 10 connections from Spark application at the same time.

How to know proper values for lowerBound and upperBound?
We can just execute simple query before start our reading from JDBC.

val dataDF =
spark
.read
.format("jdbc")
.options(connectionOptions)
.option("query", "SELECT min(id_client) AS min_id, max(id_client) AS max_id FROM schema.client")
.load()

Then we just need to collect result of this query and use it in our next query for partitioning reading.

So let’s come back to our second try to read data from GP table. In this time, whole table was read in about 7-7.5 minutes.

You might be wondering why increasing the number of tasks by 5 resulted in a speed increase of more than 5 times. I noticed that the more data we try to read in one executer the slower it reads data. For example, we need to read 20 millions rows by one task, Spark can read first 5 millions rows in 5 minutes, second 5 millions rows in 15 minutes and last 10 millions rows Spark will read other 40 minutes. All numbers in this example were given theoretically, just as example for understanding what I’ve seen when I tried to read whole table by one tasks.

So, we got significant increasing of reading speed. Awesome, right? What if I say that we can still speed up our reading?

Third try

What’s the problem with our previous try?

Let’s look at distribution of id’s in our table. For this we need to execute such query:

SELECT 
floor(id_client / 48022450) * 48022450 AS interval_start,
floor(id_client / 48022450) * 48022450 + 48022450 - 1 AS interval_end,
count(*)
FROM
schema.client
WHERE
date('2023-08-01') <= dt_to
AND date('2023-08-31') >= dt_from
GROUP BY
floor(id_client / 48022450)
ORDER BY
floor(id_client / 48022450)

The result of this query for my table is:

As we figured out earlier Spark create a few queries this different conditions to select data parallel from JDBC. What will we get in our situation? Spark will read one big partition and four small. We get a bottleneck here: the whole application will work with a speed of reading one big partition.

How can we resolve this problem? By finding or creating column with even distribution.

Let’s try to find such column. Because of Greenplum is MPP database it stores data in a few segments and should to distribute data through it segments.

For each row in table that is stored in GP we can get information about what segment stores row. gp_segment_id attribute is responsible for providing to user this information.

Let’s check distribution by this attribute:

SELECT 
floor(gp_segment_id / 8) * 8 AS interval_start,
floor(gp_segment_id / 8) * 8 + 8 - 1 AS interval_end,
count(*)
FROM
schema.client
WHERE
date('2023-08-01') <= dt_to
AND date('2023-08-31') >= dt_from
GROUP BY
floor(gp_segment_id / 8)
ORDER BY
floor(gp_segment_id / 8)

Much better, right?

Let’s just change partitionColumn and UpperBound in our application.

val partitionOptions = Map(
"partitionColumn" -> "gp_segment_id",
"lowerBound" -> "0",
"upperBound" -> "40",
"numPartitions" -> "5"
)

But in this case I got error:

User class threw exception: org.apache.spark.sql.AnalysisException: user-defined partition column gp_segment_id not found in the HDBC relation

Hmm, it’s so disappointing. The problem is that gp_segment_id doesn’t exist in our table but generated automatically by Greenplum when we send query to the master of Greenplum.

So, we have to help to Spark to get this column, we have to make sso that Spark will know about this column after Greenplum generate it for us. My first thought was to create view with column gp_segment_id. It worked great until I thought that I may not have permissions to create any views in source system.

I tried this:

val dataDF =
spark
.read
.format("jdbc")
.options(connectionOptions)
.option("query", "SELECT gp_segment_id, * FROM schema.client")
.options(partitionOptions)
.load()
.where(lit("2023-08-01") < $"dt_to" && lit("2023-08-31") > $"dt_from")

And got another error:

User class threw exception: java.lang.IllegalArgumentException: requirement failed: Options 'query' and 'partitionColumn' can not be specified together. Please define the query using `dbtable` option instead and make sure to qualify the partition columns using the supplied subquery alias to resolve any ambiguity.
Example:
spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("dbtable", "(select c1, c2 from t1) as subq")
.option("partitionColumn", "c1") .option("lowerBound", "1")
.option("upperBound", "100")
.option("numPartitions", "3")
.load()

Spark even gives example how we should do. Important that we have to provide alias for our query in dbtable options. Without alias queries in dbtable don’t work.

My final version was:

val partitionOptions = Map(
"dbtable" -> "(SELECT gp_segment_id, * FROM schema.client) as client",
"partitionColumn" -> "gp_segment_id",
"lowerBound" -> "0",
"upperBound" -> "10",
"numPartitions" -> "5"
)

We must not forget new column gp_segment_id before saving table into Hive.

In this time, it took 2.5–3 minutes to read table.

Much better, I think this is a really great result. Do you remember that we started from 1.5 hours?

You will say: We are very happy for you, but we don’t have a gp_segment_id column, because we work with regular Postgres.

Let’s just create a new column that help us to get better distribution. I thought about how GP distributes rows by segments. GP uses hash functions. Why can’t we do the same?

First, let’s check distribution by hashes of id’s.

SELECT 
floor(abs(hashtext(CAST(id_client AS TEXT))) / 429496724) * 429496724 AS interval_start,
floor(abs(hashtext(CAST(id_client AS TEXT))) / 429496724) * 429496724 + 429496724 - 1 AS interval_end,
count(*)
FROM
schema.client
WHERE
date('2023-08-01') <= dt_to
AND date('2023-08-31') >= dt_from
GROUP BY
floor(abs(hashtext(CAST(id_client AS TEXT))) / 429496724)
ORDER BY
floor(abs(hashtext(CAST(id_client AS TEXT))) / 429496724)

Looks great!

Here’s version of code for this situation:

val partitionOptions = Map(
"dbtable" -> "(SELECT abs(hashtext(CAST(id_client AS TEXT))) AS id_client_hash, * FROM schema.client) as client",
"partitionColumn" -> "id_client_hash",
"lowerBound" -> "0",
"upperBound" -> "214748368",
"numPartitions" -> "5"
)

And result:

We’re still in the saddle. Cool! The execution time is not much worse when we using gp_segment_id.

Of course, if we use regular relational database and don’t have advantages of MPP database like Greenplum, we can get worse results than I got by using Greenplum. But all this recommendations are common for all JDBC sources. By finding or creating right column that is even distributed we can speed up or work with JDBC sources in Spark application. The main problem here is data transfer rate over the network or between different systems of storage. Our goal is to find way to divide our data in smaller batch to read them in parallel way.

--

--