by Ryan Abernathey and Tom Augspurger
TLDR: this post describes a new python library called rechunker, which performs efficient on-disk rechunking of chunked array storage formats. Rechunker allows you to write code like this:
from rechunker import rechunk
target_chunks = (100, 10, 1)
max_mem = "2GB"
plan = rechunk(source_array, target_chunks, max_mem,
…and have the operation parallelized over any number of Dask workers.
Chunked arrays are a key part of the modern scientific software stack in fields such as geospatial analytics and bioinformatics. Chunked arrays take a large multidimensional array dataset, such as an image captured over many timesteps, and split it up into many “chunks” — smaller arrays which can comfortably fit in memory. These chunks can form the basis of parallel algorithms that can make data science workflows go a lot faster.
Chunked arrays are implemented in both parallel computing frameworks — such as Dask and NumpyWren — and as an on-disk storage format. Some storage formats that support chunked arrays include HDF5, TileDB, Zarr, and Cloud Optimized Geotiff. When these chunked array storage formats are paired with the above computing frameworks, excellent scaling performance can be achieved.
However, chunked arrays workflows can fail hard when the chunks are not aligned with the desired analysis method. A great example can be found in this post from a user on the Pangeo forum:
Best practices to go from 1000s of netcdf files to analyses on a HPC cluster?
What? 8759 netcdf files totalling 17TB of HYCOM ocean model (u,v) velocity data at two depth levels (and bottom…
Geospatial satellite data is often produced as a global map once per day, creating a natural chunk structure (e.g. one file per day). But what happens if you want to do a timeseries analysis at each point in space? This analysis can’t be parallelized over chunks. Many array-based workflows get suck on similar problems.
One existing solution is to use Dask’s rechunk function to create a new chunk structure lazily, on the fly, in memory. This works great for some problems. For others, particularly those involving a full rechunk (every source chunk goes into every target chunk), Dask’s algorithm can run out of memory, or produce an unmanageably large number of tasks. (More details can be found in the post linked above.)
To address this problem, we created a new package that aims to solve this specific problem in an optimal way: rechunker.
The Rechunker Algorithm
Rechunker takes an input chunked array (or group of arrays) stored in a persistent storage device (such as a filesystem or a cloud storage bucket) and writes out an array (or group of arrays) with the same data, but different chunking scheme, to a new location. Along the way, it may create a temporary, intermediate copy of the array in persistent storage. The reliance on persistent storage is a key difference between Rechunker and Dask’s rechunk function.
Figuring out the most efficient way to do this was a fun computer science problem to solve. Via our Discourse forum, many people contributed to the discussion and shared different ideas they had implemented in the past. We identified a couple of key requirements for Rechunker’s algorithm:
- Respect memory limits. Rechunker’s algorithm guarantees that worker processes will not exceed a user-specified memory threshold.
- Minimize the number of required tasks. Specifically, for N source chunks and M target chunks, the number of tasks is always less than N + M.
- Be embarrassingly parallel. The task graph should be as simple as possible, to make it easy to execute using different task scheduling frameworks. This also means avoiding write locks, which are complex to manage, and inter-worker communication.
These considerations led to the creation of an algorithm we call Push-Pull-Consolidated.
Given a specific memory constraint (e.g., my worker processes have 4GB of available memory), this algorithm figures out whether an intermediate array needs to be created, and, if so, what chunk structure it needs to have. Rechunker groups together both reads and writes in an optimal way, so as to minimize the total number of tasks while respecting the specified memory constraint.
The Rechunker Package
Today we released Rechunker v0.1. You can get it on GitHub, Pip and (coming soon) Conda.
Rechunker is a Python package which enables efficient and scalable manipulation of the chunk structure of chunked array…
To install it, just run
pip install rechunker
To get started with rechunker, we recommend going through the tutorial.
Rechunker Tutorial - Rechunker 0.0.1+17.g4bdb8ae documentation
Here we load one of xarray's tutorial datasets and write it to Zarr. This is not actually a big dataset, so rechunker…
We hope that using rechunker is intuitive and simple. Rechunker should work on any platform where you can install and run Dask and Zarr, from your laptop to a massive supercomputer. Via the various filesystem-spec packages, rechunker can read / write data from standard POSIX filesystems, all flavors of cloud object storage, and basically any storage service you can think of.
A Realistic Cloud Example
In the tutorial, we show an example using the sea-surface height dataset from the Pangeo Cloud Catalog. Opening a Zarr dataset from Google Cloud Storage looks like this:
# a zarr group lives here
url = 'gs://pangeo-cmems-duacs'
gcs = gcsfs.GCSFileSystem(requester_pays=True)
source_store = gcs.get_mapper(url)
group = zarr.open_consolidated(source_store, mode='r')
source_array = group['sla']
source_array with Dask looks like this, a classic data cube:
The data are chunked along the first axis (which happens to correspond to time). If we want to parallelize some calculation over the time axis, such as calculating the spatial mean at each time, this chunking scheme is ideal. But if we want to do something else, like analyze the timeseries at each point in space, we are out of luck. We need a new chunking scheme. Rechunker to the rescue!
from rechunker import rechunk
max_mem = "1GB"
target_chunks = (8901, 72, 72)
# you must have write access to this location
store_tmp = gcs.get_mapper("gs://scratch-bucket/temp.zarr")
store_target = gcs.get_mapper("gs://scratch-bucket/target.zarr")
r = rechunk(source_array, target_chunks, max_mem,
This rechunking operations transforms the chunks from
(5, 720, 1440) to
(8901, 72, 72). The chunks are about the same size, but they have a very different orientation with respect to the full array. To perform the rechunking, we call
Using a dask cluster of 20 workers, we are able to execute this rechunking operation in a few minutes. Most importantly, the plan is guaranteed to work with any number of workers (including just one, i.e. in serial) or memory constraints. For rechunker, robustness and performance go hand-in-hand. This is what the dask graph for the rechunking operation looks like:
We see that there are basically two main groups of tasks: reading the source / writing the intermediate data, and reading the intermediate / writing the target. Each of these is embarassingly parallel.
Rechunker in its current state supports rechunking Zarr arrays using Dask as the execution framework. However, we would like to make it more flexible both in terms of formats and in terms of task schedulers. We hope to eventually support
- Incremental rechunking (update, we got a PR for this feature as I was writing this post)
- Other chunked array formats such as TileDB. HDF5 may even be possible, although more difficult.
- More flexible scheduling. Could we use other task execution frameworks, such as AWS Lambda, Apache Beam, or Prefect to execute these operations?
Inevitably, as people start using rechunker, new use cases will surface new bugs and suggest new features to implement. We look forward to working with the community on these, while trying to keep rechunker as simple and lean as possible!
If you’re struggling with chunk challenges, please try out rechunker and let us know what we can improve!