A New, Official Dask API for XGBoost

Jiaming Yuan
RAPIDS AI
Published in
7 min readFeb 21, 2020
Update: There's a new blog post diving into the latest features and updated included in XGBoost release 1.4. 

Training gradient boosted trees in a distributed environment has often been a challenging task. Different distributed frameworks all provide varying user interfaces, and the overhead of syncing data between worker nodes can become significant for wide datasets or deep trees. To provide an efficient distributed training solution over multiple GPUs, XGBoost relies on CUDA and Dask. This post will provide a deep dive into the updated Dask API that simplifies the process of training XGBoost models.

To use earlier versions of XGBoost with Dask, developers had to build on an external project called dask_xgboost. Starting with the RAPIDS 0.11 release, XGBoost adopted a new native Dask interface, simplifying the API and making integration easier. The strength of Dask lies in its flexibility and the seamless integration with existing Python scientific computing libraries. Dask builds on popular libraries including NumPy, pandas, and RAPIDS cuDF as its data manipulation backend. Along with its basic usage with XGBoost, we will also touch on the subject of accelerating ETL using cuDF in place of Pandas.

A Brief Introduction to Dask and XGBoost

XGBoost is a gradient boosting library with a focus on tree models. Building on top of the rabit and NCCL libraries, it includes integrated support for distributed training. During distributed training, XGBoost trains a model with a subset of data on each worker. This is enabled by syncing the histogram of gradients for each tree node across the workers. XGBoost by itself can’t perform feature engineering or allocate workers. These features require dedicated libraries and higher-level distributed computing frameworks, such as Dask.

Dask is a Python-based distributed computing framework, it provides an interface resembling popular Python scientific libraries and has integration with CUDA libraries. Dask splits up a big dataframe into multiple partitions:

These partitions are further scattered across individual workers. Typically, on a CUDA platform, each NVIDIA GPU is treated as a standalone worker. For example, if you have 2 GPUs, the above 4 partitions might be distributed into:

Figure 2. Example of Dask partitions across GPU workers

Later, these partitions in individual GPUs are fed into XGBoost’s own data container called DMatrix, which we will introduce in the next section.

Basic Usage

In XGBoost, there are 2 related interfaces for integrating with Dask, one is functional and the other is stateful, modeled after Scikit-Learn. To give you a taste for how xgboost-dask works, here is a simple example of using the functional interface:

from dask.distributed import Client
from dask_cuda import LocalCUDACluster
from dask import dataframe as dd
import xgboost as xgb
def main(client):
# We use HIGGS as the dataset for demonstration. To obtain it, download:
# https://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz
fname = 'HIGGS.csv'
colnames = ['label'] + ['feature-%02d' % i for i in range(1, 29)]
# By default dask dataframe uses pandas as data handling backend
dask_df = dd.read_csv(fname, header=None, names=colnames)
y = dask_df['label']
X = dask_df[dask_df.columns.difference(['label'])]
# DaskDMatrix acts like normal DMatrix, works as a proxy for local
# DMatrix scattered around the workers.
dtrain = xgb.dask.DaskDMatrix(client, X, y)
# Use train method from xgboost.dask instead of xgboost. This
# distributed version of train returns a dictionary containing the
# resulting booster and evaluation history obtained from
# evaluation metrics.
output = xgb.dask.train(client,
# Use GPU training algorithm
{'tree_method': 'gpu_hist'},
dtrain,
num_boost_round=100,
evals=[(dtrain, 'train')])
booster = output['booster'] # booster is the trained model
history = output['history'] # A dictionary containing evaluation results
# Save the model to file
booster.save_model('xgboost-model')
print('Training evaluation history:', history)

There are a lot of things happening here. Let’s break it down and see how Dask and XGBoost interact with each other. First, read_csv from Dask is called for loading the dataset, which creates a distributed DataFrame object with multiple partitions scattered to individual workers. This dataframe acts as a proxy to those concrete partitions, as shown in the introduction. Then we split up the data into predictor and target, using them to create a DaskDMatrix defined by XGBoost. DaskDMatrix is also a proxy that holds references to partitions of DataFrame for creating concrete DMatrix s. After these data preparation steps, XGBoost takes over and starts training using the gpu_hist algorithm. For an introduction of the GPU tree boosting algorithm, see https://devblogs.nvidia.com/gradient-boosting-decision-trees-xgboost-cuda/ .

Figure 3. Data preparation using Dask DataFrame objects to create a DaskDMatrix defined by XGBoost

The interface is modeled after a single node XGBoost training API, with one change: the evaluation result is returned as part of a dictionary instead of an output parameter. For instance, in single node training API, you will need to create an empty dictionary evaluation_result and pass it to XGBoost train function like:

evaluation_result = {}
booster = xgb.train(parameters, dtrain,
evals=[(dtrain, 'train')],
evals_result=evaluation_result)
# After training, `evaluation_result` will contain
# evaluation history And the return type is a trained model
# instead of a dictionary.

