How Hulu Uses InfluxDB and Kafka to Scale to Over 1 Million Metrics a Second
By Samir Jafferali, Senior Systems Engineer
As Hulu continues to grow, time series data has become a critical part of how we monitor various pieces of information over a period of time. This can be as simple as machine performance metrics or data about our applications themselves. Due to the amount of data we have, creating an architecture that can handle our growth in a redundant, supportable and scalable way is crucial.
Why is Time Series Data Important?
Time series data allows us to evaluate trends in order to identify issues and act against them.
The below graph was recently used to identify a memory leak affecting a version of an application running out of a specific datacenter.
Initially, each development team had their own solution for time series data. This was inconsistent and wasteful of resources since most teams had similar needs. To solve for this, we built our original time series data pipeline that provided a time series database for teams across our engineering organization.
This pipeline was based on Graphite and stored data to OpenTSDB. At peak, the throughput for this Graphite cluster was 1.4 million metrics per second. While maintaining this pipeline, we experienced a number of issues that only became more prevalent as we continued to scale.
Many of these issues originated from providing the pipeline as a shared service for all development teams at Hulu. Others were inherent issues with scalability due to the tremendous throughput required and cardinality we needed to support.
Challenges Encountered with Initial Pipeline
Quite often and unintentionally, users would send unique data in their metrics namespaces, such as a timestamp or another unique identifier. An example of some of these metrics include:
stats.application.dc1.1557471341.count 1 1557471341
stats.application.dc1.1557471345.count 1 1557471345
stats.application.dc1.1557471346.count 1 1557471346
This resulted in an explosion of cardinality in our namespaces, which affected ingestion speed and caused stability issues for the entire pipeline. Though it was possible to block these, it was very difficult to do so since it required adding a conditional that matched the problematic metric without affecting legitimate metrics.
Due to our throughput, we were limited in the complexity and number of rules we could create, as each rule would have to be evaluated for each metric that was received. This would also need to be done in a rolling fashion across the many nodes that were involved in processing our metric data. Because of this, adding conditionals to block these problematic metrics was rarely ever done in favor of getting the application sending the metrics to stop. Unfortunately, the amount of time this took caused data loss in most cases.
When retrieving data, many users would inadvertently run long or resource intensive queries, which would eventually cause the backend serving it to timeout and go offline. There were no limits available to stop this behavior, and this also affected the stability of Graphite as a whole.
There was also no accountability for the metrics that were sent. Since the format was not standardized, finding out which service was sending a specific metric required much guesswork and was very difficult when we actually needed to do this.
Lastly, due to our setup, all metrics were either sent to one of our two datacenters. In the event of a failure, the metrics in the entire datacenter would be inaccessible. Additionally, since we had a single unified interface for retrieving these metrics, we prioritized one datacenter over the other. This was problematic because if a user sent a metric to the first priority datacenter, but then decided to use the other, their new metrics would not accessible since the namespace already existed in the first one, leading to much confusion.
InfluxDB — Initial Architecture
To address these issues, we decided to re-architect our stats pipeline from scratch based on InfluxDB. For our first attempt, we created two clusters, one in each of our two primary datacenters (DC1 and DC2). Both of these clusters would contain the same data.
A metric relay cluster was built on top of this. All metrics would be sent to a relay cluster. These clusters’ sole purpose was to push all metrics received to both of our InfluxDB clusters. This allows all metrics to be retrievable from any datacenter, completely eliminating the metric availability issues that we previously experienced with our Graphite architecture. We created this layer in each datacenter.
On this metric relay layer, we also implemented a required tag, which is an identifier unique to every application at Hulu. This allowed us to easily trace back where each metric originated from. Any metrics that did not have this required tag were dropped on this layer completely.
All machines at Hulu run the Telegraf daemon (https://github.com/influxdata/telegraf). We have this daemon configured to report all machine stats and also listen for metrics on localhost. We encourage all developers to send metrics to localhost since we have Telegraf configured to automatically add standard tags for all metrics that it received. These tags include the originating datacenter, machine name, and machine ID.
This setup worked very well. We tested throughput past two million metrics per second without any issues. However, we quickly ran into an issue that caused us to re-evaluate the current setup.
Specifically one of our clusters was unavailable for a period of time, causing all metrics to only get pushed to a single (online) datacenter, then discarded before being copied to the other cluster. Once the problematic cluster was available again, there was a discrepancy in the data requiring manual effort to resynchronize the clusters. We realized we needed a way to handle these types of issues more gracefully and with less manual intervention.
InfluxDB — Altered Architecture
We created two new layers to the design that consisted of a Kafka queue in each datacenter, as well as an InfluxDB writer layer to solve for this issue.
Metrics are still sent to the original relay cluster, but from there they are no longer routed directly to our InfluxDB clusters. Instead, they are sent to a local Kafka queue in the datacenter where they are received.
The InfluxDB writer layer was created in each datacenter where an InfluxDB cluster resides. This layer’s sole purpose is to connect to every Kafka queue in all of our datacenters, and write them to their local InfluxDB cluster. In the event of a failure of one of the InfluxDB clusters, the “writers” in that datacenter would stop writing to that cluster, but the other datacenter would continue ingesting their metrics. Once the problematic cluster came back online, the “writers” in that datacenter would pick up where they left off, writing to their own cluster. Once they are caught up, both clusters would be in a consistent state again.
This design also allows us to completely disable huge portions of the infrastructure (even entire datacenters), and route those to another datacenter with absolutely no end user impact.
This design solved many of our issues, however, there were two lingering issues that it did not solve for.
The issues we experienced with users running long or problematic queries were still present. Users would sometimes make a very long running query (generally by accident) which would cause performance degradation for other users. Due to this, we created a new micro service, Influx Imposer, for this purpose.
This application logs into both of our InfluxDB clusters and checks the running queries every minute. If it exceeds certain thresholds (such as 60 seconds) or is a dangerous/resource intensive query, it is killed. We also implemented logging of the killed queries for debugging purposes and have seen much improved stability after implementation.
Blocking/Filtering “Bad” Metrics
Users could potentially still send metrics with timestamps in the namespace, or some unique data that would cause a very large spike in the cardinality of our dataset. In order to solve this specific issue, we created another micro service, called Barricade.
Barricade is a database-backed API, that contains a dynamic blacklist. Anytime there is a need to blacklist a specific metric, by tag, measurement, or virtually any other piece of information, it is added to this database. Once there, the change is propagated to all of the local configurations on the InfluxDB writer machines. These machines continuously poll Barricade for their updated blacklist rules. If a change is detected, the “writer” regenerates its local backlist configuration. The reason this is done at the InfluxDB writer layer is because any changes on this layer do not cause any interruptions in metric ingestion, thus adding a new blacklist entry is non-impactful.
Time series data is and will continue to be a crucial part of Hulu’s ability to evaluate trends and react to them. We were able to address all of the issues in our previous pipeline and are now transitioning all users off our legacy platform. As of writing this, our new stats pipeline processes over a million metrics a second and is growing by the day.
If you’re interested in working on projects like these and powering play at Hulu, see our current job openings here.