Cloud Compute Orchestration with Covalent

Predicting the Ages of Crabs from Kaggle with Machine Learning on an AWS Cloud-Based Dask Cluster

Filip Boltuzic
18 min readNov 15

--

Architecture of Covalent, Dask, and AWS (Image by Author)

Introduction

Navigating a maze of platforms, clusters, and other resources is a common challenge in the realm of multi-step workflows, especially in machine learning settings. For example, preprocessing and dataset creation might require CPU compute resources, while the training and inference of machine learning models demand GPU-capable machines. The challenge multiplies with large datasets that can’t be fully loaded into memory, making the case for memory-efficient solutions like Dask. Moreover, orchestrating these intricate workflows and managing required resources can be quite demanding.

Enter Covalent, an open-source orchestration tool that enables seamless management for workflows deployed on local or AWS resources. Covalent helps you shuttle your compute tasks across various resources, including Dask clusters, multiple clouds, on-premises compute (e.g. HPC clusters), and more — all while enjoying the benefits of an abstracted management system for tracking and reproducing experiments. See Covalent’s executor plugins for the full scope of supported resources (with many more on the way).

Covalent is suitable for a wide range of users, including machine learning practitioners, data scientists, and anyone interested in running heterogenous computations from their local environment.

In this blog post, we will use an example problem to explain how to utilize a remote Dask cluster with Covalent. We will first describe how to set up, manage, and scale a Dask cluster on AWS. After this, we will use the Covalent SDK to create a workflow that runs computations on the Dask cluster in AWS, and then returns results back to a local machine for post-processing and visualization.

Covalent Highlights

Before we proceed, here’s a quick list of some applications for prospective Covalent users.

  1. Multi-cloud Experiments: Covalent allows you to run experiments across several different clouds and regions. This can be useful for circumventing availability issues across different vendors or geographic regions.
  2. Cloud Abstraction: Covalent simplifies complex cloud operations through a Pythonic SDK that uses executor objects at a task level.
  3. Asset and File Management: Covalent provides automatic file transfers, making cloud computing more accessible and hassle-free.
  4. Monitoring: Covalent offers workflow monitoring in the Covalent UI (see live demo here), providing real-time status updates, for a transparent view of your experiment workflows.
  5. Caching: Covalent securely stores all intermediate data, ensuring efficiency and reproducibility in your operations.

Prerequisites

To be able to complete the steps in this blog post, you will need to install and configure the following tools to your local machine:

You should also have a working python installation and, preferably, a python virtual environment.

Example Problem: Crab Age Prediction

To demonstrate how to run a covalent workflow with Dask, we will use the problem of crab age prediction (more details available on the Kaggle challenge). More specifically, we will adapt this gold-winning notebook to be executed using Covalent. Executing a notebook via Covalent can help offload computationally heavy processing to remote servers, and then seamlessly transfer results back to your local machine.

The aforementioned Kaggle solution uses ensembling (via Optuna) which combines several regression models. The regression models include various flavors of gradient boosting (the most popular approach to tabular data on Kaggle), bagging, feedforward neural networks, and support vector machines. For building the regression ensemble model we will be using well-known python libraries: sklearn, optuna, lightgbm, catboost, and xgboost.

Photo by Felipe Portella on Unsplash

Workflow

To create an ensemble machine learning crab prediction model, we will create a simple covalent workflow:

def workflow():
crab_dataset = load_data(path=FILEPATH, use_sample=USE_SAMPLE)
plot_data(crab_dataset)
X_train, y_train, X_test = create_ml_datasets(crab_dataset)
regression_result = kfold_and_evaluate(
X_train, y_train, X_test
)
return regression_result

First we download data from Kaggle, which consists of multiple CSV files. Then we plot the data (as defined in the plot_data function) to get a better understanding of which features are important and how the synthetically generated datasets differ from the original dataset. Then, using create_ml_datasets we transform the dataset into a numeric form suitable for machine learning algorithms to consume. Finally, we apply cross-validation, train, validate, and test regression algorithms, as well as their ensembled combination to evaluate how well we can predict the age of crabs based on their features (kfold_and_evaluate).

Covalent

Covalent is an open-source workflow orchestration tool designed for running distributed experiments over HPC clusters, quantum computers, GPU arrays, and remote compute resources services like Dask clusters.

Developing with covalent

