Why and How we use Pangeo at CNES
This post is meant to share the reasons that made us, the CNES (French spatial agency) Computing Center team adopt Pangeo and provide it to our users. It will also show how the platform has been deployed on our cluster, and what we’re doing with this software ecosystem.
First contact with Dask
I’m originally an all around software developer, that specialized in data engineering and distributed processing. I’ve been in the CNES High Performance Computing (HPC) team for three years as a scientific computing expert, and I am leading this team since a few weeks. I began my journey to the distributed computing world with Hadoop and MapReduce algorithms applied to Astronomy data. Later on I discovered and used Apache Spark, and a couple of years after was hired to work in the CNES HPC team.
There, I encountered a lot of different use cases on our platform: using software to orchestrate several batch jobs, big processing campaigns with big job arrays, a few MPI/OpenMP optimized computations, and some using mpi4py as a workload manager. Most of our cluster occupation was, and still is, heavy High Throughput Computing (HTC) pipelines, that need either a lot of CPU time or are I/O bound. Obviously there is some synchronization and data manipulation to do, but most of this doesn’t require the MPI sophistication.
So I came and said “Hey, why don’t you use Spark? It would really simplify the way you are writing your workflows!” I installed it, made some demonstrations, participated in Research and Technology projects, but this never really took off. Then, in a Big Data conference in London, I attended a talk on Dask by Matthew Rocklin. That looked really appealing, and especially some words struck me: science, scipy, numpy, pandas… I did not understand at this moment that what really prevented Spark to take off for us is that it is quite close to a Business Intelligence tool at first and that it is developed in Scala. Whereas what is now pushing Dask/Pangeo up is that Dask is born in a scientific ecosystem around Python, much closer to what CNES people are used to.
Tried Dask and met Pangeo
I decided to try Dask to compute Daily analysis of our IBM Spectrum Scale (ex GPFS) usage: things like computing the number of created files, the volume, and so one sorted by users, date, project… With some GPFS magic, we were able to extract between 100 and 200 GB of CSV files per day giving stats on the 500 million files on our system. I was able to quickly analyze them using dask-dataframe and a LocalCluster on one of our compute nodes (24 cores, 128GB memory). This was really a pleasure to just have to install a conda environment and be able to do easy distributed computing, with only a few lines of code to express the analysis we needed. I was really being convinced by Dask, but this was on a Spark-like usage. I needed some more science.
I’m not sure how I came into Pangeo website. Maybe when looking for a good way to analyze lots of NetCDF files, maybe some colleague pointed me to it. I immediately felt the community was trying to solve a problem we were facing with our data and associated scientific computations, and that we could not solve easily using other technologies like Spark or MPI. There were already cool examples for complex computations. We were then at the beginning of the creation of the Pangeo cloud platform, and Joe Hamman and others were working on HPC deployment, with a lot of documentation and a python tool that later on became dask-jobqueue. That’s when I decided to engage in the community, in order to work on the HPC integration, and be able to come up with real science use cases for the users of our cluster.
Pangeo deployment at CNES
The Pangeo platform is made of three main parts: a distributed computing infrastructure with Dask distributed available on top of it; a JupyterHub or a way to launch Jupyter as user interface; scientific data and associated use cases that can be helped by Xarray or other spcialized library.
Distributed computing infrastructure
Let’s start with the distributed computing, as it is the easiest: we already had an HPC cluster scheduled by PBS Pro, so we only needed a conda environment with Dask and Dask-jobqueue in it. And yep, that was it, in a few minutes, we had our base infrastructure. If you’re interested, our cluster is a modestly sized one, with currently about 350 servers, 8,000 cores and 6.2 PB storage. We’ll soon update it to reach 12,000 core and 8PB storage, and a bandwidth of 100GB/s for reads and writes.
As described on pangeo website, Jupyter notebooks were already available for advanced users. But the need for qsub, ssh tunneling, and other tricks made it complicated. We thus decided to go with Jupyterhub, the right way of providing notebooks to our users. Except for some internal IT process that can take some time, it was really easy to put it in place. It’s basically made of:
- A Virtual Machine with direct access to our cluster: plugged with the same LDAP directory, and with PBS client commands available on it (qsub, qstat, qdel).
- A local Python environment with Jupyterhub and related packages to actually launch the hub and a default shared Python environment in which to start the user notebook server.
- Jupyterhub correct configuration, in our HPC case, with batchspawner which starts user notebooks by submitting jobs on the cluster (we just had to patch it to make it PBS Pro compatible), and with wrapspawner to be able to provide several configurations depending on user need and our cluster queues.
Wrap all this with the right level of automation using Ansible, some additional wrapping around systemd, a layer of security, and you’re all set!
Scientific data and use cases
Fortunatly, we have quite a few scientific datasets available on our computing platform. They are generaly the output of some HPC simulations, or data produced by CNES satellites or our partners (mostly earth observation data, like the one from Sentinel satellites). They are not always in a friendly format for distributed processing, but that’s another story.
I will talk more on the actual cases below. What’s important here is that it can take some time to really demonstrate Pangeo’s potential. You need to go beyond basic Dask/Xarray examples, and the really cool more advanced ones are not always a perfect fit too. As my team and myself are generally not field or domain experts, we needed to work with scientists to show that real science adapted to their need could be done with Pangeo.
From job arrays to Dask
Job arrays use
Our first use case doesn’t even use Xarray or alike. It is base on what I called “parametrized simulation” where basically:
- You have one compute method or routine, that takes between a few seconds and several hours to run on a given set of parameters.
- You need to run it on several thousands or million of parameters.
- You need to gather the results, save them, and do some statistics/profiling on them.
Example of this kind of workload are Montecarlo simulation in classical HPC or Gridsearch Cross Validation in Machine Learning. In HPC usual ways, you do this with job arrays. Job arrays are made to easily run a high number of almost identical jobs with a few variation based on their index. This is done in three steps:
- Build your set of parameters, and save it to a file.
- Write a job array PBS script file, and submit it to your batch scheduler. Every single job will take its corresponding line from the parameters file.
- Gather all results, one way or another.
In my experience, even something as simple as that often leads to poor design or bad use of our cluster : boilerplate code that is hard to read to chain every steps (with a combination of shell, python and other languages), Batch Scheduler overwhelmed by short jobs or an added complexity to adjust the number of computations in a job, File System DDoS when writing and then reading several millions of tiny files containing the results of every computations.
And with Dask?
No fancy use of elegant collections here, we just need the low level API of Dask, either Delayed or Futures (okay we may need Bag to optimize a bit), pick the one that best fits your need. What we do with it:
- Build the set of parameters, and keep it in memory.
- Start a Dask cluster with dask-jobqueue, connect a client to it.
- Just client.map, delayed into a for loop, or make a bag of the parameters, and compute on all our parameters.
- Gather all the results into client memory if it can fit, or just do some post analysis and reduction, or just write results from the Dask workers directly.
So what do we win by using Dask:
- Easy to maintain and unique Python file of code with much fewer lines than the original program!
- No impact on HPC cluster scheduling system, task scheduling is made by Dask, the cluster job queuing system just give resources for a long amount of time.
- No impact on the distributed storage. Everythong is done in memory with network exchange if possible, or else Dask provides nicely optimized IO methods like to_csv.
- The ability to directly analyze the results in the same script in a distributed way.
Some more advantages provided by Dask: easy error handling with task retrial, work on results as they are computed, use adaptive reservation to optimize your cluster use… So clearly here, this is a big win, and if we add that by changing one line of code you could go to the cloud, some eyes pop out.
Multi temporal analysis of scientific data
One important thing that provide modern satellites, for example Sentinel constellation, is a high revisit frequency (same location every 5 days for example). This enables new or more efficient algorithms, as soon as you’re able to take advantage of it. In order to do this you often represent the data as a so called Datacube. For satellite imagery, a data cube is made of several dimensions: latitude, longitude, spectral bands, and time. In order to manipulate these datacubes, some array databased like Rasdaman or Open Data Cube were developed, but they often need a data ingestion step and a Client/Server setup that make them not easy to adopt.
Xarray, a main component of Pangeo, provides data structure adapted to NetCDF format, which is multi dimensional by nature, and thus provides a memory representation, along with some input and output methods, to build a datacube. It can scale this datacube on hundreds of computer nodes with Dask. It’s actually the library used by the Australian Open Data Cube to build their system. With a few lines of code, we can use the xarray.open_rasterio method to read multiple spectral bands in multiple dates in time of Sentinel data, and build a datacube out of it.
What to do with the data once you’ve built your temporal hyper cube?
- Hello world example for satellite imagery is the computation of the Normalize Difference Vegetation Index (NDVI), which gives an idea of the quantity of vegetation of a given pixel.
- But you can do other temporal statistics: how many days is a pixel or area free of clouds over a year? How have coastal areas or Amazonian forests evolved? And so on.
Outside of Satellite imagery, Ifremer has used Pangeo on our cluster to analyze the outputs of a big ocean simulation (MitGCM LLC4320). One typical thing done on this simulation is the computation of time-frequency kinetic energy spectra, which basically build a big datacube over the globe and reorganize the data so as to have the full temporal axis on a given location in memory in order to perform the scientific computation on it. Xarray helps to do this in very few lines of code, and by using Zarr file format they were able to speed-up the data loading by a factor of ten compared to NetCDF.
So where are we now
This has been almost a year we have a Pangeo platform on our cluster. A lot of users have adopted Jupyter notebook interface, and rightly so, with or without the rest of the Pangeo stack. A good number have started working with Dask for varied needs. Some oceanographers like Ifremer scientists are working with Xarray on NetCDF like data. Yes, Pangeo is a wonderful answer to a lot of our needs.
What we are really working on right now is to identify the level of integration we can hope of Pangeo in our more complex workflows, especially with satellite imagery (so beyond just NDVI). Will we be able to integrate our computations in high level libraries such as Xarray (which would be the ideal in my opinion), or will we have to just use Dask as a low level task manager? To rightly answer this, I think that Xarray needs to grow it’s support for this kind of data, and hopefully some of us will be able to help for this to happened!