Machine Learning in production (2) — large scale ML training

Keven Wang
9 min readJun 29, 2020

--

In a previous post (here), I shared some experience on how to introduce a structured Python project for machine learning as early as the first working notebook. By leveraging a powerful platform like Databricks, you can have pretty fast feedback loops. This approach works relatively well for a small scale machine learning project, but you will probably need something more for a large scale machine learning project. Before we start, I would like express my appreciation to Nicolas Seyvet spent hours to help me refine this story and also Jun Xie who contributed some of code examples in this blog.

First of all, what is large scale machine learning?

  1. You need to train not only single model, but hundreds or even thousands of models. For example in retail industry, to get most accurate result, with the same code base you can train different models for specific geographical locations, customer groups, type of products or even time periods, this could be thousands of different combination or even more.
  2. It takes considerable amount of time and compute resource to train a single model, like 6–24hours with hundreds of machines. This often happens for customer management related use cases. For example, recommendation systems for which in order to take full advantage of the machine learning algorithm potentially requires all transactions/events generated by all your customers. The volume of data is huge.

For this type of project, I would propose to introduce Machine Learning Orchestrator on which the remainder of this post will focus.

The expected benefits of such an orchestrator are:

  • Automation
  • Optimised (computation) resource usage
  • Operational friendly

On the other hand, when should NOT you consider machine learning orchestrator? If your project is still in its infancy and the team continuously refactors the code structure and pipeline, then you should probably avoid using a machine learning orchestrator. In such case, a more interactive development environment like Jupyter/Databricks notebook or the IDE solution proposed in the last story should definitely be considered.

Machine Learning Orchestrator

Orchestrator is similar to a workflow engine that can tie disparate tasks and procedures together to create reliable, flexible, and efficient end-to-end solution (quote from here).

Specifically for Machine Learning Orchestrator, here are a number of challange we are trying to solve:

  1. Utilising different types of computation resources. Picture below illustrates the various stages of a typical machine learning pipeline, the technology or tools involved in these steps. It is very clear that for each steps you will have different size of data and different computational paradigms, some step could be distributed computations, and others are iterative process. The Orchestrator will help to use the different types of computational platform in different steps, for example using Spark cluster(s) for distributed computation steps and virtual machine or even better Docker containers for single machine computation steps which are highly iterative.

2. Computation graph (training pipeline). As discussed above, you need to train different models for different scenarios, as the scenario represents a model for a specific geographical location, type of products, and/or time period. The result is a computation graph or let’s call it scenario set. As a goal, you want to leverage a number of spark clusters, a number of virtual machines or Docker containers to process all scenarios in parallel. And, equally important, you want to access those compute resources in an elastic way, to be able to start or scale out them when needed and shut or scale them down afterwards.

3. Dynamic computation graph. Consider that during development, it may be more efficient to get faster feedback. E.g. while running a pipeline as part of continuous integration for a feature branch, it may be preferable to run a single scenario only rather than all possible scenarios; To approve a pull request (PR), maybe the scenario set will contain 3–5 different representative scenarios; while for making a release, the scenario set of course should contain all possible scenarios. This introduces the need to dynamically configure (input parameters) your computation graph. Another use case for such dynamic computation graph is where a task requires a significant amount of computation resource that is not required on each run, then you can have some if/else condition to skip those steps.

4. Additional operational requirements:

  • If some task in the middle failed, instead of rerun complete pipeline, you want to be able to rerun/retry certain tasks
  • You want some extensive monitoring feature like a GUI based dashboard to visualize the graph
  • Collect the execution time on each tasks for future optimization; input/output of each step, so you can re-produce the step

To summarize, the orchestrator needs to:

  • Support computation graph or DAG (Directed acyclic graph) that can be parameterized
  • Dedicate different tasks/steps to different types of computation platform, like Databricks for Spark tasks and Docker or Kubernetes for single computer task
  • Provide rich operational features, like replay, dashboard, logging, monitoring etc.
  • Last but not least, testability, easy for debugging etc. Ideally, we can run it locally at laptop.

Apache Airflow as Machine Learning Orchestrator

Apache Airflow is one of the most popular choices of IT and data pipeline orchestration. From high level, there is not that big difference between machine learning orchestration and data pipeline orchestration. So I decided to give it a try.

There are plenty of online articles summarizing the advantages of Airflow, so I will instead focus on its challenges:

  • Airflow has typical master/slave architecture. Its master components like Airflow scheduler and web server are deployed as single instance which means single point of failure.
  • Airflow supports cluster deployment, but prior to 1.10.3, the cluster options were either dask or celery. Both are popular distributed computation platforms in Python, but offering only limited support for elastic deployments, favoring a static cluster.
  • There are strong application dependency between your application and infrastructure. If you want to introduce a new python library in your application/DAG, you have to install this library on every single node of Airflow cluster. This is a challenge for large scale deployment and development.