Covalent SDK is a python framework that uses three main elements:

  1. the electron decorator,
  2. executor objects, and
  3. the lattice decorator.

These elements help users to define complex workflows in a lightweight and non-destructive manner, with minimal and non-intrusive changes to their experimental code. We will next explain how to utilize the electron, executor, and lattice elements.

Electron

The electron element converts a function to a remotely-executable task that Covalent can deploy arbitrary resources to. A decorator ct.electron is all that is needed upon a python function to become an electron. Users specify resources and constraints for each electron by passing various executor objects to electron decorators.

Executors

Covalent executors are used to define the environment in which the workflow will be executed. Each electron may be assigned a different executor, indicating that the workflow may be executed across several different machines.

import covalent as ct

AWS_DASK_PORT = 8080
# create an executor to pass to electrons below
aws_dask_executor = ct.executor.DaskExecutor(
scheduler_address=f"tcp://127.0.0.1:{AWS_DASK_PORT}"
)

AWS_GPU_DASK_PORT = 8081
# create an executor to pass to electrons below
aws_gpu_dask_executor = ct.executor.DaskExecutor(
scheduler_address=f"tcp://127.0.0.1:{AWS_GPU_DASK_PORT}"
)

# use `@ct.electron` to make electrons from individual tasks
@ct.electron(executor=aws_dask_executor)
def load_data(path, use_sample):
"""function that loads data"""
...

@ct.electron(executor=aws_gpu_dask_executor)
def kfold_and_evaluate(X_train, y_train, X_test):
"""function that runs the training/validation loop"""
...

Lattice

The lattice decorator (@ct.lattice) converts a function composed of electrons into a manageable workflow. It does this by wrapping the function in a callable Lattice object. As an example, you can create the lattice in the following way

import covalent as ct

@ct.lattice
def workflow(path):
X_train, y_train, X_test = load_data(path, use_sample=False)
return kfold_and_evaluate(X_train, y_train, X_test)

Once a workflow is complete, to run it, you need to dispatch a workflow. You can dispatch a lattice workflow using Covalent by doing the following

ct.dispatch(workflow)("your/data/path")

Once the lattice is dispatched, the electrons load_data and kfold_and_evaluate are detected as tasks and asynchronously executed. You can track your workflow visually using the Covalent UI. For a closer look at Covalent, see covalent.xyz or the Covalent Documentation.

Next, we will introduce Dask and describe how to setup a remote Dask cluster in AWS.

Dask

Dask is a python library for parallel computing. It is used to scale python across multiple machines. In Covalent, we can easily leverage Dask to execute tasks on Dask clusters.

Setting up Dask in AWS

To setup dask in AWS, we will use aws cli, eksctl, kubectl, and helm. First, we will authenticate with our AWS account by running aws configure sso. Next, we will create a kubernetes cluster (using Amazon EKS Service):

CLUSTER=dask-cluster
eksctl create cluster --name $CLUSTER --set-kubeconfig-context

By passing in --set-kubeconfig-context we are instructing eksctl to update our ~/.kube/config file with the new cluster information so that we may connect to it via kubectl. You may specify a more specific cluster configuration by providing a yaml file and running eksctl create cluster -f cluster.yaml. eksctl initiates an AWS CloudFormation job, which creates multiple resources, such as EKSCluster, VPC, EC2 security group and others.

Alternatively, you can specify the types of instances you wish your cluster to have. For example, for GPU instances, you can specify node type:

CLUSTER_GPU=dask-cluster-gpu
eksctl create cluster --name $CLUSTER_GPU --node-type=p2.xlarge --set-kubeconfig-context

Once you have created the cluster, make sure you can connect to it by running kubectl get nodes.

Next step uses helm to install Dask on the created cluster. It is extremely important to install the same version of Dask on the remote cluster as the local machine which executes Covalent.

Run the following command to determine the version of Dask and distributed installed in your python environment:

pip show covalent dask distributed | grep -B 1 "Version"

After you made note of the Dask version (for example 2023.6.1), you can install Dask on the Kubernetes cluster using helm. You can specify the Dask configuration via yaml, see an example yaml file here. Here, we will use the following yaml configuration:

# helm_dask.yaml
scheduler:
name: scheduler # Dask scheduler name.
enabled: true # Enable/disable scheduler.
image:
repository: "ghcr.io/dask/dask" # Container image repository.
tag: "2023.6.1" # Container image tag.
pullPolicy: IfNotPresent # Container image pull policy.
replicas: 1 # Number of schedulers (should always be 1).
serviceType: "ClusterIP" # Scheduler service type. Set to `LoadBalancer` to expose outside of your cluster.
servicePort: 8786 # Scheduler service internal port.
worker:
name: worker # Dask worker name.
image:
repository: "ghcr.io/dask/dask" # Container image repository.
tag: "2023.6.1" # Container image tag.
pullPolicy: IfNotPresent # Container image pull policy.
dask_worker: "dask-worker" # Dask worker command. E.g `dask-cuda-worker` for GPU worker.
replicas: 3 # Number of workers.
env:
- name: EXTRA_PIP_PACKAGES
value: catboost category-encoders covalent lightgbm optuna pandas scikit-learn scikit-lego xgboost pandas==1.5.3 --upgrade
jupyter:
name: jupyter # Jupyter name.
enabled: false # Enable/disable the bundled Jupyter notebook.

Make sure to specify the correct image tag. A list of possible tags is available in docker here. Please note that this specified Dask yaml configuration does not support GPU nodes and computation. To use GPU computation, you can use rapidsai helm charts which also come with preinstalled CUDA drivers. Rapidsai uses a different set of Docker images, so make sure the check the version on the Docker website. Note it is also possible to run both CPU and GPU instances in the same cluster. After deciding on the yaml configuration, we are ready to install Dask to the EKS cluster via helm:

helm repo add dask <https://helm.dask.org>
helm repo update
helm install -f helm_dask.yaml $CLUSTER dask/dask --wait --timeout 300s

While the installation is running, you can check the status of the Kubernetes pods by running kubectl get pods. Once the installation is complete, you can check the status of the pods by running kubectl get pods. You should see something like this:

NAME                        READY   STATUS    RESTARTS   AGE
dask-cluster-dask-scheduler-0 1/1 Running 0 2m
dask-cluster-dask-worker-0 1/1 Running 0 2m
dask-cluster-dask-worker-1 1/1 Running 0 2m
dask-cluster-dask-worker-2 1/1 Running 0 2m

If you wish to change the number of workers (scale up or down), the scheduler port, or anything else, you can update the cluster configuration by running the command below after updating helm_dask.yaml.

helm upgrade -f helm_dask.yaml $CLUSTER dask/dask --wait --timeout 300s

After the installation is complete, you can test connecting to the remote Dask cluster. First, you need to forward the port (or use externally available ports) of the Dask scheduler.

export DASK_SCHEDULER="127.0.0.1"
export DASK_SCHEDULER_UI_IP="127.0.0.1"
export DASK_SCHEDULER_PORT=8080
export DASK_SCHEDULER_UI_PORT=8081

kubectl port-forward --namespace default svc/dask-cluster-scheduler $DASK_SCHEDULER_PORT:8786 &
kubectl port-forward --namespace default svc/dask-cluster-scheduler $DASK_SCHEDULER_UI_PORT:80 &

If you wish to run a blocking process and inspect whether connections are going from localhost to the remote cluster, omit & at end of kubectl port-forward. Once you can reach your remote Dask cluster via localhost, you can execute Dask jobs as if you are running them locally.

Covalent and Dask

Now, you can test connecting to the remote Dask cluster by opening a python prompt and running the following:

from dask.distributed import Client

client = Client("tcp://localhost:8080")
client.submit(lambda x, y: x + y, 1, 5).result()

You should also verify you are able to connect to a cluster via Covalent. You can the following code to test whether you can execute covalent remotely.

import covalent as ct

dask_executor = ct.executor.DaskExecutor(
scheduler_address="tcp://127.0.0.1:8080"
)

@ct.electron(executor=dask_executor)
def my_custom_task(x, y):
return x + y

@ct.lattice
def workflow(a):
val_1 = my_custom_task(x=a, y=2)
return val_1

dispatch_id = ct.dispatch(workflow)(2)
print("Dispatch id: ", dispatch_id)
result = ct.get_result(
dispatch_id=dispatch_id, wait=True
)
print(result.result)

The dask_executor is assigned to my_custom_task, which means that code will be executed on the Dask cluster. Transferring objects and all other tasks are handled implicitly by Covalent. Dispatching workflows in Covalent works asynchronously, so if you wish to obtain the result, you can use ct.get_result and use the dispatch_id to fetch the job information and result.

Crab age prediction with Covalent