Using xgb.dask.train, XGBoost will handle all the setup for internal communication and syncing of gradient statistics between workers by using rabit and nccl:

Figure 4. XGBoost training and communication

So far we haven’t explained where the client comes from yet. Here is the remaining setup code for the example:

if __name__ == '__main__':    # `LocalCUDACluster` is used for assigning GPU to XGBoost 
# processes. Here `n_workers` represents the number of GPUs
# since we use one GPU per worker process.
with LocalCUDACluster(n_workers=2) as cluster:
with Client(cluster) as client:
main(client)

Inside this snippet, users first launch a cluster of two GPU workers on the local machine by initializing a LocalCUDACluster object. By default, if n_workers is not specified, Dask will use all GPUs on the local machine. Then the example connects it to a Client object, which is passed into XGBoost’s API. The client object acts as a handle to the launched cluster. This snippet can scale to run on virtually any machine or cluster. For example, we can test or tune this snippet on our laptop first, then launch a GPU cluster on the cloud and connect the client with it for training on a real dataset. Once connected, all the computation will be carried out on the remote GPU cluster.

Persisting and Reloading Models

Training alone is only half the story, what users really want is the final output model. You can save the trained model and load it back for prediction or continuing the previous training. In the above example we called:

booster.save_model('xgboost-model')

The booster returned by xgboost.dask.train method is the same as the one returned by xgboost.train, hence you can do anything with it just like in single-node mode. For example, you can load it back just by calling:

booster = xgb.Booster(model_file='xgboost-model')

Inference

The new Dask interface also defines 2 prediction functions. Namely predict and inplace_predict. In this post we will look into the predict function specifically. After obtaining the trained model booster, inference can be done via:

# Set to use GPU for inference.
booster.set_param({'predictor': 'gpu_predictor'})
# dtrain is the DaskDMatrix defined above.
prediction = xgb.dask.predict(client, booster, dtrain)

or equivalently:

booster.set_param({'predictor': 'gpu_predictor'})
# where X is a dask DataFrame or dask Array.
prediction = xgb.dask.predict(client, booster, X)

If you input a Dask DataFrame directly, the prediction result will be a Dask Series with “prediction” as the Series name. Unlike training, prediction doesn’t require syncing between workers, so XGBoost runs prediction on each partition or chunk in Dask input data without concatenation. This has some performance implications — if you set the partition size or chunk size of input data too small, XGBoost will have to run prediction multiple times, which can hurt performance.

Integration with cuDF

Recent versions of XGBoost have built-in support for building a DMatrix from a GPU-resident cuDF dataframe, without the need to first copy data to CPU memory. Since Dask also supports using cuDF as a dataframe backend, the integration comes naturally. Starting from the above code, you can simply switch dd.read_csv to dask_cudf.read_csv:

import dask_cudf
import xgboost as xgb
def main(client):
fname = 'HIGGS.csv'
colnames = ["label"] + ["feature-%02d" % i for i in range(1, 29)]
# Here we use `dask_cudf` instead of `dask.dataframe`.
dask_df = dask_cudf.read_csv(fname, header=None, names=colnames)
y = dask_df["label"]
X = dask_df[dask_df.columns.difference(["label"])]
dtrain = xgb.dask.DaskDMatrix(client, X, y)

From there we are able to accelerate the whole pipeline with distributed GPU computing! Easy right?

Performance

The performance of XGBoost training on the Higgs dataset with multiple GPUs is shown below. The benchmark is performed on NVIDIA DGX-1 servers with 8 V100 GPUs and 2 20-core Xeon E5–2698 v4 CPUs. Due to the cost of syncing gradients over the ethernet link, multi-node is somewhat slower than running on a single host with high-speed NVLINK connections between GPUs.

Figure 5. Training time with the Higgs dataset, with 20 threads CPU hist tree method as a baseline.

Limitations

Currently, on a single node, XGBoost’s Scikit-Learn interface performs label encoding automatically when using XGBClassifier, but the feature is not yet implemented in the Dask interface. In the future, XGBoost will provide more integration with dask-ml, so users can define customized metrics without manually syncing, and it will use more functionality from dask-ml, like label encoding. If you have a particular missing feature in mind, feel free to open an issue at https://github.com/dmlc/xgboost/issues.

Summary

Accelerating and scaling XGBoost GPU training is easy with the native support of Dask. With both libraries having integration with cuDF, we can even scale up the whole data processing pipeline. The key idea is to use dask_cudf inplace of dask dataframe, and call functions in xgboost.dask instead of xgboost.

For more examples, see

  1. https://xgboost.readthedocs.io/en/latest/tutorials/dask.html
  2. https://github.com/dmlc/xgboost/tree/master/demo/dask

Please share your experience with us!

--

--