Spark job on k8s with s3 log

Pongthep Vijite
DAMAGeek
Published in
7 min readMay 16, 2021

หมายเหตุ ผู้อ่านสามารถดู table of contents ของ Data Engineering from Noob to Newbie ได้ที่ http://bit.ly/2P7isEw

บทความนี้เราจะมาดูการ setup spark job และ spark history server บน k8s โดยทั้ง 2 services จะเชื่อมกันผ่าน log ที่อยู่ใน aws s3 (แต่ spark history server ขอต่อในบทความหน้านะครับเพื่อไม่ให้ยาวไป)

เริ่มแรกเราจะมาทำการติดตั้ง spark-k8s-operator บน k8s กันก่อน ซึ่ง operator ตัวนี้เป็น custom resource ที่ google สร้างขึ้น โดยการติดตั้งเราสามารถเลือกได้ว่าติดตั้งใช้งานแบบธรมดาหรือจะเป็นแบบเปิดการใช้งาน admission webhooks ซึ่งที่เราจะได้มาจากการเปิด feature ดังกล่าวคือ สามารถ mount volumes และใช้งาน pod affinity ได้ โดยในบทความนี้เราจะเลือกตั้งตั้งแบบหลังเพราะน่าจะมีประโยชน์ในการใช้งานจริงมากกว่า

