Sitemap
VeepeeTech

VeepeeTech is the Veepee Tech community. The team handles more than 50 tools to support the business and the growth of Veepee! Here we share our work, achievements, failures and success on various Tech topics! Eager to know more? > https://careers.veepee.com/en/vptech/

Training pipeline orchestration with Kubeflow pipelines

--

This article has been written by Shihe Long, Data engineer at Veepee Personalization Team.

Abstract : As a data engineer at Veepee, I’ve worked during the past months on industrializing our machine learning training pipeline from dataset generation to model off-line evaluation. Thanks to Kubeflow I managed to speed up the full training process, ease the reproducibility and scale it (hyper parameter tuning) at a low cost. Here is my journey!

In the article Learning to rank at Veepee we explained how we implemented a neural network model to rank the sales in our homepage. To train a new model with new features or fresh data, our data scientists need to launch a series of steps manually, from their local computer or on a shared on-premise machine. This approach has several drawbacks:

First of all, lack of automation. Launching a new training is often very time-consuming since most of the steps in the pipeline require switching to a different Python virtual environment or to a different Docker image.

Second, poor reproducibility and reusability. A small change of some params on a single step often means we have to repeat the whole process manually again.

Last and most important, no scalability. The model training step needs a lot of resources (many CPUs and RAM) due to the fact that we have a very large train dataset. We can only launch at most 2 trains simultaneously on our on-premise machine. As a result, it is then impossible to grid search for best hyper parameters as it requires launching many trains in parallel, which limits our capabilities to optimize the performance of our models.

To tackle these challenges, we need a tool that can allow us to fully automate the training process and run it in the cloud to scale. Kubeflow pipelines comes as a natural choice since we have already worked with Kubernetes in the team and most of the projects used in the pipeline are already dockerized.

Decomposition of the training pipeline

As we can see from the figure below, the model training pipeline can be summarized as four main steps:

In some of the steps, the same task may need to be executed several times but with different parameters, for example, the creation of the train, validation and test dataset in the first step. In Kubeflow, the repeated tasks can be templated using components. A component in kubeflow is a reusable set of code that performs one step in the pipeline.

There are mainly two ways to create a component:

  • Using ContainerOp if the step is launched by running a docker image
  • Using function_to_container_op or create_component_from_func if the step is a simple python function.

Often, steps in the pipeline must be run one after another, one simple way to do that in Kubeflow is to specify this dependency by writing taskB.after(taskA) so taskB must wait for the finish of taskA. A dedicated project is created to host all our pipeline configurations and for each step described above, we create a Kubeflow component to reuse them in the pipeline definition. So finally the structure of our project is as follows:

Creation of train/test/eval dataset

In this step, we create all the features that the model needs from raw data available in the data lake of Veepee. We are using the power of BigQuery to do complex data transformations and aggregations. This step may take from 30 minutes to several hours depending on the size of the dataset. In this step, we also save all the feature names together with their types and default values in the generated dataset to a BigQuery table for later usage.

Exportation of train/test/eval the dataset

It would be too slow if our deep learning model coded in Tensorflow loads train samples directly from BigQuery. So we export them in the format of parquet to Google Cloud Storage (GCS). This is quite a simple task so we write it as a python function using the Google cloud Python SDK. This gives us more flexibility because we can easily customize the table name and the path in GCS according to a predefined pattern. Kubeflow then will take care of the compilation to the pod manifest file that can be run directly in Kubeflow pipelines.

Note that in creating a Kubeflow component from a python function, If there is any extra packages that needs to be installed, we must import them inside the python function and then we must declare them in the packages_to_install argument in func_to_container_op so that Kubeflow can correctly generate the compiled manifest.

Model Training and monitoring

Dataset Generator

Our machine learning model is written in Tensorflow and we use the Tensorflow Dataset API to feed the samples to the model. But a big problem is that our training dataset is huge (more than 1000 small files in parquet, and a total size of around 300~500 GB). As the training workload is run in a shared Kubernetes cluster, it’s complex to create a large volume, copy the files from GCS and attach it to the training pod. A simple workaround is to consume the dataset file by file using the API tf.Dataset.from_generator so that we can download only the partitions we need from GCS instead of the whole dataset on the fly during the training step. According to GCP, as long as your Kubernetes cluster and your GCS bucket are in the same GCP region, there is no extra cost of transferring data between them.

Resource allocation

It’s worth mentioning that to speed up the training process, we may have to allocate a special type of VM instances for the training pod, for example, a VM with GPUs or with many CPUs. In Kubeflow, we can specify the number of CPUs/amount of the memory we want for the training process in such way:

However, in some cases the training pod may fail to start due the following error:

This is because the machine types configured in the Kubernetes cluster are all smaller than what is asked by a single training so it cannot fulfill your demands. The solution is to ask the cluster administrator to create a dedicated node pool with the instance types that are tailored for our needs. Then we can configure Kubeflow to only schedule the training pods on these type of machines by:

The configuration above might not be enough if you are working in a shared cluster as other workloads may also be scheduled by K8s to your dedicated machines instances so eventually there might not be enough resources to fit the training process any more. To prevent this, we can ask the cluster admin to add a node taint to the dedicated machines and we can configure our train pod to tolerate this node taint by:

