GPU-based workloads as a part of Airflow DAGs

At Fandom, we’re working currently on A/B testing various approaches for content recommendations for our readers. One of the most popular algorithm to provide these based on users’ behavior is Collaborative Filtering, so among other strategies, we decided to give it a try.

All our data-related workloads: importing data from external providers, marketing efficiency analysis and reporting is controlled now by Airflow DAGs running workloads on Amazon EMR clusters. Fortunately, Apache Spark, one of the most famous data processing frameworks has built-in support for Collaborative Filtering, so our first choice was to implement the recommendations model with Spark.

Quickly, it turned out the Fandom scale is challenging — on weekly data with 25 million unique users and 8.4 million unique items (Fandoms articles views) it takes ages to calculate satisfying multi-iterations model with Spatk due to data shuffling between cluster nodes. What is more, the method was very cost-ineffective as a lot of new machines needed to be started. We felt like we needed a perspective change…

GPU for the rescue!

When browsing the Internet looking for Collaborative Filtering model calculation on a dataset with the comparable scale we found a great Python library: implicit by Ben Frederickson. The API is simple and just right, provided example matches our needs and — what was the most important in our case — the author is focused on optimizing the time of model building and response delivery by providing code that runs on GPU using Nvidia CUDA.

While implicit library worked great in manual proof-of-concept phrase, it has 2 drawbacks when comparing to the classic Apache Spark approach:

  • plain python script can’t read and process quickly data stored in columnar ORC format and Spark does it in the very efficient, distributed way
  • there is no “standard” way to tell Airflow “please schedule this task on some GPU-backed cluster”, while in Spark we user Hadoop YARN scheduler as a job queue.

While the first issue can be addressed easily with 3-steps Airflow DAG, we were looking at how to address the second one. Basically, we imagined the flow like this:

The in-cloud part had to meet a few requirements:

  • be cost efficient: start GPU nodes only where there are jobs to execute
  • act as scheduler: when there are multiple job requests, queue them
  • be able to access input data on S3 and upload results there as well
  • integrate as Airflow operator: if not, we could use classic BashOperator, but this would be suboptimal.

Airflow meets AWS Batch

The first idea was: “hey, somebody must have already addressed the problem”, so we went to the operators directory on Airflow’s GitHub and found there AWSBatchOperator. AWS Batch is a managed service when you create the following objects:

  • job definition: description of the job you’d like to run remotely — docker image, command, resource limits, etc.,
  • job queue: queue that forwards jobs to the computing nodes based on their requirements and priority,
  • compute environment: cluster recipe that spawns nodes when there are jobs waiting on the queue and removes them when there are no waiting tasks.

From the perspective of our needs, it looked like just the right solution! Just a bit too perfect ;-) So where is the trick?

AWS Batch is not quite into GPU…

While Amazon is doing a lot of efforts to make Elastic Container Service GPU-friendly and even supports GPU-based scheduling and out of the box support on certain instance flavors, the features are not available in AWS Batch. Actually, setting up the compute environment as a bunch of P2- or P3-flavoured nodes gives capabilities to run nvidia Docker runtime via SSH, but AWS Batch job submission API lacks this option.

There is an official AWS blog post on how to create a custom AMI image for GPU cluster but it lacks information on resource allocation or how to prepare and run custom Docker image. Also, it seems a bit outdated.

However, we were highly motivated to try this approach! The next sections describe how we overcame all these difficulties to create a working Airflow pipeline.

1. AMI image for running AWS Containers

To create a working AMI container it’s good to start with Deep Learning Base AMI. Start a new GPU node with this image (we used g3.4xlarge). Then login into the node and:

  1. Install ECS task runner: sudo yum install -y ecs-init
  2. Register Docker nvidia runtime and set it as default one:
sudo tee /etc/docker/daemon.json <<EOF
{
  "runtimes": {
    "nvidia": {
      "path": "/usr/bin/nvidia-container-runtime",
      "runtimeArgs": []
    }
  },
  "default-runtime": "nvidia"
}
EOF

