Geek Culture
Published in

Geek Culture

Spark on K8s — Send Spark job’s Metrics to DataDog Using Autodiscovery

Today, I will share how we send Spark job metrics on Kubernetes to DataDog, which will be used to create a monitor or dashboard for Spark applications.

Requirements

I would recommend you spend some times reading my previous blogs to understand about Spark on K8s and DataDog:

In this tutorial, I will use DataDog Autodiscovery feature to collect and send Spark metrics to DataDog.

Why using Autodiscovery?

DataDog supports 450+ integrations, including Spark, which can collect the metrics for:

  • Drivers and executors: RDD blocks, memory used, disk used, duration, etc.
  • RDDs: partition count, memory used, and disk used.
  • Tasks: number of tasks active, skipped, failed, and total.
  • Job state: number of jobs active, completed, skipped, and failed.

Those metrics are easily collected by DataDog Agent when Spark is installed into a host (EC2, VMs, etc.) or EMR, Mesos, YARN or Standalone cluster.

But when the Spark job are submitted and running on Kubernetes, its behaviors are different: Spark driver and executors are running as Kubernetes pods. Each Spark job exposes a SparkUI endpoint via driver pod, which can be used to check the Spark’s application and status.

Spark driver UI — 1
Spark driver UI — 2
Spark driver UI — 3

The problem here is the IP address of driver pod is randomly assigned by Kubernetes network, so we can not define the correct spark_url value into the Spark integration configuration file. Another problem here is when we have a hundred or thousand Spark jobs running in Kubernetes cluster, we can not define all of the spark_url into DataDog Agent configuration file. That’s why Autodiscovery is the best fit for this Spark on K8s.

Perform sample Spark job

In this post, I will re-use the Docker image, vitamingaugau/spark:spark-2.4.4-irsa, which was built in my previous blog.

First, make sure Spark integration has been enabled in DataDog:

Screenshoot taken in DataDog integrations console

Find the API control plane URL:

➜  ~ kubectl cluster-info
Kubernetes control plane is running at https://E3C<hidden>626.yl4.ap-southeast-1.eks.amazonaws.com
CoreDNS is running at https://E3C<hidden>626.yl4.ap-southeast-1.eks.amazonaws.com/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
To further debug and diagnose cluster problems, use 'kubectl cluster-info dump'.

Note down the Kubernetes control plane URL, that will be used for spark-submit.

Create this file spark-pi.yaml as following:

---
apiVersion: v1
kind: Namespace
metadata:
name: spark-pi
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: spark-pi
namespace: spark-pi
automountServiceAccountToken: true
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: spark-pi-role
namespace: spark-pi
rules:
- apiGroups: [""]
resources: ["pods", "services", "configmaps"]
verbs: ["get", "list", "watch", "create", "delete", "update", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: spark-pi-role-binding
namespace: spark-pi
subjects:
- kind: ServiceAccount
name: spark-pi
namespace: spark-pi
roleRef:
kind: Role
name: spark-pi-role
apiGroup: rbac.authorization.k8s.io

Then run kubectl apply -f spark-pi.yaml to create namespace and RBAC for Spark.

Create a jump pod to run spark-submit:

➜  ~ cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
labels:
run: tmp
name: tmp
namespace: spark-pi
spec:
containers:
- image: vitamingaugau/spark:spark-2.4.4-irsa
imagePullPolicy: Always
name: tmp
args:
- sleep
- "1000000"
resources: {}
serviceAccountName: spark-pi
EOF

Check out the sample spark.d/conf.yaml for all available configuration options. You will need to convert the YAML into JSON one-liner format to generate the Autodiscovery config to put into spark-submit. I created a sample spark-conf.yaml file for this test as following (I will explain why we put the values later):

init_config:instances:
- spark_url: http://%%host%%:4040
spark_cluster_mode: spark_driver_mode
cluster_name: spark-k8s

Then convert that file into JSON format:

➜  ~ pip3 install PyYAML
➜ ~ alias yaml2json="python3 -c 'import sys, yaml, json; y=yaml.load(sys.stdin.read(), Loader=yaml.FullLoader); print(json.dumps(y))'"
➜ ~ cat spark-conf.yaml | yaml2json
{"init_config": null, "instances": [{"spark_url": "http://%%host%%:4040", "spark_cluster_mode": "spark_driver_mode", "cluster_name": "spark-k8s"}]}

We will use below --conf as Autodiscovery configurations for spark-submit. Remember to add escape \ characters before double quotes ", and replace null as [{}].

    --conf "spark.kubernetes.driver.annotation.ad.datadoghq.com/spark-kubernetes-driver.check_names=[\"spark\"]" \
--conf "spark.kubernetes.driver.annotation.ad.datadoghq.com/spark-kubernetes-driver.init_configs=[{}]" \
--conf "spark.kubernetes.driver.annotation.ad.datadoghq.com/spark-kubernetes-driver.instances=[{\"spark_url\": \"http://%%host%%:4040\", \"spark_cluster_mode\": \"spark_driver_mode\", \"cluster_name\": \"spark-k8s\"}]"

Exec into the jump pod kubectl -n spark-pi exec -it -- bash and perform spark-submit:

bash-4.4# export K8S_CONTROL_PLANE="https://E3C<hidden>626.yl4.ap-southeast-1.eks.amazonaws.com"bash-4.4# /opt/spark/bin/spark-submit \
--master=k8s://$K8S_CONTROL_PLANE:443 \
--deploy-mode cluster \
--name spark-pi \
--class org.apache.spark.examples.SparkPi \
--conf spark.kubernetes.driver.pod.name=spark-pi-driver \
--conf spark.kubernetes.container.image=vitamingaugau/spark:spark-2.4.4-irsa \
--conf spark.kubernetes.namespace=spark-pi \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-pi \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=spark-pi \
--conf spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
--conf spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
--conf "spark.kubernetes.driver.annotation.ad.datadoghq.com/spark-kubernetes-driver.check_names=[\"spark\"]" \
--conf "spark.kubernetes.driver.annotation.ad.datadoghq.com/spark-kubernetes-driver.init_configs=[{}]" \
--conf "spark.kubernetes.driver.annotation.ad.datadoghq.com/spark-kubernetes-driver.instances=[{\"spark_url\": \"http://%%host%%:4040\", \"spark_cluster_mode\": \"spark_driver_mode\", \"cluster_name\": \"spark-k8s\"}]" \
local:///opt/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.4.4.jar 20000

While waiting for the job running, open another terminal and verify Spark driver pod:

➜  ~ kubectl -n spark-pi get pod spark-pi-driver -o yaml
apiVersion: v1
kind: Pod
metadata:
annotations:
ad.datadoghq.com/spark-kubernetes-driver.check_names: '["spark"]'
ad.datadoghq.com/spark-kubernetes-driver.init_configs: '[{}]'
ad.datadoghq.com/spark-kubernetes-driver.instances: '[{"spark_url": "http://%%host%%:4040",
"spark_cluster_mode": "spark_driver_mode", "cluster_name": "spark-k8s"}]'
kubernetes.io/psp: eks.privileged
labels:
spark-app-selector: spark-2c5bbb79941748be83d62c978e862d6e
spark-role: driver
name: spark-pi-driver
namespace: spark-pi
...
spec:
containers:
- args:
- driver
- --properties-file
- /opt/spark/conf/spark.properties
- --class
- org.apache.spark.examples.SparkPi
- spark-internal
- "20000"
env:
- name: SPARK_DRIVER_BIND_ADDRESS
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: SPARK_LOCAL_DIRS
value: /var/data/spark-d89cc767-0f63-425c-88ef-44c7fe6d21d1
- name: SPARK_CONF_DIR
value: /opt/spark/conf
image: vitamingaugau/spark:spark-2.4.4-irsa
imagePullPolicy: IfNotPresent
name: spark-kubernetes-driver
ports:
- containerPort: 7078
name: driver-rpc-port
protocol: TCP
- containerPort: 7079
name: blockmanager
protocol: TCP
- containerPort: 4040
name: spark-ui
protocol: TCP
...
nodeName: ip-10-150-123-100.ap-southeast-1.compute.internal

