Apache Druid: Setup, Monitoring and Auto Scaling on Kubernetes
A detailed look at how to operate Apache Druid on Kubernetes
Prerequisite
This article assumes the existence of a Kubernetes cluster. Please follow the links below to set up your Kubernetes cluster.
- Enable Kubernetes on Docker-Desktop
- Install Minikube for local setup
- Setup a Kubernetes Cluster using Amazon Elastic Kubernetes Service (Amazon EKS)
- Setup a Kubernetes Cluster using Google Kubernetes Engine (GKE)
- Setup a Kubernetes Cluster using Azure Kubernetes Service (AKS)
For this post, I will be using a cluster running on GKE
Setup Apache Druid
Druid is a column-oriented, open-source, distributed data store commonly used in business intelligence/OLAP applications to analyze high volumes of real-time and historical data. It can be seen as a mix between a search engine like Apache Solr and Elasticsearch, a time-series database (TSDB) like Prometheus, OpenTSDB, and an OLAP Database.
Druid components
A Druid installation is made of multiple services designed to run on a distributed, cloud-friendly architecture.
- Middlemanager: Responsible for ingesting data, reading from external data sources into segments. This is the service we will focus on in this article trying to automatically scale it with Kubernetes.
- Historical: Handle the storage and answer the queries on data already ingested (ie: historical data).
- Broker: Receive both ingestion and querying requests and forward them to the right service (Historical or Middlemanager).
- Overlord: Deal with the assignation of ingestion tasks to the Middlemanager nodes.
- Coordinator: Deal with the balancing of the segments across the cluster’s Historical nodes.
Why Kubernetes
Setting up Druid on Kubernetes has several advantages. Some of them are
- Auto-scaling of components independently
- No single point of failure
- Rolling upgrade
Install Apache Druid using Helm chart
Install Druid on Kubernetes using Helm chart from https://github.com/helm/charts/tree/master/incubator/druid. This chart is being actively developed by open source community members, but as of this writing, this is the easiest way to get Druid running on Kubernetes.
Many parameters in the values file like replica count can be customized to the needs of the clusters. You can copy the values file, made changes, and use your values file for installing.
For default installation from the repo, use the following steps
$ helm repo add incubator https://kubernetes-charts-incubator.storage.googleapis.com
$ helm install druid incubator/druid
Check deployment status
$ kubectl get all
The output should look like this
Check the Router URL
On the Kubernetes cluster, port forward the Druid router service using this command and open http://127.0.0.1:8080 in the browser
$ kubectl port-forward svc/druid-router 8080:8888
Ingest data to Druid
Use the Wikipedia example data to load to Druid.
Setup Monitoring using Prometheus and Grafana
Install Prometheus and Grafana
Install Prometheus operator
$ helm install demo stable/prometheus-operator
Grafana
Port forward the Grafana service and add Prometheus Data source in Grafana
$ kubectl port-forward svc/demo-grafana 8081:80
Prometheus must have been already added as a data source.
Install Druid exporter
- Install Kubernetes deployment and service from https://github.com/opstree/druid-exporter
- In Grafana, load the dashboard https://grafana.com/grafana/dashboards/12155
You can see the metrics being collected by the exporter at the exporter endpoint and in the Grafana dashboard.
Your Druid Grafana dashboard should look something like this…
Setup Apache Kafka
In addition to using native files, data can also be loaded from Apache Kafka, AWS Kinesis, Apache Hadoop, and other sources. Let us look at the most interesting and practical use case which is loading from Kafka.
Install Apache Kafka using Helm chart
I have used both https://github.com/helm/charts/tree/master/incubator/kafka that is maintained by the Helm community and https://github.com/confluentinc/cp-helm-charts in production and for this post, I will install using the chart from confluent repo.
On the Kubernetes cluster, Install Kafka
$ helm repo add confluentinc https://confluentinc.github.io/cp-helm-charts/
$ helm repo update
$ helm install demo-kafka confluentinc/cp-helm-charts --version 0.5.0
Verify the installation
Create a Kafka client pod
cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
name: kafka-client
namespace: default
spec:
containers:
- name: kafka-client
image: confluentinc/cp-kafka:5.5.0
command:
- sh
- -c
- "exec tail -f /dev/null"
EOF
Create a Kafka topic called wikipedia-topic, to which we’ll send data.
$ kubectl exec -it kafka-client -- /bin/bash
$ kafka-topics --zookeeper demo-kafka-cp-zookeeper-headless:2181 --topic wikipedia-topic --create --partitions 1 --replication-factor 1 --if-not-exists
Verify the topic was created successfully
$ kubectl -n default exec kafka-client -- kafka-topics --zookeeper demo-kafka-cp-zookeeper:2181 --list
Kafka Grafana Dashboard
Add Kafka monitoring dashboard in Grafana.
Ingest data to Kafka
Submit a JSON supervisor with the following spec. In the below JSON, bootstrap.server
and topic
are important fields
{
"type": "kafka",
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
}
}
},
"metricsSpec" : [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"rollup": false
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": false
},
"ioConfig": {
"topic": "wikipedia-topic",
"replicas": 2,
"taskDuration": "PT10M",
"completionTimeout": "PT20M",
"consumerProperties": {
"bootstrap.servers": "demo-kafka-cp-kafka-headless:9092"
}
}
}
Ingest Data
Copy wikiticker-2015-09-12-sampled.json.gz
from druid broker pod to Kafka client pod. You can use kubectl cp
and the syntax is `kubectl cp pod-1:my-file pod-2:my-file
/opt/apache-druid-0.19.0/quickstart/tutorial $ ls
compaction-day-granularity.json rollup-index.json updates-init-index.json
compaction-init-index.json transform-data.json updates-overwrite-index.json
compaction-keep-granularity.json transform-index.json wikipedia-index-hadoop.json
deletion-disable-segments.json updates-append-index.json wikipedia-index.json
deletion-index.json updates-append-index2.json wikipedia-kafka-supervisor.json
deletion-kill.json updates-data.json wikipedia-top-pages-sql.json
hadoop updates-data2.json wikipedia-top-pages.json
retention-index.json updates-data3.json wikiticker-2015-09-12-sampled.json.gz
rollup-data.json updates-data4.json
Exec to Kafka client pod and run this command to ingest data
$ kafka-console-producer --broker-list demo-kafka-cp-kafka-headless:9092 --topic wikipedia-topic < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json
Query data
You can look up this document and identify how to query data now — https://druid.apache.org/docs//0.15.1-incubating/tutorials/tutorial-query.html
Auto Scaling of MiddleManager
In Part 2, we will see how to make use of Kubernetes HPA (https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/) feature to autoscale Druid component based on ingestion load.