Production Deployment of Random Forest Price Prediction ML Pipeline with Spark Kubernetes Operator and MinIO Object Storage

MLOps

(λx.x)eranga
Effectz.AI
14 min readAug 6, 2023

--

Background

In this blog post, we will explore the process of running production-ready tea price prediction AI model using Apache Spark. The price prediction model, built with Random Forest Algorithm using Apache Spark Scala library. We have trained the model on a Spark cluster deployed through the Spark Kubernetes Operator on Helm. The Spark job will reads data from the MinIO object storage S3 API and save the generated models back to the S3 API. Further, we have explored various optimizations and feature engineering techniques that have been implemented to enhance the training process and model accuracy.

To delve into the details, we provide the source codes and deployment information in a GitLab repository. So, feel free to clone the repository and and walk through the steps of building, deploying and optimizing the tea price prediction model with Spark.

Architecture

The end-to-end architecture of the system is depicted in the figure below. As mentioned about we have used Random Forest algorithm through the Apache Spark ML library, for building the tea price prediction model. The CSV records containing the tea price data are stored in the MinIO object storage, which provides access through its S3 API. The Spark job reads this data from the MinIO object storage S3 API. We run the Spark job 3 node Spark cluster, which powered by the Spark Kubernetes Operator. The operator itself is deployed and managed through Helm, a widely adopted package manager for Kubernetes. As the Spark job completes the training process, the resulting trained model is saved back into the MinIO object storage, ensuring easy access and future utilization. This system utilizes Apache Spark, MinIO, Kubernetes (through the Spark Kubernetes Operator), and Helm to create an efficient and scalable Spark-ML pipeline for tea price prediction. The architecture seamlessly integrates these technologies to ensure a smooth and reliable workflow. The main steps involved in building and deploying the Spark-ML pipeline are as follows.

1. Setup Kubernetes Cluster

We have established a 3-node Kubernetes cluster on AWS EC2 instances, with each node equipped with 4 VCPUs and 16 GB of memory. The cluster comprises one control-plane node responsible for managing cluster-wide operations and two worker nodes handling the execution of tasks and workloads. Following is the cluster setup.

❯❯ kubectl get nodes
NAME STATUS ROLES AGE VERSION
ip-172-31-1-30 Ready control-plane 57d v1.27.2
ip-172-31-4-133 Ready <none> 54d v1.27.2
ip-172-31-4-171 Ready <none> 54d v1.27.2

2. Setup Spark Kubernetes Operator

After setting up the Kubernetes cluster, we proceeded to deploy the Spark Kubernetes Operator using the Helm chart. The Spark Kubernetes Operator offers an official Helm chart that simplifies the installation and configuration process. Following is the way to install helm chart.

# install helm with asdf
❯❯ asdf plugin add helm
❯❯ asdf list all helm
❯❯ asdf install helm 3.12.0
❯❯ asdf global helm 3.12.0

# add spark-operator helm repo
❯❯ helm repo add spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator

# list helm repos
❯❯ helm repo list
NAME URL
spark-operator https://googlecloudplatform.github.io/spark-on-k8s-operator

# install kubernets operator on custom namespace named spark-operator
❯❯ helm install tea3 spark-operator/spark-operator --namespace spark-operator --version "1.1.26" --create-namespac

# view installed operator
❯❯ helm list -n spark-operator
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION
tea3 spark-operator 1 2023-07-10 03:52:55.964156359 +0000 UTC deployed spark-operator-1.1.26 v1beta2-1.3.8-3.1.1

# uninstall operator
❯❯ helm uninstall tea3 -n spark-operator

3. Setup MinIO Object Storage

MinIO is a robust and scalable distributed object storage server developed in Go(Golang). It serves as an excellent solution for storing vast amounts of unstructured data, such as photos, videos, container/VM images, log files, archives, and more. One of the key advantages of MinIO is its compatibility with Amazon S3, allowing it to use the same API interface. Consequently, applications designed to interact with Amazon S3 can seamlessly integrate with MinIO, making it a versatile choice for data storage.

