Observability Using Abstracted IO

Written By: James Novino

I recently published a post about how the Order Management System (OMS) started using abstractions to deal with IO. These abstractions enable us to wrap metrics and logging around all our IO interactions to have consistent monitoring/instrumentation. This post is intended to go into a little more detail about why and how we deal with metrics/monitoring on the Order Management System (OMS).

Philosophy

Monitoring tends to be a reasonably opinionated topic, especially amongst software engineers. On the OMS team we view monitoring as having two distinct purposes:

  • KPIs (Key Performance Indicator): These are business level metrics, that we use to monitor systems health. i.e., Orders Created, Shipments, Cart Checkout to Order History, etc.
  • Technical: These are metrics/dashboards that are intended to provide telemetry information that is valuable for engineers.

All our monitoring is built using one or more of these concepts:

  • Logs — Used for debugging, troubleshooting, errors, etc. We utilize Splunk heavily for this sort of capability. Note: Not going to talk much about these in this article.
  • Events — A raw measurement or instrumentation that has additional attributes. For example, workflow started, order created, service started, etc.
  • Metrics — An aggregation of events. For example a counter sums event occurrences, latency histogram buckets, completion event latencies, etc.
  • Tracing — A trace is the story of a transaction or workflow as it propagates through a (potentially distributed) system. Traces are constructed through a series of Events.

Note: Tracing is accomplished by propagating header/metadata between transactions, this implementation is not discussed in this post. But our distributed tracing uses the Open Tracing Spec which makes it compatible with external tooling to generate spans.

Motivation

With traditional static deployments where we only deploy to a handful of services to a single server, monitoring is relatively straightforward, determining the health of the system can be accomplished by instrumenting the underlying server/host. However, with the rise of container-based systems like Hashicorp Nomad and Kubernetes which tend to be highly elastic, the system becomes much more distributed, more dynamic, and more complex which makes instrumentation and monitoring complex and challenging.

Historically, the way we would monitor system health was by correlating logs through Splunk from multiple services and hosts. We would derive metrics from structured Splunk logs, but this had some issues:

  • Challenging to track and diagnosis issues through distributed systems. As troubleshooting can be challenging as logs need to be correlated through multiple services.
  • The required number of logs to derive all the required metrics was very large and continued to grow which put pressure on the entire Splunk cluster.
  • Logging to Splunk from some environments (web apps) is not always reliable, especially with high log volume.
  • Log concatenation/truncation, with high log volume which leads to incorrect monitoring/alerting.
  • Splunk retention is finite mainly due to cost implications. (Other metrics solutions provide some novel approaches to archiving data to cold storage)

In our ideal world, Splunk logs would only be used for errors and troubleshooting mainly for engineers. For everything else, we would utilize dedicated monitoring/alerting platforms like New Relic, Azure App Insights, Prometheus or some other metrics platform.

The OMS.Infrastructure library provides users abstractions to deal with IO, Service Boilerplate, Consumption some of which has been detailed in earlier posts:

Having our services handle external interactions through a common interface allowed us to inject metrics and monitoring capability quite easily. To provide consumers a robust metrics/monitoring solution users needed the ability to enable/configure metrics for multiple metrics sinks without having to instrument any of their external interactions. But, our library also needed to provide some additional interfaces to allow users to produce their own custom metrics. We determined that any generic monitoring and alerting design would need to support both structured events and metrics. Knowing that we wanted to support both metrics and events we needed to define what each would be used for:

We determined that the any generic alerting/monitoring design needed to meet the following requirements:

  • A single event/metric interface that supports all our current metrics solutions; this event/metric must be extensible enough to support any future metric solutions. The library needs to support converting custom events into a metric. Note: This is done by buffering custom events and then aggregating/averaging the value before pushing the metric.
  • The ability to configure multiple metrics sinks without having to write any metric code into their source code. Clients should not have to add any custom instrumentation around external interactions.

Based on these requirements we came up with the design of having two modules in the OMS.Infrastructure library. A single Metric module and an additional Statistics module. I’ll go into more detail about what each of these contains later.

Knowing that we needed to support multiple metrics stores we needed to design a Metric and CustomEvent that were generic enough to be adapted for future use cases and be extensible enough so that it would not require OMS.Infrastructure users to make modifications when event shapes needed to be changed. We came up with the following definitions :

type Metric = {
group : string
category : Category
label : string
value : int64
correlationIds : string list
tag : string
}
type CustomEvent = {
[<Newtonsoft.Json.JsonProperty("eventType")>]
collection : string
group : string
category : string
label : string
value : int64 //in milliseconds
correlationId : string
correlationId2 : string
correlationId3 : string
timestamp : int64 //unix timestamp
tag : string
service : string
//environmental tags
host : string
env : string
}

