Getting started with RAPIDS on AWS ECS using Dask Cloud Provider
ECS can run containers in two modes; the first uses Elastic Compute Cloud (EC2) instances which you must manage yourself, the second uses a serverless platform called Fargate. We will cover both methods in this article, but it is worth noting that GPUs are not available on Fargate.
Dask Cloud Provider
There are tools for launching Dask clusters on a variety of platforms from batch job schedulers like PBS and Slurm to cloud orchestration tools like Kubernetes. The newest addition to the cluster manager family is dask-cloudprovider which aims to allow people to confidently create Dask clusters on various public cloud providers without having any prior knowledge about that specific cloud platform. The first provider that has been implemented is AWS ECS and so we will use dask-cloudprovider to get our cluster up and running today.
Setting up AWS credentials
In order to launch a Dask cluster on ECS we will need some credentials which will allow us to interact with the AWS API. If you are setting up your AWS account for the first time you will need to generate yourself an access key and secret key. If someone else manages your AWS account you will need these things to be provided for you.
You can provide these tokens to dask-cloudprovider in a variety of ways, but the easiest is to set up your local environment using the AWS command line tools.
$ pip install awscli$ aws configure
You will be asked to provide your access key, secret key, default region and preferred output format. The region will be the short ID for the region and you should probably choose your nearest region to get the best performance, for me in the UK it will be
eu-west-2, but be sure to check the region table to ensure that ECS and Fargate are available in the region you select.
aws configure command will create some files in your home .
aws directory containing these credentials and dask-cloudprovider will pick these up automatically, so you don’t need to worry about them again.
To install dask-cloudprovider you will need to run the following:
$ pip install dask-cloudprovider
Let’s get started by creating the most basic of ECS clusters, a
FargateCluster. In order to launch tasks on AWS Fargate a variety of resources are required but the
FargateCluster class takes care of all of this for you. To create a default cluster you can run the following
>>> from dask_cloudprovider import FargateCluster>>> cluster = FargateCluster(n_workers=1)>>> cluster
FargateCluster(‘tcp://<public ip>:8786’, workers=1)
Instantiating this class may take a few minutes as many AWS resources are being created for you behind the scenes. The two main resources to be aware of are the Fargate tasks for the scheduler and one worker that we requested. These two tasks are the only chargeable resources that are created for you* and cost a few cents per hour. Fargate tasks are billed in per-second intervals and if you do not provide any work to the cluster for the default timeout of 5 minutes the tasks will exit automatically, so you don’t need to worry about unused resources being left running on your account and running up a bill.
Now that we have our cluster we can increase and decrease the number of workers using the
>>> cluster.scale(2) # Sets the total number of workers to 2
This call is asynchronous and workers will be added once AWS has made them available.
You can also view the dashboard by following the address provided by the
dashboard_link attribute. Fargate will automatically assign a public IP address to the scheduler task.
To use our cluster we will need to connect to it with a distributed client.
>>> from dask.distributed import Client>>> client = Client(cluster)
Now that we have connected to our cluster let’s test it out with a quick Dask array operation.
>>> import dask.array as da>>> arr = da.random.random((1000, 1000, 10000), chunks=(100, 100, 1000))>>> arr = arr.mean().persist()
Customizing your cluster
When creating the
FargateCluster above we left everything as default values except for
n_workers (which defaults to
0). However you can fully customise your deployment if you have some knowledge of AWS, see the documentation for full information on the options available.
For instance if you do not want your scheduler and dashboard to be accessible from anywhere you can provide custom security groups to lock it down to your specific IP address, you can change how long logs are retained for, you can set the CPU and memory allowances of your scheduler and workers, etc.
We want to be able to run RAPIDS on our cluster and for that we will need GPUs. Fargate does not have support for GPUs so we will need to create our own EC2 cluster with GPU instances and tell dask-cloudprovider to use that instead.
The dask-cloudprovider library also includes an
ECSCluster class which is a less opinionated version of `FargateCluster`. It is not designed to work out of the box with default values and requires some configuration, but not a huge amount.
We discuss the necessary steps below.
Creating the EC2 cluster
Let’s start by creating our EC2 instances. First you need to log into the AWS console and visit the ECS dashboard.
From here visit the “Clusters” section on the left hand side and hit “Create Cluster’.
We want to select an EC2 Linux + Networking cluster as we want to be able to specify our networking options, then click next.
Give your cluster a name, for our demo we will call it `rapids-cluster`.
We will need to change the instance type to one which supports GPUs. I’ve selected
p3.2xlarge which each come with an NVIDIA V100 GPU. Notice that this automatically changes your AMI (the operating system image AWS will use for the instance) to one with GPU drivers installed. If you want to use your own AMI be sure that it has NVIDIA Docker, the ECS container agent and up to date NVIDIA drivers, however this can be non-trivial and AWS recommend you use the default AMIs.
I’ve also increased the default disk size just in case we need a few different Docker images.
In the networking section I recommend you select the default VPC and all the subnets available in that VPC. If you are using a new AWS account this will already exist and be the only VPC available, if this account has been provided to you and has multiple VPCs please refer to your account owner.
All the other settings are fine to leave as default and you are good to hit “create”. You will see your cluster launching and once all the sections have gone green we can switch back to our Python session. The EC2 instance that will power the cluster will still be launching but that’s fine.
Creating the Dask cluster
In order to create our
dask_cloudprovider.ECSCluster object we will need the Amazon Resource Name (ARN) for the cluster we just created. The format for this will be
arn:aws:ecs:<region>:<account number>:cluster/<cluster name>. If you don’t know your account number you can query the API using the CLI tool to get the list of ECS cluster ARNs active on your account.
$ aws ecs list-clusters
"arn:aws:ecs:<region>:<account number>:cluster/<cluster name>"
Now we can create our
ECSCluster object in our Python session and provide it the ARN of the ECS cluster that we want to use. We are also going to specify that each Dask worker should be given one GPU.
Lastly we are going to specify that the scheduler should be run in Fargate mode. We are doing this to ensure the scheduler is assigned a public IP address. If you are running your Python code from an instance within the same VPC as the cluster then you do not need to do that.
>>> from dask_cloudprovider import ECSCluster>>> cluster = ECSCluster(
cluster_arn="<my cluster arn>",
Once the Dask cluster object has been created in Python we should now be able to visit the ECS cluster dashboard to see our GPU cluster running with two active tasks (the scheduler and worker).
Testing out RAPIDS
As before we need to create a distributed client for our cluster.
>>> from dask.distributed import Client>>> client = Client(cluster)
Now let’s load the Dask sample timeseries dataset, cast it to a cudf dataframe and perform a GPU accelerated operation to test that everything works nicely.
>>> import dask, cudf, dask_cudf>>> ddf = dask.datasets.timeseries()>>> gdf = ddf.map_partitions(cudf.from_pandas)>>> gdf.groupby(‘name’).id.count().compute().head()
Name: id, dtype: int64
Great! Now that you have a working RAPIDS cluster on AWS ECS you can get stuck into the awesome example notebooks to learn more.
As you created the EC2 cluster and we didn’t configure any autoscaling policies this GPU EC2 instance will just hang around forever. Unlike the Fargate tasks you will be billed for this instance hourly regardless of whether an
ECSCluster is making use of it or not. Therefore it is your responsibility to shut it down when you’re done with it.
You have two options here. You can scale the ECS cluster down to zero instances or delete the cluster all together. If you plan on using the cluster again I recommend scaling down to zero instances.
To be safe you may also want to automate this. Visit the EC2 dashboard, find the “Auto Scaling Groups” section in the left menu, select the autoscaling group that was created for your cluster, select the “Scheduled Actions” tab and then hit “Create Scheduled Action”.
Give the action a name like “shut things down in case I forget” and set the minimum and desired number of instances to zero. Give it a recurrence of “Every Day” and set the time to some time after you know you will be done working. This will save you from accidentally leaving instances running when you are done using them.
Dask Cloud Provider at Capital One
At Capital One, we use Dask to solve complex data problems when laptops or single machines are no match for the job. These problems are often challenging because the data are either too large to fit into memory on a single machine or located in a highly secured cloud environment. In either case, we need an easy way to spin up ephemeral Dask clusters to work with large datasets where they reside. In other words, we need to bring scalable compute capabilities to the data.
Institutional AWS cloud environments are usually heavily restricted compared to the rest of the world. Many features and services that AWS provides are not allowed, under evaluation, or heavily controlled. ECS is an approved and widely used service that is perfect for launching Dask clusters in our restricted environment. Fargate is not yet approved so we have to make due with ECS on EC2.
To deploy Dask on ECS with Cloud Provider, we first need an ECS cluster that is compliant with the institutional cloud governance policies. To make this easier, we use terraform to create the cluster, an autoscaling group, a route53 entry, and an ECS service for launching a Jupyter Lab task. The Jupyter Lab task is run using the default docker image specified by Dask Cloud Provider, from which the user can create Dask clusters. The ECSCluster manager class makes it very easy to configure a Dask cluster running on ECS.
We have also been using the GPU capabilities with RAPIDS. We ran into challenges relating to deploying RAPIDS on ECS nodes with GPUs using Dask Cloud Provider due to not having access to the Amazon ECS GPU-optimized AMI. These AMIs have not been vetted by our Cyber Security team. To work around this, we had to build our own, which is not a trivial task unless you know the ins and outs of ECS and NVIDIA drivers. We were able to run our tests on single node multi-gpu clusters, but were less successful on multi-node multi-gpu.
Dask Cloud Provider allows us to quickly create distributed Dask clusters on ECS in our institutional environment. It gives simple and an easy to use interface with enough low level control so that we can customize it to work with constraints.
In this post we covered how to get started with RAPIDS and Dask on AWS ECS with Dask Cloud Provider. We started with a serverless Dask cluster running on Fargate and then moved on to a GPU powered RAPIDS cluster backed by EC2.
Hopefully this gave you some insight into how straight forward it is to get up and running with RAPIDS and scale out to giant datasets.
— — — —
* If you are not eligible for the free tier you are also billed a small amount for the logs generated by these tasks which will appear as CloudWatch Logs under the billing dashboard. These logs will automatically be deleted after 30 days which means this is a one time charge.