Now, since we have a Dask cluster setup in AWS, we return to our machine learning problem and demonstrate how to use Covalent and Dask to build a crab age regression model. First, we define the data loading. We will download data using the Kaggle CLI tool. The downloaded data contains two:

  1. Synthetic dataset (generated using this notebook, license Apache 2.0)
  2. Original crab dataset (more details here, license CC0: Public Domain)

The synthetic dataset serves as data augmentation to be able to construct more robust crab age prediction regression models.

Without looking at the dataset, what are your priors like? What is the average crab age and how well could a regression model predict the crab age (in months)?

def download_data_command(kaggle_type, name, kaggle_flag, filepath):
return " ".join(
[
"kaggle", kaggle_type, "download",
kaggle_flag, name, "-p", filepath,
]
)

def unzip_data_command(filename, filepath):
return " ".join(
["unzip", "-o", os.path.join(filepath, f"{filename}"), "-d", filepath]
)

def cleanup(filepath):
return " ".join(["rm", "-rf", filepath])

def execute_bash(commands):
for cmd in commands:
subprocess.run(
cmd, stdin=subprocess.DEVNULL, shell=True,
capture_output=True, check=True, text=True,
)

sythetic_dataset_download = [
download_data_command(
kaggle_type="competitions",
name="playground-series-s3e16",
kaggle_flag="-c",
filepath=FILEPATH,
),
unzip_data_command(
filename="playground-series-s3e16.zip", filepath=FILEPATH
),
]
crab_dataset_download = [
download_data_command(
kaggle_type="datasets",
name="sidhus/crab-age-prediction",
kaggle_flag="-d",
filepath=FILEPATH,
),
unzip_data_command(filename="crab-age-prediction.zip", filepath=FILEPATH),
]
pre_command = sythetic_dataset_download + crab_dataset_download
post_command = [cleanup(FILEPATH)]

@ct.electron(
deps_bash=ct.DepsBash(pre_command),
call_after=ct.DepsCall(execute_bash, args=(post_command,)),
)
def load_data(path, use_sample=False):
...

Instead of running the commands directly, we will provide the commands as dependencies to electrons (tasks) of our Covalent workflow, for which we will use DepsCall and DepsBash covalent functionalities. DepsBash allows one to execute arbitrary bash commands and DepsCall specifies which python functions to run before or/and after an electron (task). Therefore our task to load data has a dependency on downloading the data.

@dataclass
class CrabDataSet:
df_train: pd.DataFrame
df_test: pd.DataFrame
original: pd.DataFrame
num_cols: List[str]
cat_cols: List[str]
target_col: str

@ct.electron(
deps_bash=ct.DepsBash(pre_command),
call_after=ct.DepsCall(execute_bash, args=(post_command,)),
)
def load_data(path, use_sample=False):
df_train = pd.read_csv(os.path.join(path, "train.csv"), index_col=[0])
df_test = pd.read_csv(os.path.join(path, "test.csv"), index_col=[0])
original = pd.read_csv(os.path.join(path, "CrabAgePrediction.csv"))

if use_sample:
df_train = df_train.sample(n=1000)
df_test = df_test.sample(n=500)
original = original.sample(n=200)

df_train["is_generated"] = 1
df_test["is_generated"] = 1
original["is_generated"] = 0

print(f"train shape :{df_train.shape}, ", f"test shape :{df_test.shape}")
print(f"original shape :{original.shape}")

num_cols = df_test.select_dtypes(include=["float64"]).columns.tolist()
cat_cols = df_test.select_dtypes(include=["object"]).columns.tolist()

return CrabDataSet(
df_train=df_train, df_test=df_test, original=original,
num_cols=num_cols, cat_cols=cat_cols, target_col="Age",
)

We return a custom defined dataclass CrabDataSet which has the information on the dataset splits, numerical columns (num_cols), categorical columns (cat_cols) and the target variable we are trying to predict (target_col).

Next, we will plot the data to find out more about the distribution of the crab features and target variable (Age). The plots will be split based on dataset, hence there will be separate plots for training, test, and original dataset.

import matplotlib.pyplot as plt
import seaborn as sns
import os

@ct.electron
def plot_data(crab_dataset: CrabDataSet):
n_rows = (len(df_train.columns) - 1) // n_cols + 1

fig, axes = plt.subplots(
nrows=n_rows, ncols=n_cols, figsize=(18, 4*n_rows)
)
axes = axes.flatten()

