Using a TailRec for reading a dynamic list of JDBC tables in Spark

Alexander Lopatin
3 min readJul 23, 2022

--

Sometimes, it may happen that you don’t know exactly how many sources for your DataFrame you will use.

For different formats of text files it’s not a problem. You can just tell Spark to load JSON from this folder and it will read all JSON files that in this folder. It doesn’t matter how many files are in the folder.

But what about JDBC?

For example, you can start with only one table:

But then, you can find that you need to add more data from other table to the same DataFrame. For example, it can be different corporate systems that gather similar data: mobile app, CRM, web site. Each system writes its own data to its own table.

At the beginning of the Spark project, you are planning to connect only one system to your DataFrame, but you know that more sources will be needed. Of course, you don’t want to re-deploy your project each time when you need add one more table.

To solve this problem, I used tail recursion. My idea is to use some configuration library to specify a list of sources and put this list in a tail recursion function.

Let’s do some code.

I will use TypeSafe Config as a configuration library.

We need create application.conf in our resources folder:

Here, I declare 3 sources in sources section.

Let’s write implicit class to get our config as Map:

I want to have opportunity use my getMap function as method of Config type. For this, I create an object Helpers and put an implicit class ConfigHelper in it. In ConfigHelper I create my getMap method. Now, I can call method getMap from the Config type.

We wrote code for getting of list of sources from our configuration file. It’s time to write our tailrec function.

I have three functions here:

  1. unionDF is entry point for our tailrec. We can do some preparation here before call tailrec function. unionDF accepts two parameters as input: sources and schema of our future DataFrame. This function creates empty DataFrame as a start point for our accumulator for tailrec and calls another function that is called just union.
  2. union goes through iterator of tuples (String, String). While iterator has next value it calls function readTableDF, unions result of it with accumulator and calls itself. When tuples are finished, the function will return the result of its work.
  3. readTableDF loads JDBC table in Spark DataFrame. I recommend always using partitioning to parallelize read and write operations. In my example, I have omitted the use of partitioning.

So, we have everything ready for reading our sources and changing list of them by configuration files without changing our project code.

I generated some test data by using Online data generator and loaded them in my Postgres database:

Let’s assume that we have some information about citizens of some cities. Information of each city is stored in a separate table. I generated info about three cities.

Earlier, you could see a sources section in appliction.conf file. Now, I can add or remove any amount of sources in this config file and my Spark pipeline will load all this tables from JDBC without need to edit my Spark code.

Here is code of my main function:

I added some logic and did some calculation with my data. Result of this program will be as follow:

By adding a new config in my application.conf, I’ll get new result:

Here you can find the full code of the example project from this article: https://github.com/aalopatin/tailrec-spark-dynamic-sources-jdbc.git

When I started learning Scala, I was skeptical about this language. I wanted to code on Java, but my job required from me using of Scala.

But now, such things as a tailrec, that I really liked, start to change my mind about Scala and functional programming. I can say that functional programming style is fit for Big Data and Spark tasks.

--

--