Importing data from postgresql with Spark and comparing join between Parquet, hive, ORC

Giacomo Veneri
digitalindustry
Published in
2 min readFeb 22, 2019

Originally published at jugsi.blogspot.com.

Importing data from postgresql with Spark and comparing join between Parquet, hive, ORC

I have my funny application managing 200 nodes over internet with my funny db with two important tables:

  • action (command) and
  • log about action (log)

For every action I have 1 or more replies from different the node. Unfortunately, I have to prune my PostgreSQL db every 3 hours to maintain the number of log table’s rows below the 60.000 records avoiding waste of performance. So that, I’m loosing my precious logs every 3 hours

PostgreSQL partitioning should be a valuable solution, but I preferred a different approach. To maintain my logs I decided to use Spark + Hadoop HDFS and I tested ORC vs Hive vs Parquet.

  • Why ORC? ORC has a good compression and for archive should be very good
  • Why HIVE? Hive is natively supported by Spark
  • Why Parquet? Parquet was accounted as one of the best HDFS Based DB

Environment

My QA environment is a small environment: three nodes hadoop and spark over YARN.

Importing data

here my simple code to import from postgreSQL from scratch

val log = spark.read.format("jdbc").option("url","jdbc:postgresql://mycorporate.com:5432/mydb").option("driver","org.postgresql.Driver").option("dbtable","log").option("user","usr").option("password","pwd").load()
log.write.format("parquet").save("log.parquet")
log.write.option("path", "/data/home/hive").saveAsTable("log")
log.write.format("orc").option("orc.compression","gzip").save("log.orc")
val command = spark.read.format("jdbc").option("url","jdbc:postgresql://mycorporate.com:5432/mydb").option("driver","org.postgresql.Driver").option("dbtable","command").option("user","usr").option("password","pwd").load()
command.write.option("path", "/data/home/hive").saveAsTable("command")
command.write.format("parquet").save("command.parquet")
command.write.format("orc").option("orc.compression","gzip").save("command.orc")

I imported the data into a Spark dataFrame then I reversed this data into Hive, CSV or Parquet. the key partition is the command id (UUID).

I imported 60.000 rows from log and 3200 rows from command.

Then i tested with a simple join and an export of result partitioned for each node

//PARQUET
var time = System.currentTimeMillis()
val r_log_o = spark.read.parquet("log.parquet")
val r_log = r_log_o.drop(r_log_o.col("id"))
val r_command_o = spark.read.parquet("command.parquet")
val r_command = r_command_o.withColumnRenamed("timestamp", "c_timestamp")
System.currentTimeMillis() - time
//JOIN
time = System.currentTimeMillis()
val r_joined = r_log.filter($"status">3).join(r_command, r_command.col("id") === r_log.col("commandid"))
System.currentTimeMillis() - time
time = System.currentTimeMillis()
r_joined.write.partitionBy("nodename").csv("/data/home/out/joined_parquet.csv")
System.currentTimeMillis()-time
//HIVE
var time = System.currentTimeMillis()
val r_log_o = spark.sql("SELECT * FROM log")
val r_log = r_log_o.drop(r_log_o.col("id"))
val r_command_o = spark.sql("SELECT * FROM command")
val r_command = r_command_o.withColumnRenamed("timestamp", "c_timestamp")
System.currentTimeMillis() - time
//JOIN
time = System.currentTimeMillis()
val r_joined = r_log.filter($"status">3).join(r_command, r_command.col("id") === r_log.col("commandid"))
System.currentTimeMillis() - time
time = System.currentTimeMillis()
r_joined.write.partitionBy("nodename").csv("/data/home/out/joined_hive.csv")
System.currentTimeMillis()-time
//ORC
val r_log_o = spark.read.format("orc").load("log.orc")
val r_log = r_log_o.drop(r_log_o.col("id"))
val r_command_o = spark.read.format("orc").load("command.orc")
val r_command = r_command_o.withColumnRenamed("timestamp", "c_timestamp")
//JOIN
var time = System.currentTimeMillis()
val r_joined = r_log.filter($"status">3).join(r_command, r_command.col("id") === r_log.col("commandid"))
System.currentTimeMillis() - time
time = System.currentTimeMillis()
r_joined.write.partitionBy("nodename").csv("/data/home/out/joined_orc.csv")
System.currentTimeMillis()-time

Results:

To Load data:

  • Parquet: 1.6s
  • Hive: 1.9s
  • ORC: 1.5s

I repeated the test 10 times with different amount of data.

Joins were similar among the three 800–900ms

At the end: no big difference between the Parquet, hive and ORC for my use case.

--

--