for i, var_name in enumerate(df_train.columns.tolist()):
if var_name != 'is_generated':
ax = axes[i]
sns.distplot(df_train[var_name], kde=True, ax=ax, label='Train')
if var_name != target_col:
sns.distplot(df_test[var_name], kde=True, ax=ax, label='Test')
sns.distplot(original[var_name], kde=True, ax=ax, label='Original')
ax.set_title(f'{var_name} Distribution (Train vs Test)')
ax.legend()

plt.tight_layout()
plt.savefig(
os.path.join(
BASE_DIR,
"histograms.png"
)
)

Plots generated within a Covalent workflow might be either be renderred (via plt.show) or saved as images. We opt here for the latter using plt.savefig. The plotting code above displays code for plotting only the first image below. The plots created should look like the following:

Crab data feature histograms (image by author)
Train data feature correlation heatmap (image by author)
Test data feature correlation heatmap (image by author)
Original data feature correlation heatmap (image by author)
Train data feature distributions (image by author)
Test data feature distributions (image by author)
Original data feature distributions (image by author)

After plotting, we will move on to machine learning and creating regression models. Before doing so, we will prepare the data such that it is suitable for machine learning. In other words, we will obtain numerical representations of crab data.

def cat_encoder(X_train, X_test, cat_cols):
# OneHot Encoder
encoder = OneHotEncoder(cols=cat_cols)
train_encoder = encoder.fit_transform(X_train[cat_cols]).astype(int)
test_encoder = encoder.transform(X_test[cat_cols]).astype(int)
X_train = pd.concat([X_train, train_encoder], axis=1)
X_test = pd.concat([X_test, test_encoder], axis=1)
X_train.drop(cat_cols, axis=1, inplace=True)
X_test.drop(cat_cols, axis=1, inplace=True)
encoder_cols = list(train_encoder.columns)
return X_train, X_test, encoder_cols

@ct.electron(executor=aws_dask_executor)
def create_ml_datasets(crab_dataset):
train = pd.concat([crab_dataset.df_train, crab_dataset.original])
test = crab_dataset.df_test.copy()

X_train = train.drop([f"{crab_dataset.target_col}"], axis=1).reset_index(
drop=True
)
y_train = train[f"{crab_dataset.target_col}"].reset_index(drop=True)
X_test = test.reset_index(drop=True)

# category encodering
X_train, X_test, cat_cols = cat_encoder(
X_train, X_test, crab_dataset.cat_cols, encode="ohe"
)
# scaling
sc = StandardScaler()
X_train[crab_dataset.num_cols] = sc.fit_transform(
X_train[crab_dataset.num_cols]
)
X_test[crab_dataset.num_cols] = sc.transform(X_test[crab_dataset.num_cols])

# dropping column describing that the data is generated or not
drop_cols = ["is_generated"]
X_train.drop(drop_cols, axis=1, inplace=True)
X_test.drop(drop_cols, axis=1, inplace=True)

return X_train, y_train, X_test

The create_ml_datasets function constructs training and test datasets. The synthetic train and original data are merged into one training dataset, whereas the synthetic test data will be predicted on. To transform categorical features such as sex, we will use cat_encoder which can one-hot encode categorical features. In the original Kaggle solution, they opt to experiment with additional features, such as Length_to_Diameter_Ratio, Weight_to_Shell_Weight_Ratio, and Length_Minus_Height.

Now that the dataset is prepared, we are ready to start building the regression model. We define a regression model ensemble class, based on the Kaggle notebook. Several popular data science libraries, such as XGBoost and scikit-learn are used to import popular regression models. Defined regression models should then all work together towards a bagging solution.

import xgboost as xgb
from catboost import CatBoostRegressor
import lightgbm as lgb
from sklearn.neural_network import MLPRegressor
from sklearn.svm import SVR
from sklearn.ensemble import (
RandomForestRegressor,
HistGradientBoostingRegressor,
)
from sklearn.ensemble import GradientBoostingRegressor

class EnsembleRegressor:
def __init__(self, n_estimators=1000, device="cpu", random_state=0):
self.n_estimators = n_estimators
self.device = device
self.random_state = random_state
self.models = self._define_model()
self.models_name = list(self._define_model().keys())
self.len_models = len(self.models)