3. Restart docker: sudo service docker restart

4. Verify that runtime is properly configured:

sudo docker run --rm nvidia/cuda:9.0-base nvidia-smi

5. Create an AMI image from AWS console using the official manual and create a new AWS Batch compute environment with this image. When asked for max CPUs — multiply the number of CPU cores (not GPU cores or GPU units!) of your selected EC2 flavor per desired parallelism level.

6. Create a job queue in AWS Batch, redirecting all jobs to just created compute environment.

2. Docker image with Nvidia CUDA support

When your AMI is ready, it’s time to prepare a Docker image that containers will be built from. AWS Batch can’t deploy tasks directly on AMI, that’s why this additional virtualization layer is needed here.

For implicit library mentioned before we created an image using this Dockerfile:

FROM nvidia/cuda:9.0-devel
RUN apt update && \
    apt install -y python3-pip && \
    pip3 install pip==9.0.3 pybind11 && \
    pip3 install awscli implicit nmslib pandas

Please note that the base image is devel one (not the base we used to test the AMI instance) — this one provides header files required by libraries installed via pip.

Images you want to start from AWS Batch need to reside in public DockerHub repository or on your private ECR. We decided to use a private one.

Then go into AWS Batch and create a job definition:

  • container image should match just created image,
  • job role needs to have access to S3, at least the buckets that will be used to transfer data to and from a container,
  • a command can be as simple as nvidia-smi, we’re going to override it anyway
  • a number of vCPUs and memory limit needs to be a fraction of node capacity respective to available GPU cards. For example: if you decided to use g3.16xlarge nodes (4GPUs, 64 CPUs, 488GB of RAM) and your job requires only one GPU unit, set limits: 16 for vCPUs and 120GB for memory. With this method, the AWS Batch will not allow scheduling more than 4 jobs on each node.

3. Schedulable python job

Write a python script that reads data from the local filesystem and writes results back to the filesystem as well (in our case directories are named input and output). This way allows to write integration tests easily and helps with development. The Docker image contains awscli tools so integration with S3 can live outside a script.

Finally, put a script on S3 so it can be updated without touching base image.

4. Airflow task definition

Airflow glues all the parts together. It uses AWSBatchOperator to submit a job definition.

cf_commands = [
  'aws s3 sync s3://data-bucket/input-data input/',
  'aws s3 cp s3://bin-bucket/reco_wiki_articles_cf_step2.py job.py',
  'mkdir output',
  'python3 job.py',
  'aws s3 rm --recursive s3://data-bucket/output-data',
  'aws s3 sync output/ s3://data-bucket/output-data'
]
cf_step_2 = AWSBatchOperator(
  task_id='cf_calculate',
  job_name='articles',
  job_definition='collaborative-filtering',
  job_queue='gpu-cluster-queue',
  overrides={'command': ['bash', '-c', ' && '.join(cf_commands)]},
  dag=dag
)

Airflow task takes responsibility of setting input data location, application code and requested output location. This provides some kind of flexibility we need to:

  • run multiple environments (testing, staging, production) that do not mix the data or app code
  • change the application code without the need to rebuild a Docker image.

Summary

The described method works perfectly fine in our environment, even with the final multi-level virtualization pattern (AWS Batch -> Node -> Container -> Application).

AWS Batch provides the required level of flexibility — it runs GPU nodes only when there are some jobs waiting in the queue and is not limited to Airflow as a jobs submitter. Keeping the application code outside Docker container allows us to simplify deploy pipelines to just synchronizing git repository with the S3 bucket, without the need to rebuild images.

From the maintenance perspective — AWSBatchOperator waits for the job to complete by default and benefits from Airflow capabilities like retries, notifications, and monitoring (job logs are available via AWS Cloud Watch).

It’s worth to mention that AWS Batch artifacts prepared for this pipeline (job definition, queue and compute environment) can be used also outside Airflow by data scientists that want to test new models.