Spark vs Hive behavior over “collect_set”

Mithlesh Vishwakarma
Globant
Published in
4 min readJul 20, 2023

Introduction

When working with big data in distributed environments like Spark and Hive, it is not uncommon to come across situations where the same query produces different results. This can be due to various factors, such as differences in the execution engines, optimisation techniques, or even the underlying data format.

One such scenario where the results differ is when using the collect_set function in Spark and Hive. In this blog post, we’ll take a deep dive into the differences between collect_set in Spark and Hive and explore the reasons behind these differences.

What is collect_set in Spark and Hive?

Collect_set is a built-in function in Spark and Hive that returns a set of unique elements from a column. In Spark, collect_set is part of the DataFrame API and can be used with Spark SQL or PySpark. In Hive, collect_set is used with the SELECT statement and can be used with HiveQL.

Why does collect_set produce different results in Spark and Hive?

The main reason why collect_set produces different results in Spark and Hive is due to the order of elements. In Spark, the order of elements in a set is not guaranteed. This means that if two sets have the same elements, but in a different order, they will not be considered equal. In Hive, the order of elements in a set is guaranteed. This means that if two sets have the same elements but in a different order, they will still be considered equal.

Use-Case

Let’s consider a scenario where we have a table input_data containing information about user preferences. The table has columns user_id, domain_name, paid_or_free, r_num, and idx_period. We want to group the data by user_id and paid_or_free columns, collect the distinct domain_name values, and concatenate them using a colon separator.

WITH input_data AS (
SELECT '00024' AS user_id, 'TM' AS domain_name, 'PAID' AS paid_or_free, 2 AS r_num, '2022-12' AS idx_period
UNION ALL
SELECT '00024' AS user_id, 'ENT' AS domain_name, 'PAID' AS paid_or_free, 1 AS r_num, '2022-12' AS idx_period
)
SELECT
user_id, paid_or_free,
concat_ws(':' , collect_set(domain_name)) as domain_switch
FROM input_data
GROUP BY
user_id,
paid_or_free

The expected output of this query is:

user_id                  | paid_or_free | domain_switch
-------------------------|--------------|--------------
00024 | PAID | TM:ENT

In Hive, the output of this query matches the expected output. However, in Spark, the output is different:

user_id                  | paid_or_free | domain_switch
-------------------------|--------------|--------------
00024 | PAID | ENT:TM

As we can see, the order of the domain_name values in the domain_switch the column is reversed in Spark compared to Hive. This can happen because the collect_set function does not guarantee the order in which the elements are collected. In Hive, the default behavior is to preserve the order of the elements, whereas in Spark, the order is not guaranteed.

To ensure consistent behavior across both Spark and Hive, we can sort the elements before concatenating them using the sort_array function, as shown below:

WITH input_data AS (
SELECT '00024' AS user_id, 'TM' AS domain_name, 'PAID' AS paid_or_free, 2 AS r_num, '2022-12' AS idx_period
UNION ALL
SELECT '00024' AS user_id, 'ENT' AS domain_name, 'PAID' AS paid_or_free, 1 AS r_num, '2022-12' AS idx_period
)
SELECT
user_id, paid_or_free,
concat_ws(':' , sort_array(collect_set(domain_name))) as domain_switch
FROM input_data
GROUP BY
user_id,
paid_or_free

The above solution is applicable only when you are able to modify the query over both the environment Spark and Hive.

Here is one more proposed solution using which we can generate the expected result in Spark as Hive

WITH input_data AS (
SELECT '00024' AS user_id, 'TM' AS domain_name, 'PAID' AS paid_or_free, 2 AS r_num, '2022-12' AS idx_period
UNION ALL
SELECT '00024' AS user_id, 'ENT' AS domain_name, 'PAID' AS paid_or_free, 1 AS r_num, '2022-12' AS idx_period
)
SELECT user_id, paid_or_free, IF(first_domain_name is not null and second_domain_name is not null, concat(first_domain_name, ':', second_domain_name), domain_name) as domain_switch
FROM (select concat_ws(':' , collect_set(domain_name)) as domain_name, member_guid, idx_period, max(first_domain_name) as first_domain_name, max(second_domain_name) as second_domain_name
FROM
(select *, null as second_domain_name, domain_name as first_domain_name from input_datawhere r_num = 2
union all
select *, domain_name as second_domain_name, null as first_domain_name from input_data where r_num = 1)A
group by member_guid, idx_period)B

This proposed solution is validated only when the number of elements in domain_name is two. Developers can enhance it further for more different domains.

user_id                  | paid_or_free | domain_switch
-------------------------|--------------|--------------
00024 | PAID | TM:ENT

Possible cause of the differences

One possible cause is differences in the underlying data structures used by Spark and Hive to store and manipulate data (e.g., Spark uses Resilient Distributed Datasets, or RDDs, while Hive uses the Hadoop Distributed File System or HDFS).

These differences may affect the way collect_set operates, particularly when dealing with complex data types or nested structures.

Conclusion

Collecting unique elements from a dataset is a common task in data processing, and the collect_set function is used for this purpose in both Spark and Hive. However, differences in the order of elements collect_set can produce different results in Spark and Hive. In this blog, we explored the reasons behind these differences and provided some approaches for handling them. As data analysts, it's important to be aware of these differences and choose the appropriate approach for our specific use case.

--

--

Mithlesh Vishwakarma
Globant
Writer for

I am a data enthusiast working in a Data and AI company. Worked in python, java, spark.