def _define_model(self):
# values taken directly from Kaggle notebook
xgb_params = {
"random_state": self.random_state,
}
if self.device == "gpu":
xgb_params["tree_method"] = "gpu_hist"
xgb_params["predictor"] = "gpu_predictor"

# values taken directly from Kaggle notebook
hist_params = {
"max_iter": self.n_estimators,
"random_state": self.random_state,
}

# values taken directly from Kaggle notebook
gbd_params = {
"n_estimators": self.n_estimators,
"random_state": self.random_state,
}

# values taken directly from Kaggle notebook
cat1_params = {
'iterations': self.n_estimators,
'task_type': self.device.upper(),
'random_state': self.random_state
}

# values taken directly from Kaggle notebook
lgb1_params = {
'n_estimators': self.n_estimators,
'random_state': self.random_state
}

# values taken directly from Kaggle notebook
models = {
"xgb": xgb.XGBRegressor(**xgb_params),
"hgb": HistGradientBoostingRegressor(**hist_params),
"lgb": lgb.LGBMRegressor(**lgb1_params),
"cat": CatBoostRegressor(**cat1_params),
"SVR_rbf": SVR(kernel="rbf", gamma="auto"),
"RandomForestRegressor": RandomForestRegressor(
n_estimators=self.n_estimators,
random_state=self.random_state,
n_jobs=-1,
),
"MLPRegressor": MLPRegressor(
max_iter=500,
early_stopping=True,
n_iter_no_change=10,
random_state=self.random_state,
),
"GradientBoostingRegressor": GradientBoostingRegressor(
**gbd_params
),
}
return models

Next, we define the Optuna object, which will try to find the best balance across our list of regression models. This is called an ensembling mechanism. To learn more about how you can use Optuna to do ensembling, check out this excellent Kaggle notebook.

import optuna

class OptunaWeights:
def __init__(self, random_state, n_trials=100):
self.study = None
self.weights = None
self.random_state = random_state
self.n_trials = n_trials

def _objective(self, trial, y_true, y_preds):
# Define the weights for the predictions from each model
weights = [
trial.suggest_float(f"weight{n}", 1e-15, 1)
for n in range(len(y_preds))
]

# Calculate the weighted prediction
weighted_pred = np.average(
np.array(y_preds).T, axis=1, weights=weights
)

# Calculate the score for the weighted prediction
score = mean_absolute_error(y_true, weighted_pred)
return score

def fit(self, y_true, y_preds):
optuna.logging.set_verbosity(optuna.logging.ERROR)
sampler = optuna.samplers.CmaEsSampler(seed=self.random_state)
pruner = optuna.pruners.HyperbandPruner()
self.study = optuna.create_study(
sampler=sampler,
pruner=pruner,
study_name="OptunaWeights",
direction="minimize",
)
objective_partial = partial(
self._objective, y_true=y_true, y_preds=y_preds
)
self.study.optimize(objective_partial, n_trials=self.n_trials)
self.weights = [
self.study.best_params[f"weight{n}"] for n in range(len(y_preds))
]

def predict(self, y_preds):
assert (
self.weights is not None
), "OptunaWeights error, must be fitted before predict"
weighted_pred = np.average(
np.array(y_preds).T, axis=1, weights=self.weights
)
return weighted_pred

def fit_predict(self, y_true, y_preds):
self.fit(y_true, y_preds)
return self.predict(y_preds)

def weights(self):
return self.weights

After defining the EnsembleRegressor class and Optuna class objects, we instantiate both to find the best way ensembling models can work together towards the best results. We define a training loop, where k-fold cross validation is used as a model selection mechanism over the training data. Optuna provides us a set of weights which define the contribution to the final regression score from each individual regression model.

@dataclass
class DataFold:
X_train: pd.DataFrame
X_valid: pd.DataFrame
y_train: pd.DataFrame
y_valid: pd.DataFrame

@dataclass
class RegressionResult:
ensemble_score: List[float]
weights: List[float]
algorithm_scores: Dict[str, List[float]]
model_set: Dict

def get_splits(X_train, y_train):
kf = StratifiedKFold(
n_splits=KFOLD_N_SPLITS, random_state=RANDOM_SEED,
shuffle=True,
)
splits = []
for train_index, val_index in kf.split(X_train, y_train):
splits.append((train_index, val_index))
return splits

@ct.electron
def process_fold(
datafold: DataFold, X_test, y_train
):
# Get a set of regressor models
models = EnsembleRegressor(
N_ESTIMATORS, DEVICE, RANDOM_SEED
).models

