Open sourcing cluster-pack

Fabian Höring
Mar 10, 2020 · 5 min read

Packaging in Python is hard. Packaging is particularly hard when code needs to run in a distributed computing environment where it is difficult to know what runs where and which parts of the code are available to run there.

In a related blog post “Packaging code with PEX — a PySpark example” we explained different ways of shipping code to the cluster and how we can run this with PySpark. The blog post is focused on Python standard tooling like virtual environments, pip & pex but the ideas are generic and also apply to Anaconda environments, other applications (like Dask) and other compute clusters (like Mesos, Kubernetes).

Today we introduce cluster-pack, a new tool that generalizes all those ideas and use cases. cluster-pack is a library on top of either pex or conda-pack to make your Python code easily available on a cluster. Its goal is to make your prod & development Python code easily available on any cluster.

The first examples use Skein (a simple library for deploying applications on Apache YARN) and PySpark with HDFS storage. TensorFlow is supported via tf-yarn (see this blog post for more details) that already uses cluster-pack. We intend to add more examples for other applications (like Dask, Ray) and S3 storage.

Cluster-pack is available on GitHub: https://github.com/criteo/cluster-pack.

Features

Here is a brief description of the features that cluster-pack brings.

Ships a package with all the dependencies from your current virtual environment or your conda environment

For conda it uses conda-pack to zip your current conda environment, for pex it will inspect your current virtual environment and generate a self-contained pex file. Both zip files will be uploaded to the distributed storage. cluster-pack supports both HDFS & S3 as a distributed storage via the PyArrow FileSystem.

Storing metadata for an environment

In addition to uploading the environment as a zip file cluster-pack will also create a metadata file with all the uploaded package versions. This is not only useful to know which environments are existing and reuse them (currently only conda has this feature locally via ‘conda list’), it also permits to do caching and update only the packages where the dependencies changed.

Dev mode

When using virtual environments with pip cluster-pack takes advantage of pip’s editable installs mode, all editable requirements will be uploaded all the time separately, making local changes directly visible on the cluster, and not requiring to regenerate the whole package with all the dependencies again. In our use case where we have large packages like PySpark (200 MB) and TensorFlow (400 MB) this is a huge improvement for developing our libraries. This even works when we need to change multiple libraries at the same time. We only change them to editable mode, do our changes and they are immediately taken into account in our job on the cluster.

Interactive (Jupyter notebook) mode

Code used in an interactive way (like in the Spark shell, Jupyter notebooks, …) means that it is not included in the package and therefore can’t just be included in the zip file. In general, this case is handled by pickling (serializing) the Python functions, sending it to the workers and unpickling it on the remote side. cluster-pack uses cloudpickle to provide this feature and to also make interactive code directly available on the cluster.

We are aware that pickling is difficult in Python as a non-statically typed language and even cloudpickle fails in some cases. That’s why cluster-pack also ships the code from the current package making pickling not necessary (described in the self shipping project example). One could just work in an IDE in the current project and all project files will be uploaded automatically.

Provides config helpers to directly use the uploaded zip file inside your application

After having uploaded the package to a distributed storage we need to download it and invoke the Python interpreter. For conda this means unzipping the whole package and setting some environment variables. For pex it means to invoke the self-contained zip file. This is in general application-specific. Every application provides different ways to invoke the entry points and to upload files. We currently provide a config builder for skein and PySpark to easily do this but also intend to provide one for Dask.

Launching jobs from jobs from jobs …

Imagine a situation where you launch a job on the cluster that launches other jobs. This is great for offloading heavy local work to the cluster. What does this mean for the zipped environment? What to do with the pickled files and uploaded editable projects? cluster-pack will take care of this and forward everything to the next job.

Walk-through for an interactive skein example

Here is an interactive example using Skein and HDFS storage with a virtual environment. You can also execute it directly in a Jupyter notebook.

$ cd examples/interactive-mode
$ python3 -m venv venv
$ . venv/bin/activate
$ pip install cluster-pack numpy skein
python

2. Define the workload to execute remotely

def compute_intersection():
a = np.random.random_integers(0, 100, 100)
b = np.random.random_integers(0, 100, 100)
print("Computed intersection of two arrays:")
print(np.intersect1d(a, b))

3. Upload current virtual environment to the distributed storage (HDFS in this case)

import cluster_pack
package_path, _ = cluster_pack.upload_env()

The installed dependencies numpy & skein will be automatically included in the packages. We also ship the pickled function compute_intersection.

4. Call the skein config helper to get the config that easily executes this function on the cluster

from cluster_pack.skein import skein_config_builder
skein_config = skein_config_builder.build_with_func(
func=compute_intersection,
package_path=package_path
)

5. Submit a simple skein application

import skein
with skein.Client() as client:
service = skein.Service(
resources=skein.model.Resources("1 GiB", 1),
files=skein_config.files,
script=skein_config.script
)
spec = skein.ApplicationSpec(services={"service": service})
app_id = client.submit(spec)

Ideas for future developments

The first examples work with skein and PySpark on Hadoop and HDFS storage. We intend to add more examples for other applications (like Dask, Ray) and S3 storage.

We could also provide examples on how to use it on other compute clusters like Kubernetes, Mesos, ... or how to use it with a manual cluster setup (by providing a cluster spec and using ssh to connect to each node)

The current use cases mostly cover working with a local virtual env/conda environment. We could also directly create the package from a requirements file (for virtual envs) or conda yaml file (for conda). This would activate many more use cases. For example, we could have have a unified tool (pex & conda compatible) to run projects in Mlflow. Currently, only conda yaml files work.

Criteo R&D Blog

Tech stories from the R&D team

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store