Faster extract and load of ETL jobs in Apache Spark

Sairaman Kumar
6 min readJun 21, 2020

--

If you have been working in apache spark and had a look at spark UI or spark history server, you would know the fact that the time taken to read and write is greater than the transform factor. Let us try a few small changes(some are quite complex 😅) to the code that improves the performance drastically.

Almost all the Database driver have the settings mentioned below. For the sake of example let us consider a simple products table with the following schema.

The table has 3 columns as seen in the ER diagram. The product_id column is an auto-increment key that is always unique. The product_category is another column that is used as a DB partition. There can be other column which we can safely ignore for this post.

Basic read and write in spark

So let us start with the task of reading from a table and writing to another table without doing any transformation. The code for the same would look like

val readUrl=jdbc:mysql://127.0.0.1:3306/sourcedb
val readProperties = new Properties()
readProperties.put("user", "user")
readProperties.put("password", "passw0rD")
val writeUrl=jdbc:mysql://127.0.0.1:3307/destinationdb
val writeProperties = new Properties()
writeProperties.put("user", "user")
writeProperties.put("password", "pa$$w0rD")
spark.read.jdbc(readUrl,"products",readProperties).write.mode(SaveMode.Append).jdbc(writeUrl,"products",writeProperties)

Yes, as you think we can do extract and load in a single line of code in spark. The problem with the above code is that the number of tasks created for write defaults to one when no partition is mentioned. The read also happens from the same task. What it means is that the spark is not doing what its intended to do i.e. the parallelism of spark vanishes and it runs only on one thread. Lets try some optimization that might help.

Use auto increment key as a partition column

You might think that setting numPartitions for read might help. Yes, that’s the same parameter I’m gonna suggest, but the key thing to note is the column selected. Also, you might need to set the lowerBound and UpperBound of the partitionColumn for the numPartitions to work. Let us look into official spark documentation to understand what these different parameters mean.

Taken from official spark documentation.

Please note that the lowerBound, UpperBound and numPartitions are used to create strides that would be used as a condition while reading from a task. All the values lower than lowerBound would be read together and similarly for upperBound , all the values greater than upperbound would be read together in a single task.

Let’s take the same products table here. There are two possible candidates for partitionColumn and let us see how they differ.

  • product_category — This column might look like an ideal candidate for read partitionColumn since the database is also partitioned based on the same column. It will be an ideal candidate if the data is equally spread across all the product_category. Often the data is skewed heavily on one or two product_category. What really happens because of this data skew is that the spark parallelism again reduces and often the data read from one partition runs for hours while the others complete in minutes. Below is one such example taken from our test database. The time taken for a larger task can be as high as 15 times than our smaller task.
  • product_id — At first this column might not look ideal, but the advantage it packs over the other columns is that it is unique and would not be null. There are a lot of articles out there that suggest creating a salted column which can be used for processing, but you can use an existing primary key if it exists. For this case you can set the lowerBound as min of primary key and upperBound as max of primary key and spark will divide it into equal strides. These values can generally be retrieved using a simple jdbc query and can be used further in our spark job.
Stage summary where you read 42 million records in 60 tasks in parallel

As you can see from the above image, the time difference between the fastest and slowest task is pretty small which is what we want. This also ensures that we don’t struggle with straggler tasks which keeps your job running forever.

The code after this change will look like the following snippet

spark.read.jdbc( readUrl, "products","product_id", lowerBound=1, upperBound=3000000, numPartitions=30, readProperties ).write.mode( SaveMode.Append).jdbc(writeUrl,"products",writeProperties)

Now after these changes, we have successfully increased the number of task and also managed to run those task in parallel without worrying about stragglers.

Using fetchSize option in spark read

After adding the partition column our job ran much faster than before but still, we had a problem. A job will take around an hour to complete in spark on Kubernetes but it will complete in less than two mins when we ran our spark job from the database server in standalone mode. We were puzzled about this behavior from Spark.

Finally while going through spark documentation, we found an option named fetchsize that can be modified. We started to look at what exactly is this parameter and what is the default value. Later, we found out that the default value for IBM DB2 driver is 32. So, you might need to look into your driver setting to find the default value.

Task summary of the same stage with different fetchsize

As you can see from the above image, the time taken to read from the same source with all the parameters being the same saw a 7 fold increase just by increasing the fetchsize. Everyone might think why not set it to a very high value, the problem with that approach is that the entire data fetched in single jdbc call is always stored in heap space. So it is quite possible that you fill the jvm heap space and get an exception for the same.

spark.read.option("fetchsize", "10000").jdbc( readUrl, "products", "product_id", lowerBound=1, upperBound=3000000, numPartitions=30, readProperties ).write.mode(SaveMode.Append).option("batchsize","100000").jdbc( writeUrl,"products",writeProperties)

Similar to fetchsize for read, there is a batchsize for write which can be tuned in the same way.

Repartition data on spark just before writing

Whatever process you do in spark, you might need to write the data back into some sort of storage system. The storage system can be object storage in cloud, Network attached storage or a database system.

Just before you write the data, try to repartition it. This way you create equal task for the executors to work and it is quite possible for them to complete all the task in a particular stage at the same time consuming the same amount of resources.

Writing the data after repartitioning created task that are almost equal in size and all of them completed in almost the same time.

The spark code to read and write after all the optimization will look like the below code snippet.

spark.read.option("fetchsize", "10000").jdbc( readUrl, "products" ,"product_id", lowerBound=1, upperBound=3000000, numPartitions=30, readProperties ).repartition(60).write.mode(SaveMode.Append).option("batchsize","100000").jdbc( writeUrl,"products",writeProperties)

Also note that all the spark jobs ran on a kubernetes cluster in a cloud environment that is hosted in the different geographical zone but in a single region. So your mileage may vary based on the network communication between your physical cluster and other factors. Feel free to comment for any changes or improvements.

--

--