Use cases of using Java ML libraries in conjunction with Spring, Docker, Spark, Rapids, CUDA

Alexpit
31 min readJul 27, 2022

Introduction

This article explores how to use nVidia GPUs with CUDA technology in Docker containers for distributed training of machine learning models on multiple machines. The purpose of the article is to show the option of using the Apache Spark Big Data Tool in Docker containers, together with the Rapids GPU computing accelerator on nVidia CUDA devices, using the DJL, Spark ML, XGBoost libraries, in a Spring Boot application on Java 8 (Rapids requirement), on multiple machines running Windows 10 Pro to solve the problem of training machine learning models in a distributed system. The same containers can later be used in Kubernetes.

An important condition from which all painful decisions follow is that all actions are performed in Windows 10 Pro. Further in the article, the option of using WSL2 (Linux Subsystem for Windows) will be considered, but first an important requirement is that the latest versions of Rapids do not work with video cards based on the Pascal architecture, that is, to run examples in the Windows environment, an nVidia video card based on the Turing architecture (series 1600, 2000) and above. Pascal architecture cards will work under OC Linux, it is recommended to run the examples below on Ubuntu 20.04 (for sure, Debian 10 will also work), but not higher — Rapids requirement.

Another important condition is the implementation of all examples in Java. In the world of Spark (along with Spark ML), the use case for Scala is more common. I don’t know Scala, and there is no particular desire to learn, but there is a desire to learn Spark and ML. Considering that Scala and Java are equivalent in the Spark environment, unlike the same Python, and that there are many ML libraries in Java that can be used in conjunction with Spark, and taking into account the already existing experience with it, the decision is to try to implement several examples did not keep you waiting.

The code of the article was tested on Windows 10 Pro, GeForce RTX 2060, some of the screenshots were taken during the setup of the second machine with the GeForce 1650 card. Anticipating the reader’s question, why not do everything on the same Ubuntu 20.04, I answer: a) so everyone can, you try on Windows; b) no technical possibility (Cloud is not an option — expensive GPU machines).

The target launch scheme is shown in the figure below:

That is, there are from 2 to n nodes, each of which has from 1 to m GPU devices, docker runtime with a Spark worker container, from which GPUs are available.

Hardware and software layers are described by the diagram:

Docker allows you to run many containers with different applications:

his is suitable for the tasks of distributed training of ML models in the Apache Spark infrastructure: this article shows an example of running a Standalone Apache Spark cluster with one Master node, two Worker nodes on different machines, and a Spring Boot Java 8 application using the DJL, Spark ML and XGBoost libraries in a separate container (spoiler — not everything worked, and it won’t work without a Linux machine).

Interesting is the possibility of using Embedded nVidia devices for IoT devices.

All the code below is available in the GitLab repository.

Preparing the Environment

All of the following steps are performed on Windows 10 Pro. It is important to run it on a configuration not lower than Pro, and Windows 10 Build 19043.1263 (21H1) and higher.

WSL, Docker and CUDA will be installed as part of this article.

Recommended WSL version 5.10.16.3+;

Docker 19.03+.

It is recommended to install Windows Terminal to open many terminal tabs: PowerShell, cmd, Ubuntu.

nVidia driver, CUDA
Make sure your nVidia CUDA version is at least 11.7. The driver that contains this version is version 516.40 at the time of writing.

You can check the version of the driver and CUDA by opening Powershell (it is better to open it as an administrator right away, but this is a requirement for future actions) and executing the command

nvidia-smi

WSL — Windows Subsystem for Linux
In order to use the GPU in Docker containers, you need to install software from nVidia (see below), which requires WSL2 to be installed.

If WSL is not installed on the user’s PC, then you can install it with the command below:

wsl --install

If the WSL is already installed, it’s better to upgrade to the latest version and check the Ubuntu version, it should be 2.

Reboot required. After restarting, Ubuntu for Windows will be installed in a separate window

At the end of the installation, you can check the WSL version in Powershell

wsl -l -v

If the Ubuntu version is 1, you should update it

wsl --set-version Ubuntu-20.04

Docker Desktop
Docker Desktop should be installed if not already installed. If installed, it is recommended to update.

At the time of installation on a system without Docker Desktop, my choices were presented with the following settings:

I left both checkboxes enabled. A reboot is required at the end of the installation. In the Settings you need to make sure that the checkbox “Use WSL 2 based engine” is activated.

Apply & Restart.

You can check the health of Docker with the command

docker run -d -p 5000:5000 --restart=always --name registry registry:2

A local docker registry will be installed, which will be useful in future work.

docker ps 
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
745b50d66906 registry:2 "/entrypoint.sh /etc…" 2 minutes ago Up 2 minutes 0.0.0.0:5000->5000/tcp registry

CUDA Support for WSL2
It is critical to have CUDA running in docker containers, for which nVidia has a solution.

Run the following steps in PowerShell as an administrator:

wsl
sudo -i
apt-key del 7fa2af80
wget https://developer.download.nvidia.com/compute/cuda/repos/wsl-ubuntu/x86_64/cuda-wsl-ubuntu.pin
mv cuda-wsl-ubuntu.pin /etc/apt/preferences.d/cuda-repository-pin-600
wget https://developer.download.nvidia.com/compute/cuda/11.7.0/local_installers/cuda-repo-wsl-ubuntu-11-7-local_11.7.0-1_amd64.deb
# see the output of the previosly command
cp /var/cuda-repo-wsl-ubuntu-11-7-local/cuda-B81839D3-keyring.gpg /usr/share/keyrings/
apt-get update
apt-get -y install cuda

Now you need to check the operation of the nVidia test container with the benchmark flag in a separate PowerShell window:

docker run --gpus all nvcr.io/nvidia/k8s/cuda-sample:nbody nbody -gpu -benchmark

If the output contains something similar, all steps are completed correctly and you can continue working.

If errors occur, it is recommended that you refer to the nVidia documentation pages here and here for resolution.

Preparing images and running containers

We recall that Rapids requires Java 8 and no higher to work. The next steps to prepare all the necessary Docker images, and subsequently the applications themselves, will be performed based on this requirement.

Base Image for Applications and Spark Workers

Initially, the most basic image is needed. Below is a listing of the Dockerfile.

You need to use a base image of Ubuntu 20.04 with CUDA 11.7.0 from the nVidia image repository. The available Ubuntu 22.04 image did not work due to the compatibility of all system software components required to run the application software.

FROM nvcr.io/nvidia/cuda:11.7.0-devel-ubuntu20.04

ENV LANG='en_US.UTF-8' LANGUAGE='en_US:en' LC_ALL='en_US.UTF-8'

ARG DEBIAN_FRONTEND=noninteractive

RUN apt-get update && apt install -y bash tini libc6 libpam-modules libnss3 procps nano iputils-ping net-tools

RUN apt-get update && \
apt-get install -y openjdk-8-jdk && \
apt-get install -y ant && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* && \
rm -rf /var/cache/oracle-jdk8-installer;

