Apache Druid: Setup, Monitoring and Auto Scaling on Kubernetes

A detailed look at how to operate Apache Druid on Kubernetes

Arunkumar Eli
6 min readJul 26, 2020

Prerequisite

This article assumes the existence of a Kubernetes cluster. Please follow the links below to set up your Kubernetes cluster.

For this post, I will be using a cluster running on GKE

Setup Apache Druid

Druid is a column-oriented, open-source, distributed data store commonly used in business intelligence/OLAP applications to analyze high volumes of real-time and historical data. It can be seen as a mix between a search engine like Apache Solr and Elasticsearch, a time-series database (TSDB) like Prometheus, OpenTSDB, and an OLAP Database.

Druid components

A Druid installation is made of multiple services designed to run on a distributed, cloud-friendly architecture.

  • Middlemanager: Responsible for ingesting data, reading from external data sources into segments. This is the service we will focus on in this article trying to automatically scale it with Kubernetes.
  • Historical: Handle the storage and answer the queries on data already ingested (ie: historical data).
  • Broker: Receive both ingestion and querying requests and forward them to the right service (Historical or Middlemanager).
  • Overlord: Deal with the assignation of ingestion tasks to the Middlemanager nodes.
  • Coordinator: Deal with the balancing of the segments across the cluster’s Historical nodes.

Why Kubernetes

Setting up Druid on Kubernetes has several advantages. Some of them are

  • Auto-scaling of components independently
  • No single point of failure
  • Rolling upgrade

Install Apache Druid using Helm chart

Install Druid on Kubernetes using Helm chart from https://github.com/helm/charts/tree/master/incubator/druid. This chart is being actively developed by open source community members, but as of this writing, this is the easiest way to get Druid running on Kubernetes.

Many parameters in the values file like replica count can be customized to the needs of the clusters. You can copy the values file, made changes, and use your values file for installing.

For default installation from the repo, use the following steps

$ helm repo add incubator https://kubernetes-charts-incubator.storage.googleapis.com
$ helm install druid incubator/druid

Check deployment status

$ kubectl get all

The output should look like this

Check the Router URL

On the Kubernetes cluster, port forward the Druid router service using this command and open http://127.0.0.1:8080 in the browser

$ kubectl port-forward svc/druid-router 8080:8888

Ingest data to Druid

Use the Wikipedia example data to load to Druid.

Setup Monitoring using Prometheus and Grafana

Install Prometheus and Grafana

Install Prometheus operator

$ helm install demo stable/prometheus-operator

Grafana

Port forward the Grafana service and add Prometheus Data source in Grafana

$ kubectl port-forward svc/demo-grafana 8081:80

Prometheus must have been already added as a data source.

Install Druid exporter

You can see the metrics being collected by the exporter at the exporter endpoint and in the Grafana dashboard.

Your Druid Grafana dashboard should look something like this…

Setup Apache Kafka

In addition to using native files, data can also be loaded from Apache Kafka, AWS Kinesis, Apache Hadoop, and other sources. Let us look at the most interesting and practical use case which is loading from Kafka.

Install Apache Kafka using Helm chart

I have used both https://github.com/helm/charts/tree/master/incubator/kafka that is maintained by the Helm community and https://github.com/confluentinc/cp-helm-charts in production and for this post, I will install using the chart from confluent repo.

On the Kubernetes cluster, Install Kafka

$ helm repo add confluentinc https://confluentinc.github.io/cp-helm-charts/
$ helm repo update
$ helm install demo-kafka confluentinc/cp-helm-charts --version 0.5.0

Verify the installation

Create a Kafka client pod

cat <<EOF | kubectl apply -f -
apiVersion: v1
kind: Pod
metadata:
name: kafka-client
namespace: default
spec:
containers:
- name: kafka-client
image: confluentinc/cp-kafka:5.5.0
command:
- sh
- -c
- "exec tail -f /dev/null"
EOF

Create a Kafka topic called wikipedia-topic, to which we’ll send data.

$ kubectl exec -it kafka-client -- /bin/bash
$ kafka-topics --zookeeper demo-kafka-cp-zookeeper-headless:2181 --topic wikipedia-topic --create --partitions 1 --replication-factor 1 --if-not-exists

Verify the topic was created successfully

$ kubectl -n default exec kafka-client -- kafka-topics --zookeeper demo-kafka-cp-zookeeper:2181 --list

Kafka Grafana Dashboard

Add Kafka monitoring dashboard in Grafana.

Ingest data to Kafka

Submit a JSON supervisor with the following spec. In the below JSON, bootstrap.server and topic are important fields

{
"type": "kafka",
"dataSchema": {
"dataSource": "wikipedia",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
}
}
},
"metricsSpec" : [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"rollup": false
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": false
},
"ioConfig": {
"topic": "wikipedia-topic",
"replicas": 2,
"taskDuration": "PT10M",
"completionTimeout": "PT20M",
"consumerProperties": {
"bootstrap.servers": "demo-kafka-cp-kafka-headless:9092"
}
}
}

Ingest Data

Copy wikiticker-2015-09-12-sampled.json.gzfrom druid broker pod to Kafka client pod. You can use kubectl cp and the syntax is `kubectl cp pod-1:my-file pod-2:my-file

/opt/apache-druid-0.19.0/quickstart/tutorial $ ls
compaction-day-granularity.json rollup-index.json updates-init-index.json
compaction-init-index.json transform-data.json updates-overwrite-index.json
compaction-keep-granularity.json transform-index.json wikipedia-index-hadoop.json
deletion-disable-segments.json updates-append-index.json wikipedia-index.json
deletion-index.json updates-append-index2.json wikipedia-kafka-supervisor.json
deletion-kill.json updates-data.json wikipedia-top-pages-sql.json
hadoop updates-data2.json wikipedia-top-pages.json
retention-index.json updates-data3.json wikiticker-2015-09-12-sampled.json.gz
rollup-data.json updates-data4.json

Exec to Kafka client pod and run this command to ingest data

$ kafka-console-producer --broker-list demo-kafka-cp-kafka-headless:9092 --topic wikipedia-topic < {PATH_TO_DRUID}/quickstart/tutorial/wikiticker-2015-09-12-sampled.json

Query data

You can look up this document and identify how to query data now — https://druid.apache.org/docs//0.15.1-incubating/tutorials/tutorial-query.html

Auto Scaling of MiddleManager

In Part 2, we will see how to make use of Kubernetes HPA (https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/) feature to autoscale Druid component based on ingestion load.

--

--