An essential aspect that sets MinIO apart from Amazon S3 is its ability to be deployed in a private cloud environment, managed by system administrators. This autonomy provides greater control and security for organizations that wish to maintain their own object storage infrastructure. In our specific use case, we employed MinIO as the object storage solution to store the tea price dataset in CSV format, along with the trained models. To facilitate the setup of MinIO, we utilized Docker Compose, which streamlines the process of creating and managing the MinIO service.

version: '3'

services:
minio:
image: quay.io/minio/minio
ports:
- 9000:9000
- 9090:9090
environment:
- MINIO_ROOT_USER=rahasak_minio_user
- MINIO_ROOT_PASSWORD=rahasak_minio_password
command: server /data --console-address ":9090"
# deploy minio
❯❯ docker-compose up -d minio

# minio docker
❯❯ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
412d03727b3d quay.io/minio/minio "/usr/bin/docker-ent…" 8 weeks ago Up 3 weeks 0.0.0.0:9000->9000/tcp, :::9000->9000/tcp, 0.0.0.0:9090->9090/tcp, :::9090->9090/tcp minio-minio-1

Once MinIO is deployed, we can access its web UI using the <host>:9090 port. Within the web interface, we have created a storage bucket, in this case, named rahasak and then proceed to upload the tea price data CSV file, which we've named tea3.csv. With the CSV file successfully stored in the rahasak bucket, the Spark job can now easily read the data using the MinIO S3 API. The input path for the Spark job to access the CSV data would be specified as s3a://rahasak/tea3.csv.

4. Random Forest Model Implementation

The tea price prediction Random Forest model is implemented using the Apache Spark-ML Scala library. Below is the build.sbt file of the Scala project, which specifies the required dependencies for Spark and Hadoop to interact with the MinIO object storage S3 API. Ensuring compatibility between the versions of Spark and Hadoop is crucial to avoid runtime errors while running the Spark job on our Kubernetes-based Spark cluster.

name := "tea3"

version := "1.0"

scalaVersion := "2.12.10"

libraryDependencies ++= {
Seq(
"org.apache.spark" %% "spark-core" % "3.0.0",
"org.apache.spark" %% "spark-sql" % "3.0.0",
"org.apache.spark" %% "spark-mllib" % "3.0.0",
"org.slf4j" % "slf4j-api" % "1.7.5",
"ch.qos.logback" % "logback-classic" % "1.0.9",
"org.apache.hadoop" % "hadoop-client" % "2.7.4",
"org.apache.hadoop" % "hadoop-aws" % "2.7.4",
"com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.10.0",
)
}

excludeDependencies ++= Seq(
ExclusionRule("org.apache.avro","avro-tools")
)

assemblyMergeStrategy in assembly := {
case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
case PathList("META-INF", xs@_*) => MergeStrategy.discard
case "application.conf" => MergeStrategy.concat
case x => MergeStrategy.first
}

Below is the structure of the CSV dataset that served as the foundation for building the Random Forest model. Please note that, we have anonymized the dataset due to privacy reasons. Throughout the model development process, we applied various feature engineering techniques and performance tuning approaches to enhance the model’s accuracy with this dataset.

+-----+--------+---+------+-----+-----+-----+-----+-----+-----+----+----+-------+-----+--------+----+----+-----+
|grade|category| rp|weight|st_01|ad_01|st_02|ad_02|st_03|ad_03| rmk|lrmk| status|price|outstype|date|week|month|
+-----+--------+---+------+-----+-----+-----+-----+-----+-----+----+----+-------+-----+--------+----+----+-----+
| A| B| No| 55| X1| X14| X5| X0| X1| X12| X20| X2| Sold| 1020| X3| 14| 50| 12|
| A| B| No| 55| X1| null| X2| null| null| null|null| X4| Sold| 1000| X3| 14| 50| 12|
| C| B| No| 55| X5| X18| null| null| X29| null| X6| X2| Sold| 1040| X3| 14| 50| 12|
| C| B|Yes| 52| X5| null| null| X30| null| null| X6| X4| Sold| 1140| X3| 14| 50| 12|
| C| B| No| 55| X5| null| X4| X9| X1| X10| X0| X2| Sold| 1000| X3| 14| 50| 12|
| D| E| No| 40| X7| X8| null| null| X10| null|null|null| Sold| 1750| X10| 13| 50| 12|
| B| E|Yes| 50| X10| null| null| null| X0| null|null|null| Sold| 1500| X10| 13| 50| 12|
| E| E| No| 36| X11| X12| null| null| null| X9|null|null|OutSold| 2100| X3| 13| 50| 12|
| D| E| No| 40| X13| X14| null| X7| null| null| X15| X16| Sold| 1080| X3| 13| 50| 12|
+-----+--------+---+------+-----+-----+-----+-----+-----+-----+----+----+-------+-----+--------+----+----+-----+

