Photo credit: katkaZV

Performance Monitoring with Riemann and Clojure

The eReceipts and Grocery Data Services team at @WalmartLabs is responsible for building, maintaining and operating critical back-end data services on a Clojure-based technology stack. Walmart’s store and online customers depend upon our services to provide a smooth, reliable and quick shopping experience, and we have made uptime and performance bedrock considerations of every system we’ve built. Our team of engineers leans very heavily on Riemann to monitor the internal state of our systems and to help us find, diagnose and potentially escalate or intervene in degraded performance situations before they turn into customer-impacting outages.

What is Riemann?

Riemann is a low-latency event stream processing system, and as such its core domain is in terms of discrete events and event streams. This approach differs from some conventional monitoring systems in that servers and static infrastructure are emergent concerns, as opposed to fundamental concerns, which makes it ideally suited to a dynamic, highly distributed environment.

How does it work?

Riemann receives events in the form of protocol buffers via UDP or TCP. In our case, these events are structured log entries from our applications or ad-hoc events generated by other active monitoring systems. Once these events are received, Riemann passes them into the event “stream” which is part of configuration and is responsible for filtering, aggregating and handling them.

Riemann configuration is actually just a Clojure program, the majority of which consists of event streams. Event streams are configured using the Riemann streams function, which expects to be passed one-argument functions that will be invoked for every received event. Riemann documentation speaks of this in terms of a “flow” of events that emit from the top-level streams function.

For example, here is a very minimal Riemann configuration:

(defn my-event-handler
[event]
(println “Handling event:” event))
(streams my-event-handler)

Since configuration is just code and event streams are just functions, Riemann is quite literally infinitely flexible. It provides a rich library of composable functions which comprise a stream-processing DSL and enable you build elaborate monitoring scenarios without necessarily possessing a comprehensive knowledge of Clojure. Philosophically, this makes Riemann more like a framework or library than conventional monitoring software. Events are just arbitrary structs (with the exception of a few standard, idiomatic fields), so the semantics of an event are almost entirely contingent upon the implementation of streams and not the data represented in the event itself. This means that Riemann itself won’t do anything with the event; it’s completely up to the stream configuration to handle it.

As a more complex example, if you wanted to create a Riemann configuration that watches for error events and sends an email about them at most once every sixty seconds you could do something like this:

(def email (mailer {:host “mail.relay”
:from “riemann@example.com”}))
(streams
(where (and (= (:service event) “my-service”)
(= (:level event) “ERROR”))
(rollup 1 60
(email “ops@example.com”))))

In this example, where, rollup and mailer are all provided by the Riemann stream DSL. where is a Clojure macro that returns a function which will filter out events according to the provided logical predicates and then pass those events on down to the rollup function, which returns a function that will accumulate events and pass them down to the stream created by email at most once every sixty seconds. email is a function that returns a function which will format one or more events into an email notification and send it to the provided address.

Admittedly this is a lot of “functions returning functions that return functions”, but in the end we’ve only really had to think about two fundamental concepts: events and streams, or in Clojure terms, maps and functions. This kind of simplicity and flexibility is one of the most compelling things about Riemann, and it has allowed us to build out sophisticated monitoring within the constraints of our existing architecture.

Performance Metrics in Riemann

An important part of the initial activity in designing any business-critical system is thoroughly understanding performance and uptime requirements, which ultimately must be defined in terms that can be measured. Often these requirements take the form of objectives, such as “the 99th percentile of response times for a web service should be less than 100 milliseconds”, or “95% of all HTTP requests should be successful.” To measure our performance against these kinds of objectives, we’ve built a declarative performance threshold DSL within our Riemann configuration. It allows us to establish thresholds through configuration and to source metrics directly from events already flowing through Riemann.

In our DSL, we consider two different fundamental types of performance metrics: percentiles and histograms. Percentiles apply to numeric measured values (e.g. response times) and can be defined at any arbitrary point, whereas histograms apply to discrete values (e.g. HTTP response status codes) and take the form of percentage or counter thresholds. Both histograms and percentiles are defined across an interval, which is the number of seconds within the discrete time window in which events will be considered together.

As an example, given events that look something like this:

{:service "query-service response"
:metric 85
:status-code “200”}

We represent performance objective thresholds in a separate EDN configuration file:

{:percentiles {"query-service response" {0.99 {:<= 100 
:interval 600}}}
:histograms {"query-service response"
{:status-code {"^2[0–9]{2}$" {:>= 0.95
:interval 600
:type :percentage}
".*" {:> 0
:interval 300
:type :count}}}}}

In this snippet, we are tracking 3 different performance thresholds:

  1. The 99th percentile of response times should be less-than-or-equal-to 100 over an interval of 10 minutes. The value used for comparison will be associated with the :metric key on the event (which is a Riemann idiom for events associated with a number).
  2. 95% of all response events with a :status-code should have a status code of 2XX over an interval of 10 minutes.
  3. The count of all response events with a :status-code should be greater than zero over a period of 5 minutes.

Our Riemann configuration transforms the above threshold configuration map into a set of streams that filter relevant events, accumulate values and compare them with the expected threshold. When the state of a given threshold changes from acceptable to unacceptable, or back again, then the stream will pass the change event along to notification streams which send or clear alerts in our external alerting systems.

Our configuration generates these streams from EDN threshold configuration files, but a static (and admittedly naive) version of one of the above thresholds might look like this:

(percentiles 600    ;; interval
[0.99] ;; percentile
(smap (fn [event]
(assoc
event
:state (if (<= (:metric event) 100)
"ok"
"violation")))
(changed :state {:init "ok"}
(where (= (:state event) "ok")
notify-restored
(else
notify-violated)))))

In this example, we’re using four functions/macros provided by the Riemann DSL:

  1. percentiles — Aggregates events over 600 second intervals, and passes the event with the :metric value at the 99th percentile rank into the next stream, which in this case is smap.
  2. smap — This is a streaming map, which means it maps the function provided as the first argument over every event before passing it to the next stream. In this case the next stream is changed.
    The mapped function assigns a value to the event :state property, which is either "ok" or "violation", conditional on if the :metric value of the event is less than or equal to 100. Since this stream is beneath the percentile stream, it will only receive events that represent the 99th percentile metric value of any event processed by percentile.
  3. changed — This stream filters out every event except for those events where the specified property (in this case :state) has changed from the last event it saw. The {:init “ok"} argument is just an option map that initializes the state of the changed stream. This allows us to only send notifications when the performance metric threshold is crossed in one direction or the other, and prevents a notification from being sent for the first event if the state is "ok".
  4. where— This stream simply calls the appropriate notify-* stream based on the state, which in our contrived example are assumed to be functions defined somewhere else in the configuration.

Summary

In the end, we like Riemann because it stays out of the way and allows us to focus our energy on deciding what to track instead of how to track it. The biggest challenge then becomes identifying the metrics, thresholds and intervals that reflect stable and nominal operating values to ensure the purity, and therefore serviceability, of signals that result from threshold violations. Our team has defined thresholds for our systems such that they represent strong indicators for system health, and we’ve integrated them with our active alerting system to automatically manage incidents and notify on-call engineers when these values fall in and out of range. This has given us a framework with which to smoothly automate our error detection heuristics without requiring heavy investments in conventional monitoring integrations, and it has liberated us from the vicious cycle of checking dashboards and being notified of problems from upstream or external sources.