We can see the Spark driver pod has these config:

  • Container name isspark-kubernetes-driver, hence the DataDog Autodiscovery Integration Template has followed this format ad.datadoghq.com/<container_name>
  • spark-ui port is exposed on port 4040, so the spark_url has value http://%%host%%:4040, where %%host%% is transform to IP address of the pod/container by DataDog Agent.
  • The job runs as cluster mode, so spark_cluster_mode has value spark_driver_mode.
  • cluster_name is a tag will be added into all collected metrics with value spark-k8s.

Now, we need to find the DataDog Agent pod running in same node with spark-pi-driver to verify the integration checks:

➜  ~ kubectl get pods --all-namespaces -o wide --field-selector spec.nodeName=ip-10-150-123-100.ap-southeast-1.compute.internal | grep -E "spark-pi-driver|datadog"
addons datadog-p9fj6 1/1 Running 0 48d 10.150.123.67 ip-10-150-123-100.ap-southeast-1.compute.internal <none> <none>
spark-pi spark-pi-driver 1/1 Running 0 106s 10.150.123.45 ip-10-150-123-100.ap-southeast-1.compute.internal <none> <none>

Run this command to verify if the Autodiscovery working and Agent starts collecting metrics:

➜  ~ kubectl -n addons exec -it datadog-p9fj6 -- agent check spark
...
=========
Collector
=========
Running Checks
==============
spark (1.15.0)
--------------
Instance ID: spark:a83f2f65603a7da0 [OK]
Configuration Source: kubelet:docker://e5848a0a8e2c99444ca8095a9eebd17f8a02d5dcf516abbf8aebd5f65fec92e1
Total Runs: 1
Metric Samples: Last Run: 0, Total: 0
Events: Last Run: 0, Total: 0
Service Checks: Last Run: 1, Total: 1
Average Execution Time : 8ms
Last Execution Date : 2021-05-16 16:01:06.000000 UTC
Last Successful Execution Date : 2021-05-16 16:01:06.000000 UTC
2021-05-16 16:01:07 UTC | CORE | INFO | (pkg/collector/python/datadog_agent.go:122 in LogMessage) | spark:c6e4a79673c9e1fe | (spark.py:311) | Returning running apps {'spark-***************************863b5': ('Spark Pi', 'http://10.150.123.45:4040')}
=== Series ===
{
"series": [
{
"metric": "spark.executor.active_tasks",
"points": [
[
1621180867,
2
]
],
"tags": [
"app_name:Spark Pi",
"cluster_name:spark-k8s",
"docker_image:vitamingaugau/spark:spark-2.4.4-irsa",
"image_name:vitamingaugau/spark",
"image_tag:spark-2.4.4-irsa",
"kube_container_name:spark-kubernetes-driver",
"kube_namespace:spark-pi",
"kube_service:spark-pi-1621180711634-driver-svc",
"pod_name:spark-pi-driver",
"pod_phase:running",
"short_image:spark"
],
"host": "i-00200eb5d332e4263",
"type": "count",
"interval": 0,
"source_type_name": "System"
},
...
{
"metric": "spark.job.num_completed_tasks",
"points": [
[
1621180867,
13223
]
],
"tags": [
"app_name:Spark Pi",
"cluster_name:spark-k8s",
"docker_image:vitamingaugau/spark:spark-2.4.4-irsa",
"image_name:vitamingaugau/spark",
"image_tag:spark-2.4.4-irsa",
"job_id:0",
"kube_container_name:spark-kubernetes-driver",
"kube_namespace:spark-pi",
"kube_service:spark-pi-1621180711634-driver-svc",
"pod_name:spark-pi-driver",
"pod_phase:running",
"short_image:spark",
"stage_id:0",
"status:running"
],
"host": "i-00200eb5d332e4263",
"type": "count",
"interval": 0,
"source_type_name": "System"
},
...
{
"metric": "spark.executor.memory_used",
"points": [
[
1621180867,
1256
]
],
"tags": [
"app_name:Spark Pi",
"cluster_name:spark-k8s",
"docker_image:vitamingaugau/spark:spark-2.4.4-irsa",
"image_name:vitamingaugau/spark",
"image_tag:spark-2.4.4-irsa",
"kube_container_name:spark-kubernetes-driver",
"kube_namespace:spark-pi",
"kube_service:spark-pi-1621180711634-driver-svc",
"pod_name:spark-pi-driver",
"pod_phase:running",
"short_image:spark"
],
"host": "i-00200eb5d332e4263",
"type": "count",
"interval": 0,
"source_type_name": "System"
},
...
{
"metric": "spark.driver.memory_used",
"points": [
[
1621180867,
1256
]
],
"tags": [
"app_name:Spark Pi",
"cluster_name:spark-k8s",
"docker_image:vitamingaugau/spark:spark-2.4.4-irsa",
"image_name:vitamingaugau/spark",
"image_tag:spark-2.4.4-irsa",
"kube_container_name:spark-kubernetes-driver",
"kube_namespace:spark-pi",
"kube_service:spark-pi-1621180711634-driver-svc",
"pod_name:spark-pi-driver",
"pod_phase:running",
"short_image:spark"
],
"host": "i-00200eb5d332e4263",
"type": "count",
"interval": 0,
"source_type_name": "System"
},
...
=== Service Checks ===
[
{
"check": "spark.driver.can_connect",
"host_name": "i-00eb5d3e43",
"timestamp": 1621180867,
"status": 0,
"message": "Connection to Spark driver \"http://10.150.123.45:4040\" was successful",
"tags": [
"cluster_name:spark-k8s",
"docker_image:vitamingaugau/spark:spark-2.4.4-irsa",
"image_name:vitamingaugau/spark",
"image_tag:spark-2.4.4-irsa",
"kube_container_name:spark-kubernetes-driver",
"kube_namespace:spark-pi",
"kube_service:spark-pi-1621180711634-driver-svc",
"pod_name:spark-pi-driver",
"pod_phase:running",
"short_image:spark",
"url:http://10.150.123.45:4040"
]
},
{
"check": "spark.application_master.can_connect",
"host_name": "i-00eb5d3e43",
"timestamp": 1621180867,
"status": 0,
"message": "Connection to ApplicationMaster \"http://10.150.123.45:4040\" was successful",
"tags": [
"cluster_name:spark-k8s",
"docker_image:vitamingaugau/spark:spark-2.4.4-irsa",
"image_name:vitamingaugau/spark",
"image_tag:spark-2.4.4-irsa",
"kube_container_name:spark-kubernetes-driver",
"kube_namespace:spark-pi",
"kube_service:spark-pi-1621180711634-driver-svc",
"pod_name:spark-pi-driver",
"pod_phase:running",
"short_image:spark",
"url:http://10.150.123.45:4040"
]
}
]
=========
Collector
=========
Running Checks
==============
spark (1.15.0)
--------------
Instance ID: spark:c6e4a79673c9e1fe [OK]
Configuration Source: kubelet:docker://ea263fdb3ad10e8485c9c0ac92f4721652c75879eee1eafc858cffc569b6f8ee
Total Runs: 1
Metric Samples: Last Run: 50, Total: 50
Events: Last Run: 0, Total: 0
Service Checks: Last Run: 3, Total: 3
Average Execution Time : 71ms
Last Execution Date : 2021-05-16 16:01:07.000000 UTC
Last Successful Execution Date : 2021-05-16 16:01:07.000000 UTC
metadata:
version.major: 2
version.minor: 4
version.patch: 4
version.raw: 2.4.4
version.scheme: semver

Bingo, DataDog Agent is able to detect the Autodiscovery config (podAnnotations) in spark-pi-driver pod. Now let’s verify in DataDog Metrics Explorer to find those metrics:

Spark metrics in DataDog

Check out this page to see all supported metrics of Spark integration.

Conclusion

In this post, we have learned how to define podAnnotation for Spark on K8s job using spark-submit, and how to configure DataDog Autodiscovery to collect Spark application metrics. The metrics will be useful to debug Spark jobs, or create dashboards and monitors based on them.

Hope you enjoy reading my blog, and feel free to leave comments or questions.

Checkout my tutorials for Spark on K8s series:

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store