Here in the Met Office we produce a lot of data, and we produce it fast. At the time of writing we are creating and archiving approximately 200TB of data a day. What’s more, much of this data becomes stale quickly. For example, we re-run our high-resolution UK weather model every hour. If you have not successfully loaded, parsed, processed and analysed that data within an hour then your conclusions are going to be out of date. This is the problem of “high momentum” data, data that is both big and changes quickly.
At the Informatics Lab we’ve implemented a proposed specification change to Zarr (a new cloud optimised file format) that lets us prepend, append and “roll” these large, fast-moving datasets in an efficient, concurrent and safe manner. This makes it possible to work with data sets that aren’t just big but also fast moving.
What’s wrong with NetCDF?
Currently much of the data processing in the Met Office uses NetCDF files as the data storage format. Though it has served the Met Office well, we don’t think the current version of NetCDF (and associated tools) are suitable for highly concurrent object-store backed access, as typical in a cloud computing setting. For a fuller argument on why see Matthew Rocklin’s article - HDF in the Cloud but in short:
- Accessing specific data and even metadata requires many small reads over a high latency network making data access slow.
- Usually we want to work with many (100’s, 1000’s or more) NetCDF files at once, greatly exacerbating the problem above.
- Most (all?) feature rich libraries to work with HDF use a C library that only works on POSIX file systems not with object stores (therefore requiring sub-optimal workarounds)
Because of these reasons (and more) we are looking at other data formats that work better for our highly concurrent, object store backed workflow. Zarr is the one we are most excited about currently.
The problem with Zarr
For all its pros perhaps the biggest challenge with Zarr for us is its handling of ‘moving’ datasets. To illustrate this, imagine an S3 backed dataset of the numbers 1 to 10 stored as a 1D Zarr array with a chunk size of one. This would create 11 objects in S3; the
.zarray object with dataset attributes and the chunks
9 which would contain the data 1 to 10. This is illustrated below:
If you wish to grow this array, say add the number 11, this is a relatively simple operation requiring changing one object (
.zattrs) and creating one new object (
10). This is illustrated below (green additions, blue changes):
However, if you wish to prepend to the original array, by adding the number 0, then every single object needs rewriting and a new object created. That looks like this:
In this trivial example that is no big deal but when working with big datasets with thousands or millions of significantly sized chunks it becomes infeasible.
Our real-world example
Let’s start with a simplified version of the problem we actually want to solve. Our simple example is a rolling dataset containing the dates of the next 4 days. If today is 01/01/19 then our dataset looks like this:
Now pretend we’ve waited 24 hours and it’s now 02/01/19. Again our data set should represent the next 4 days and so our updated dataset now looks like this (again changes in blue):
Notice that we’ve had to modify all objects (except
.zattrs). However, in information terms what has happened is that we’ve deleted
01/01/19 and created
05/01/19. The dates
05/01/19 should not have needed to change.
Again, no big deal in this trivial example but in our real-world scenario we are uploading 7TB a day to AWS Earth and wish to expose this as a 7-day rolling dataset. The underlying weather models update every 1 to 6 hours so we’d want to “roll” on this frequency. If all objects need re-writing approximately every 3 hours we’re looking at:
7 day archive * 7TB per day * 24 hours per day / 3 hours update frequency = ~400TB write per day.
That’s a lot, especially when you consider that as far as the information content is concerned you’ve only added 7TB. Even if (big if) the bulk can be considered moves rather than writes it’s still millions of object operations. With S3’s eventually consistent nature it’s almost inevitable that errors would occur such as object name collisions or accessing a mix of pre and post move objects when reading the data.
This example demonstrates a) this use case is not currently feasible and b) this use case is not purely academic. The good news is that a) a solution has already been suggested and b) we’ve implemented a workaround that can be used in the meantime.
A solution, as suggested in zarr-developers/zarr issue #267, is to add an extra bit of metadata that defines the origin of the Zarr array. At the moment it is baked into the specification (2.2 at the time of writing) that the origin is at 0 along each dimension. This proposal suggests that the position of the origin is set in the
.zarray object. So for our example of the 4 day rolling dataset on day 2 (02/01/19) the dataset would look like this:
Now instead of ‘shifting’ all the data along one object (
1, etc) we’ve instead created a new object at
4 and changed our
origin from implicitly at 0 to explicitly at 1. We have also deleted (red) the object we no longer need (
0). This is a much neater solution to our problem of rolling data. Also by accepting negative indexes (such as -1) this approach also allows datasets to be extended by prepending chunks.
If this seems like a good idea to you I’d encourage you to 👍 the feature request and perhaps add an example of how you’d like to use it. Since this feature requires a specification change there is a reasonably high threshold we need to cross so please go share your thoughts on zarr-developers/zarr issue #267.
A workaround for here and now
Rather than fork Zarr and create our own version with this specification change we’ve implemented a workaround that amounts to the same thing. To do this we’ve leveraged the fact that Zarr works on top of an array store, which is anything with a key-value interface such as S3 or a Python dictionary. By creating our own array store that understands that chunks are indexed and can apply an offset to that index we’ve effectively implemented the non zero origin considered above. We’ve used the
.zattrs element of the Zarr spec to put in our custom
offset metadata. Here is an illustration of our system based on our rolling 5 day example above:
Offloading responsibility for deleting
In our real-world AWS Earth scenario we want to be deleting a few hundred thousand objects a day. Whilst not an insurmountable amount to monitor, track, delete and generally administer it is an overhead we would rather not have.
We can avoid this burden by offloading it onto S3. To achieve this we are using the S3 lifecycle tools to automatically delete objects over seven days old. However, the complication in this is that whilst we wish the Zarr chunks (where the actual data is stored) to expire after seven days we don’t want other objects such as
.zarray objects to expire as this would break the Zarr specification, resulting in unreadable data. To work around this we use S3 tags to mark chunk objects (and only chunk objects) with
type=temp_chunk. We can then set a rule on our bucket to remove these objects after seven days.
Demo and source
If you would like to give it a go, including simple demos of prepending and rolling as discussed above, check out the interactive binder demo. If you are interested in the code take a look at the source at GitHub.
Concurrency in write
The examples above talk mainly about reading the data but to achieve our goal we need to be writing that data too. Given the number of objects coming from AWS Earth (~100,000 per day) and the number of chunks we want to output (1–10 times as many) these write operations will need to occur concurrently. The beauty of the system above is that as long as we make sensible decisions about chunking then safe concurrent write comes for free.
The considerations around chunking are simple enough. We must ensure that any chunk maps to one and only one input. In other words: processing any combination of inputs simultaneously will never result in writing to any one chunk more than once. This means that chunks can be smaller than the input files (one input → many chunks) but never larger (many inputs → one chunk). Depending on your input stream this could be an issue but it’s a concession we are willing to make in our case.
With the above caveat, you can freely make the data edits in any order and always have a valid dataset. The ideal ordering is:
- You write any new incoming chunk(s)
- Update the
.zattrswith a new offset
- Delete any old chunks
In this scenario you get a successfully “rolled” Zarr that has no missing data. This is the case even between any of the steps, such as before step 2 but after step 1.
Due to the eventually consistent nature of S3 you may not be able to control the ordering of these events, but this is ok. If steps happen out of sequence you end up with a valid Zarr with null/missing data (probably just temporally). For example, if step 3 happens first you have missing data at the beginning of your dataset, and if step 2 happens before 1 you have missing data at the end. These are perfectly legitimate situations.
The first limitation of note is that this system only works for adding, removing or shifting by whole chunks at a time. In our rolling 4 day dataset example if we had a chunk size of two we could only ‘roll’ every other day. There is a good suggestion from @alimanfoo on how to resolve this with another complementary addition to the spec.
The most significant limitation for us and one we will be blogging on more in the future is on metadata handling. Our datasest are accompanied by reams of metadata such as the coordinate grid the data is on, the height range the data, and so on. The solution above discards a lot of this information to provide an efficient, scalable, parallel write dataset. More on how we are dealing with this in the future.
Watch this space…
We excited about this small addition to the Zarr spec and the workaround, and believe it makes ‘live’ Zarr datasets a practical reality. That said this work is just the beginning so watch this space…
A follow-up post “How to (and not to) handle metadata in high momentum datasets” has now been published discussing some of the metadata challenges alluded to above.