The presented implementation of the Random Forest model follows a systematic process to predict tea prices. Initially, data is accessed using the MinIO S3 API, and feature engineering techniques are applied to cleanse the dataset. Different encoding approaches, such as MeanEncoding for st_01, st_02, ad_01, and ad_02, and OneHotEncoding for grade, category, rp, rmk, lrmk, and outstype are employed to adequately represent categorical features.

The subsequent steps involve feature encoding and assembling, ensuring a seamless integration of categorical and numerical features for the model. Leveraging the RandomForestRegressor from the Spark-ML library, the model is trained with this well-prepared dataset. To optimize its performance, hyperparameter tuning is conducted using CrossValidator. This entails exploring various hyperparameter combinations, including NumTrees, MaxDepth, and MaxBins. The resulting model is evaluated through k-fold cross-validation, providing a robust evaluation that promotes generalization to new data.

Upon model training, it is applied to the test data to generate predictions. The model’s performance is evaluated using essential metrics, including Root Mean Squared Error (RMSE), R-squared (r²), Mean Absolute Error (MAE), and Mean Squared Error (MSE). These metrics offer valuable insights into the model’s predictive capabilities on the test dataset. Ultimately, to ensure the model’s reusability and accessibility, the trained model is securely saved in the MinIO S3 bucket for future application and analysis.

package com.rahasak.tea3

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{MinMaxScaler, OneHotEncoder, StringIndexer, VectorAssembler}
import org.apache.spark.ml.regression.RandomForestRegressor
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object Tea3RandomForest extends App {

// context for spark
private val spark = SparkSession.builder
//.master("local[*]")
.appName("lambda")
.config("spark.shuffle.file.buffer", "1m") // Set the shuffle file buffer size
.getOrCreate()

// configuration for minio s3 storage api
// fs.s3a.access.key is the MINIO_ROOT_USER
// fs.s3a.secret.key is the MINIO_ROOT_PASSWORD
// please set these credentials and configurations through kubernets secrets and configmaps
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "rahasak_minio_user")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "rahasak_minio_password")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "http://192.168.1.36.:9000")

import spark.implicits._

// read csv data to data frame
private val dataPath = "s3a://rahasak/tea3.csv"
private val tea3Df = spark.read.format("csv")
.option("header", value = true)
.option("delimiter", ",")
.option("mode", "DROPMALFORMED")
.option("inferSchema", value = true)
.load(dataPath)
.drop("subElevation")
.drop("c21")
.drop("c22")
.drop("year")
.drop("sellingEndTime")
.cache()
tea3Df.show()

// find empty data rates
private val missingRowCount1 = tea3Df.filter(col("st_02").isNull || col("st_02").isNaN).count()
private val missingRowCount2 = tea3Df.filter(col("st_03").isNull || col("st_03").isNaN).count()
private val missingRowCount3 = tea3Df.filter(col("ad_02").isNull || col("ad_02").isNaN).count()
private val missingRowCount4 = tea3Df.filter(col("ad_03").isNull || col("ad_03").isNaN).count()
println(s"missing st_02 $missingRowCount1")
println(s"missing st_03 $missingRowCount2")
println(s"missing ad_02 $missingRowCount3")
println(s"missing ad_03 $missingRowCount4")