Airflow marry with Kubernetes

Kubernetes becomes a default choice when facing scalability or dependency management challenges with infrastructure. Fortunately, Airflow introduced Kubernetes support as part of release 1.10.3. After some PoC, I am quite satisfied with it.

You can read more information about Kubernetes support at Airflow website: here. Airflow introduced Kubernetes executor alongside with dask and celery executor. As a result:

  • Each task in your DAG is scheduled as a single container in Kubernetes cluster.
  • Elasticity is solved since most major Kubernetes providers enable cluster autoscaling(up/down).
  • Since each task becomes a single container pod and you can even specify the docker image to instantiate, so you can isolate application dependencies. The task/image/container is now decoupled from the infrastructure.
  • Finally, you can also specify the docker container size per task in your DAG, so you can tailor the resources of the container for each task. Some task can take 1 CPU and 1G RAM while another would ask for 32 CPU and 8 GB RAM. This not only optimizes the resource usage but also provides the possibility to run some really CPU or memory intensive tasks.

Besides running Airflow tasks in Docker/Kubernetes, both AirFlow web server and schedulers can be ran as PoDs within Kubernetes, in this way their availability can be delegated to Kubernetes. Also you can deploy Airflow MetaDB with cloud database solution and store Airflow logs/DAGs files with cloud storage solution. All this greatly reduces maintenance efforts.

If you want to try it out, there is ready to use Helm chart at Airflow Github project , check here.

If you are looking for more production ready setup, here is another deployment example based on Azure.

  • Azure Kubernetes Service (AKS). AKS supports auto scaling by node pool. When a pod is scheduled but cluster is out of resource, a scale set will dynamically add one single VM to the node pool that automatically joins the cluster. Note that when a VM is idling for 10 minutes then it is recycled. So when there is no DAG/pipeline running, you only need single VM for Airflow web server and Airflow scheduler, additional VMs will only be added upon the DAG being triggered.
  • Azure File share (AFS). AFS can be provisioned as persistent volume in AKS to store Airflow logs and DAG files. Even better, you can mount AFS via SMB as a drive yo your local computer and modify or check DAG files or logs.
  • Azure container registry (ACR). ACR can be provisioned as container registry for AKS.
  • Azure PostgreSQL service to host Airflow MetaDB.

In addition, Airflow comes with Databricks operator, so you can define a task in your DAG which dedicate a Spark job to an external Databricks cluster, the Spark job can read source data from Azure Datalake, then dump the result back to Azure Datalake, so other python tasks in the same DAG can consume output data from Spark job easily.

Dependency management

There are different ways to define a task in Airflow, check here. For example you can define a bash command task with BashOperator, interact with different Google Cloud services with Google Cloud Operators. For Python application, the most convenient way is to usePythonOperator to execute Python callables, check detail here.

One challenge in this approach is dependency management. When upload a new DAG file, Airflow scheduler/webserver will go through entire DAG and make sure all the dependencies (import libs) are available in the local python interpreter. For example, one task defined as PythonOperatoruses pandas to read data, even if this task is using KubernetesExector and will be running in a Docker container, you still need install pandas on Airflow Scheduler/webserver. This is a problem if you want to use Airflow as a shared environment.

Is there some better way to handle it? Yes. Like other dynamic languages, you can import a function by specifying the package, module and function name, then call this function. This is thanks to importlib package in Python. Here is the sample below. In this way, you only need to install pandas in my_app/py_ml_image and keep Airflow Scheduler image as simple as possible.

See one toy example below, a DAG with two tasks both using PythonOperator . By calling actual Python function with call_func , you can hide all task specific dependencies within each task (in their Docker image). And even better, you don’t need to import each tasks into your DAG, this enables you to run the same DAG with code base from different git branches.

Optimise the resource usage

When creating a task in Airflow, you can provide extra configuration parameters specific to the selected executor. In the case of the Kubernetes executor, there are plenty of configuration parameters, let’s have a closer look.

  • You can specify the Docker image which will be used to run the task.
  • You can specify CPU and memory resource request/limit
  • You can mount persistent volume, this is a great way to transform/share data between tasks.

Other Machine Learning Orchestrator option

Besides Apache Airflow, there are other options for machine learning orchestrators, like Kubeflow which gets lots of attention in past one year. In my opinion, Kubeflow pipeline has similar capabilities as Airflow, but Kubeflow also covers other perspectives, like model management and model serving. In addition, Kubeflow has more native integration with Kubernetes by leveraging Argo Workflow.

Stay tuned for my coming blogs about Kubeflow and entire machine learning lifecycle management.

Again, if you are interested with how to continue a machine learning project after first notebook, pls take a look at the blog post in this series, here.

--

--