EXPEDIA GROUP TECHNOLOGY — DATA

Can Hive Union Distinct Really Cause Data Loss?

Story of a 4 month long investigation into data loss issues with Hive

Abhimanyu Gupta
Expedia Group Technology

--

An abstract photo with a large question mark in the middle
Photo by Emily Morter on Unsplash

The short answer is no, but there could be other things wrong with your query! Read on to find out what they turned out to be in our case.

As software engineers, it is quite usual for us to hit bugs in our applications when we run them in a production environment. In spite of our best efforts and modern tools available for automated software testing, there are always those scenarios which happen only in prod. These bugs can be trivial or sometimes might require extensive investigation but rarely do these investigations go on for over 4 weeks.

Recently, we, the Expedia Groupᵀᴹ Data Platform Team, hit one such bug which took us 4 months of extensive investigation to find the root cause. I decided to chronicle our journey in the hopes that our learnings (and we had lots of them) will help a tired soul some day to solve their missing data mystery. If you want to skip the buildup and go straight to the solution, skip down to our Eureka Moment!

decorative separator

In the big data world, bugs are usually a bit harder to locate given the abstract nature of the data platforms on which these applications run and the large amount of data that they process. For instance, Hive SQL queries are internally converted into map reduce jobs by Hive and then executed on a Hadoop cluster. As a developer, you just trust the underlying platforms like Hive and Hadoop to work as expected as long as your business logic is fine.

Let me introduce Cloverleaf at this point. Cloverleaf is our internal, Hive-based, data reformatting and repartitioning tool. It reads data from source Hive table partitions, merges them with the existing partitions from a target Hive table, converts the merged data to Parquet format, and finally writes it back in the partition structure requested by the user. Cloverleaf runs on a shared auto-scaling Amazon EMR cluster where we run multiple Hive jobs at a time.

So, in January this year, when a customer reported that the Hive table generated and managed by Cloverleaf lost data, our first reaction was to check all other Cloverleaf-managed tables to verify that our core logic was correct. We found that the row counts in all of our other tables matched between source and target. This meant that Cloverleaf was working as expected for all other tables.

We then discovered that the problematic table had been reprocessed a few times in the past in order to propagate schema changes. We thought something might have gone wrong with this reprocessing, so we quickly reprocessed the problematic partitions using Cloverleaf and it fixed the row counts for those partitions. We assumed that a manual reprocess had been the cause of the errors.

decorative separator

Double partition Issue

Meanwhile, users were reporting another weird data corruption issue in their Hive tables that looked something like this:

Error opening Hive split s3://<TABLE_BASE_PATH>/<UNIQUE_SNAPSHOT_ID>/local_date=2019–12–03/brand=expedia/brand=expedia_$folder$ (offset=0, length=0):s3://<TABLE_BASE_PATH>/<UNIQUE_SNAPSHOT_ID>/local_date=2019–12–03/brand=expedia/brand=expedia_$folder$ is not a valid Parquet File

Notice the text in bold above. This table is partitioned on local_date and brand but Hive on EMR created this double partition folder along with _$folder$ file in that partition on S3. This was happening in an unpredictable manner in random output partitions. The actual data for that partition was being stored in the lowest leaf folder.

On further inspection, we found that this was also happening in other tables. We looked in the Hive metastore logs, AWS S3 access logs and also involved AWS premium support but because it was difficult to reproduce, we could not find the root cause.

And then the data loss issue was back!

After a couple of weeks, the data loss issue described above was reported again in the same table. This time we were sure that the table had not had any manual interventions and that something was really wrong. Note that there were still other tables that were just fine and there was no data loss occurring for those. This triggered our investigation.

Sleuthing starts!

Photo of a hunting dog in a wooded landscape
Photo by Kevin Noble on Unsplash

Fix double partition issue

At first, we tried to find some correlation between the two issues that we had. We found a few partitions which were impacted by the double partition issue and had data loss. Given that we didn’t know the root cause behind the double-partition issue, we added a feature to Cloverleaf to fix the partition by moving data one level up and deleting the extra level brand=expedia_$folder$. At the very least, we expected this would solve one issue for us.