# Get the training data for this fold
X_train_, y_train_ = datafold.X_train, datafold.y_train
X_val, y_val = datafold.X_valid, datafold.y_valid

oof_preds = []
test_preds = []
post_test_preds = []
score = None

# Loop over each base model and fit it to the training data,
# evaluate on validation data, and store predictions
algorithm_scores = defaultdict(list)
for name, model in models.items():
if ("xgb" in name) or ("lgb" in name) or ("cat" in name):
early_stopping_rounds_ = (
int(EARLY_STOPPING_ROUNDS * 2)
if ("cat" not in name)
else EARLY_STOPPING_ROUNDS
)
model.set_params(early_stopping_rounds=early_stopping_rounds_)
model.fit(
X_train_,
y_train_,
eval_set=[(X_val, y_val)],
verbose=VERBOSE,
)
else:
model.fit(X_train_, y_train_)

y_val_pred = mattop_post_process(
model.predict(X_val).reshape(-1), y_val
)
test_pred = model.predict(X_test).reshape(-1)
post_test_pred = mattop_post_process(
model.predict(X_test).reshape(-1), y_train
)

score = mean_absolute_error(y_val, y_val_pred)
algorithm_scores[name].append(score)

oof_preds.append(y_val_pred)
test_preds.append(test_pred)
post_test_preds.append(post_test_pred)

# Use Optuna to find the best ensemble weights
optweights = OptunaWeights(
random_state=RANDOM_SEED, n_trials=N_TRIALS
)
y_val_pred = optweights.fit_predict(y_val.values, oof_preds)
score = mean_absolute_error(y_val, y_val_pred)

return score, optweights.weights, algorithm_scores

@ct.electron(executor=aws_dask_executor)
def kfold_and_evaluate(X_train, y_train, X_test):
ensemble_score, weights, algorithm_scores = [], [], []

for train_index, val_index in get_splits(X_train, y_train):
X_train_, X_val_ = X_train.iloc[train_index], X_train.iloc[val_index]
y_train_, y_val_ = y_train.iloc[train_index], y_train.iloc[val_index]

datafold = DataFold(
X_train_, X_val_, y_train_, y_val_
)

fold_score, fold_weights, fold_algorithm_scores = process_fold(
datafold, X_test, y_train
)
ensemble_score.append(fold_score)
weights.append(fold_weights)
algorithm_scores.append(fold_algorithm_scores)

merged_scores = defaultdict(list)
for alg, _ in algorithm_scores[0].items():
merged_scores[alg].append(
[scores[alg] for scores in algorithm_scores]
)

model_set = EnsembleRegressor(
N_ESTIMATORS, DEVICE, RANDOM_SEED
).models

return RegressionResult(
ensemble_score, weights, merged_scores, model_set
)

Notice how in the specification of the kfold_and_evaluate electron, we explicitly defined our Dask executor named aws_dask_executor. The idea is to execute the function on a remote Dask cloud, and to do so, we define our Dask executor as (based on the Dask cluster setup):

# dask cluster and executor
aws_dask_executor = ct.executor.DaskExecutor(
scheduler_address="tcp://localhost:8080"
)

Finally, we evaluate the results of created ensemble regression model. We define the evaluation as follows:

def print_evaluation(workflow_result):
# Get the results from the regression
algorithm_scores = workflow_result.result.algorithm_scores
ensemble_score = workflow_result.result.ensemble_score
weights = workflow_result.result.weights
models = workflow_result.result.model_set

# Print the mean and standard deviation of the MAE scores
print("--- Mean MAE Scores---")
for name, score in algorithm_scores.items():
mean_score = np.mean(score)
std_score = np.std(score)
print(f"{name}: {mean_score:.5f} ± {std_score:.5f}")

print()
# Calculate the mean LogLoss score of the ensemble
mean_score = np.mean(ensemble_score)
std_score = np.std(ensemble_score)
print(
f"Mean Optuna Ensemble MAE {mean_score:.5f}"
f"± {std_score:.5f}"
)

print("")
# Print the mean and standard deviation
# of the ensemble weights for each model
print("--- Optuna Weights---")
mean_weights = np.mean(weights, axis=0)
std_weights = np.std(weights, axis=0)
for name, mean_weight, std_weight in zip(
models.keys(), mean_weights, std_weights
):
print(f"{name}: {mean_weight:.5f} ± {std_weight:.5f}")

