Tips for using JDBC in Apache Spark SQL

Radek Strnad
4 min readOct 8, 2017

--

Using Spark SQL together with JDBC data sources is great for fast prototyping on existing datasets. It is also handy when results of the computation should integrate with legacy systems. However not everything is simple and straightforward. Spark has several quirks and limitations that you should be aware of when dealing with JDBC.

Disclaimer: This article is based on Apache Spark 2.2.0 and your experience may vary.

1. No update operations

When writing data to a table, you can either:

  • Rewrite the whole table (SaveMode.Overwrite). According to the database driver, some databases are treated with TRUNCATE TABLE command, others are dropped and completely recreated with indices and foreign keys lost. See https://issues.apache.org/jira/browse/SPARK-16463
  • Append data to existing without conflicting with primary keys / indexes (SaveMode.Append)
  • Ignore any conflict (even existing table) and skip writing (SaveMode.Ignore)
  • Create a table with data or throw an error when exists (SaveMode.ErrorIfExists)
ds.write
.mode(SaveMode.Append)
.option("numPartitions", 8)
.option(JDBCOptions.JDBC_DRIVER_CLASS, "org.postgresql.Driver")
.jdbc(url, dbTable, connectionProperties)

If you must update just few records in the table, you should consider loading the whole table and writing with Overwrite mode or to write to a temporary table and chain a trigger that performs upsert to the original one.

It is quite inconvenient to coexist with other systems that are using the same tables as Spark and you should keep it in mind when designing your application.

2. Auto increment primary keys

If you overwrite or append the table data and your DB driver supports TRUNCATE TABLE, everything works out of the box. All you need to do is to omit the auto increment primary key in your Dataset[_].

Things get more complicated when tables with foreign keys constraints are involved. In this case indices have to be generated before writing to the database. Luckily Spark has a function that generates monotonically increasing and unique 64-bit number.

df.withColumn("id", monotonically_increasing_id())

Generated ID however is consecutive only within a single data partition, meaning IDs can be literally all over the place and can collide with data inserted in the table in the future or can restrict number of record safely saved with auto increment counter. There is a solution for truly monotonic, increasing, unique and consecutive sequence of numbers across in exchange for performance penalty which is outside of scope of this article.

3. Limits are not pushed down to JDBC

As you may know Spark SQL engine is optimizing amount of data that are being read from the database by pushing down filter restrictions, column selection, etc. Naturally you would expect that if you run ds.take(10) Spark SQL would push down LIMIT 10 query to SQL. That’s not the case. Some predicates push downs are not implemented yet. Spark reads the whole table and then internally takes only first 10 records. In fact only simple conditions are pushed down. You can track the progress at https://issues.apache.org/jira/browse/SPARK-10899 . This bug is especially painful with large datasets.

As always there is a workaround by specifying the SQL query directly instead of Spark working it out

spark.read("jdbc")
.option("url", url)
.option("dbtable", "(SELECT * FROM pets LIMIT 10) AS t")
.option("user", user)
.option("password", password)
.load()

4. Parallel read / write

Spark is a massive parallel computation system that can run on many nodes, processing hundreds of partitions at a time. Traditional SQL databases unfortunately aren’t. Level of parallel reads / writes is being controlled by appending following option to read / write actions: .option("numPartitions", parallelismLevel). The specified number controls maximal number of concurrent JDBC connections. By default you read data to a single partition which usually doesn’t fully utilize your SQL database. On the other hand the default for writes is number of partitions of your output dataset. This can potentially hammer your system and decrease your performance. Careful selection of numPartitions is a must.

Fine tuning requires another variable to the equation - available node memory. If numPartitions is lower then number of output dataset partitions, Spark runs coalesce on those partitions. Sum of their sizes can be potentially bigger than memory of a single node, resulting in a node failure.

5. Partitioning per column values

Sometimes you might think it would be good to read data from the JDBC partitioned by certain column. In the previous tip you’ve learned how to read a specific number of partitions. If you add following extra parameters (you have to add all of them), Spark will partition data by desired numeric column:

  • partitionColumn — numeric column name of a table in question
  • lowerBound — minimal value to read
  • upperBound— maximal value to read
spark.read("jdbc")
.option("url", url)
.option("dbtable", "pets")
.option("user", user)
.option("password", password)
.option("numPartitions", 10)
.option("partitionColumn", "owner_id")
.option("lowerBound", 1)
.option("upperBound", 10000)
.load()

This will result into parallel queries like:

SELECT * FROM pets WHERE owner_id >= 1 and owner_id < 1000
SELECT * FROM pets WHERE owner_id >= 1000 and owner_id < 2000
SELECT * FROM pets WHERE owner_id >= 2000 and owner_id < 3000
...

Be careful when combining partitioning tip #3 with this one. It might result into queries like:

SELECT * FROM (SELECT * FROM pets LIMIT 100) WHERE owner_id >= 1000 and owner_id < 2000

6. Timezone shifts

Last but not least tip is based on my observation of Timestamps shifted by my local timezone difference when reading from PostgreSQL. I didn’t dig deep into this one so I don’t exactly know if it’s caused by PostgreSQL, JDBC driver or Spark. Maybe someone will shed some light in the comments. However if you run into similar problem, default to UTC timezone by adding following JVM parameter:

-Duser.timezone=UTC

--

--