Importing data from postgresql with Spark and comparing join between Parquet, hive, ORC
Originally published at jugsi.blogspot.com.
Importing data from postgresql with Spark and comparing join between Parquet, hive, ORC
I have my funny application managing 200 nodes over internet with my funny db with two important tables:
- action (command) and
- log about action (log)
For every action I have 1 or more replies from different the node. Unfortunately, I have to prune my PostgreSQL db every 3 hours to maintain the number of log table’s rows below the 60.000 records avoiding waste of performance. So that, I’m loosing my precious logs every 3 hours
PostgreSQL partitioning should be a valuable solution, but I preferred a different approach. To maintain my logs I decided to use Spark + Hadoop HDFS and I tested ORC vs Hive vs Parquet.
- Why ORC? ORC has a good compression and for archive should be very good
- Why HIVE? Hive is natively supported by Spark
- Why Parquet? Parquet was accounted as one of the best HDFS Based DB
Environment
My QA environment is a small environment: three nodes hadoop and spark over YARN.
Importing data
here my simple code to import from postgreSQL from scratch
val log = spark.read.format("jdbc").option("url","jdbc:postgresql://mycorporate.com:5432/mydb").option("driver","org.postgresql.Driver").option("dbtable","log").option("user","usr").option("password","pwd").load()
log.write.format("parquet").save("log.parquet")
log.write.option("path", "/data/home/hive").saveAsTable("log")
log.write.format("orc").option("orc.compression","gzip").save("log.orc")
val command = spark.read.format("jdbc").option("url","jdbc:postgresql://mycorporate.com:5432/mydb").option("driver","org.postgresql.Driver").option("dbtable","command").option("user","usr").option("password","pwd").load()
command.write.option("path", "/data/home/hive").saveAsTable("command")
command.write.format("parquet").save("command.parquet")
command.write.format("orc").option("orc.compression","gzip").save("command.orc")
I imported the data into a Spark dataFrame then I reversed this data into Hive, CSV or Parquet. the key partition is the command id (UUID).
I imported 60.000 rows from log and 3200 rows from command.
Then i tested with a simple join and an export of result partitioned for each node
//PARQUET
var time = System.currentTimeMillis()
val r_log_o = spark.read.parquet("log.parquet")
val r_log = r_log_o.drop(r_log_o.col("id"))
val r_command_o = spark.read.parquet("command.parquet")
val r_command = r_command_o.withColumnRenamed("timestamp", "c_timestamp")
System.currentTimeMillis() - time//JOIN
time = System.currentTimeMillis()
val r_joined = r_log.filter($"status">3).join(r_command, r_command.col("id") === r_log.col("commandid"))
System.currentTimeMillis() - timetime = System.currentTimeMillis()
r_joined.write.partitionBy("nodename").csv("/data/home/out/joined_parquet.csv")
System.currentTimeMillis()-time//HIVE
var time = System.currentTimeMillis()
val r_log_o = spark.sql("SELECT * FROM log")
val r_log = r_log_o.drop(r_log_o.col("id"))
val r_command_o = spark.sql("SELECT * FROM command")
val r_command = r_command_o.withColumnRenamed("timestamp", "c_timestamp")
System.currentTimeMillis() - time//JOIN
time = System.currentTimeMillis()
val r_joined = r_log.filter($"status">3).join(r_command, r_command.col("id") === r_log.col("commandid"))
System.currentTimeMillis() - timetime = System.currentTimeMillis()
r_joined.write.partitionBy("nodename").csv("/data/home/out/joined_hive.csv")
System.currentTimeMillis()-time//ORC
val r_log_o = spark.read.format("orc").load("log.orc")
val r_log = r_log_o.drop(r_log_o.col("id"))
val r_command_o = spark.read.format("orc").load("command.orc")
val r_command = r_command_o.withColumnRenamed("timestamp", "c_timestamp")//JOIN
var time = System.currentTimeMillis()
val r_joined = r_log.filter($"status">3).join(r_command, r_command.col("id") === r_log.col("commandid"))
System.currentTimeMillis() - timetime = System.currentTimeMillis()
r_joined.write.partitionBy("nodename").csv("/data/home/out/joined_orc.csv")
System.currentTimeMillis()-time
Results:
To Load data:
- Parquet: 1.6s
- Hive: 1.9s
- ORC: 1.5s
I repeated the test 10 times with different amount of data.
Joins were similar among the three 800–900ms
At the end: no big difference between the Parquet, hive and ORC for my use case.