Added row count validation

We then also added a row count validation step to Cloverleaf that uses Hive’s table statistics. Cloverleaf always writes the restated data in a new S3 location to do snapshot isolation. The validation we added would fail a particular Cloverleaf run if the row count of the restated partition was less than that of an existing partition. We hoped this would help us to identify the data loss issues right as they happened instead of having to dig around in the logs days or weeks later. Because the validation would fail the run this would also prevent the merging of new partitions with data loss in our production data sets. We’d rather have old, correct data than new, incorrect data!

The result was not what we expected. There were row count validation failures even in those tables where the overall row counts were fine.

Fixing Hive stats

On further investigation, we found that even after setting the Hive property:

set hive.stats.autogather=true;

Cloverleaf’s usage of an insert overwritequery was not producing correct statistics(numRows) for the output partitions. That meant the majority of the row count validation failures that we were seeing were false positives. We decided to put an explicit COMPUTE STATISTICS step at the end of our INSERT OVERWRITE query to set the correct stats on the output partitions.

ANALYZE TABLE PARTITION (part_col='<value>') COMPUTE STATISTICS

Once we realized that all of our existing partition statistics counts were wrong we manually updated the statistics on all of our existing tables.

With all these fixes in place, we now had to wait for the actual row count validation failures to happen again so we could debug our real issue.

Real failures and panic

After a few days, we saw our first “actual” row count validation failure, and then another. In the next two days validation failures were observed in all of the larger datasets that were recently onboarded. This was really alarming as we could no longer ensure data reliability for our customers. Our number one priority was correcting the data so we launched a massive team effort to reprocess the data in the affected partitions which seemed to solve the issues and give us some breathing space to continue our investigations. An additional benefit of this was that the next time a data loss issue happened, it would be on latest data and we would be able to save a copy of it to continue our investigation.

Data is not just missing, we have duplicates too!

Over the next few weeks, we tried all kinds of things to reproduce the issue in non-Production environments without any success and we were really running out of options. To keep the ship afloat, the reprocessing of partitions that we were doing had become a huge operational overhead for us.

Another baffling observation was that there were also duplicate rows in those partitions. So, Cloverleaf was not just missing some data but was also adding duplicate rows.

From observing the situations leading to the data loss we identified two common threads:

  • The issue happened only in the tables that contained large amounts of data, eg. 100 GB per partition.
  • The issue seemed more likely to occur when the EMR cluster was running a large number of parallel jobs.

Set up a testing environment in production

I mentioned at the beginning that there are some scenarios which only happen in production and we believed this was one such scenario. So we set up a separate EMR cluster in production to run Cloverleaf in parallel but write the output to scratch tables. This would then give us something using production data that we could experiment with, without impacting the real production data.

At its core Cloverleaf executes the following Hive query:

INSERT OVERWRITE TABLE 
TEMP_TARGET_TABLE
PARTITION (`target_part_col_1`,`target_part_col_2`)
SELECT * FROM TARGET_TABLE
UNION DISTINCT
SELECT * FROM SOURCE_TABLE
DISTRIBUTE BY PART_COL_1, PART_COL_2, cast(round(rand()*10) as INT);

We took this query and wrote scripts to execute it multiple times with large numbers of source partitions and compared row counts after every run. Within hours, we started seeing the row count validation failures. This was the first time we were able to reproduce the issue. We checked the data and were able to pinpoint missing and duplicate rows. We concluded:

  • There was nothing unusual about those rows. So, the issue wasn’t in the source data.
  • There were no double partitions on S3 in the output table. So, the two issues, double partition and data loss, were not connected.
  • There were a large number of killed reducers in the map reduce jobs that produced incorrect data. However, the jobs where the row count was correct had fewer killed reducers so it looked like the issue was being caused by high load.

The big clue

Multiple pairs of hands holding a treasure map
Photo by N. on Unsplash