# Fix certificate issues
RUN apt-get update && \
apt-get install -y ca-certificates-java && \
apt-get clean && \
update-ca-certificates -f && \
rm -rf /var/lib/apt/lists/* && \
rm -rf /var/cache/oracle-jdk8-installer;

# Setup JAVA_HOME, this is useful for docker commandline
ENV JAVA_HOME /usr/lib/jvm/java-8-openjdk-amd64/
RUN export JAVA_HOME

CMD ["tail", "-f", "/dev/null"]

The JDK used in the image is openjdk8, which is Rapids compliant and has no issues with the Oracle JDK license agreement. A set of applications for debugging is also installed in the image.

The CMD instruction is optional, but useful for debugging.

It is worth noting that the base nVidia image marked “devel” is initially used — testing took place on it in order to exclude possible errors associated with insufficient components.

Also there is a more optimal image:

the difference from devel is the absence of “nvcc”.

The image builds with the command:

docker build -f Dockerfile-cuda-java8 -t localhost:5000/cuda-jdk8:v1 .

I draw your attention to the fact that on my local machine there is a container with a repository of Docker images, it is convenient for me when working with a local Kubernetes cluster to specify my images from localhost:5000 in the manifest and download them without using external repositories.

Run container command:

docker run --gpus all --name=cuda-jdk8 -it -d localhost:5000/cuda-jdk8:v1

Note: the important flag is “ — gpus” which is passed the value “all”. With this flag, all gpu resources of the local machine are available to the container.

You can check the health of the base image by running two commands in the container:

$ nvidia-smi
Sun Jul 10 13:58:20 2022
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 515.48.07 Driver Version: 516.40 CUDA Version: 11.7 |
|-------------------------------+----------------------+----------------------+
| GPU Name Persistence-M| Bus-Id Disp.A | Volatile Uncorr. ECC |
| Fan Temp Perf Pwr:Usage/Cap| Memory-Usage | GPU-Util Compute M. |
| | | MIG M. |
|===============================+======================+======================|
| 0 NVIDIA GeForce ... On | 00000000:0B:00.0 On | N/A |
| 0% 42C P8 20W / 250W | 1241MiB / 11264MiB | 4% Default |
| | | N/A |
+-------------------------------+----------------------+----------------------+
+-----------------------------------------------------------------------------+
| Processes: |
| GPU GI CI PID Type Process name GPU Memory |
| ID ID Usage |
|=============================================================================|
| No running processes found |
+-----------------------------------------------------------------------------+
$ nvcc --version
nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2022 NVIDIA Corporation
Built on Tue_May__3_18:49:52_PDT_2022
Cuda compilation tools, release 11.7, V11.7.64
Build cuda_11.7.r11.7/compiler.31294372_0

When using the “runtime” base image, nvcc — version will throw an error because nvcc is not present in the image.

If there is no similar output from any of the commands, you should return to the previous sections and check that all steps were completed correctly.

Spark Worker Image
The next step is preparing the Spark Worker image.

It should be noted here that this article deals with running a Spark cluster as a Standalone cluster, without a resource manager. The Spark Master is run in a local VM (I already had a Spark Master set up in the VM to test Spark with Cassandra for another task not covered in this article), and the Spark Worker in a Docker container is connected to it. The usefulness of this test is that:

a) the efficiency of the GPU in containers is checked;
b) for further work, an example of a Standalone cluster and a Docker image for the Kubernetes cluster remain.
It should also be noted that a Spark image can also be used to run a container with a Spark Master.

Briefly about all the ways to run Spark, both in local and standalone modes, and using Kubernetes, you can read here, and you can read about the differences between Yarn and Mesos managers here, or study the issue yourself.

Preparing

You need to download the archive with Spark from the official website. Due to compatibility issues in my hardware and software configuration, I had to use version 3.2.1, although, at the time of testing (and writing this article), version 3.3.0 is already available.

Unzip the contents to the spark directory (or use the prepared examples from the repository).

After unpacking the archive, the spark directory should look like this:

Rapid resources

Except for the rapids directory. You need to create it and upload *.jar files to it from the Rapids website. At the time of this writing, release 22.06.0 is available, which combines the two files shown in the screenshot. But at the time of testing, the latest version was 22.04.0.

At first I wanted to write that I leave this point unchanged, however, when I tested the Spring service before publishing the article on nVidia1080 Ti, I still tried using 22.06.0. It still didn’t work on the 1080 Ti, but the latest version displayed a debug message, thanks to which I learned that the new version of Rapids in conjunction with Pascal and WSL2 will not work. The reader can use any of the versions mentioned, and 22.06.0 remains in the repository with the example.

The getGpusResources.sh script is needed to detect GPU resources:

#!/usr/bin/env bash#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This script is a basic example script to get resource information about NVIDIA GPUs.
# It assumes the drivers are properly installed and the nvidia-smi command is available.
# It is not guaranteed to work on all setups so please test and customize as needed
# for your environment. It can be passed into SPARK via the config
# spark.{driver/executor}.resource.gpu.discoveryScript to allow the driver or executor to discover
# the GPUs it was allocated. It assumes you are running within an isolated container where the
# GPUs are allocated exclusively to that driver or executor.
# It outputs a JSON formatted string that is expected by the
# spark.{driver/executor}.resource.gpu.discoveryScript config.
#
# Example output: {"name": "gpu", "addresses":["0","1","2","3","4","5","6","7"]}
ADDRS=`nvidia-smi --query-gpu=index --format=csv,noheader | sed -e ':a' -e 'N' -e'$!ba' -e 's/\n/","/g'`
echo {\"name\": \"gpu\", \"addresses\":[\"$ADDRS\"]}

Datasets
Another directory is datasets. It stores *.csv and *.parquet files, which will later be used in applications as training and validating datasets. You can take it here.

Spark config files
Let’s go through all the directories in which you need to make changes.

All working configs are presented in the example repository.

conf directory
The directory contains config templates. You can enable each by copying the template to the same directory and removing the “.template” from the filename:

Thus, the spark-defaults.conf file is edited:

spark.master                     spark://192.168.5.129:7077
spark.executor.memory 2g
spark.executor.cores 4
spark.worker.resource.gpu.amount 1
spark.worker.resource.gpu.discoveryScript /opt/sparkRapidsPlugin/getGpusResources.sh
spark.executorEnv.NCCL_DEBUG INFO

spark-env.sh:

#!/usr/bin/env bash# Options for the daemons used in the standalone deploy mode
SPARK_MASTER_HOST="192.168.5.129"
SPARK_MASTER_PORT="7077"
SPARK_WORKER_OPTS="-Dspark.worker.resource.gpu.amount=1 -Dspark.worker.resource.gpu.discoveryScript=/opt/sparkRapidsPlugin/getGpusResources.sh -Dspark.rapids.memory.pinnedPool.size=2G -Dspark.executor.resource.gpu.amount=1 -Dspark.executorEnv.NCCL_DEBUG=INFO""

Docker

According to the Spark documentation, the next step should be to run the script to build the Docker images:

$ ./bin/docker-image-tool.sh -r <repo> -t my-tag build

The prepared Dockerfile needs to be changed to look like this:

# changes in docker-build command
ARG java_image_tag=11-jre-slim
FROM ${java_image_tag}ARG spark_uid=1001
ARG UID_GID=1001
ENV UID=${UID_GID}
ENV GID=${UID_GID}
ENV SPARK_RAPIDS_DIR=/opt/sparkRapidsPlugin
ENV SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_DIR}/rapids-4-spark_2.12-22.06.0.jar
# old
#ENV SPARK_CUDF_JAR=${SPARK_RAPIDS_DIR}/cudf-22.04.0-cuda11.jar
#ENV SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_DIR}/rapids-4-spark_2.12-22.04.0.jar
RUN set -ex && \
sed -i 's/http:\/\/deb.\(.*\)/https:\/\/deb.\1/g' /etc/apt/sources.list && \
apt-get update && \
ln -s /lib /lib64 && \
apt install -y bash tini libc6 libpam-modules libnss3 procps nano iputils-ping net-tools iptables sudo \
wget software-properties-common build-essential libnss3-dev zlib1g-dev libgdbm-dev libncurses5-dev \
libssl-dev libffi-dev libreadline-dev libsqlite3-dev libbz2-dev python3 && \
mkdir -p /opt/spark && \
mkdir -p /opt/spark/examples && \
mkdir -p /opt/spark/conf && \
mkdir -p /opt/spark/work-dir && \
mkdir -p /opt/sparkRapidsPlugin && \
touch /opt/spark/RELEASE && \
rm /bin/sh && \
ln -sv /bin/bash /bin/sh && \
echo "auth required pam_wheel.so use_uid" >> /etc/pam.d/su && \
chgrp root /etc/passwd && chmod ug+rw /etc/passwd
RUN apt-get install libnccl2 libnccl-dev -y --allow-change-held-packages && rm -rf /var/cache/apt/*
COPY jars /opt/spark/jars
COPY rapidsNew /opt/sparkRapidsPlugin
# old
#COPY rapids /opt/sparkRapidsPlugin
COPY bin /opt/spark/bin
COPY sbin /opt/spark/sbin
COPY conf /opt/spark/conf
COPY kubernetes/dockerfiles/spark/entrypoint.sh /opt/
COPY kubernetes/dockerfiles/spark/decom.sh /opt/
COPY kubernetes/tests /opt/spark/tests
COPY data /opt/spark/data
COPY datasets /opt/spark/
ENV SPARK_HOME /opt/sparkWORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir
RUN chmod a+x /opt/decom.sh
# USER
RUN groupadd --gid $UID appuser && useradd --uid $UID --gid appuser --shell /bin/bash --create-home appuser
RUN mkdir /var/logs && chown -R appuser:appuser /var/logs
RUN mkdir /opt/spark/logs && chown -R appuser:appuser /opt/spark/
RUN chown -R appuser:appuser /tmp
RUN ls -lah /home/appuser
RUN touch /home/appuser/.bashrc
RUN echo -e '\
export SPARK_HOME=/opt/spark\n\
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin\
' > /home/appuser/.bashrc
RUN chown -R appuser:appuser /home/appuser# Specify the User that the actual main process will run as
USER ${spark_uid}
EXPOSE 4040
EXPOSE 8081
ENTRYPOINT [ "/opt/entrypoint.sh" ]

entrypoint.sh:

#!/bin/bash

SPARK_DRIVER_BIND_ADDRESS=192.168.5.129:7077 # spark master address
NCCL_DEBUG=INFO

source ~/.bashrc

start-worker.sh spark://$SPARK_DRIVER_BIND_ADDRESS
tail -f /dev/null

SPARK_DRIVER_BIND_ADDRESS is the address of the Spark Master, in my case it is the address of the local virtual machine. To debug possible malfunctions while working with the nccl library, you should set the debug level to INFO. The start-worker.sh spark://$SPARK_DRIVER_BIND_ADDRESS command starts the worker by connecting to the master.

The kubernetes/dockerfiles/Dockerfile sources and the contents of the same directory can be found in the sample repository.

Building the image and running the container:

cd spark
docker build -f kubernetes/dockerfiles/spark/Dockerfile --build-arg java_image_tag=localhost:5000/cuda-jdk8:v1 -t localhost:5000/cuda-jdk8-spark:v1 .
docker run --memory="6g" --cpus="4" --gpus all --name=cuda-jdk8-spark -p 8081:8081 -it -d localhost:5000/cuda-jdk8-spark:v1

You should make sure that the correct version of the nccl library is installed in the image, for which you need to run into the created Spark Worker container and execute:

$ dpkg -l | grep nccl
ii libnccl-dev 2.12.12-1+cuda11.7 amd64 NVIDIA Collective Communication Library (NCCL) Development Files
ii libnccl2 2.12.12-1+cuda11.7 amd64 NVIDIA Collective Communication Library (NCCL) Runtime

at the time of writing and testing, the correct version is 2.12.12–1+cuda11.7. In lower versions, there may be a problem with starting XGBoost tasks, because nccl cannot find the network device due to the fact that it is virtual in the docker container.

We check the availability of the worker by opening its WEB GUI at localhost:8081 (according to the docker run command above):

We see that in addition to Cores and Memory, the GPU resource is available. There is one device on my local machine, and its id is indicated in the array as “0”.

We check the WEB GUI of the wizard (the address of my local virtual machine is http://192.168.5.129:8080/):

The Spark Worker running in the container appeared in the Workers list. You can move on to developing the application.

Development and launch of the application

The current section considers an example of a simple web service, which is also a Spark Driver. The application will have 3 HTTP Endpoints, each will have an example of one of the libraries available: DJL, Spark ML, XGBoost.

Application carcass

The application uses Spring Boot with the spring-boot-starter-web dependency, and the JDK used is OpenJDK 8 (keeping the Rapids requirement in mind). I am creating a new project at the moment I am writing this article, so the final result should also work for the reader, provided that the preliminary steps described above are followed.

Project structure:

The pom.xml file can be viewed in the repository, I will focus on an important point. For running XGBoost on Windows with WSL2 in a Docker container, a detailed investigation of the issue was made in the GitHub Issue.

At the moment, the version of the XGBoost library used in this example does not have a release version, so it is not available in Maven central. To load the library, you need to add a repository with SNAPSHOT versions to pom.xml:

<distributionManagement>
<repository>
<id>XGBoost4J Snapshot Repo</id>
<name>XGBoost4J Snapshot Repo</name>
<url>https://s3-us-west-2.amazonaws.com/xgboost-maven-repo/snapshot/</url>
</repository>
</distributionManagement>

There may be a problem accessing the repository. In my case, VPN solved the problem and manually uploading the xgboost4j-gpu_2.12–2.0.0-SNAPSHOT.jar and xgboost4j-spark-gpu_2.12–2.0.0-SNAPSHOT.jar files to the local m2 repository:

Also, these jars need to be uploaded to the jars directory of the project (see screenshot above). These *.jar files will be passed to the Spark Executor as dependencies to run the driver code. The list of such files is described in SparkConfiguration:

package com.mlwebservice.config;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.net.InetAddress;
import java.net.UnknownHostException;
@Configuration
public class SparkConfiguration {
@Value("${spring.application.name}")
private String appName;
@Value("${spark.masterHost}")
private String masterHost;
@Bean
public JavaSparkContext javaSparkContext() throws UnknownHostException {
String host = InetAddress.getLocalHost().getHostAddress();
SparkConf sparkConf = new SparkConf(true)
.setAppName(appName)
.setMaster("spark://" + masterHost)
.setJars(new String[]{
"service.jar",
"jars/config-1.4.1.jar",
"jars/rapids-4-spark_2.12-22.06.0.jar",
"jars/spark-nlp_2.12-3.4.1.jar",
"jars/xgboost4j-gpu_2.12-2.0.0-SNAPSHOT.jar",
"jars/xgboost4j-spark-gpu_2.12-2.0.0-SNAPSHOT.jar"})
// Spark settings
.set("spark.worker.cleanup.enabled", "true")
// executors
.set("spark.executor.cores", "4")
.set("spark.executor.memory", "2g")
.set("spark.executor.resource.gpu.amount", "1")
.set("spark.executorEnv.NCCL_DEBUG", "INFO")
.set("spark.task.resource.gpu.amount", "1")
// driver
.set("spark.ui.enabled", "true")
.set("spark.ui.port", "4040")
.set("spark.driver.host", host)
.set("spark.driver.bindAddress", host)
.set("spark.driver.blockManager.port", "45029")
.set("spark.driver.port", "33139")
.set("spark.port.maxRetries", "16")
.set("spark.driver.maxResultSize", "2g")
.set("spark.executor.heartbeatInterval", "200000")
.set("spark.network.timeout", "300000")
// rapids
.set("spark.rapids.memory.gpu.pooling.enabled", "false")
.set("spark.rapids.memory.gpu.minAllocFraction", "0.0001")
.set("spark.rapids.memory.gpu.reserve", "2")
.set("spark.rapids.sql.enabled", "true")
.set("spark.sql.adaptive.enabled", "false")
.set("spark.rapids.sql.explain", "ALL")
.set("spark.rapids.sql.hasNans", "false")
.set("spark.rapids.sql.csv.read.float.enabled", "true")
.set("spark.rapids.sql.castFloatToString.enabled", "true")
.set("spark.rapids.sql.csv.read.double.enabled", "true")
.set("spark.rapids.sql.castDoubleToString.enabled", "true")
.set("spark.rapids.sql.exec.CollectLimitExec", "true")
.set("spark.locality.wait", "0s")
.set("spark.sql.files.maxPartitionBytes", "512m")
.set("spark.plugins", "com.nvidia.spark.SQLPlugin")
.set("spark.driver.extraClassPath", "/opt/sparkRapidsPlugin/rapids-4-spark_2.12-22.06.0.jar");
return new JavaSparkContext(sparkConf);
}
@Bean
public SparkSession sparkSession(JavaSparkContext context) {
return SparkSession.builder()
.master("spark://" + masterHost)
.appName(appName)
.config(context.getConf())
.config("spark.executorEnv.NCCL_DEBUG", "INFO")
.getOrCreate();
}
}

There are a lot of Spark configuration options, you can read more about them on the Configuration — Spark 3.3.0 Documentation page.

The controller is as simple as possible, it contains three services, each service implements 1–2 methods of each library. Please note that this controller is a launch tool for the corresponding example, made for the sake of speed and the very fact that several technologies can be combined in a Spring application, and does not pretend to use the application in a productive environment. For a real application, there should be at least other HTTP verbs, message handlers, informative DTOs, asynchronous operations, message brokers for data streams, reactives, websockets and that’s it.

package com.mlwebservice.controller;import ai.djl.translate.TranslateException;
import com.mlwebservice.service.DJLService;
import com.mlwebservice.service.RapidsService;
import com.mlwebservice.service.SparkMLService;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.IOException;@RestController
@RequestMapping("/")
@RequiredArgsConstructor
public class MLController {
private final DJLService djlService;
private final SparkMLService sparkMLService;
private final RapidsService rapidsService;
@GetMapping("/djl")
public ResponseEntity<?> djl() {
try {
djlService.mlWork();
} catch (TranslateException | IOException e) {
return ResponseEntity.status(500).body(e.getMessage());
}
return ResponseEntity.ok().build();
}
@GetMapping("/forest")
public ResponseEntity<?> sparkML() {
sparkMLService.randomForestTest();
return ResponseEntity.ok().build();
}
@GetMapping("/gpu_test")
public ResponseEntity<?> rapidsGpuTest() {
rapidsService.testRapids();
return ResponseEntity.ok().build();
}
@GetMapping("/xgboost")
public ResponseEntity<?> rapidsXGBoost() {
rapidsService.xgBoost();
return ResponseEntity.ok().build();
}
}

Deep Java Library — DJL

The first library in line is DJL. This is a convenient machine learning library for the Java language, a feature of which is the Model Zoo, which allows you to get a finished model according to the described parameters from the list of available models. It is also possible to create your own model, save to disk and load for further use.

This example considers the implementation of a linear regression model. Unfortunately, in view of the architectural features of this model, it is rather difficult to parallelize the process of its training, and, probably, is solved in certain cases by certain engines, such as PyTorch. At least, I didn’t come across the parallelization of learning using the Spark linear regression model, and I couldn’t quickly come up with an implementation myself. However, there is a common example of using Spark paired with DJL to classify images using a model from the Model Zoo, such as this article.

The implementation of the linear regression model is made on the basis of articles 3.2. Linear Regression Implementation from Scratch — Dive into Deep Learning 0.1.0 documentation and 3.3. Concise Implementation of Linear Regression — Dive into Deep Learning 0.1.0 documentation and is displayed in the DJLService.

For debugging purposes, the main method of the application logs a call to several methods that make it easy to identify the incorrectness of the application configuration. With the correct configuration, a log of the form should be displayed:

2022-07-18 19:38:45.346  INFO 1 --- [           main] c.mlwebservice.MLWebServiceApplication   : Initializing DJL lib...
2022-07-18 19:38:45.349 INFO 1 --- [ main] c.mlwebservice.MLWebServiceApplication : CPU: cpu()
2022-07-18 19:38:45.349 INFO 1 --- [ main] c.mlwebservice.MLWebServiceApplication : GPU: gpu(0)
2022-07-18 19:38:45.439 INFO 1 --- [ main] c.mlwebservice.MLWebServiceApplication : CUDA available: true
2022-07-18 19:38:45.440 INFO 1 --- [ main] c.mlwebservice.MLWebServiceApplication : CUDA GPU count: 1
OpenJDK 64-Bit Server VM warning: You have loaded library /root/.djl.ai/pytorch/1.11.0-20220510-cu113-linux-x86_64/libtorch_cpu.so which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
2022-07-18 19:38:45.739 INFO 1 --- [ main] ai.djl.pytorch.engine.PtEngine : Number of inter-op threads is 8
2022-07-18 19:38:45.740 INFO 1 --- [ main] ai.djl.pytorch.engine.PtEngine : Number of intra-op threads is 8
2022-07-18 19:38:45.740 INFO 1 --- [ main] c.mlwebservice.MLWebServiceApplication : GPU count: 1
2022-07-18 19:38:45.741 INFO 1 --- [ main] c.mlwebservice.MLWebServiceApplication : Engine: PyTorch:1.11.0, capabilities: [
CUDA,
CUDNN,
OPENMP,
MKL,
MKLDNN,
]
PyTorch Library: /root/.djl.ai/pytorch/1.11.0-20220510-cu113-linux-x86_64

DJL Service code:

package com.mlwebservice.service;import ai.djl.Model;
import ai.djl.metric.Metrics;
import ai.djl.ndarray.NDArray;
import ai.djl.ndarray.NDManager;
import ai.djl.ndarray.types.Shape;
import ai.djl.nn.Block;
import ai.djl.nn.ParameterList;
import ai.djl.nn.SequentialBlock;
import ai.djl.nn.core.Linear;
import ai.djl.training.DefaultTrainingConfig;
import ai.djl.training.EasyTrain;
import ai.djl.training.Trainer;
import ai.djl.training.dataset.ArrayDataset;
import ai.djl.training.dataset.Batch;
import ai.djl.training.listener.TrainingListener;
import ai.djl.training.loss.Loss;
import ai.djl.training.optimizer.Optimizer;
import ai.djl.training.tracker.Tracker;
import ai.djl.translate.TranslateException;
import com.mlwebservice.model.DataPoints;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
@Slf4j
@Service
public class DJLService {
public void mlWork() throws TranslateException, IOException {
// Generating the Dataset
NDManager manager = NDManager.newBaseManager();
NDArray trueW = manager.create(new float[]{2, -3.4f});
float trueB = 4.2f;
DataPoints dp = DataPoints.syntheticData(manager, trueW, trueB, 1000);
NDArray features = dp.getX();
NDArray labels = dp.getY();
// Reading dataset
int batchSize = 10;
ArrayDataset dataset = loadArray(features, labels, batchSize, false);
// mini test
Batch testBatch = dataset.getData(manager).iterator().next();
NDArray X = testBatch.getData().head();
NDArray y = testBatch.getLabels().head();
log.info("X = {}", X);
log.info("y = {}", y);
testBatch.close();
// Defining the model
Model model = Model.newInstance("lin-reg");
SequentialBlock net = new SequentialBlock();
Linear linearBlock = Linear.builder().optBias(true).setUnits(1).build();
net.add(linearBlock);
model.setBlock(net); // Defining the Loss function
Loss l2loss = Loss.l2Loss();
// Defining the Optimization Algorithm
Tracker lrt = Tracker.fixed(0.03f);
Optimizer sgd = Optimizer.sgd().setLearningRateTracker(lrt).build();
// Instantiate Configuration and Trainer
DefaultTrainingConfig config = new DefaultTrainingConfig(l2loss)
.optOptimizer(sgd) // Optimizer (loss function)
.optDevices(manager.getEngine().getDevices(1)) // single GPU
// .addTrainingListeners(TrainingListener.Defaults.logging()); // Logging
.addTrainingListeners(TrainingListener.Defaults.basic()); // Without logging for increase speed
Trainer trainer = model.newTrainer(config);log.info("Trainer devices: {}", Arrays.toString(trainer.getDevices())); // Initializing Model Parameters
// First axis is batch size - won't impact parameter initialization
// Second axis is the input size
trainer.initialize(new Shape(batchSize, 2));
// Metrics
Metrics metrics = new Metrics();
trainer.setMetrics(metrics);
// Training
int numEpochs = 30;
long startTime = System.currentTimeMillis();for (int epoch = 1; epoch <= numEpochs; epoch++) {
// Iterate over dataset
for (Batch batch : trainer.iterateDataset(dataset)) {
// Update loss and evaulator
EasyTrain.trainBatch(trainer, batch);
// Update parameters
trainer.step();
batch.close();
}
// reset training and validation evaluators at end of epoch
trainer.notifyListeners(listener -> listener.onEpoch(trainer));
}
Block layer = model.getBlock();
ParameterList params = layer.getParameters();
NDArray wParam = params.valueAt(0).getArray();
NDArray bParam = params.valueAt(1).getArray();
long endTime = System.currentTimeMillis();float[] w = trueW.sub(wParam.reshape(trueW.getShape())).toFloatArray();
log.info("Error in estimating w: [{} {}]", w[0], w[1]);
log.info("Error in estimating b: {}", trueB - bParam.getFloat());
log.info("Training time: " + (endTime - startTime) + " ms"); // Save the model
Path modelDir = Paths.get("models/lin-reg");
Path savedDir = Files.createDirectories(modelDir);
model.setProperty("Epoch", Integer.toString(numEpochs)); // save epochs trained as metadata
model.save(modelDir, "lin-reg");
log.info("Model saved in " + savedDir.toAbsolutePath());
}
// Save in the file for later use
public ArrayDataset loadArray(NDArray features, NDArray labels, int batchSize, boolean shuffle) {
return new ArrayDataset.Builder()
.setData(features) // set the features
.optLabels(labels) // set the labels
.setSampling(batchSize, shuffle) // set the batch size and random sampling
.build();
}
}

DataPoints Model:

package com.mlwebservice.model;import ai.djl.ndarray.NDArray;
import ai.djl.ndarray.NDManager;
import ai.djl.ndarray.types.DataType;
import ai.djl.ndarray.types.Shape;
public class DataPoints {
private final NDArray x;
private final NDArray y;
public DataPoints(NDArray x, NDArray y) {
this.x = x;
this.y = y;
}
public NDArray getX() {
return x;
}
public NDArray getY() {
return y;
}
// Generate y = X w + b + noise
public static DataPoints syntheticData(NDManager manager, NDArray w, float b, int numExamples) {
NDArray x = manager.randomNormal(new Shape(numExamples, w.size()));
NDArray y = x.matMul(w).add(b);
// Add noise
y = y.add(manager.randomNormal(0, 0.01f, y.getShape(), DataType.FLOAT32));
return new DataPoints(x, y);
}
}

Execution result:

2022-07-18 20:29:27.461  INFO 1 --- [nio-9090-exec-1] com.mlwebservice.service.DJLService      : X = ND: (10, 2) gpu(0) float32
[[ 0.7017, -0.7652],
[ 2.495 , -0.3341],
[-2.175 , -0.452 ],
[ 1.1075, 0.8347],
[-1.8369, -0.7469],
[ 0.5647, 2.1323],
[-0.2754, 0.3807],
[ 0.2902, 1.5136],
[-0.5902, 0.6777],
[ 0.4059, -1.0304],
]
2022-07-18 20:29:27.473 INFO 1 --- [nio-9090-exec-1] com.mlwebservice.service.DJLService : y = ND: (10) gpu(0) float32
[ 8.1976, 10.324 , 1.3922, 3.5564, 3.0556, -1.9248, 2.3501, -0.361 , 0.7023, 8.4904]
2022-07-18 20:29:27.491 INFO 1 --- [nio-9090-exec-1] com.mlwebservice.service.DJLService : Trainer devices: [gpu(0)]
2022-07-18 20:29:34.665 INFO 1 --- [nio-9090-exec-1] com.mlwebservice.service.DJLService : Error in estimating w: [-4.7445297E-5 -1.2493134E-4]
2022-07-18 20:29:34.670 INFO 1 --- [nio-9090-exec-1] com.mlwebservice.service.DJLService : Error in estimating b: 1.9073486E-4
2022-07-18 20:29:34.670 INFO 1 --- [nio-9090-exec-1] com.mlwebservice.service.DJLService : Training time: 7112 ms
2022-07-18 20:29:34.676 INFO 1 --- [nio-9090-exec-1] com.mlwebservice.service.DJLService : Model saved in /usr/src/app/models/lin-reg

Spark ML

There is excellent documentation for beginners from nVidia on working with Spark ML using the Random Forest model as an example. Given the specifics of this model, the learning process can be parallelized into several performers, and then either the average value can be used in the case of solving regression problems, or voting by the majority in the case of solving classification problems. You can read more here in the Spark documentation, and you can also see Spark ML code examples in the documentation.

In this example, you will need datasets for training and validation, you can take it from here, or use the repository code. I note that this section does not completely rewrite the example from the nVidia article on Spark ML, but rather is an implementation of the task from the nVidia article on XGBoost, but using Random Forest from Spark ML. Datasets are copied into the Dockerfile scripts, and the paths to them are hardcoded in the service (you can afford an example).

Note: The Spark ML article says that only XGBoost supports GPU acceleration in Spark ML. It may well be that the documentation is outdated (as they wrote in one of the Issue on GitHub) and at the moment, since the Rapids documentation indicates a repository with at least one more example for the Principal component analysis (PCA) algorithm.

Service code:

package com.mlwebservice.service;import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.evaluation.RegressionEvaluator;
import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.ml.feature.VectorAssembler;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.regression.RandomForestRegressor;
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.springframework.stereotype.Service;
import static org.apache.spark.sql.functions.col;@Slf4j
@Service
@RequiredArgsConstructor
public class SparkMLService {
private final SparkSession session;
public void randomForestTest() {
String trainPath = "/opt/spark/train/train.parquet";
//test
String evalPath = "/opt/spark/eval/eval.parquet";
Dataset<Row> tdf = session.read().parquet(trainPath);
Dataset<Row> edf = session.read().parquet(evalPath);
String labelName = "fare_amount";
String[] featureColumns = {"passenger_count", "trip_distance", "pickup_longitude", "pickup_latitude", "rate_code",
"dropoff_longitude", "dropoff_latitude", "hour", "day_of_week", "is_weekend", "h_distance"};
VectorAssembler assembler = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("rawfeatures");
StandardScaler standardScaler = new StandardScaler()
.setInputCol("rawfeatures")
.setOutputCol("features")
.setWithStd(true);
RandomForestRegressor regressor = new RandomForestRegressor()
.setLabelCol(labelName)
.setFeaturesCol("features");
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[]{assembler, standardScaler, regressor});ParamMap[] paramGrid = new ParamGridBuilder()
.addGrid(regressor.maxBins(), new int[]{100, 200})
.addGrid(regressor.maxDepth(), new int[]{2, 7, 10})
.addGrid(regressor.numTrees(), new int[]{5, 20})
.build();
RegressionEvaluator evaluator = new RegressionEvaluator()
.setLabelCol(labelName)
.setPredictionCol("prediction")
.setMetricName("rmse");
CrossValidator crossvalidator = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3);
CrossValidatorModel pipelineModel = crossvalidator.fit(tdf);ParamMap[] bestEstimatorParamMap = pipelineModel.getEstimatorParamMaps();log.info("best params map {}", bestEstimatorParamMap);Dataset<Row> predictions = pipelineModel.transform(edf);
Dataset<Row> result = predictions.withColumn("error", col("prediction").minus(col(labelName)));
result.select(labelName, "prediction", "error").show();
result.describe(labelName, "prediction", "error").show();
RegressionEvaluator maevaluator = new RegressionEvaluator()
.setLabelCol(labelName)
.setMetricName("mae");
log.info("mae evaluation: {}", maevaluator.evaluate(predictions));
RegressionEvaluator rmsevaluator = new RegressionEvaluator()
.setLabelCol(labelName)
.setMetricName("rmse");
log.info("rmse evaluation: {}", rmsevaluator.evaluate(predictions));
}
}

Rapids and XGBoost

The final example is an implementation of the example from the nVidia article on XGBoost, which uses both Spark and Rapids together. This example is the most interesting, as it provides really better computational speed compared to Spark ML Random Forest.

In addition, the Rapids documentation considers the Join operation of two dataframes of 10 million numbers as the first example. This example is also implemented in the test method of the RapidsService service:

@Slf4j
@Service
@RequiredArgsConstructor
public class RapidsService {
private final SparkSession session;
public void testRapids() {
int capacity = 1000000;
List<LongValue> list = new ArrayList<>(capacity);
for (long i = 1; i < (capacity + 1); i++) {
list.add(new LongValue(i));
}
Dataset<Row> df = session.createDataFrame(list, LongValue.class);
Dataset<Row> df2 = session.createDataFrame(list, LongValue.class);
long result = df.select(col("value").as("a"))
.join(df2.select(col("value").as("b")), col("a").equalTo(col("b"))).count();
log.info("count result {}", result);
}
}
@Data
@AllArgsConstructor
public class LongValue implements Serializable {
private static final long serialVersionUID = 1L;
private Long value;
}

The example is slightly different from its Scala source, but also provides GPU computation. DAG is shown in the screenshot below:

As for XGBoost, the example is taken from an nVidia article, the datasets are the same as for Random Forest Spark ML, you can read about XGBoost itself here.

Implementation of XGBoost regressor:

package com.mlwebservice.service;import com.mlwebservice.model.LongValue;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import ml.dmlc.xgboost4j.scala.spark.XGBoostRegressionModel;
import ml.dmlc.xgboost4j.scala.spark.XGBoostRegressor;
import org.apache.spark.ml.PredictionModel;
import org.apache.spark.ml.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.springframework.stereotype.Service;
import scala.collection.immutable.HashMap;
import scala.collection.immutable.Map;
import java.util.ArrayList;
import java.util.List;
import static org.apache.spark.sql.functions.col;@Slf4j
@Service
@RequiredArgsConstructor
public class RapidsService {
private final SparkSession session;
public void xgBoost() {
String trainPath = "/opt/spark/train/train.parquet";
//test
String evalPath = "/opt/spark/eval/eval.parquet";
Dataset<Row> tdf = session.read().parquet(trainPath);
Dataset<Row> edf = session.read().parquet(evalPath);
String labelName = "fare_amount";
String[] featureColumns = {"passenger_count", "trip_distance", "pickup_longitude", "pickup_latitude", "rate_code",
"dropoff_longitude", "dropoff_latitude", "hour", "day_of_week", "is_weekend", "h_distance"};
Map<String, Object> map = new HashMap<>();
map = map.updated("learning_rate", 0.05);
map = map.updated("max_depth", 8);
map = map.updated("subsample", 0.8);
map = map.updated("gamma", 1);
map = map.updated("num_round", 500);
map = map.updated("tree_method", "gpu_hist");
map = map.updated("num_workers", 1);
XGBoostRegressor regressor = new XGBoostRegressor(map);
regressor.setLabelCol(labelName);
regressor.setFeaturesCol(featureColumns);
PredictionModel<Vector, XGBoostRegressionModel> model = regressor.fit(tdf);
Dataset<Row> predictions = model.transform(edf);
Dataset<Row> result = predictions.withColumn("error", col("prediction").minus(col(labelName)));
result.select(labelName, "prediction", "error").show();
result.describe(labelName, "prediction", "error").show();
}
}

Launch application

It’s time to launch the application. To do this, you need to build a Docker image using the Dockerfile:

#FROM adoptopenjdk/openjdk8:ubuntu-jre-nightly
FROM localhost:5000/cuda-jdk8:v1
WORKDIR /usr/src/app
ARG JAR_FILE
ARG UID_GID=1001
ENV UID=${UID_GID}
ENV GID=${UID_GID}
RUN mkdir -p jars
COPY jars jars
ENV PYTHONUNBUFFERED=1RUN apt-get update && apt install -y python-is-python3 wget curl ca-certificates bash libgomp1 && \
rm -rf /var/cache/apt/*
RUN mkdir -p /opt/spark/
COPY spark /opt/spark
COPY ${JAR_FILE} service.jarRUN groupadd --gid $UID appuser && useradd --uid $UID --gid appuser --shell /bin/bash --create-home appuser
RUN chown -R appuser:appuser /home/appuser && chown -R appuser:appuser /usr/src/app
EXPOSE 4040
EXPOSE 9090
USER $UID
CMD ["java", "-jar", "service.jar"]

Pay attention to the first and second lines. If you want to run your application with DJL library logic, you need to use the base image you created earlier for Spark. It contains the necessary system software to work with nVidia graphics cards. Given that the engine specified in the dependencies is downloaded at startup (PyTorch, MXNet, etc), you need an Internet connection and a little more time to start the service. There is an option to connect the volume to the container once and add several COPY directives to the Dockerfile after the first launch, so as not to download the necessary files from the Internet, but immediately copy them to the image.

In case DJL is not used in the service, it makes sense to use a “heavy” base image, and you can use the “lighter” image that JRE8 contains, such as the one commented out on the first line.

The launch commands are moved to the build.sh script:

#!/bin/bash
mvn clean install -DskipTests=true
docker rmi localhost:5000/ml:1
docker build -f Dockerfile --build-arg JAR_FILE=target/service.jar -t localhost:5000/ml:1 .
docker run --gpus all -p 9090:9090 -p 4040:4040 -p 33139-33155:33139-33155 -p 45029-45045:45029-45045 --name=ML -it -d localhost:5000/ml:1

After a while, the container should start, the engines and everything necessary for DJL should be initialized, and the service should appear in the list of running applications in the Web UI Spark Master:

The Web UI Spark Worker should have an Executor for the specified application:

The Web UI of the service should also become available:

According to the controller, there are 4 GET methods available that run the required example:

http://localhost:9090/djl

http://localhost:9090/forest

http://localhost:9090/gpu_test

http://localhost:9090/xgboost

The DJL execution result log is presented in the corresponding section above, it is not very interesting than Spark Jobs.

At the first start of the logic that should be executed on Spark, the method takes a little longer than on subsequent runs — there is a need for some “warm-up” of the executor. To do this, you can run the gpu_test method.

In the details of the job, we see that it took a little more than 8 seconds, which, indeed, is quite a long time. Subsequent calls to this method were performed twice as fast (except for the second one, vmmem let it down at the moment, allocating 25GB of RAM for WSL):

Execution result:

2022-07-19 14:05:13.856  INFO 1 --- [nio-9090-exec-4] o.a.spark.ml.tree.impl.RandomForest      :   init: 1.62514E-4
total: 3.192210057
findBestSplits: 3.17661902
chooseSplits: 3.166410779

2022-07-19 14:05:13.864 INFO 1 --- [nio-9090-exec-4] com.mlwebservice.service.SparkMLService : best params map {
rfr_dc03cc8c5712-maxBins: 100,
rfr_dc03cc8c5712-maxDepth: 2,
rfr_dc03cc8c5712-numTrees: 5
}
+------------------+------------------+--------------------+
| fare_amount| prediction| error|
+------------------+------------------+--------------------+
| 11.4|12.422369509009028| 1.0223695090090281|
| 7.4| 7.289954038707909|-0.11004596129209165|
| 5.0| 4.601351052403492| -0.3986489475965076|
| 8.5| 8.773609129887804| 0.27360912988780406|
| 7.4| 7.351427584678662|-0.04857241532133827|
| 3.8| 4.509977888929194| 0.7099778889291946|
| 5.4|6.1300686499042305| 0.7300686499042301|
| 7.4| 5.310782694363023| -2.0892173056369776|
| 5.3| 6.281121521712063| 0.9811215217120628|
| 4.1| 4.320442646467865| 0.22044264646786527|
| 4.2| 4.358399833924078| 0.15839983392407753|
| 23.0| 21.84539235607258| -1.1546076439274202|
| 6.2| 4.800643228448342| -1.3993567715516582|
| 12.6|13.513431604134931| 0.9134316041349315|
| 7.8| 7.289324492912175| -0.510675507087825|
| 11.0| 12.14859211003076| 1.1485921100307603|
| 24.2| 19.82343367802233| -4.37656632197767|
| 10.6| 9.87204611828728| -0.72795388171272|
| 18.6|19.290663393934967| 0.6906633939349653|
|11.800000000000002|12.322340133504676| 0.5223401335046738|
+------------------+------------------+--------------------+
only showing top 20 rows
+-------+-----------------+------------------+--------------------+
|summary| fare_amount| prediction| error|
+-------+-----------------+------------------+--------------------+
| count| 3000| 3000| 3000|
| mean|9.536166666666665| 9.535967764479922|-1.98902186749770...|
| stddev|6.952558857268078|6.4554477337337675| 1.9208959387344227|
| min| 2.5|3.9593080769885773| -69.80275612138105|
| max| 110.0|53.803333333333356| 12.055956289978678|
+-------+-----------------+------------------+--------------------+
mae evaluation: 0.8626064049871519
rmse evaluation: 1.9205757730272761

Random forest did a lot of Spark Jobs that ran from 14:04:02 to 14:05:15 (73 seconds).

XGBoost on the same dataset ran within 433–436 Spark Jobs, which took ~16 seconds.

Execution result:

+------------------+------------------+--------------------+
| fare_amount| prediction| error|
+------------------+------------------+--------------------+
| 11.4|11.298457145690918|-0.10154285430908239|
| 7.4| 7.516303539276123| 0.11630353927612269|
| 5.0| 5.16908597946167| 0.16908597946166992|
| 8.5| 9.045893669128418| 0.545893669128418|
| 7.4| 7.355461597442627| -0.0445384025573734|
| 3.8| 4.012299060821533| 0.21229906082153338|
| 5.4| 5.95053768157959| 0.5505376815795895|
| 7.4| 5.841796875| -1.5582031250000004|
| 5.3| 6.106812000274658| 0.8068120002746584|
| 4.1| 4.191019058227539| 0.09101905822753942|
| 4.2|3.9211881160736084| -0.2788118839263918|
| 23.0| 22.72040557861328|-0.27959442138671875|
| 6.2| 4.528580665588379| -1.6714193344116213|
| 12.6| 13.0178804397583| 0.41788043975830114|
| 7.8| 7.767493724822998|-0.03250627517700...|
| 11.0|11.349909782409668| 0.34990978240966797|
| 24.2| 23.78424072265625| -0.4157592773437493|
| 10.6|10.418869972229004|-0.18113002777099574|
| 18.6| 19.02918243408203| 0.42918243408202983|
|11.800000000000002|11.934724807739258| 0.13472480773925533|
+------------------+------------------+--------------------+
+-------+-----------------+------------------+-------------------+
|summary| fare_amount| prediction| error|
+-------+-----------------+------------------+-------------------+
| count| 3000| 3000| 3000|
| mean|9.536166666666665| 9.538236152251562|0.00206948558489451|
| stddev|6.952558857268078|6.8646934667359885| 0.6205967386209823|
| min| 2.5|1.9244213104248047| -4.911700439453128|
| max| 110.0|106.85425567626953| 2.949781894683838|
+-------+-----------------+------------------+-------------------+

It is in this example that we can see that XGBoost did it faster and better, judging by the error values.

But not everything is so good

Expanding the spoiler from the first part of the article, not everything really worked. On two machines with Docker Desktop, it was not possible to start the target scheme due to the inability to synchronize two containers of different machines with each other. Network=host does not give the desired result, routes and nginx proxy too, also configured iptables in containers — without success.

The problem can be solved using Docker Swarm, but the thing is that for the cluster to work correctly, you still need at least one machine with Linux operating system acting as a master. Naturally, I tried to make a scheme with the launch of the master on the virtual machine, I registered routes and directed traffic from the second physical node to a certain port of the first one, and on the first node I registered the route from this port to the virtual machine, but I ran into the problem of receiving response packets from the master, and several other problems.

You could also try to roll out Kubernetes, but I decided to stop there, because:

a) Standalone Spark cluster in containers — in fact, nonsense and a priori overhead, since the essence of the Standalone cluster is that it can be used on a small number of nodes and for constant load. In this case, Docker is not needed, and it is better to install on a clean OS;

b) If Kubernetes, then you need to understand that it is needed for floating loads, to optimize the use of computing resources, and it is better to use the Kubernetes Operator — there is no experience in this yet, and this is probably the topic of a future article;

c) “That’s it, stop, it remains only to roll out the kuber on Windows, stop suffering garbage” — I heard in my head, and I stopped :)

However, the result made me happy anyway — the work of XGBoost got up at the step of synchronizing the datasets for the subsequent issuance of the result, which I managed to capture on the screenshot.

Outcome

I believe that the purpose of this article has been achieved. All three libraries turned out to be functional, the service is written in Java, launched as a Spring Web Service, tasks on the GPU are executed in Docker containers.

What’s next and what can be improved? Naturally, there are several areas of work:

  1. Tuning Spark. At a minimum, it would be nice to connect Kryo serializer. While working with Rapids 22.06.0, it never worked for me. In addition to Kryo, there are many configuration options for Spark itself, which all together have a rather large impact on performance.
  2. Running a Spark Standalone cluster on bare metal and native Ubuntu 20.04.
  3. Running a service in Kubernetes paired with Spark Kubernetes Operator. Probably, the launch guide and the results will be the topic of a separate article.
  4. Further R&D in ML and Spark.

Links and sources

Common interesting articles

Accelerating Spark 3.0 and XGBoost End-to-End Training and Hyperparameter Tuning

Accelerating Deep Learning on the JVM with Apache Spark and NVIDIA GPUs

How Amazon retail systems run machine learning predictions with Apache Spark using Deep Java Library

How Netflix uses Deep Java Library (DJL) for distributed deep learning inference in real-time

Adopting machine learning in your microservices with DJL (Deep Java Library) and Spring Boot

Getting Started with RAPIDS Accelerator with on premise cluster or local mode

Accelerating Apache Spark 3.0 with GPUs and RAPIDS

Leverage deep learning in Scala with GPU on Spark 3.0

Accelerating Deep Learning on the JVM with Apache Spark and NVIDIA GPUs

nVidia documentation

nVidia docker containers documentation

CUDA on WSL User Guide

How to install CUDA Toolkit on Ubuntu 18.04 LTS — Performatune

WSL 2 GPU Support for Docker Desktop on NVIDIA GPUs — Docker

nVidia Docker images

nVidia Rapids documentation

Get Started — RAPIDS Docs

On-Prem

On-Prem — Example Join Operation

nVidia ML documentation

Predictive Analytics Tutorial with Spark ML | NVIDIA

What’s New in Deep Learning & Artificial Intelligence from NVIDIA

Spark ML library documentation:

Classification and regression — Spark 3.3.0 Documentation

Ensembles — RDD-based API — Spark 3.3.0 Documentation

DJL

Main — Deep Java Library

Examples

Troubleshooting — Deep Java Library

Deep Learning with Spark in Deep Java Library in 10 minutes

Deep Java Library(DJL) — a Deep Learning Toolkit for Java Developers

5.5. GPUs — Dive into Deep Learning 0.1.0 documentation

DJL dependency management — Deep Java Library

3.2. Linear Regression Implementation from Scratch — Dive into Deep Learning 0.1.0 documentation

3.3. Concise Implementation of Linear Regression

XGBoost Java library

GitHub — dmlc/xgboost: Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library, for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow

xgboost/jvm-packages/xgboost4j-example at master · dmlc/xgboost

xgboost/SparkMLlibPipeline.scala at master · dmlc/xgboost

A Full Integration of XGBoost and Apache Spark

XGBoost4J-Spark-GPU Tutorial (version 1.6.1+) — xgboost 2.0.0-dev documentation

XGBoost4J-Spark-GPU Tutorial (version 1.6.1+) — xgboost 1.6.1 documentation

xgboost4j_2.12 1.6.1 API

spark-rapids-examples/kubernetes-scala.md at branch-22.06 · NVIDIA/spark-rapids-examples

spark-rapids-examples/Taxi.scala at branch-22.06 · NVIDIA/spark-rapids-examples

For debugging

How to extract best parameters from a CrossValidatorModel

Use shared library that uses glibc on AlpineLinux

--

--