These two types were designed to be able to represent all of our current use cases for metrics/events. Note: For this post, I will use the terminology of metrics/events interchangeably as Metrics can be converted into Events. The conversion is done by the backing-store specific implementation, for instance with New Relic; we buffer events into a sequence that is aggregated into metrics before being pushed. Some examples of common events:

External Interaction Latency
collection : OMS | MSP | OMO
group : ExternalInteractions
category : Latency
label : Infrastructure Label (SQL, CosmoDB, Kafka, etc.)
value : latency (ms)
correlationId : ...
correlationId2 : ...
correlationId3 : ...
timestamp : UnixTimestamp
tag : Read/Write
service : Host Process
host : Host Name
env : Environment (Dev | QA | Prod)
Process Times for Decode, Handle Interpret 
collection : OMS | MSP | OMO
group : Microservice
category : Latency
label : microservice name
value : latency (ms)
correlationId : ...
correlationId2 : ...
correlationId3 : ...
timestamp : UnixTimestamp
tag : decode | handle | interpret | total
service : Host Process
host : Host Name
env : Environment (Dev | QA | Prod)
Service Restarts
collection : OMS | MSP | OMO
group : Microservice | API
category : Counter
label : Service Name
value : 1 // for each restart
correlationId : ...
correlationId2 : ...
correlationId3 : ...
timestamp : UnixTimestamp
tag : Empty
service : Host Process
host : Host Name
env : Environment (Dev | QA | Prod)

Note: MSP (Member Services Portal), OMO (Order Management Operations)

The examples above only represent a small sample of the different types of Metrics that we need to be able to have systems visibility and monitoring. In order to have a single metric module that provides all the required interfaces without having to leak any of the details of the underlying implementation, two writer interfaces were created:

type MetricWriter = Metric -> Async<unit>
type EventWriter = CustomEvent -> Async<unit>

All of the underlying infrastructure implementations for writing the metrics need to adhere to the above signature in order to be able to integrate with the common interfaces provided by the Metrics module.

Integration with Instrumentation Systems

The Metrics module supports multiple sink systems currently:

  1. New Relic
  2. Azure App Insights
  3. Prometheus
  4. Splunk
  5. SQL

Each of these systems was chosen to handle specific use cases:

  • New Relic — General Dashboarding, New Relic Insights (Since we are pushing custom metrics/events) is great for dashboarding as it’s fast and NRQL is powerful enough to provide all the required dashboards.
  • Splunk — Is best used for general logging, troubleshooting, debugging, etc. Splunk is powerful and not always best suited to overview dashboards.
  • Azure App Insights — This is powerful tooling similar to New Relic (APM) that allows for monitoring of the full stack. Azure App Insights has a cost associated with each instance so we needed to ensure that consumers were aware of this requirement before enabling it.
  • Prometheus — Prometheus is a new metrics platform that is intended for metrics only. The challenge with Prometheus is that metric cardinality plays an important role. As such, it’s not a silver bullet for all our monitoring needs.
  • SQL — Was integrated so that we could run some more powerful queries for some very specific use cases that dealt with time-variant data that were very challenging to deal with in Splunk or New Relic. For SQL, an interesting thing to note is that metrics are not kept forever. Our design allows us to specify a query which in one case adds the metrics rows and in the other case removes the rows. This means no additional cleanup is needed and anything else that remains indicates an issue.

Metrics Module

The metrics module contains several interfaces for consumers or helper functions to utilize:

  • recordMetric : writer:MetricWriter metric:Metric Async<unit>
  • recordEvent : writer:EventWriter customEvent:CustomEvent Async<unit>
  • recordMetricAll : metric:Metric Async<unit>
  • recordEventAll : event:CustomEvent Async<unit>
  • recordMetricsAll : metrics:Metric list Async<unit>
  • recordEventsAll : events:CustomEvent list Async<unit>

The interfaces above will push either a Metric or an CustomEvent into any/all configured metric sinks. All the consumer needs to do is create either the metric or CustomEvent and then the metrics interface handles calling the corresponding backing store implementation.

The client configures the writers through a series of helper functions:

  • initialize — Creates the writer and places it into the Concurrent Dictionary for use in the Metrics Module
  • Configuration.setConfigObject —Sets the Metrics configuration in the global Configuration store. Note: The configuration module is not discussed in this article but is essentially a large configuration store that provides the ability for users to configure the behavior of the OMS.Infrastructure library.

Clients would configure their metrics sinks by setting the OMS.Infrastructure configuration for metrics in their program startup:

let configureMetrics (additionalMetrics : MetricsConfiguration[]) = 
Configuration.setConfigObject Configuration.InfrastructureConfiguration.Metrics
additionalMetrics
[|
Option.ofBool splunk splunkMetricsEnabled;
Option.ofBool newRelic newRelicMetricsEnabled;
Option.ofBool generic genericMetricsEnabled;
|]
|> Array.choose id
|> configureMetrics