With the information that we had so far, we listed potential problem areas:

  • Writing to Parquet format.
  • Writing data to HDFS first and then moving it to S3. We had disabled Hive’s blob store optimization flag as we were processing data on EMR’s HDFS first before moving it to S3 using distcp.
set hive.blobstore.optimizations.enabled=false
  • Writing data to Hive table
  • A bug in Hive’s “Union Distinct” clause (highly unlikely)

Then we used our testing script to set up these tests individually. We found that:

  • The issue happened even if Cloverleaf writes data in ORC format.
  • Changing the blob store optimization flag had no effect.
  • Writing data to just HDFS did not solve the issue.
  • The issue did not happen when you don’t write the data.

The last point means that the Cloverleaf query was executed without the insert overwrite clause and row count validation was run as part of the query itself.

SELECT PART_COL_1, PART_COL_2, count(*)
FROM
(SELECT * FROM TARGET_TABLE
UNION DISTINCT
SELECT * FROM SOURCE_TABLE
GROUP BY PART_COL_1, PART_COL_2) x

This was a big clue! Now we knew that the issue was being caused when Hive writes data to the table.

Eureka moment!

Image of children playing with a ball in a field with trees
Photo by Robert Collins on Unsplash

Armed with this information, we looked back on our query.

INSERT OVERWRITE TABLE 
TEMP_TARGET_TABLE
PARTITION (`target_part_col_1`,`target_part_col_2`)

SELECT * FROM TARGET_TABLE
UNION DISTINCT
SELECT * FROM SOURCE_TABLE
DISTRIBUTE BY PART_COL_1, PART_COL_2, cast(round(rand()*10) as INT);

We knew the text in bold in the above query had our issue hidden somewhere. To control the number of output files in each partition, we had introduced the distribute by clause with cast(round(rand()*10). All it was supposed to do was to distribute rows randomly to (in this case) 10 reducers. This number 10 was generated in Cloverleaf dynamically based on data volume for a particular table.

It seemed quite straightforward but we were really clutching at straws here. So, we started to search for some alternative approaches to control file sizes and stumbled upon this StackOverflow answer. And Bingo! Once you understand how Hadoop handles restarting killed reducers, you will quickly realize that this non-determinism introduced by rand() was causing all of our data loss/data duplication issues.

Let me elaborate on it a bit more. When a container, (the reducer writing the output data in this case) is restarted because it was running on a node experiencing memory or network issues, Hadoop will try and distribute the data again to a newly launched reducer and therein lies the problem. The random number generated for each row that determines which reducer will handle a particular row won’t necessarily be the same next time it gets calculated. So, a particular row that is already present in another reducer’s output might get distributed again to a new reducer (causing data duplication) or some row might not get any reducer as the designated reducer might have already finished (causing data loss).

Now, we only needed to replace the rand() function with a deterministic, evenly distributed key. That wasn’t difficult, as we could use Hive’s hash function and apply it on all columns in the data to get a fairly even distribution of data, like so:

abs(hash(source_col_1, source_col_2, source_col_3,...))%10

And why did the issue only happen when there was high load? Well, that’s when the nodes are more likely to run out of memory and reducers are pre-empted. We tested out these changes extensively with heavy load before rolling this out. We haven’t observed any data loss in Cloverleaf managed Hive tables since then.

Conclusion

As painful as this was, it was also a good learning experience. Looking back, this bug was so difficult to locate because it was only happening in our production environment and we were constantly on-boarding new tables while debugging. We were fairly new to the auto-scaling EMR cluster setup and had to spend a lot of time tuning our Hive queries and EMR configuration to get that right balance between performance and stability. As always in retrospect, creating the parallel test setup in production early on in the investigation would have made debugging a lot faster. Also, next time we see something wrong with data, we would think twice before blaming Hive for it.

I hope you enjoyed reading about our journey. It was an enlightening voyage of discovery for us. Thanks to Patrick Duin, Jay Green-Stevens & Max Jacobs, who are part of Expedia Group’s Data Platform team and helped track down this nasty one.

--

--