Custom metrics in Dataflow pipelines with Prometheus and StatsD

Sushil Kumar
5 min readMar 6, 2020

--

Preface

At my current firm we rely heavily on Dataflow for running our data ingestion pipelines. We ingest millions of click stream events from PubSub and ship them to our warehouse in BigQuery. Our click stream events are shipped in JSON format and we parse them before ingesting into BigQuery.

We don’t always receive correct JSON payloads and we reject such messages during ingestion. We wanted to monitor the number of rejected messages and create alerting on them. We were already using Dataflow Monitoring interface which gives some vital pipeline metrics such as Watermark and System lag.

The obvious next step was to use something that Dataflow natively supported and could fit right into our monitoring and alerting infrastructure (we use Stackdriver, Prometheus and Grafana). We tried to implement our reject counters using Apache Beam metrics .

Soon we realized the shortcomings of Beam’s counter in that they are not namespaced and we needed to namespace our rejection counters because we wanted to attribute all rejections to rejection reasons.

We were missing Prometheus style labels in our metrics. We set out to find if there was any way to implement Prometheus in a fully managed service like Dataflow. That’s when we stumbled upon a post from Qubit on Google’s blog. They suggested to use a push based model using StatsD exporter for pushing our custom metrics to prometheus, and so we did.

In this post I’m going to walk you through the steps to implement the solution given in the blog. So lets jump right to it.

Table of Content

  1. Setting up StatsD Exporter and Prometheus.
  2. Getting Dataflow to push events to StatsD.
  3. Tying it all together.

1. Setting up StatsD Exporter and Prometheus

We’ll start by starting up a VM where we’ll run StatsD Exporter and Prometheus instance. In production its better to separate these machines and re-use the Prometheus if you already have one. We’ll be using docker to spin up these services quickly.

StatsD Exporter supports receiving StatsD format metrics directly hence we don’t need to run a separate StatsD server. If you have a StatsD server already running as part of your infrastructure you can configure a repeater backend to push metrics from StatsD server to StatsD exporter.

We’ll first create a custom docker network so that all the containers can talk to each other via hostnames.

docker network create -d bridge metrics-bridge

StatsD Exporter

We’ll start the StatsD Exporter with Prometheus mapping so that exporter can convert StatsD format to Prometheus format. Create statsd_mapping.yml as below.

Lets start the container.

docker run -d --name prom-statsd-exporter \
--network metrics-bridge \
-p 9102:9102 \
-p 9125:9125 \
-p 9125:9125/udp \
-v $PWD/statsd_mapping.yml:/tmp/statsd_mapping.yml \
prom/statsd-exporter --statsd.mapping-config=/tmp/statsd_mapping.yml

Check the logs to verify if the service started correctly.

level=info ts=2020-03-01T10:06:53.917Z caller=main.go:172 msg="Accepting StatsD Traffic" udp=:9125 tcp=:9125 unixgram=level=info ts=2020-03-01T10:06:53.917Z caller=main.go:173 msg="Accepting Prometheus Requests" addr=:9102

You’ll see two separate endpoints one where exporter is accepting traffic from StatsD and the other accepting from Prometheus.

Prometheus

The final bit in our metrics stack is Prometheus. Create the prometheus.yml file.

Start the Prometheus container.

docker run -d --name prometheus \
--network metrics-bridge \
-p 9090:9090 \
-v $PWD/prometheus.yml:/tmp/prometheus.yml \
prom/prometheus --config.file=/tmp/prometheus.yml --web.listen-address '0.0.0.0:9090'

Check logs to see if Prometheus started listening.

level=info ts=2020-03-01T11:01:52.078Z caller=main.go:630 msg="Server is ready to receive web requests."

If you have external access enabled for your VM you can access Prometheus web UI using VM’s external IP or whatever mechanism you use to access services externally.

http://<VM-EXTERNAL-IP>:9090/graph

Now let us test if our metric rig is working fine. We’ll push dummy metrics to StatsD which should get exported to Prometheus and then we’ll view them on Prometheus Web UI.

echo -n 'statsd_dataflow.rejected.step_1.reason.missing_key:1|c' | nc -u -q0 localhost 9125
echo -n 'statsd_dataflow.rejected.step_1.reason.missing_key:1|c' | nc -u -q0 localhost 9125
echo -n 'statsd_dataflow.rejected.step_1.reason.missing_key:1|c' | nc -u -q0 localhost 9125
echo -n 'statsd_dataflow.rejected.step_1.reason.missing_key:1|c' | nc -u -q0 localhost 9125

The above commands are pushing a counter rejected.step_1.reason.missing_key and increasing its value by 1 on each execution. The final value of the counter should be 4. You can verify it from Prometheus Web UI.

You can also check the value using command line.

curl localhost:9090/metrics

This should show you our new counter.

Now that our metrics collection stack is working fine lets implement the second part. Let us see how to configure Dataflow to push metrics to StatsD.

2. Getting Dataflow to push events to StatsD

We’ll use Datadog’s StatsD Java client to push metrics. You can find complete pipeline setup and code on my Github repo. You can clone the repository and run it using Maven.

mvn compile exec:java -Dexec.mainClass=App -Dexec.args="--project=<YOUR-PROJECT-ID> --runner=DataflowRunner --statsDHost=<INTERNAL-IP-OF-YOUR-STATSD-HOST> --statsDPort=9125 --outputPath=gs://YOUR-BUCKET/dataflow-statsd-integration/valid_json"

If you face any errors while running the Dataflow job, you can refer to my earlier blog to see if everything is correctly configured on your machine.

Let’s walk through the main configuration steps below.

Initializing the StatsD client in DoFn’s setup method.

@Setup
public void startup(){
this.Statsd = new NonBlockingStatsDClient("statsd_dataflow", statsDHost, statsDPort);
}

Note the prefix parameter in the NonBlockingStatsDClient constructor. The prefix statsd_dataflow will be appended to all the metrics pushed by this client. The final name of the metric will become statsd_dataflow.<YOUR-METRIC-NAME> . That’s why in our statsd_mapping.yml file we have added statsd_dataflow in match section.

Increment the counter in processElement.

this.Statsd.incrementCounter("rejected.mandatory_check.reason.missing_mandatory");
.
.
.
this.Statsd.incrementCounter("rejected.valid_check.reason.extra_keys");

3. Tying it all together.

In the pipeline there are 3 JSON messages out of which 2 are invalid. Once you run the pipeline you should see the metrics in Prometheus.

According to our statsd_mapping.yml configuration the StatsD counter rejected.madatory_check.reason.missing_mandatory and rejected.valid_check.reason.extra_keys will get converted to following

rejected_events{host="statsd-gcp",instance="prom-statsd-exporter:9102",job="statsd_metrics",reason="extra_keys",step="valid_check"}rejected{host="statsd-gcp",instance="prom-statsd-exporter:9102",job="statsd_metrics",reason="missing_mandatory",step="mandatory_check"}

You can verify this using Prometheus Web UI or /metrics endpoint.

Rejected events metrics

Once you have your custom metrics in Prometheus you can use Grafana to create charts and alerts on them.

And there you have it. A working implementation of custom Prometheus metrics in a Dataflow pipeline.

If you find any issues in the code or have any questions, please feel free to drop a comment below.

Till then, Happy Coding!

--

--

Sushil Kumar

A polyglot developer with a knack for Distributed systems, Cloud and automation.