There are two specific configurations for Splunk and NewRelic, and then the third is the generic stream which supports all our other supporting infrastructure, i.e., SQL, Prometheus, etc. Note: The reason there are specific configurations for the Splunk & NewRelic is that these two stores were implemented initially, before the introduction of the generic interface; this is some technical debt that we carry but intend to fix soon. Once set, all the metrics configurations will go through the generic interface.

In the above example we are configuring three different metrics sinks, but for each of these sinks, we only want to emit specific metrics. We can do this by setting a filter field in the configuration type, i.e., splunkMetricsEnablednewRelicMetricsEnabled, genericMetricsEnabled. The filter field allows us to specify a predicate MetricsConfig.filter:CustomEvent → bool which controls which metrics/event are published. An example of this filter below:

let stream =
{
MetricsConfig.env = Hosting.environment
eventGroup = ""
key = None
system = "OMS"
filter = fun ce ->
match ce.label with
| "WorkflowStarted"
| "WorkflowCompleted"
| "WorkflowFailed"
| "StepEvaluated"
| "TaskDeferred" -> true
| _ -> false
} |> fun m -> MetricsConfiguration.Generic(genericMetricsDefinition, m)

The filter above is saying that only events/metrics with a label equal to “WorkflowStarted”, “WorkflowCompleted”, etc. should be pushed to the metric sink. If I wanted to enable all metrics to be pushed I could do something like this:

filter = fun _ -> true

By setting the configurations above in our service, this ensures that all our external interactions are appropriately instrumented. As we wrap metrics functionality around all our IO, i.e.:

let readWithSetRetries retryCount (sd : StreamDefinition) =
read sd
|> Async.retryBackoff retryCount (transientExceptions sd) Backoff.DefaultExponentialBoundedRandomized
|> Async.Catch
|> Helpers.logException log sd.identifier "Read"
|> Metrics.Latency.external sd.writeLabel "Read"
let readWithRetries (sd : StreamDefinition) =
readWithSetRetries 10 sd

The Metrics.Latency.external wrapper is simply a helper function used for logging external interaction latencies:

In addition to the helper functions shown above, there is a similar Counter module that is used for instrumenting count based metrics. The combination of configuration by clients and abstracted metrics interfaces allow us to have a robust mechanism of pushing metrics through OMS.Infrastructure.

Statistics Module

The statistics module is similar to the Inputs/Output modules which were discussed in Abstracting IO using F#. The Statistics module is an interface that utilizes underlying infrastructure implementation to support publishing aggregated metrics.

The implementation of the statistics module is very similar to the Inputs/Outputs module talked about in Abstracting IO using F# in that it matches on the Stream Definition’s schema and then pulls metrics from the underlying backing store implementations and then pushes them to all configured metrics sinks. The statistics module contains a single interface publish: sd:StreamDefinition → Async<unit> this interface allows consumers to publish statistics/metrics on a regular cadence for selected infrastructure.

In the example above when passing a service bus Stream Definition, the statistics module publishes distinct metrics for counts of Active Messages, Scheduled Messages, and DeadLettered Messages.

Note: The Statistics module is perhaps not the best name as it’s not pulling any statistics, it’s mainly an abstracted interface to push Metrics/CustomEvents from backing stores.

The typical use case is for clients to create a service that is responsible for fetching/publishing metrics for selected pieces of infrastructure. This can be accomplished by doing something like this:

let rec start () =
AsyncSeq.intervalMs (intervalInMs)
|> AsyncSeq.iterAsync(fun _ -> async {
return!
sds
|> Array.map(fun sd -> async {
do! Statistics.publish sd })
|> Async.ParallelThrottled 16
|> Async.Ignore
})

We primarily use this to augment the infrastructure metrics that are published to run more advanced analytics. We use this service to also aggregate and publish some custom system health metrics. These health metrics are important, as they allow us to build some very advanced alerting and monitoring. Our service is affectionately named TheOneThatChecksTheOtherServices and publishes metrics for infrastructure as well as some additional system health metrics.

Conclusion

The ability to abstract over individual metric sinks has provided us with a lot of value in recent history as we have been evaluating/migrating between different metrics solutions. This abstraction allows us to test the capabilities of each metric solution with little to no changes to the actual services as the metrics are entirely controlled through configuration/stream definitions. The abstractions/interfaces discussed above simplify our client code since each client doesn’t need to worry about handling metrics for interactions with things like external infrastructure. However, what this post doesn’t talk about is how we aggregate all the information through multiple metrics solutions to provide a comprehensive monitoring/alerting system, that is the topic of a future post about how we deal with monitoring/alerting at Jet.

If you like the challenges of building distributed systems and are interested in solving complex problems, check out our job openings.


Acknowledgments

Thanks to Krishna Vangapandu, Leo Gorodinski and all of the other Jet engineers for the comments, edits, and suggestions.