Writing Into Dynamic Partitions Using Spark

Hive has this wonderful feature of partitioning — a way of dividing a table into related parts based on the values of certain columns. Using partitions it’s easy to query a portion of data. Hive optimizes the data load operations based on the partitions.

Writing data into partitions is very easy. You have two options:

  • Static Partitioning — You provide the list of partitions you want to write the data into.
  • Dynamic Partitioning — You provide a column whose values become the values of the partitions. In this case Hive creates as many partitions as unique values of the column provided.

Here’s a code snippet that writes into static and dynamic partitions:

CREATE EXTERNAL TABLE IF NOT EXISTS stats (
ad STRING,
impressions INT,
clicks INT
) PARTITIONED BY (country STRING, year INT, month INT, day INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n';
-- Specify static partitions
INSERT OVERWRITE TABLE stats
PARTITION(country = 'US', year = 2017, month = 3, day = 1)
SELECT ad, SUM(impressions), SUM(clicks)
FROM impression_logs
WHERE log_day = 1
GROUP BY ad;
-- Load data into partitions dynamically
SET hive.exec.dynamic.partition = true;
SET hive.exec.dynamic.partition.mode = nonstrict;
INSERT OVERWRITE TABLE stats 
PARTITION(country = 'US', year = 2017, month = 3, day)
SELECT ad, SUM(impressions), SUM(clicks), log_day
FROM impression_logs
GROUP BY ad;

Did you notice the difference between the two? In the second INSERT query, the value for day hasn’t been specified in the PARTITIONS construct. Instead, the value for the day partition column comes from log_day column of the impression_logs tables. While the first query only writes into 1 partition, the second query can write into as many partitions as the days in the impression_logs table.

The OVERWRITE keyword tells Hive to delete the contents of the partitions into which data is being inserted. This is the key! Hive only deletes data for the partitions it’s going to write into. All the other partitions remain intact.

This feature of Hive allows us to develop applications using AWS EMR + S3 and Hive to write partitioned outputs and run backfills/updates on selective partitions. We create ephemeral EMR clusters and load the partitions needed by a job in each run. Partitioning the data by date also allows us to run multiple parallel jobs (in separate EMR clusters), processing data for different dates at the same time. Hive optimizes the disk reads to only load the partitions requested by a query.

And this is where Spark differs massively in implementation! When a Spark programmer talks about partitions, they always mean the partitions into which a Dataset is divided across the cluster. They talk about repartitioning the data or to coalesce the data before writing to disk.

One of the biggest problems I faced when working on a new project with Spark was the organization of the output data into buckets (Hive partitions) such that individual (or a collective group of these buckets) may be overwritten by a batch Spark job. The challenge was that, even though Spark provides an API to write into a format similar to Hive partitions, it either OVERWRITEs all partitions or appends to the partitions. Spark doesn’t natively support the same behavior as Hive. In the OVERWRITE mode, Spark deletes all the partitions, even the ones it would not have written into.

Here’s the Spark code that writes the data into partitions:

impressionsDF
.write.mode("overwrite")
.partitionBy("country", "year", "month", "day")
.json("s3://output_bucket/stats")

If we run the job to backfill the data for just 1 day, then Spark will delete the data for all other days if we were to run the above code.

After searching around a lot for solutions and trying out various options, I finally settled on using Hive context to write the partitioned data. With Spark-2.1.0 the Spark community has fixed a few bugs related to dynamic partition inserts:

  • SPARK-18183 — INSERT OVERWRITE TABLE ... PARTITION will overwrite the entire Datasource table instead of just the specified partition
  • SPARK-18185 — Should fix INSERT OVERWRITE TABLE of Datasource tables with dynamic partitions

So, if you are using Spark 2.1.0 and want to write into partitions dynamically without deleting the others, you can implement the below solution.

The idea is to register the dataset as a table and then use spark.sql() to run the INSERT query.

// Create SparkSession with Hive dynamic partitioning enabled
val spark: SparkSession =
SparkSession
.builder()
.appName("StatsAnalyzer")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.getOrCreate()
// Register the dataframe as a Hive table
impressionsDF.createOrReplaceTempView("impressions_dataframe")
// Create the output Hive table
spark.sql(
s"""
|CREATE EXTERNAL TABLE stats (
| ad STRING,
| impressions INT,
| clicks INT
|) PARTITIONED BY (country STRING, year INT, month INT, day INT)
|ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n'
""".stripMargin
)
// Write the data into disk as Hive partitions
spark.sql(
s"""
|INSERT OVERWRITE TABLE stats
|PARTITION(country = 'US', year = 2017, month = 3, day)
|SELECT ad, SUM(impressions), SUM(clicks), day
|FROM impressions_dataframe
|GROUP BY ad
""".stripMargin
)

Spark now writes data partitioned just as Hive would — which means only the partitions that are touched by the INSERT query get overwritten and the others are not touched.

I hope Spark adds this functionality natively, but until then this is the best solution I have.