// prefill empty data
private val tea3DfPreFilled = tea3Df
.filter($"status" === "Sold")
.na.fill("Normal_01", Seq("st_01"))
.na.fill("Normal_02", Seq("st_02"))
.na.fill("Normal_03", Seq("st_03"))
.na.fill("Normal_01", Seq("ad_01"))
.na.fill("Normal_02", Seq("ad_02"))
.na.fill("Normal_03", Seq("ad_03"))
.na.fill("NormalR", Seq("rmk"))
.na.fill("NormalLR", Seq("lrmk"))
//.na.drop()
tea3DfPreFilled.show(20)

// cast data types
private val tea3DfCast = tea3DfPreFilled
.withColumn("weight", col("weight").cast("double"))
.withColumn("date", col("date").cast("int"))
.withColumn("week", col("week").cast("int"))
.withColumn("month", col("month").cast("int"))
.withColumn("price", col("price").cast("double"))
.withColumn("id", monotonically_increasing_id())

// find frequencies of st_01
private val st1MeanPurchasedPrice = tea3DfCast.groupBy("st_01")
.agg(mean("price").alias("st_01_mean_purchased_price"))
st1MeanPurchasedPrice.show(20)
private val st1Freq = tea3DfCast.groupBy("st_01").count()
st1Freq.show(20)
private val st1JoinedData = st1Freq.join(st1MeanPurchasedPrice, Seq("st_01"))
st1JoinedData.show(20)
private val st1EncodedData = tea3DfCast.join(st1JoinedData, Seq("st_01"), "left")
st1EncodedData.show()

// find frequencies of st_02
private val st2MeanPurchasedPrice = tea3DfCast.groupBy("st_02")
.agg(mean("price").alias("st_02_mean_purchased_price"))
st2MeanPurchasedPrice.show(20)
private val st2Freq = tea3DfCast.groupBy("st_02").count()
st2Freq.show(20)
private val st2JoinedData = st2Freq.join(st2MeanPurchasedPrice, Seq("st_02"))
st2JoinedData.show(20)
private val st2EncodedData = st1EncodedData.join(st2JoinedData, Seq("st_02"), "left")
st2EncodedData.show()

// find frequency of adjective_01
private val ad1MeanPurchasedPrice = tea3DfCast.groupBy("ad_01")
.agg(mean("price").alias("ad_01_mean_purchased_price"))
ad1MeanPurchasedPrice.show(20)
private val ad1Freq = tea3DfCast.groupBy("ad_01").count()
ad1Freq.show(20)
private val ad1JoinedData = ad1Freq.join(ad1MeanPurchasedPrice, Seq("ad_01"))
ad1JoinedData.show(20)
private val ad1EncodedData = st2EncodedData.join(ad1JoinedData, Seq("ad_01"), "left")
ad1EncodedData.show()

// find frequency of adjective_02
private val ad2MeanPurchasedPrice = tea3DfCast.groupBy("ad_02")
.agg(mean("price").alias("ad_02_mean_purchased_price"))
ad2MeanPurchasedPrice.show(20)
private val ad2Freq = tea3DfCast.groupBy("ad_02").count()
ad2Freq.show(20)
private val ad2JoinedData = ad2Freq.join(ad2MeanPurchasedPrice, Seq("ad_02"))
ad2JoinedData.show(20)
private val ad2EncodedData = ad1EncodedData.join(ad2JoinedData, Seq("ad_02"), "left")
ad2EncodedData.show()

// price to label
private val labelDf = ad2EncodedData.withColumnRenamed("price", "label")
.orderBy("id")
.na.drop()
labelDf.printSchema()
labelDf.show(20)

// indexers to convert grade, category, rp, rmk, lrmk, outstype to integers
private val gradeIndexer = new StringIndexer().setInputCol("grade").setOutputCol("gradeIndexer").setHandleInvalid("skip")
private val categoryIndexer = new StringIndexer().setInputCol("category").setOutputCol("categoryIndexer").setHandleInvalid("skip")
private val rpIndexer = new StringIndexer().setInputCol("rp").setOutputCol("rpIndexer").setHandleInvalid("skip")
private val rmkIndexer = new StringIndexer().setInputCol("rmk").setOutputCol("rmkIndexer").setHandleInvalid("skip")
private val lrmkIndexer = new StringIndexer().setInputCol("lrmk").setOutputCol("lrmkIndexer").setHandleInvalid("skip")
private val outstypIndexer = new StringIndexer().setInputCol("outstype").setOutputCol("outstypeIndexer").setHandleInvalid("skip")

