CLUSTER BY and CLUSTERED BY in Spark SQL

Nitin Mathur
3 min readApr 21, 2018

--

What is CLUSTER BY?

CLUSTER BY is a Spark SQL syntax which is used to partition the data before writing it back to the disk. Please note that the number of partitions would depend on the value of spark parameter “spark.sql.shuffle.partitions” that define the number of partitions. To understand more about CLUSTER BY, please refer this article.

What is CLUSTERED BY (Bucketing)?

Bucketing is a Hive concept primarily and is used to hash-partition the data when its written on disk. To understand more about bucketing and CLUSTERED BY, please refer this article.

Note:

  • CLUSTER BY and CLUSTERED BY can be used together in SPARK SQL.
  • CLUSTER BY (which is DISTRIBUTE BY and SORT BY in tandem) decides how many files will be created as an output of the operation while CLUSTERED BY decides how many buckets would be created and how the processed data would be finally stored on the disk.
  • CLUSTER BY is a part of spark-sql query while CLUSTERED BY is a part of the table DDL.

Lets take a look at the following cases to understand how CLUSTER BY and CLUSTERED BY work together in Spark SQL.

CASE 1: REGULAR TABLE (NO CLUSTERED BY)+ CLUSTER BY:

Consider a regular table with no bucketing. Lets load data using CLUSTER BY in this table.

DDL:

DROP TABLE IF EXISTS test_data;
CREATE TABLE `test_data` (
`user_id` BIGINT)
USING PARQUET
OPTIONS (
path ‘/user/test_data/’
);

Inserting Data with CLUSTER BY:

set spark.sql.shuffle.partitions=10;
INSERT OVERWRITE TABLE test_data
select cntry_id from countries CLUSTER BY cntry_id;

We can see below that 10 files + 1 SUCCESS file were created as an output of the operation:

spark-sql> !hdfs dfs -ls /user/test_data/;
Found 11 items
_SUCCESS
part-00000-d0fcf821.snappy.parquet
part-00001-d0fcf821.snappy.parquet
part-00002-d0fcf821.snappy.parquet
part-00003-d0fcf821.snappy.parquet
part-00004-d0fcf821.snappy.parquet
part-00005-d0fcf821.snappy.parquet
part-00006-d0fcf821.snappy.parquet
part-00007-d0fcf821.snappy.parquet
part-00008-d0fcf821.snappy.parquet
part-00009-d0fcf821.snappy.parquet

Observations:

  • Number of files = spark.sql.shuffle.partitions value with cluster by
  • Each file is partitioned by the value of CLUSTER BY column (cntry_id in this case)
  • Output file format is part-<partition-no.>-<alphanumeric>.<compression format>.<storage format>. Example: part-00009-d0fcf821.snappy.parquet

CASE 2: CLUSTERED BY (Bucketed) Table + NO CLUSTER BY:

DDL:

DROP TABLE IF EXISTS test_data;
CREATE TABLE `test_data` (
`user_id` BIGINT)
USING PARQUET
OPTIONS (
path ‘/user/test_data/’
)
CLUSTERED BY (user_id) INTO 10 BUCKETS;

If you redefine a table as CLUSTERED while underlying data is not CLUSTERED (please note that the path value is kept same), we get the below error on trying to query the table:

select * from test_data;

18/04/18 14:58:03 ERROR SparkSQLDriver: Failed in [select * from test_data]
java.lang.RuntimeException: Invalid bucket file part-00000-d0fcf821.snappy.parquet

Now let us load the data to the bucketed table with no CLUSTER BY in the sql.

INSERT OVERWRITE TABLE test_data
select cntry_id from countries;

We see that due to low volume of data, only 1 file was created. However, it was divided into 10 files due to 10 buckets.

spark-sql> !hdfs dfs -ls /user/hadoop_user/test_data/;
Found 11 items
_SUCCESS
part-00000–59e09855_00000.snappy.parquet
part-00000–59e09855_00001.snappy.parquet
part-00000–59e09855_00002.snappy.parquet
part-00000–59e09855_00003.snappy.parquet
part-00000–59e09855_00004.snappy.parquet
part-00000–59e09855_00005.snappy.parquet
part-00000–59e09855_00006.snappy.parquet
part-00000–59e09855_00007.snappy.parquet
part-00000–59e09855_00008.snappy.parquet
part-00000–59e09855_00009.snappy.parquet

Observations:

  • Number of files = 1 as spark.sql.shuffle.partitions as CLUSTER BY is not mentioned.
  • The one file created above is divided into 10 buckets.
  • Output file format is part-<partition-no.>-<alphanumeric>_<bucket-no.>.<compression format>.<storage format>. Example: part-00000–59e09855_00009.snappy.parquet

To summarize:

1. Both CLUSTER BY and CLUSTERED BY have same column values

Number of partitions (CLUSTER BY) < No. Of Buckets: We will have atleast as many files as the number of buckets. As seen above, 1 file is divided into 10 buckets

Number of partitions (CLUSTER BY) >No. Of Buckets: The number of files will not change, but multiple files will be mapped to same bucket.

Number of partitions (CLUSTER BY) =No. Of Buckets: Ideal situation. Each file created will be mapped to each bucket 1:1.

2. CLUSTER BY and CLUSTERED BY have different column values:

CLUSTER BY’s number of partitions will decide how many files will be created which will be distributed based on the hash of the combination of values of the columns mentioned in CLUSTER BY in memory. While writing these files to disc there will be a re-distribution based on the hash of the combination of values of the columns mentioned in CLUSTERED BY. The number of files may change again but number of buckets will remain constant.

--

--