เราสามารถติดตั้ง spark-k8s-operator ได้ผ่านทาง helm (https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/docs/quick-start-guide.md#installation) แต่ส่วนตัวหลังจากลองแล้วกลับพบว่ามีปัญหาไม่สามารถเปิดการใช้งาน webhook ได้ เดาว่าใน helm ใช้ image v1beta2–1.2.1–3.0.0 (gcr.io/spark-operator/spark-operator) แต่ตัวใหม่สุดที่เราจะใช้กันคือ v1beta2–1.2.3–3.1.1

gcr.io/spark-operator/spark-operator

ผมเลยเลือกที่จะติดตั้งผ่าน template จาก git เพื่อที่เราจะแก้ไขบ้างค่าการติดตั้งได้อย่างอิสระ

git clone https://github.com/GoogleCloudPlatform/spark-on-k8s-operator.gitcd spark-on-k8s-operator

สร้าง namespace ที่ชื่อ spark-operator เนื่องจาก default แล้ว operator นี้ถูก set มาให้ทำงานที่ namespace นี้ เพื่อลดความซับซ้อนเราจึงจำเป็นต้องการมันขึ้นมา

kubectl create namespace spark-operator

เราจะมาทำการ install spark-operator ผ่านทางการ config file ใน charts/spark-operator-chart/values.yaml กัน แต่ก่อนอื่นเรามา backup default กันไว้ก่อน

cd charts/spark-operator-chart
cp values.yaml values-default.yaml
vi values.yaml

จากนั้นเริ่มด้วยการเปลี่ยน image version

image:
# -- Image repository
repository: gcr.io/spark-operator/spark-operator
# -- Image pull policy
pullPolicy: IfNotPresent
# -- Overrides the image tag whose default is the chart appVersion.
tag: "v1beta2-1.2.3-3.1.1" #<-- เปลี่ยนเป็น version

เปิดการใช้งาน webhook

webhook:
# -- Enable webhook server
enable: true

ถ้าเราจะให้ spark job ทำงานใน namespace อื่น ที่ไม่ใช่ namespace ที่ติดตั้ง spark-operator เราสามารถกำหนดค่าเพิ่มได้

sparkJobNamespace: ""  #<-- เราจะใช้แค่ namespace เดียว

หลังจากนั้นก็ทำการติดตั้งผ่านทาง helm install

helm install spark-ops spark-operator/spark-operator --namespace spark-operator --values values.yaml

ถ้าทุกอย่างสมบูรณ์เราจำเห็น pod ดังรูปใน namespace spark-operator

pod หลังติดตั้ง spark-operator

ขั้นตอนต่อไปจะเป็นการสร้าง service account ไว้สำหรับ submit spark job โดย file rbac จะอยู่ที่ manifest/spark-rbac.yaml โดยเราต้องทำการแก้ไข namespace ให้เป็น spark-operator เพราะเราจะทำการให้ spark app ทำงานใน namespace นั้น

#
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark
namespace: spark-operator #<-- หลังจากเปลี่ยน ns แล้ว
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: spark-operator #<-- หลังจากเปลี่ยน ns แล้ว
name: spark-role
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["*"]
- apiGroups: [""]
resources: ["services"]
verbs: ["*"]
- apiGroups: [""] #<-- เพิ่มสิทธ์ให้สามารถใช้งาน configmaps ได้
resources: ["configmaps"]
verbs: ["*"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"] #<-- เพิ่มสิทธ์ให้สามารถใช้งาน pvc ได้
verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-role-binding
namespace: spark-operator #<-- หลังจากเปลี่ยน ns แล้ว
subjects:
- kind: ServiceAccount
name: spark
namespace: spark-operator #<-- หลังจากเปลี่ยน ns แล้ว
roleRef:
kind: Role
name: spark-role
apiGroup: rbac.authorization.k8s.io

จากนั้นก็ทำการสร้าง service account

kubectl apply -f manifest/spark-rbac.yaml

ถ้าทุกอย่างสมบูรณ์เราจำเห็น service account ชื่อ spark ดังรูปใน namespace spark-operator

service account ชื่อ spark หลังติดตั้ง spark-rbac.yaml

ถึงตรงนี้เราก็พร้อมสำหรับการ submit spark job กันแล้ว โดยใน examples folder จะมี spark app ตัวอย่างที่พร้อมใช้งาน โดย app ที่เราจะทำการทดสอบคือ spark-pi.yaml แต่ก่อนอื่นเราต้องทำการแก้ไข namespace ของ app กันก่อนโดยเปลี่ยนจาก default เป็น spark-operator

#
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark-operator #<-- หลังจากเปลี่ยน ns
spec:
type: Scala
mode: cluster
image: "gcr.io/spark-operator/spark:v3.1.1"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
sparkVersion: "3.1.1"
restartPolicy:
type: Never
volumes:
- name: "test-volume"
hostPath:
path: "/tmp"
type: Directory
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.1.1
serviceAccount: spark
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"
executor:
cores: 1
instances: 1
memory: "512m"
labels:
version: 3.1.1
volumeMounts:
- name: "test-volume"
mountPath: "/tmp"

จากนั้นทำการ submit โดยใช้คำสั่ง

kubectl apply -f spark-pi.yaml

หลังจาก submit แล้วเราพบว่าใน namespace spark-operator จะมี 2 pods ใหม่เกิดขึ้น ตัวนึงจะเป็น spark driver อีกตัวจะเป็น spark executor

spark job is running

หลังจาก spark job ทำงานเสร็จเรียบร้อยแล้วตัว spark executor จะถูก terminate และ spark driver จะเข้าสู่สถานะ completed ซึ่ง container ภายในจะถูก terminated

spark job is finished

จากที่เราเห็นหลังจาก spark job ทำงานเสร็จแล้ว เราจะไม่สามารถเข้าไปดู spark ui ของ spark app ได้เลย ทำได้เพียงเรียกดู log การทำงานโดยใช้คำสั่ง

kubectl logs spark-pi-driver -n spark-operator

วิธีที่เราจะสามารถเรียกดู spark ui ของ spark app ที่ทำงานจบไปแล้วนั้น ทำได้โดยการใช้ spark history server ที่ point ไปยัง localtion ของ log ที่เดียวกัน ซึ่งใน k8s ถ้าเราต้องการสร้าง shared storage เพื่อให้ app หลายๆตัวสามารถ mount volume มายังที่เดียวกันได้เราสามารถทำได้โดยการสร้าง NFS แต่ถ้าใช้งาน cloud อย่าง aws ก็จะมี service อย่าง aws efs มาให้ใช้งาน

โชคดีที่ spark lib เพิ่มทางเลือกให้เราโดยการรองรับการเขียน log ไปยัง s3 โดยไม่ต้องมีการ mount volume เพิ่มเติมแต่อย่างใด สิ่งที่ต้องทำก็แค่เพิ่มเติม lib บ้างตัวและ config บางอย่างเข้าไปตอนที่เรา submit app เท่านั้นเอง โดยในบทความนี้จะใช้ docker image ที่ผม build ไว้ ซึ่งเป็นการ build จาก git ของ spark (git clone ไว้เมื่อวันที่ 30/04/2021) ตามบทความ spark on k8s

มาเริ่มจากการจัดเตรียม libs ที่จำเป็นสำหรับการเขียน logs ไปยัง s3 ซึ่งมีอยู่ด้วยกัน ดังนี้

หมายเหตุ เหตุผลที่เลือก hadoop-aws version 3.2.2 เพื่อให้ตรงกับ version ของ hadoop-* ที่อยู่ใน jars folder ของ docker image

hadoop-* version ใน jars folder

จากนั้น download libs ทั้งหมดไว้ใน folder jars แล้ว ให้ทำการสร้าง Dockerfile

FROM pongthep/spark-k8s-s3:1.0.1COPY jars /opt/spark/jarsENTRYPOINT [ "/opt/entrypoint.sh" ]

และทำการ build docker image และ push ไปยัง docker hub

docker build -t <docker hub>:<version> .docker push <docker hub>:<version>

ขั้นตอนถัดไปเราจะมา spark app ที่ใช้งาน docker image ที่เราสร้างไปโดยผมจะใช้ชื่อ file ว่า sparkpi-s3.yaml

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark-operator
spec:
type: Scala
mode: cluster
image: "pongthep/spark-k8s-s3:2.0.0" #<-- เปลี่ยนเป็น docker hub และ version ที่ท่าน build ไป
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.2.0-SNAPSHOT.jar"
sparkVersion: "3.1.1"
restartPolicy:
type: Never
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
serviceAccount: spark
executor:
cores: 1
instances: 1
memory: "512m"
sparkConf:
"spark.eventLog.enabled": "true"
"spark.eventLog.dir": "s3a://dev.coeffest.com/spark-log/" #<-- s3 location ที่ท่านจะเก็บ log
"spark.hadoop.fs.s3a.access.key": "<access>"
"spark.hadoop.fs.s3a.secret.key": "<secret>"
"spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp -Divy.home=/tmp"

จากนั้นสั่ง run โดยใช้คำสั่ง

kubectl apply -f sparkpi-s3.yaml

เมื่อ spark ทำงานเรียบร้อยเราจะพบกับ log ใน s3 location ที่เราตั้งค่าไว้

spark log บน s3

สำหรับท่านใดจำเป็นต้อง assume role บน aws ให้ทำการ config เพิ่มในส่วนของ sparkConf ดังนี้

sparkConf:
"spark.eventLog.enabled": "true"
"spark.eventLog.dir": "s3a://dev.coeffest.com/spark-log/" #<-- s3 location ที่ท่านจะเก็บ log
"spark.hadoop.fs.s3a.access.key": "<access>"
"spark.hadoop.fs.s3a.secret.key": "<secret>"
"spark.hadoop.fs.s3a.credentialsType": "AssumeRole"
"spark.hadoop.fs.s3a.aws.credentials.provider": "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider"
"spark.hadoop.fs.s3a.assumed.role.arn": "<role.arn>"
"spark.hadoop.fs.s3a.assumed.role.credentials.provider": "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
"spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp -Divy.home=/tmp"

เพื่อเพิ่มความปลอดภัยในการใช้งาน aws access.key และ secret.key เราควรจะเก็บข้อมูลเหล่านี้ไว้ใน k8s secret ด้วยการเข้ารหัส base64

apiVersion: v1
kind: Secret
metadata:
name: aws-s3-secret
namespace: spark-operator
type: Opaque
data:
access: <base64-encoded>
secret: <base64-encoded>

จากนั้นเราจะผ่านค่าของ k8s secret ด้วยการสร้าง env var ภายใน driver และ executor และลบการกำหนดแบบ hardcode ที่เราได้ทำไป

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-pi
namespace: spark-operator
spec:
type: Scala
mode: cluster
image: "pongthep/spark-k8s-s3:2.0.0"
imagePullPolicy: Always
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.2.0-SNAPSHOT.jar"
sparkVersion: "3.1.1"
restartPolicy:
type: Never
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
serviceAccount: spark
envSecretKeyRefs:
AWS_ACCESS_KEY_ID: #<-- ชื่อของ env var
name: aws-s3-secret #<-- ชื่อของ k8s secret
key: access #<-- ชื่อ key ใน k8s secret
AWS_SECRET_ACCESS_KEY:
name: aws-s3-secret
key: secret
executor:
cores: 1
instances: 1
memory: "512m"
envSecretKeyRefs:
AWS_ACCESS_KEY_ID:
name: aws-s3-secret
key: access
AWS_SECRET_ACCESS_KEY:
name: aws-s3-secret
key: secret
sparkConf:
"spark.eventLog.enabled": "true"
"spark.eventLog.dir": "s3a://dev.coeffest.com/spark-log/"
"spark.driver.extraJavaOptions": "-Divy.cache.dir=/tmp -Divy.home=/tmp"

จากการที่เรา pack external lib ที่ใช้สำหรับเชื่อมต่อ aws s3 ไว้ใน docker image จนสามารถเขียน log ไปยัง s3 ได้แล้วนั้น เรายังสามารถเรียก spark app ที่อยู่ในรูปแบบของ jar file ได้จาก s3 อีกด้วย ซึ่งวิธีนี้จะทำให้เวลาเราต้องการแก้ไข spark app เราไม่ต้องทำการ build docker image ใหม่ทุกครั้ง เราจะ build image ใหม่เฉพาะเมื่อมีการเพิ่ม external lib เข้าไปใน folder jars ของ spark เท่านั้น

mainApplicationFile: "s3a://dev.coeffest.com/spark-job/spark-examples_s3.jar"

สุดท้ายนี้ถ้าท่านใดมีข้อสงสัยหรือคำชี้แนะใดๆสามารถฝากข้อความได้ที่ https://www.facebook.com/coeffest/ นะครับ ขอบคุณมากครับที่ติดตาม

สำหรับคนที่สนใจ Data Engineer สามารถเข้ามาแชร์ข้อมูลกันที่ได้ที่ https://www.facebook.com/groups/369157183664760/

อ้างอิง:

--

--