// encoder
private val inCols = Array("gradeIndexer", "categoryIndexer", "rpIndexer", "rmkIndexer", "lrmkIndexer", "outstypeIndexer")
private val outCols = Array("gradeIndexerEnc", "categoryIndexerEnc", "rpIndexerEnc", "rmkIndexerEnc", "lrmkIndexerEnc", "outstypeIndexerEnc")
private val encoder = new OneHotEncoder()
.setInputCols(inCols)
.setOutputCols(outCols)

// vector assembler for numerical features
private val numericalCols = Array("weight", "st_01_mean_purchased_price", "st_02_mean_purchased_price", "ad_01_mean_purchased_price", "ad_02_mean_purchased_price", "date", "week", "month")
private val assembler = new VectorAssembler()
.setInputCols(numericalCols)
.setOutputCol("numerical_features")
private val scaler = new MinMaxScaler().setInputCol("numerical_features").setOutputCol("scaled_features")

// vector assembler with feature column
private val featureCols = outCols ++ Array("scaled_features")
private val featureAssembler = new VectorAssembler()
.setInputCols(featureCols)
.setOutputCol("features")

// build model
private val model = new RandomForestRegressor()
.setLabelCol("label")
.setFeaturesCol("features")
private val stages = Array(gradeIndexer, categoryIndexer, rpIndexer, rmkIndexer, lrmkIndexer, outstypIndexer, encoder, assembler, scaler, featureAssembler, model)
private val pipeline = new Pipeline().setStages(stages)
private val NumTrees = Seq(5, 10, 15)
private val MaxBins = Seq(23, 27, 30)
private val numFolds = 10
private val MaxDepth: Seq[Int] = Seq(20)
private val paramGrid = new ParamGridBuilder()
.addGrid(model.numTrees, NumTrees)
.addGrid(model.maxDepth, MaxDepth)
.addGrid(model.maxBins, MaxBins)
.build()

// split data set training(70%) and test(30%)
private val seed = 5043
private val Array(trainingData, testData) = labelDf.randomSplit(Array(0.7, 0.3), seed)

// cross validate model
private val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new RegressionEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(numFolds)
private val cvModel = cv.fit(trainingData)

// prediction with test data
private val predictions = cvModel.transform(testData)
predictions.printSchema()
predictions.show(10)
predictions.select("label", "prediction").show(200)

// prediction error percentage
private val errorData = predictions.withColumn("errorPercentage", (col("prediction") - col("label")) / col("label") * 100)
errorData.show(200)

// evaluate model
// root mean squared error
private val eval1 = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("rmse")
private val rmse = eval1.evaluate(predictions)
println("root mean squared error (RMSE) on test data = " + rmse)

// r-squared
private val eval2 = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("r2")
private val r2 = eval2.evaluate(predictions)
println("r-squared (r^2) on test data = " + r2)

private val accuracyPercentage = r2 * 100
println(s"model accuracy = $accuracyPercentage%")

// mean absolute error
private val eval3 = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("mae")
private val mae = eval3.evaluate(predictions)
println("mean absolute error (MAE) on test data = " + mae)

// mean squared error
private val eval4 = new RegressionEvaluator()
.setLabelCol("label")
.setPredictionCol("prediction")
.setMetricName("mse")
private val mse = eval4.evaluate(predictions)
println("mean squared error (MSE) on test data = " + mse)

// save model in minio s3 bucket
private val modelPath = "s3a://rahasak/tea3-model"
cvModel.save(modelPath)

// load saved model for test
val cvModelLoaded = CrossValidatorModel.load(modelPath)
}

5. Dockerize Spark Job

After successfully implementing the Spark job for the tea price prediction model, we proceeded to build a JAR file from the Scala project and seamlessly integrated it into a Docker image. The process of creating the JAR file was facilitated by the sbt-assembly plugin, which efficiently packages all the necessary dependencies. The configuration for the sbt-assembly plugin is defined in the project/assembly.sbt file, ensuring smooth and consistent build processes for the JAR file.

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.1.0")

