Spark on K8s — Send Spark job’s Metrics to DataDog Using Autodiscovery
Today, I will share how we send Spark job metrics on Kubernetes to DataDog, which will be used to create a monitor or dashboard for Spark applications.
Requirements
I would recommend you spend some times reading my previous blogs to understand about Spark on K8s and DataDog:
- How to run a Spark job on Amazon EKS cluster.
- How to run a Spark job on Amazon EKS cluster with IRSA.
- Monitor Kubernetes clusters with DataDog.
- DataDog Autodiscovery and DogStatsD.
In this tutorial, I will use DataDog Autodiscovery feature to collect and send Spark metrics to DataDog.
Why using Autodiscovery?
DataDog supports 450+ integrations, including Spark, which can collect the metrics for:
- Drivers and executors: RDD blocks, memory used, disk used, duration, etc.
- RDDs: partition count, memory used, and disk used.
- Tasks: number of tasks active, skipped, failed, and total.
- Job state: number of jobs active, completed, skipped, and failed.
Those metrics are easily collected by DataDog Agent when Spark is installed into a host (EC2, VMs, etc.) or EMR, Mesos, YARN or Standalone cluster.
But when the Spark job are submitted and running on Kubernetes, its behaviors are different: Spark driver and executors are running as Kubernetes pods. Each Spark job exposes a SparkUI endpoint via driver pod, which can be used to check the Spark’s application and status.
The problem here is the IP address of driver pod is randomly assigned by Kubernetes network, so we can not define the correct spark_url
value into the Spark integration configuration file. Another problem here is when we have a hundred or thousand Spark jobs running in Kubernetes cluster, we can not define all of the spark_url
into DataDog Agent configuration file. That’s why Autodiscovery is the best fit for this Spark on K8s.
Perform sample Spark job
In this post, I will re-use the Docker image, vitamingaugau/spark:spark-2.4.4-irsa, which was built in my previous blog.
First, make sure Spark integration has been enabled in DataDog:
Find the API control plane URL:
➜ ~ kubectl cluster-info
Kubernetes control plane is running at https://E3C<hidden>626.yl4.ap-southeast-1.eks.amazonaws.com
CoreDNS is running at https://E3C<hidden>626.yl4.ap-southeast-1.eks.amazonaws.com/api/v1/namespaces/kube-system/services/kube-dns:dns/proxyTo further debug and diagnose cluster problems, use 'kubectl cluster-info dump'.
Note down the Kubernetes control plane URL, that will be used for spark-submit.
Create this file spark-pi.yaml
as following:
---
apiVersion: v1
kind: Namespace
metadata:
name: spark-pi
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark-pi
namespace: spark-pi
automountServiceAccountToken: true
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: spark-pi-role
namespace: spark-pi
rules:
- apiGroups: [""]
resources: ["pods", "services", "configmaps"]
verbs: ["get", "list", "watch", "create", "delete", "update", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-pi-role-binding
namespace: spark-pi
subjects:
- kind: ServiceAccount
name: spark-pi
namespace: spark-pi
roleRef:
kind: Role
name: spark-pi-role
apiGroup: rbac.authorization.k8s.io
Then run kubectl apply -f spark-pi.yaml
to create namespace and RBAC for Spark.
Create a jump pod to run spark-submit:
➜ ~ cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
labels:
run: tmp
name: tmp
namespace: spark-pi
spec:
containers:
- image: vitamingaugau/spark:spark-2.4.4-irsa
imagePullPolicy: Always
name: tmp
args:
- sleep
- "1000000"
resources: {}
serviceAccountName: spark-pi
EOF
Check out the sample spark.d/conf.yaml for all available configuration options. You will need to convert the YAML into JSON one-liner format to generate the Autodiscovery config to put into spark-submit. I created a sample spark-conf.yaml
file for this test as following (I will explain why we put the values later):
init_config:instances:
- spark_url: http://%%host%%:4040
spark_cluster_mode: spark_driver_mode
cluster_name: spark-k8s
Then convert that file into JSON format:
➜ ~ pip3 install PyYAML
➜ ~ alias yaml2json="python3 -c 'import sys, yaml, json; y=yaml.load(sys.stdin.read(), Loader=yaml.FullLoader); print(json.dumps(y))'"
➜ ~ cat spark-conf.yaml | yaml2json
{"init_config": null, "instances": [{"spark_url": "http://%%host%%:4040", "spark_cluster_mode": "spark_driver_mode", "cluster_name": "spark-k8s"}]}
We will use below --conf
as Autodiscovery configurations for spark-submit. Remember to add escape \
characters before double quotes "
, and replace null
as [{}]
.
--conf "spark.kubernetes.driver.annotation.ad.datadoghq.com/spark-kubernetes-driver.check_names=[\"spark\"]" \
--conf "spark.kubernetes.driver.annotation.ad.datadoghq.com/spark-kubernetes-driver.init_configs=[{}]" \
--conf "spark.kubernetes.driver.annotation.ad.datadoghq.com/spark-kubernetes-driver.instances=[{\"spark_url\": \"http://%%host%%:4040\", \"spark_cluster_mode\": \"spark_driver_mode\", \"cluster_name\": \"spark-k8s\"}]"
Exec into the jump pod kubectl -n spark-pi exec -it -- bash
and perform spark-submit:
bash-4.4# export K8S_CONTROL_PLANE="https://E3C<hidden>626.yl4.ap-southeast-1.eks.amazonaws.com"bash-4.4# /opt/spark/bin/spark-submit \
--master=k8s://$K8S_CONTROL_PLANE:443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.driver.pod.name=spark-pi-driver \
--conf spark.kubernetes.container.image=vitamingaugau/spark:spark-2.4.4-irsa \
--conf spark.kubernetes.namespace=spark-pi \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-pi \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-pi \
--conf spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
--conf spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
--conf "spark.kubernetes.driver.annotation.ad.datadoghq.com/spark-kubernetes-driver.check_names=[\"spark\"]" \
--conf "spark.kubernetes.driver.annotation.ad.datadoghq.com/spark-kubernetes-driver.init_configs=[{}]" \
--conf "spark.kubernetes.driver.annotation.ad.datadoghq.com/spark-kubernetes-driver.instances=[{\"spark_url\": \"http://%%host%%:4040\", \"spark_cluster_mode\": \"spark_driver_mode\", \"cluster_name\": \"spark-k8s\"}]" \
local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.4.jar 20000
While waiting for the job running, open another terminal and verify Spark driver pod:
➜ ~ kubectl -n spark-pi get pod spark-pi-driver -o yaml
apiVersion: v1
kind: Pod
metadata:
annotations:
ad.datadoghq.com/spark-kubernetes-driver.check_names: '["spark"]'
ad.datadoghq.com/spark-kubernetes-driver.init_configs: '[{}]'
ad.datadoghq.com/spark-kubernetes-driver.instances: '[{"spark_url": "http://%%host%%:4040",
"spark_cluster_mode": "spark_driver_mode", "cluster_name": "spark-k8s"}]'
kubernetes.io/psp: eks.privileged
labels:
spark-app-selector: spark-2c5bbb79941748be83d62c978e862d6e
spark-role: driver
name: spark-pi-driver
namespace: spark-pi
...
spec:
containers:
- args:
- driver
- --properties-file
- /opt/spark/conf/spark.properties
- --class
- org.apache.spark.examples.SparkPi
- spark-internal
- "20000"
env:
- name: SPARK_DRIVER_BIND_ADDRESS
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: SPARK_LOCAL_DIRS
value: /var/data/spark-d89cc767-0f63-425c-88ef-44c7fe6d21d1
- name: SPARK_CONF_DIR
value: /opt/spark/conf
image: vitamingaugau/spark:spark-2.4.4-irsa
imagePullPolicy: IfNotPresent
name: spark-kubernetes-driver
ports:
- containerPort: 7078
name: driver-rpc-port
protocol: TCP
- containerPort: 7079
name: blockmanager
protocol: TCP
- containerPort: 4040
name: spark-ui
protocol: TCP
...
nodeName: ip-10-150-123-100.ap-southeast-1.compute.internal
We can see the Spark driver pod has these config:
- Container name is
spark-kubernetes-driver
, hence the DataDog Autodiscovery Integration Template has followed this formatad.datadoghq.com/<container_name>
spark-ui
port is exposed on port4040
, so thespark_url
has valuehttp://%%host%%:4040
, where%%host%%
is transform to IP address of the pod/container by DataDog Agent.- The job runs as
cluster
mode, sospark_cluster_mode
has valuespark_driver_mode
. cluster_name
is a tag will be added into all collected metrics with valuespark-k8s
.
Now, we need to find the DataDog Agent pod running in same node with spark-pi-driver
to verify the integration checks:
➜ ~ kubectl get pods --all-namespaces -o wide --field-selector spec.nodeName=ip-10-150-123-100.ap-southeast-1.compute.internal | grep -E "spark-pi-driver|datadog"
addons datadog-p9fj6 1/1 Running 0 48d 10.150.123.67 ip-10-150-123-100.ap-southeast-1.compute.internal <none> <none>
spark-pi spark-pi-driver 1/1 Running 0 106s 10.150.123.45 ip-10-150-123-100.ap-southeast-1.compute.internal <none> <none>
Run this command to verify if the Autodiscovery working and Agent starts collecting metrics:
➜ ~ kubectl -n addons exec -it datadog-p9fj6 -- agent check spark
...
=========
Collector
=========Running Checks
==============spark (1.15.0)
--------------
Instance ID: spark:a83f2f65603a7da0 [OK]
Configuration Source: kubelet:docker://e5848a0a8e2c99444ca8095a9eebd17f8a02d5dcf516abbf8aebd5f65fec92e1
Total Runs: 1
Metric Samples: Last Run: 0, Total: 0
Events: Last Run: 0, Total: 0
Service Checks: Last Run: 1, Total: 1
Average Execution Time : 8ms
Last Execution Date : 2021-05-16 16:01:06.000000 UTC
Last Successful Execution Date : 2021-05-16 16:01:06.000000 UTC2021-05-16 16:01:07 UTC | CORE | INFO | (pkg/collector/python/datadog_agent.go:122 in LogMessage) | spark:c6e4a79673c9e1fe | (spark.py:311) | Returning running apps {'spark-***************************863b5': ('Spark Pi', 'http://10.150.123.45:4040')}
=== Series ===
{
"series": [
{
"metric": "spark.executor.active_tasks",
"points": [
[
1621180867,
2
]
],
"tags": [
"app_name:Spark Pi",
"cluster_name:spark-k8s",
"docker_image:vitamingaugau/spark:spark-2.4.4-irsa",
"image_name:vitamingaugau/spark",
"image_tag:spark-2.4.4-irsa",
"kube_container_name:spark-kubernetes-driver",
"kube_namespace:spark-pi",
"kube_service:spark-pi-1621180711634-driver-svc",
"pod_name:spark-pi-driver",
"pod_phase:running",
"short_image:spark"
],
"host": "i-00200eb5d332e4263",
"type": "count",
"interval": 0,
"source_type_name": "System"
},
...
{
"metric": "spark.job.num_completed_tasks",
"points": [
[
1621180867,
13223
]
],
"tags": [
"app_name:Spark Pi",
"cluster_name:spark-k8s",
"docker_image:vitamingaugau/spark:spark-2.4.4-irsa",
"image_name:vitamingaugau/spark",
"image_tag:spark-2.4.4-irsa",
"job_id:0",
"kube_container_name:spark-kubernetes-driver",
"kube_namespace:spark-pi",
"kube_service:spark-pi-1621180711634-driver-svc",
"pod_name:spark-pi-driver",
"pod_phase:running",
"short_image:spark",
"stage_id:0",
"status:running"
],
"host": "i-00200eb5d332e4263",
"type": "count",
"interval": 0,
"source_type_name": "System"
},
...
{
"metric": "spark.executor.memory_used",
"points": [
[
1621180867,
1256
]
],
"tags": [
"app_name:Spark Pi",
"cluster_name:spark-k8s",
"docker_image:vitamingaugau/spark:spark-2.4.4-irsa",
"image_name:vitamingaugau/spark",
"image_tag:spark-2.4.4-irsa",
"kube_container_name:spark-kubernetes-driver",
"kube_namespace:spark-pi",
"kube_service:spark-pi-1621180711634-driver-svc",
"pod_name:spark-pi-driver",
"pod_phase:running",
"short_image:spark"
],
"host": "i-00200eb5d332e4263",
"type": "count",
"interval": 0,
"source_type_name": "System"
},
...
{
"metric": "spark.driver.memory_used",
"points": [
[
1621180867,
1256
]
],
"tags": [
"app_name:Spark Pi",
"cluster_name:spark-k8s",
"docker_image:vitamingaugau/spark:spark-2.4.4-irsa",
"image_name:vitamingaugau/spark",
"image_tag:spark-2.4.4-irsa",
"kube_container_name:spark-kubernetes-driver",
"kube_namespace:spark-pi",
"kube_service:spark-pi-1621180711634-driver-svc",
"pod_name:spark-pi-driver",
"pod_phase:running",
"short_image:spark"
],
"host": "i-00200eb5d332e4263",
"type": "count",
"interval": 0,
"source_type_name": "System"
},
...
=== Service Checks ===
[
{
"check": "spark.driver.can_connect",
"host_name": "i-00eb5d3e43",
"timestamp": 1621180867,
"status": 0,
"message": "Connection to Spark driver \"http://10.150.123.45:4040\" was successful",
"tags": [
"cluster_name:spark-k8s",
"docker_image:vitamingaugau/spark:spark-2.4.4-irsa",
"image_name:vitamingaugau/spark",
"image_tag:spark-2.4.4-irsa",
"kube_container_name:spark-kubernetes-driver",
"kube_namespace:spark-pi",
"kube_service:spark-pi-1621180711634-driver-svc",
"pod_name:spark-pi-driver",
"pod_phase:running",
"short_image:spark",
"url:http://10.150.123.45:4040"
]
},
{
"check": "spark.application_master.can_connect",
"host_name": "i-00eb5d3e43",
"timestamp": 1621180867,
"status": 0,
"message": "Connection to ApplicationMaster \"http://10.150.123.45:4040\" was successful",
"tags": [
"cluster_name:spark-k8s",
"docker_image:vitamingaugau/spark:spark-2.4.4-irsa",
"image_name:vitamingaugau/spark",
"image_tag:spark-2.4.4-irsa",
"kube_container_name:spark-kubernetes-driver",
"kube_namespace:spark-pi",
"kube_service:spark-pi-1621180711634-driver-svc",
"pod_name:spark-pi-driver",
"pod_phase:running",
"short_image:spark",
"url:http://10.150.123.45:4040"
]
}
]
=========
Collector
=========Running Checks
==============spark (1.15.0)
--------------
Instance ID: spark:c6e4a79673c9e1fe [OK]
Configuration Source: kubelet:docker://ea263fdb3ad10e8485c9c0ac92f4721652c75879eee1eafc858cffc569b6f8ee
Total Runs: 1
Metric Samples: Last Run: 50, Total: 50
Events: Last Run: 0, Total: 0
Service Checks: Last Run: 3, Total: 3
Average Execution Time : 71ms
Last Execution Date : 2021-05-16 16:01:07.000000 UTC
Last Successful Execution Date : 2021-05-16 16:01:07.000000 UTC
metadata:
version.major: 2
version.minor: 4
version.patch: 4
version.raw: 2.4.4
version.scheme: semver
Bingo, DataDog Agent is able to detect the Autodiscovery config (podAnnotations) in spark-pi-driver
pod. Now let’s verify in DataDog Metrics Explorer to find those metrics:
Check out this page to see all supported metrics of Spark integration.
Conclusion
In this post, we have learned how to define podAnnotation for Spark on K8s job using spark-submit, and how to configure DataDog Autodiscovery to collect Spark application metrics. The metrics will be useful to debug Spark jobs, or create dashboards and monitors based on them.
Hope you enjoy reading my blog, and feel free to leave comments or questions.
Checkout my tutorials for Spark on K8s series: