Setup Spark job and history server on K8s with GCS log

Burasakorn Sabyeying
Mils’ Blog
Published in
6 min readNov 6, 2021

Kubernetes series:
1. Setup Spark job and history server on K8s with GCS log
2. Setup Airflow on Kubernetes with remote logging to GCS
3. Create external database with PVC for Airflow Kubernetes

บทความนี้อ้างอิงจาก การสร้าง spark job และ history server บน Kubernetes จากพี่ปอ วิธีพี่ปอจะใช้ log ไปที่ s3 แต่ของเราจะเปลี่ยนเป็น Google Cloud Storage แทน ดังนั้นเราจะขอ skip การอธิบาย step ในช่วงแรกๆ แต่เน้นช่วงการเชื่อมไป GCS แทน

ไปตามอ่านกันได้จ้า

- Spark job on k8s with s3 log
- Spark history server on k8s with s3 log

ไม่พูดพร่ำทำเพลง เราจะเริ่ม setup ทุกอย่างจากตัวอย่างของ GCP spark-on-k8s-operator
โดยเราจะ clone ลงมาก่อน

git clone git@github.com:GoogleCloudPlatform/spark-on-k8s-operator.git

และเราจะสร้าง namespace ที่ชื่อว่า spark-operator

kubectl create namespace spark-operator

คราวนี้ เราจะสร้าง spark operator จากตัวไฟล์ value.yml เบื้องต้นที่ GCP provide มาค่ะ โดยเราจะใช้ tag version ที่เรากำลังใช้อยู่คือ v1beta2–1.2.3–3.1.1 ค่ะ

ไปที่ path ของ value.yml

cd spark-on-k8s-operator/charts/spark-operator-chart#values.yamleplicaCount: 1image:# -- Image repositoryrepository: gcr.io/spark-operator/spark-operator# -- Image pull policypullPolicy: IfNotPresent# -- Overrides the image tag whose default is the chart appVersion.tag: "v1beta2-1.2.3-3.1.1" #<<-- แก้เป็น version ที่เราใช้

แล้ว build ด้วย helm

helm install spark-ops spark-operator/spark-operator — namespace spark-operator — values values.yaml

เนื่องจากเรา install ไปแล้วจึงใช้คำสั่ง upgrade — install แทน
และถ้าหากเรารัน get pods ดู จะเห็น pod ตามนี้

Step ถัดไปเราจะสร้าง service account ในการ submit job ที่ชื่อ ‘sparkoperator’ (อย่าจำสลับกะชื่อ namespace นะ)

โดยจะใช้ไฟล์ spark-operator-install/manifest/spark-operator-rbac.yaml และจะแก้ namespace ใช้มาใช้ที่เราสร้างไว้