After adding the sbt-assembly plugin to our Scala project, building the JAR file for the Spark job becomes a straightforward process. With a simple command, we can initiate the JAR file creation, which will generate the file inside the designated target directory. This JAR file will encapsulate all the necessary dependencies and configurations required to execute the Spark job seamlessly.

# build jar file
❯❯ sbt assembly

# jar file locates in target directory
❯❯ ls target/scala-2.12/tea3-assembly-1.0.jar
target/scala-2.12/tea3-assembly-1.0.jar

Following the successful creation of the custom JAR file, the next step in the process involved building a Docker image. For this purpose, we utilized the base image gcr.io/spark-operator/spark:v3.1.1, which serves as the foundation for the Spark Operator. This base image ensures that our Docker image includes all the necessary components and configurations to seamlessly interact with Apache Spark. By integrating our custom JAR file into this Docker image, we created a self-contained and executable container that incorporates both the Spark job and its dependencies. This Docker image enables us to deploy and manage the Spark job efficiently within our Kubernetes environment, providing a scalable and streamlined solution for tea price prediction using the Random Forest AI model with Apache Spark.

FROM gcr.io/spark-operator/spark:v3.1.1

WORKDIR /app
ADD tea3-assembly-1.0.jar tea3.jar

ENTRYPOINT ["/opt/entrypoint.sh"]
docker build --tag erangaeb/tea3:1.16 .

docker push erangaeb/tea3:1.16

6. Deploy Spark Job

The Spark Kubernetes Operator offers a custom Kubernetes Kind called SparkApplication, which allows us to seamlessly deploy and manage Spark jobs within the Kubernetes environment. Below is the configuration of the SparkApplication(spark-job.yaml) that we used for our tea price prediction Random Forest model. During the deployment process, we made several performance improvements to ensure optimal execution in the production environment.

To optimize resource allocation, we carefully set memory limits for the driver and executor components. Specifically, the driver’s memory limit was set to 12GB, with 1 core allocated to it. As for the executor, we allocated 14GB of memory with 4 cores. These memory and core configurations were fine-tuned to strike the right balance between performance and resource utilization. By allocating sufficient resources to the driver and executor, we aimed to prevent resource contention and potential performance bottlenecks during the model’s execution.

Additionally, we optimized the SparkApplication by setting appropriate resource requests and limits for CPU and memory, ensuring that the application had the necessary resources to run efficiently. By accurately estimating resource requirements, we prevent underutilization and overutilization of resources, thereby enhancing the overall stability and performance of the Spark job.

Furthermore, we employed efficient data shuffling techniques and caching mechanisms, reducing data transfer overhead and optimizing data access. This step helped improve the overall execution time of the Spark job, particularly when processing large volumes of data.

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-job
namespace: spark-operator
spec:
type: Scala
mode: cluster
image: "erangaeb/tea3:1.17"
imagePullPolicy: Always
mainClass: com.rahasak.tea3.Tea3RandomForest
mainApplicationFile: "local:///app/tea3.jar"
sparkVersion: "3.1.1"
restartPolicy:
type: Never
sparkConf:
"spark.ui.port": "4041"
dynamicAllocation:
enabled: true
driver:
cores: 1
memory: "12g"
labels:
version: 3.1.1
serviceAccount: tea3-spark
volumeMounts:
- name: "data-volume"
mountPath: "/mnt/data"
executor:
cores: 2
memory: "14g"
instances: 4
labels:
version: 3.1.1
volumeMounts:
- name: "data-volume"
mountPath: "/mnt/data"
volumes:
- name: "data-volume"
persistentVolumeClaim:
claimName: rahasak-pvc

Below is the procedure to deploy the SparkApplication on a Kubernetes cluster, which will initiate the deployment of the Spark job in the spark-job-driver pod and create a service for the driver.

# deploy spark job
❯❯ kubectl apply -f spark-job.yaml

# pods
❯❯ kubectl get pods -n spark-operator
NAME READY STATUS RESTARTS AGE
spark-job-driver 1/1 Running 0 29h
tea3-spark-operator-7645968868-8t62f 1/1 Running 0 24d

