This is the second post in a two-part series on large-scale, distributed training of TensorFlow models using Kubeflow. In Part 1, we saw how to implement an efficient input pipeline for training a recurrent autoencoder using TFRecord and tf.data.Dataset API. In this post, we will continue from where we left off and discuss Kubeflow, the framework we will use for distributing our computational workloads across the cluster nodes. We will also provide the necessary implementation details and an illustrative visualization of how epoch run times scale with the number of GPUs.
This is the second in a two-part series. You can find Part 1 here.
TensorFlow takes care of all the painful details of workload distribution with its recommended MultiWorkerMirroredStrategy. At a high level: a replica of the model is created on every worker and then each batch of input data with a global batch size is split and distributed such that each worker gets a single batch with a batch_size_per_worker number of instances. Weight updates happen synchronously after every iteration.
This data parallelism combined with the ability of GPUs to execute intensive array computations at high speeds and TensorFlow’s efficient communication protocols between the GPUs is what makes this approach scalable. It can’t get any easier to apply this strategy to our model — we can define the model setup and graph operations within the scope of this strategy and TensorFlow takes it from there.
In the above code snippet, assemble(..) is analogous to a method that creates all the layers of the neural network and setup(..) would be a method that creates all the graph operations using those layers. Note that we’re calling these functions within the scope of the distribution strategy to let TensorFlow know that these are the operations that need to be distributed.
These are the TensorFlow essentials. Now let’s focus on another key component: Kubeflow.
Kubeflow is a machine learning deployment toolkit that automates a lot of the requirements for making portable and scalable ML deployments. We will use Kubeflow’s tf-operator to create and manage a TFJob, a custom Kubernetes resource. The TFJob custom resource is designed to run distributed training jobs in Kubernetes. In what follows we will outline the steps to install and deploy Kubeflow on an operational cluster with private nodes. We will assume kubectl is installed and enabled.
Creating a StorageClass
For this Kubeflow deployment, we need to create a default StorageClass with a dynamic volume provisioner enabled. Within the cluster context a StorageClass manifest needs to be executed; here is a template manifest (a manifest is simply a yaml file for creating a Kubernetes resource).
Next, run the following to create a storage class with dynamic provisioning enabled:
kubectl create -f <manifest>.yaml
Remember to set the cluster context appropriately using
kubectl config use-context <cluster> before executing this command.
Preparing the Node Pool and Configuring Firewall Rules
If the cluster does not meet the requirements for Kubeflow, create a new node pool to satisfy these requirements. It is straightforward to create or delete node pools with terraform but that is beyond the scope of this post.
If the cluster contains private nodes, as in our case, there will be issues with the installation. Specifically, the installation will crash when webhook related resources are being installed. To prevent this, we need to create an ingress firewall rule from the cluster master’s CIDR to the VMs on ports 6443 and 443. Creating this firewall rule using the Google cloud console is straightforward. For AWS users, the rule should be created under appropriate security group(s).
Installing and Deploying Kubeflow
Next, follow the instructions in “Prepare your environment” and “Setup and deploy Kubeflow” sections in this installation guide. After the installation, if we run
kubectl -n kubeflow get pods
we should see something like this:
We can see that our Kubeflow pods are up and running on our cluster. Note that this deployment is a one time process and in the future, these pods will be automatically created and deleted based on node availability.
Creating a TFJob
Now that we have Kubeflow installed and deployed on our cluster, we can use the tf-operator to create and run a TFJob, follow the steps below.
- Assuming the availability of sufficient compute resources to run a training job, we can simply specify the number of cluster workers to create, the docker image every worker needs to pull and the training script that needs to be executed by the workers, all inside a training manifest. See the example manifest above for clarity.
- We can also specify resource limits (such as allowable CPU and memory requests per worker pod) and also include nodeSelector configurations so that the “tensorflow” pods that run the training script always end up getting assigned to a specific node pool (e.g. a pool with GPUs). You must be careful to make sure that the workers have sufficient memory and CPU resources to run the job or you might end up seeing Insufficient cpu and Insufficient memory issues in the pod logs. See here for more details about the TFJob resource.
- Our <script_to_be_executed>.py, referred to in line 19 in the example training manifest above, executes `load_tf_records(..)` (explained in Part I of this series) and plugs in the returned tf.data.Dataset into the `fit(..)` function of the model whose graph operations are set up within the scope of the MultiWorkerMirroredStrategy.
Here, we have assumed that the reader has some level of familiarity with Docker.
Connecting the Pieces Together
So far, we have talked about all the different components of this workflow: data pipelines with TFRecord and tf.data.Dataset, the MultiWorkerMirroredStrategy, Kubernetes’ TFJob and Kubeflow’s tf-operator. But how do our model and the cluster workers interact? How does TensorFlow know the specification of our cluster and the availability of workers? How does everything we’ve seen so far fit together? This interplay between these different components is the last bit of information we need to understand.
When we initialize a TFJob, Google’s cloud AI platform automatically sets up a TF_CONFIG environment variable on every worker node in the cluster that executes the training job. It contains information such as the number of workers and task index for every worker, and essentially just points to the cluster specification. TensorFlow’s MultiWorkerMirroredStrategy uses this specification to understand worker roles.
Now, we have a reasonably good understanding of everything we need to run large-scale, distributed training of TensorFlow models. To execute the job, simply run
kubectl -n kubeflow create -f <path_to_training_spec>.yaml
and the distributed training routine will begin. You can look at the logs by running
kubectl -n kubeflow logs <worker-id>
We trained our recurrent autoencoder on a training dataset with 2,235,385 instances where each instance is an array of shape (36, 10); with 36 timesteps and 10 dimensions per timestep. Note that this example is only for demonstration purposes and that the sample dataset is not that large. Our model had an encoder with two layers with [128, 32] GRU units. The decoder had a reversed architecture. Each layer was followed by BatchNormalization and Dropout layers with the exception of the final decoder layer. The final states of every layer in the encoder were merged layer-wise to create input states for the decoder. We set batch_size_per_worker to 1024 and experimented with 2, 4, 6 and 8 GPUs (one GPU per worker). We trained this model on our cluster with n1-highmem-4 nodes, each having an NVIDIA-Tesla K-80 GPU. Below we visualize how epoch run times scale with the number of GPUs.
Running a large-scale, distributed model training involves a lot of moving parts and can be very overwhelming. We hope that this article provides clarity on the essential components of this workflow, and makes it easier to implement efficient input data pipelines for large volumes of data and to effortlessly scale computational workloads involved with model training using data parallelism. Visit When Machines Learn to find more technical guides for the working data scientist.