```spark-operator-rbac.yamlapiVersion: v1
kind: Namespace
metadata:
name: spark-operator
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: sparkoperator
namespace: spark-operator #<<-- เปลี่ยนมา namespace ของเรา
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: sparkoperator
namespace: spark-operator
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["*"]
- apiGroups: [""]
resources: ["configmaps"] #<<--- เพิ่ม configmaps
verbs: ["*"]
- apiGroups: [""]
resources: ["services", "secrets"]
verbs: ["create", "get", "delete"]
- apiGroups: ["extensions"]
resources: ["ingresses"]
verbs: ["create", "get", "delete"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"] #<<--- เพิ่ม PVC
verbs: ["*"]
- apiGroups: [""]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: sparkoperator
namespace: spark-operator #<<--- เปลี่ยนมา namespace ของเรา
subjects:
..
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
labels:
rbac.authorization.k8s.io/aggregate-to-admin: "true"
name: sparkoperator-aggregate-to-admin
namespace: spark-operator #<<--- เปลี่ยนมา namespace ของเรา
..

พอแก้ไขเสร็จ เราจะ apply ได้เลย

kubectl apply -f spark-operator-rbac.yaml 

เมื่อเราลอง list ดู service account เราควรจะเห็นชื่อ sparkoperator ขึ้นมา

kubectl get serviceaccounts -n spark-operator

Step ถัดไปคือเราจะสร้าง Spark Application แล้ววว

โดยเราจะเข้าใช้ไฟล์ spark-pi.py เป็นหลัก

ซึ่งขั้นตอนนี้จะเป็นเริ่มซับซ้อนละ เพราะเราจะเริ่มนำ log ไป upload ใส่ GCS เพื่อที่ว่า log จะไม่หายหลังจาก job รันเสร็จ (และจะให้ histiory server ไป link กะ log ในนั้นใน step ถัดๆไป)

โดยเราจะเริ่มจากการสร้าง docker image ที่มีการ config เพื่อไปหา GCS ที่เราเซ็ตก่อนเลย

โดยไฟล์ Dockerfile เราจะมีใช้ lib หลักๆคือ

  • guava-23.0
  • gcs-connector-latest-hadoop2
  • ส่วน spark-bigquery-latest_2.12 อันนี้แถมมา เผื่อมีใช้ต่อ BigQuery ในอนาคต

ทั้งหมดเป็น jars file ที่เราจะ copy เข้า $SPARK_HOME/jars

``` Dockerfile
FROM gcr.io/spark-operator/spark:v3.1.1
# Switch to user root so we can add additional jars and configuration files.
USER root
# Setup dependencies for Google Cloud Storage access.
RUN rm $SPARK_HOME/jars/guava-14.0.1.jar
ADD https://repo1.maven.org/maven2/com/google/guava/guava/23.0/guava-23.0.jar $SPARK_HOME/jars
RUN chmod 644 $SPARK_HOME/jars/guava-23.0.jar
# Add the connector jar needed to access Google Cloud Storage using the Hadoop FileSystem API.
ADD https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-latest-hadoop2.jar $SPARK_HOME/jars
RUN chmod 644 $SPARK_HOME/jars/gcs-connector-latest-hadoop2.jar
ADD https://storage.googleapis.com/spark-lib/bigquery/spark-bigquery-latest_2.12.jar $SPARK_HOME/jars
RUN chmod 644 $SPARK_HOME/jars/spark-bigquery-latest_2.12.jar
COPY conf/core-site.xml /opt/hadoop/conf/core-site.xml
COPY conf/spark-env.sh $SPARK_HOME/conf/spark-env.sh
ENTRYPOINT [ "/opt/entrypoint.sh" ]

เราจะสร้าง folder conf เอาไว้เก็บ setting ของ hadoop ด้วย
คือไฟล์ core-site.xml และ spark-env.sh

```core-site.xml
<?xml version="1.0" ?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.gs.impl</name>
<value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
</property>
<property>
<name>fs.AbstractFileSystem.gs.impl</name>
<value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
</property>
<property>
<name>fs.gs.project.id</name>
<value>${gs.project.id}</value>
</property>
</configuration>
``spark-env.shexport HADOOP_CONF_DIR="/opt/hadoop/conf"
export HADOOP_OPTS="$HADOOP_OPTS -Dgs.project.id=<project_name_ของเรา>"

ซึ่งมาถึงตรงนี้ อาจจะงงว่า structure folder เราหน้าตาประมาณนี้
จริงๆเราเอาของที่จำเป็นออกมาจาก folder ที่ clone ไว้ (spark-on-k8s-operator)

conf ต้องอยู่ path เดียวกับ Dockerfile น้า

เมื่อเรามี Dockerfile กับ conf folder แล้ว คราวนี้ก็ build image ได้เลย โดยเราจะตั้งชื่อว่า spark-k8s-gcs

docker build -t <dockerhub_user>/spark-k8s-gcs:v3 .
docker login
docker push <dockerhub_user>/spark-k8s-gcs:v3

หลังจากเรา push image ไป docker hub แล้ว ยังไม่จบแค่นี้
เราจะสร้าง K8s secret เพื่อเก็บค่า credential json key เพื่อ authen ว่าจะเข้าไปใช้ resource ใน bucket ของเราได้

โดยจะมี 2 data ข้างในคือตัว path (ตั้งชื่อ key ว่า path) กะ ค่าที่เป็น json file จริงๆ

kubectl create -n spark-operator secret generic google-sa-key \
--from-file <secret_json_filename>.json \
--from-literal=path=/cred/<secret_json_filename>.json

/cred/<secret_json_filename>.json คือ path ที่ mount ไฟล์เข้าไปข้างใน pod แล้ว

คราวนี้เราถึงค่อยแก้ spark-pi.yaml ของเราตามนี้

```spark-pi.yaml
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark-operator
spec:
type: Scala
mode: cluster
image: "mesodiar/spark-k8s-gcs:v3" ##<---- image ที่เราสร้างตะกี้
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
sparkVersion: "3.1.1"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
- name: gcp-cred
secret:
secretName: google-sa-key ##<---- เราจะ mount volume โดยจะใช้ secret ชื่อว่า gcp-cred
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1
serviceAccount: sparkoperator
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"

executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.1.1
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"

sparkConf:
"spark.eventLog.enabled": "true"
"spark.eventLog.dir": "gs://xxx-spark-log" ##<---- gcs bucket ที่เราจะใช้
"spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp -Divy.home=/tmp"
"spark.kubernetes.driver.secrets.google-sa-key": "/cred"
"spark.kubernetes.executor.secrets.google-sa-key": "/cred"
"spark.kubernetes.driverEnv.GOOGLE_APPLICATION_CREDENTIALS": "/cred/<secret_json_filename>.json" <<-- env ที่ driver ต้องใช้
"spark.executorEnv.GOOGLE_APPLICATION_CREDENTIALS": "/cred/<secret_json_filename>.json" <<-- env ที่ executor ต้องใช้

อธิบายเพิ่ม:

  • GOOGLE_APPLICATION_CREDENTIALS จะเป็น environment variable ที่นำค่ามาจาก secret ตัว key ที่ชื่อ path โดยจะระบุทั้ง 2 pod ตัว driver และ executor เลย

จบแล้วเราก็ create Spark App ได้

kubectl apply -f spark-pi.yaml

เมื่อจบ step นี้ เราก็จะเห็นว่ามี log ได้ถูก upload ไปยัง GCS ของเราแล้ว ><

bucket ของเรา

Step สุดท้าย คือการเราจะสร้าง history server ที่เป็น Web UI โดยอย่างที่กล่าวไปข้างต้นคือ เราจะให้ history server ของเรา link ไปยัง GCS เหมือนกัน

รอบนี้เราจะสร้างไฟล์ชื่อ spark-history.yaml ขึ้นมา ซึ่งไฟล์นี้จะเป็นการสร้าง deployment และ service ขึ้นมา

```spark-history.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: spark-history-server
namespace: spark-operator
spec:
selector:
matchLabels:
app: spark-history-server
replicas: 1
template:
metadata:
name: spark-history-server
labels:
app: spark-history-server
spec:
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
- name: gcp-cred
secret:
secretName: google-sa-key #<--- ใช้ secret ที่เราสร้าง
containers:
- name: spark-history-server
image: mesodiar/spark-k8s-gcs:v3
resources:
requests:
memory: "512Mi"
cpu: "100m"
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
- name: "gcp-cred"
mountPath: "/cred" #<--- volume mount เหมือนตะกี้
env:
- name: GOOGLE_APPLICATION_CREDENTIALS
valueFrom:
secretKeyRef:
name: google-sa-key
key: path #<--- env var เหมือนตะกี้
command:
- /opt/spark/bin/spark-class
- -Dspark.eventLog.enabled=true
- -Dspark.hadoop.google.cloud.auth.service.account.json.keyfile=/cred/<secret_json_filename>.json #<---- path/to/secret
- -Dspark.history.fs.logDirectory=gs://xxx-spark-log/ #<---- gcs bucket
- -Dspark.eventLog.dir=gs://xxx-spark-log/ #<---- gcs bucket
- -Divy.cache.dir=/tmp
- -Divy.home=/tmp
- org.apache.spark.deploy.history.HistoryServer
ports:
- name: http
protocol: TCP
containerPort: 18080 #<---- port ของ history server
readinessProbe:
timeoutSeconds: 4
httpGet:
path: /
port: http
livenessProbe:
timeoutSeconds: 4
httpGet:
path: /
port: http
---
apiVersion: v1
kind: Service
metadata:
name: spark-history-service
namespace: spark-operator
spec:
selector:
app: spark-history-server
ports:
- protocol: TCP
port: 80
targetPort: 18080

แล้วเราก็ apply

kubectl apply -f spark-history.yaml

Make sure ว่าทุกอย่างรันได้ปกติ

เพื่อที่จะเข้า UI ได้ เราต้อง port-forward มาด้วย format นี้

kubectl port-forward <history_pod_name> <host_port>:<container_port> -n spark-operator

kubectl port-forward spark-history-server-65d9698f67-59xjd 8002:18080 -n spark-operator

ผลลัพท์เมื่อเรารัน web UI ได้ จะเป็นดังรูปด้านล่าง

Other refs:

--

--

Burasakorn Sabyeying
Mils’ Blog

Data engineer at CJ Express. Women Techmakers Ambassador. GDG Cloud Bangkok team. Moved to Mesodiar.com