# services
❯❯ kubectl get svc -n spark-operator
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
spark-job-e0f93889b622b368-driver-svc ClusterIP None <none> 7078/TCP,7079/TCP,4041/TCP 29h
spark-job-ui-svc ClusterIP 10.109.224.10 <none> 4041/TCP

7. Access Spark Web UI

The Spark job provides a web UI that allows users to easily monitor the status and configuration of Spark jobs. This web UI can be accessed through the 4041 port of the spark-job-ui-svc. To enable access to the Spark web UI running on the AWS Spark cluster from your local machine, you have performed Kubernetes port-forwarding and SSH local forwarding.

# kubenets port forwarding inside kube cluster(in aws server)
kubectl port-forward svc/spark-job-ui-svc -n spark-operator 4041:4041

# ssh port forwarding on local machine
# then browse http://localhost:4041/ in local machine to access the spark web ui
❯❯ ssh -i tea3.pem -L 4041:localhost:4041 ubuntu@3.144.224.237

8. Known Issues

If use java-17 with spark, it will raise following error, set JDK version to java-11 in project-settings to resolve this issue. This thread discuss more information about this scenrio.

Exception in thread "main" java.lang.IllegalAccessError: class org.apache.spark.storage.StorageUtils$ (in unnamed module @0x52b1beb6) cannot access class sun.nio.ch.DirectBuffer (in module java.base) because module java.base does not export sun.nio.ch to unnamed module @0x52b1beb6
at org.apache.spark.storage.StorageUtils$.<clinit>(StorageUtils.scala:213)
at org.apache.spark.storage.BlockManagerMasterEndpoint.<init>(BlockManagerMasterEndpoint.scala:114)
at org.apache.spark.SparkEnv$.$anonfun$create$9(SparkEnv.scala:353)
at org.apache.spark.SparkEnv$.registerOrLookupEndpoint$1(SparkEnv.scala:290)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:339)
at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:194)
at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:279)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:464)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2714)
at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:953)
at scala.Option.getOrElse(Option.scala:201)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:947)
at com.rahasak.sparkapp.Tea3$.delayedEndpoint$com$rahasak$sparkapp$Tea3$1(Tea3.scala:17)
at com.rahasak.sparkapp.Tea3$delayedInit$body.apply(Tea3.scala:12)
at scala.Function0.apply$mcV$sp(Function0.scala:39)
at scala.Function0.apply$mcV$sp$(Function0.scala:39)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
at scala.App.$anonfun$main$1(App.scala:76)
at scala.App.$anonfun$main$1$adapted(App.scala:76)
at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
at scala.collection.AbstractIterable.foreach(Iterable.scala:926)
at scala.App.main(App.scala:76)
at scala.App.main$(App.scala:74)
at com.rahasak.sparkapp.Tea3$.main(Tea3.scala:12)
at com.rahasak.sparkapp.Tea3.main(Tea3.scala)

Reference

  1. https://blog.min.io/spark-minio-kubernetes/
  2. https://blog.cellenza.com/en/data/using-spark-with-kubernetes-k8s/
  3. https://googlecloudplatform.github.io/spark-on-k8s-operator/docs/quick-start-guide.html#running-the-examples
  4. https://suchit-g.medium.com/run-spark-job-on-kubernetes-using-spark-on-k8s-operator-3d08d05e8198
  5. https://suchit-g.medium.com/package-spark-scala-code-and-deploy-it-on-kubernetes-using-spark-on-k8s-operator-3e295292b672
  6. https://prateekdubey.com/blog/spark-k8s/
  7. https://kudo.dev/docs/runbooks/spark/submission.html
  8. https://granulate.io/blog/introduction-to-understanding-apache-spark-performance/
  9. https://aws.amazon.com/blogs/containers/optimizing-spark-performance-on-kubernetes/
  10. https://www.lightbend.com/blog/how-to-manage-monitor-spark-on-kubernetes-introduction-spark-submit-kubernetes-operator
  11. https://spot.io/blog/setting-up-managing-monitoring-spark-on-kubernetes/
  12. https://googlecloudplatform.github.io/spark-on-k8s-operator/docs/quick-start-guide.html

--

--