Like this, these dedicated machines can only be used by the training process.

Metric monitoring

In our pipeline, the training process takes up to 5 days to finish so it’s extremely important that we can monitor some metrics of the trained model in order to terminate early if anything bad happens (bad hyper parameters, over fitting, under fitting). Another cool feature of Kubeflow is its built-in results visualization. It’s easy to create a nice visualization if your model metrics can fit into the list of supported viewers. Below we give an example that we use to create the Tensorflow visualization in our pipeline.

Parallel training

Since the workload is run on Kubernetes and the train process is templated using the Kubeflow component, hyper parameters searching with parallel training is straightforward. We just need to pass different params to the train components, then everything is good to go, such as:

Model Exportation

When the training is finished, we need to export the model to a persistent storage (such as GCS) otherwise it might get deleted when the whole pipeline gets removed. One solution is to ask the cluster administrator to create a persistent volume and attach it to the training pod. But as the number of training increases, the number of volumes increases too and eventually it will be difficult to know in which persistent volume we store a specific model. A more elegant solution would be to store the models directly into GCS but unfortunately the Tensorflow library doesn’t support saving models directly to a blob storage such as GCS or S3. As we are in GCP, a workaround is to use GCSFUSE which emulates a classical file system using GCS so we can mount the bucket like an ordinary disk. This can be done by installing the GCSFUSE utilities in the train docker images, e.g.,

However, to activate the gcsfuse function during runtime, we need to add an extra security privileges to the train pod, such as:

Cost optimisation with GCP preemptible instances

The cost of training might be very expensive , especially for the cases of multiple parallel training. In our pipeline, we are using the standard e2-standard-16 VM for the training pod at the beginning. With only 5 parallel trainings launched and as each train lasts about 5 days, the total cost for this single task would be around 300 dollars. To optimize the cost, we can use GCP preemptible VM instances, which is up to a 60–91% discount compared to the price of standard VMs. However, the preemptible instances might be stopped and reclaimed by GCP at any time. So we have to make sure that the training process can also be resumed at any time. In the section Model Exportation, we have explained that we save the models constantly directly on GCS so the model doesn’t get lost when the machines are reclaimed by GCP. Thus we also made an improvement in our generator to save the training status (such as current epochs, partitions that have been trained in the epoch) to GCS so the training can be resumed from the last time it stopped. To use the preemptible instances, we first need to ask our cluster administrator to use preemptible instances for our dedicated node pool. Then we need to tell Kubeflow to resume the training after it has been stopped, this can be done by setting a retry on the train component as:

Like this, we managed to reduce the training cost by 3 to 4 times more than before.

Model offline evaluation

When the train process is finished, we use BigQuery.ML to evaluate the best exporter model against the test dataset to compute some interesting metrics: such as the average rank of the orders among all possible sales on the Veepee homepage. The metrics of each training is saved as a table under a specific dataset in BigQuery. We then use google data studio to visualize them all (figure below). This allows us to easily compare the performance among different trained models.

In some cases, it might be helpful to do a first offline evaluation when we have a first exported model without having to wait till the end of the training. As the training is a very long and expensive process, we could spot any potential issues and correct them at an early stage. The Kubeflow SDK allows us to execute a step only when some condition is met. So we create a Kubeflow component to periodically check the existence of the first exported model, and trigger an early offline evaluation:

An example of the whole training pipeline with Kubeflow

Below, we show the graph representation of an example pipeline that has 4 trains launched in parallel which share the same train dataset and tensorboard visualization.

Lesson learned

The Kubeflow API at this stage is not very stable and therefore sometimes the quality of the documentation is not guaranteed everywhere. Our machine learning pipeline was coded with the v1 version of the Kubeflow SDK but since then there is a new v2 coming up but there is no backward compatibility between v1 and v2. The installation of Kubeflow pipelines in GCP is quite easy via the AI platform but the configuration of all GCP permissions and resources to correctly run the pipeline is not that straightforward. To make it work, our team needs to work closely with SRE and cloud engineers who have strong skills on GCP and Kubernetes. At this time of writing the article, Google has released a new product called Vertex AI in which all workloads run on a cluster managed by GCP, therefore removing the need to manage your cluster any more. For small teams who want their pipeline ready as soon as possible and lack support from SRE, it might be a good idea to use Vertex AI pipelines first.

Conclusions

With Kubeflow pipelines, we managed to automate our training process, reducing the time to launch a full pipeline from half a day to less than a minute. We have migrated the training workload from on-premise to a cloud, allowing us to run multiple parallel trainings to search for the best hyper-parameters. The cost running on the cloud is optimized with the usage of preemptible instances. Last but not the least, with the centralisation of all training experimentation configurations, pipelines are also more reproducible than before.

--

--

VeepeeTech
VeepeeTech

Published in VeepeeTech

VeepeeTech is the Veepee Tech community. The team handles more than 50 tools to support the business and the growth of Veepee! Here we share our work, achievements, failures and success on various Tech topics! Eager to know more? > https://careers.veepee.com/en/vptech/

VeepeeTech
VeepeeTech

Written by VeepeeTech

VeepeeTech is one of the biggest tech communities in the retail industry in Europe. If you feel ready to compete with most of the best IT talent, join us.

No responses yet