RayOnSpark: Running Emerging AI Applications on Big Data Clusters with Ray and Analytics Zoo

Jason Dai
riselab
Published in
4 min readJul 29, 2019

Zhichao Li (zhichao.li@intel.com), Jason Dai (jason.dai@intel.com)

AI has evolved significantly in recent years. In order to gain insight and make decisions based on massive amounts of data, we need to embrace advanced and emerging AI technologies such as Deep Learning, Reinforcement Learning (RL), AutoML, etc.

Ray* (https://github.com/ray-project/ray) is a distributed framework for emerging AI applications open-sourced by UC Berkeley RISELab (https://rise.cs.berkeley.edu). It implements a unified interface, distributed scheduler, and distributed and fault-tolerant store to address the new and demanding systems requirements for advanced AI technologies. Ray allows users to easily and efficiently to run many emerging AI applications, such as deep reinforcement learning using RLlib, scalable hyperparameter search using Ray Tune, automatic program synthesis using AutoPandas, etc.

In this post, we will introduce RayOnSpark, a feature recently added to Analytic Zoo (an end-to-end data analytics + AI platform open sourced at https://github.com/intel-analytics/analytics-zoo). RayOnSpark allows users to directly run Ray programs on Apache Hadoop*/YARN, so that users can easily try various emerging AI applications on their existing Big Data clusters in a distributed fashion. In addition, instead of running big data applications and AI applications on two separate systems, which often introduces expensive data transfer and long end-to-end learning latency, RayOnSpark allows Ray applications to seamlessly integrate into Apache Spark* data processing pipeline and directly run on in-memory Spark RDDs or DataFrames.

In the following sections, we will focus on how to run Ray cluster and programs using PySpark on top of Hadoop/YARN (see Figure 1 below). Note that although this post only shows how to run Ray on the YARN cluster, the same logic can be applied to both Kubernetes* and Apache Mesos*.

To illustrate the intended RayOnSpark workflow, we will use a simple Ray example that gathers the server’s IP with Actor and run that example on the YARN cluster.

  • Follow the link below to install Anaconda*
https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html
  • Create a virtual environment named “zoo” (or any other name) as follows:
conda create -n zoo python=3.6
source activate zoo
  • Install the Spark, Analytics Zoo, Jupyter*, and Ray into Conda* environment.
source activate zoo
pip install analytics-zoo==0.6.0.dev6 (or above version)
pip install pyspark==2.4.3
pip install ray
conda install jupyter
  • Install the Java* environment.
conda install -c anaconda openjdk=8.0.152 

The Java environment setting is required by Spark. You can skip this if the environment variable JAVA_HOME has already been set up with JDK8.

  • Search and remember the path to the Hadoop configuration folder, which is needed to initialize Spark on YARN later. The hierarchy for the folder looks like:
  • Launch Jupyter notebook
jupyter notebook
  • In the notebook, launch SparkContext on YARN by simply calling the “init_spark_on_yarn” Python method provided by Analytics Zoo:

NOTE: One challenge the user is facing with PySpark on YARN is to prepare the python environment on each node in the cluster without modifying the cluster. You might think of using rsync to move the the dependencies from the driver to cluster manually, but it takes time and is error-prone. Also, you may not have the ssh permission in the production environment. Here we address this issue by leveraging conda-pack and YARN distributed cache, so as to help the user automatically bundle and distribute the Python dependencies across the cluster.

  • Launch Ray cluster using PySpark on YARN.

In RayOnSpark, we first create a SparkContext which will be responsible for launching Ray process across the underlying cluster (i.e YARN containers) via “ray start”. For each Spark executor, a “Ray Manager” (see Figure 2 below) is created to manage the Ray processes; it will automatically close or re-launch the processes when failure happens or when the program exits.

RayContext” is the entry point to trigger the deployment of Ray cluster. Here is the logic under the hood once you invoke “ray_ctx.init()”:
(1) A Ray “driver” will be launched on the local node.
(2) A single Ray “master” with Redis* process will be launched on one Spark executor.
(3) For each of the remaining Spark executors, a “slave” Raylet will be launched.
(4) The Ray master and Raylet processes will be configured to use the number of cores as specified by the “executor_cores” parameter.

  • After that, we will write some trivial code to test if the Ray cluster has been successfully launched or not. For instance, the following code would create Actors to gather IP from the allocated YARN containers.

With the RayOnSpark support in Analytics Zoo, the user just needs to add three extra lines of Python code at the top of the Ray program (as follows),

sc = init_spark_on_yarn( … )
ray_ctx = RayContext(sc=sc, … )
ray_ctx.init( … )

so as to run new AI applications built on top of Ray directly in existing Hadoop/YARN clusters, which can then be seamlessly integrated into the Spark data processing pipeline. As the first use case, we are currently working on an implementation of AutoML support for time series prediction (including automatic feature generation, model selection, and hyper-parameter tuning) using RayOnSpark.

*Other names and brands may be claimed as the property of others

--

--

Jason Dai
riselab

Intel Fellow and Chief Architect of Big Data AI