By: Gustavo Torres, Vincent Ketelaars, and Chris Said
Originally published at Opendoor.com:
Why we use S3Guard with S3 as a filesystem for Spark | Opendoor
Your tables are missing rows, and your jobs are failing mysteriously. It feels like some malign entity is at work…
Your tables are missing rows, and your jobs are failing mysteriously. It feels like some malign entity is at work. Here’s one solution…
After some diligent debugging, your team has figured out that the problems originated from interactions with S3, the ubiquitous storage service from Amazon. S3 is an object store, and it allows you to easily upload and download massive amounts of data.
We’ll discuss an important challenge with distributed system consistency when using S3 as a filesystem. But before we get to that, let’s start with why we chose S3 over other alternatives like HDFS.
Why use S3 as a filesystem?
- Fully managed
S3 is fully managed by AWS, and has ridiculously high up time. If S3 goes down, chances are there are other business that are going down as well too!
It’s much cheaper than hosting your own HDFS, Cassandra, Kudu
- Developer Experience
Good API support for uploading and downloading
You need a bit more space, no problem!
These are compelling reasons to use S3 as a filesystem. However, if you are considering making the switch, it’s important to keep in mind a key limitation to S3 when it comes to consistency.
What is consistency?
A distributed filesystem is consistent if actions made on one node are immediately available on a different node. That is, a consistent distributed filesystem acts as if it were running in a single process. For example, HDFS is a robust filesystem since it guarantees that if one node writes a file, another node could discover that file immediately after.
What level of consistency does S3 provide?
Unlike HDFS, S3 provides read-after-write and eventual consistency. Read-after-write consistency means that after you have created a new object, subsequent requests will return that same object. Eventual consistency, however, applies in all other scenarios, meaning that your updates will only eventually become available consistently. Until that time though, your requests could get returned in either the old state or the new!
These scenarios include overwriting an existing key or the deletion of the key. There is even a scenario where, if you query an object before you create it, that write will also be eventually consistent.
S3 can cause missing rows
When we started to use S3 as a filesystem for Spark jobs, we occasionally noticed issues with our results. Further investigation into intermediate artifacts revealed missing rows in our data.
So how did this happen?
Some of the intermediate results for our Spark jobs are very large, and splitting up those objects makes it easier to deal with them. The diagram below shows Task 1 successfully writing two split files to S3 in the same directory.
But, when Task 2 requires the output of Task1 and asks for a list of all the files, the returned list only contains a.txt. This causes Task 2 to continue working with only half of the intended data set. If however Task 2 were to wait a little longer, S3 would have managed to propagate the changes throughout, and a subsequent list call would have returned both files, i.e. the entire data set.
S3Guard as a solution
To deal with this, Hadoop provides a tool called S3Guard, which uses DynamoDB as a consistent metadata layer where all read requests should go before hitting the S3 API. It takes advantage of the fact that S3 can provide read after write consistency if you know the filename you are looking for.
The only downside of using S3Guard when writing to S3 is the fact that all writers and readers must use this metadata layer when writing/reading so that the DynamoDB table does not go out of sync with S3 contents.
Be careful when implementing this, because only Hadoop understands the s3a protocol that is required for S3Guard. You will get errors if you try it with an AWS native interface!
S3Guard successfully solved our problems with missing rows, which we were able to confirm by verifying that the number of rows written matched the number of rows we expected.