elapsed = workflow_result.end_time - workflow_result.start_time
print(
f"Workflow completed with {workflow_result.status}, " + \\
f"it took {elapsed}"
)

To put it all together we decorate the first defined workflow function as a lattice. Running the workflow and inspecting the results in the Covalent UI should look like the animation below.

Dask workflow viewed in the Covalent UI (image by author)
@ct.lattice
def workflow():
crab_dataset = load_data(path=FILEPATH, use_sample=USE_SAMPLE)
plot_data(crab_dataset)
X_train, y_train, X_test = create_ml_datasets(crab_dataset)
regression_result = kfold_and_evaluate(
X_train, y_train, X_test
)
return regression_result

Finally, we define hyperparameters and then dispatch the workflow to run everything:

# Settings
RANDOM_SEED = 42
# folder to download to
FILEPATH = "data"
USE_SAMPLE = False

KFOLD_N_SPLITS = 5

N_ESTIMATORS = 500
EARLY_STOPPING_ROUNDS = 200

N_TRIALS = 200
VERBOSE = False
DEVICE = "gpu"

# fix random seed
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

if __name__ == '__main__':
dispatch_id = ct.dispatch(workflow)()
result = ct.get_result(
dispatch_id=dispatch_id, wait=True
)
print_evaluation(result)

The results should be close to the notebook results, meaning the average mean error (Mean Optuna Ensemble MAE) should be around 1.35. This means that the average prediction of crab age will be off by less than one and a half months! From the provided Optuna Weights we can see how XGBoost was the best performing algorithm, both individually and as part of the ensemble.

--- Mean MAE Scores---
xgb: 1.34145 ± 0.00759
hgb: 1.36921 ± 0.01215
lgb: 1.36922 ± 0.01103
cat: 1.35485 ± 0.01433
SVR_rbf: 1.35944 ± 0.00965
RandomForestRegressor: 1.43984 ± 0.01175
MLPRegressor: 1.39422 ± 0.00865
GradientBoostingRegressor: 1.34979 ± 0.00621

Mean Optuna Ensemble MAE 1.34198 ± 0.00793

--- Optuna Weights---
xgb: 0.76843 ± 0.10807
hgb: 0.00330 ± 0.00286
lgb: 0.00384 ± 0.00296
cat: 0.00256 ± 0.00187
SVR_rbf: 0.00491 ± 0.00679
RandomForestRegressor: 0.00083 ± 0.00070
MLPRegressor: 0.00110 ± 0.00135
GradientBoostingRegressor: 0.11613 ± 0.21903

Runtime analysis

We compared running the Kaggle solution with and without Covalent + Dask. The execution time for the workflow without using Covalent and Dask was 110 minutes, whereas using a Covalent + remote Dask on an p3.2xlarge AWS instance took 105 minutes when setting CPU as the desired device and 50 minutes when using GPU as the device. The XGBoost algorithm is able to take advantage of the GPU speedup, hence it provided an additional boost when training on a GPU instance.

Execution time across different setups; lower is better (image by author)

Conclusion

After executing the Covalent workflow, it is easy to reproduce the same experiment, as all task results and associated metadata are cached in a database. To experiment with different algorithms, or hyperparameters, it should be a matter of running the same workflow with updated constants. To cleanup the created Dask Kubernetes cluster(s), you can run eksctl delete cluster --region=us-east-1 --name=$CLUSTER.

In this blog post we have showcased how utilizing Covalent can significantly improve how you run experiments. If your code is already designed such that it can be run as a sequence of methods (resembling functional style coding), you can easily decorate your functions as tasks (electrons) specifying the execution location via executors, connecting tasks into a workflow (lattice) and finally dispatching your workflow. If you wish to reuse steps of a dispatched workflow, we recommend using redispatching, which should save you considerable time. Finally, to help you seamlessly move between different cloud resources, we showcased how you can integrate an AWS EKS remote Dask cluster to offload heavy compute to Dask.

We hope this post has demonstrated Covalent’s potential when paired with powerful resources like Dask. As a reminder, Covalent is free and open source. We encourage you to visit the Covalent Documentation for more information and many practical tutorials.

--

--

Filip Boltuzic

Data Scientist and Machine Learning Engineer with a PhD degree in Natural Language Processing. Interested in math, linux, programming, vim.