Event Stream Analytics at Walmart with Druid

Walmart routinely deals with some of the most complex engineering problems in the world and employs the latest technology to ensure more than 11,000 physical stores and numerous online sites run at peak efficiency. At Walmart Labs, our engineering teams are often called upon to evaluate, benchmark, and make decisions on the latest tools to use. One of the more interesting challenges we faced recently was to provide high performance analytics for our streaming data.

Streams Everywhere

Many data sets at WalmartLabs are generated by our digital business, and are naturally modeled as streams of events. These event streams can range from server logs, application metrics, to product purchases. Our goal is to make it easy for the right people across our organization to be able to access this data, analyze it, and make decisions in as short of a time as possible.

Anyone that has worked on and built data stacks targeted for high volume, low latency workloads knows that one technology is often not sufficient for a complete solution. There are numerous challenges associated with event streams at scale across data delivery, data processing, and querying. An entire data stack is often required, but thankfully, there are great technologies in the open source community to help solve these problems.

We leverage Apache Kafka to deliver events from where they are created to further downstream systems where they can be processed or queried. Kafka has great properties that allows us to decouple event producers from event consumers and ensure our events are always delivered.

The next stage of our data pipeline enriches raw data, and we accomplish this using streaming frameworks such as Apache Storm, in conjunction with various in-house tools. Apache Storm is a stream processing system designed to transform and enrich data. Together with Trident, Storm can provide exactly once loads to our downstream analytics engine. In addition, given that real world data can be incredibly noisy, we developed a custom log scrapper that filters out any information that isn’t going to be useful for our analytics system.

Low Latency Queries

Our first attempt to provide low latency analytics was to leverage the Hadoop ecosystem, namely Hive, and then Presto. The problem we faced with both of these SQL-on-Hadoop solutions was that queries would sometimes take hours to complete, which significantly impacted our ability to make rapid decisions. Although our data was arriving in real-time, our queries quickly became a bottleneck in our decision-making cycle as our data volumes grew. We quickly realized that the workflow we were aiming to optimize was one where we could look at our event streams (both real-time and historical events) and slice and dice the data to look at specific subsections, determine trends, find root causes, and take actions accordingly. We needed an OLAP engine.

After some searching, we found the Druid open source project. Druid is an OLAP engine that is highly optimized for low latency data ingestion (streaming ingestion), as well as extremely fast aggregations. It integrates natively with Kafka and Storm, so it was relatively easy for us to get started.

Druid’s architecture is unique; it is a search engine and a column database merged into one. Druid fully indexes all data, similar to a search engine, but instead of indexing data in search indexes, Druid stores data in immutable columns, similar to columnar databases. The column storage format enables fast aggregations, but additionally, inverted indexes are created for string values (like in a search engine). These inverted indexes allow the engine to quickly prune out unnecessary data for a query so that the engine can scan exactly what it needs to complete the query.

Druid also does some neat optimizations at ingestion time. For example, Druid can pre-aggregate records as they are being ingested.

Let’s say we had 2 events:

{“timestamp”:”2017–01–01:12:10:01.005Z”, “attribute”:”foo”, “price”:3}

{“timestamp”:”2017–01–01:12:10:01.007Z”, “attribute”:”foo”, “price”:4}

We don’t care about the precise milliseconds in time when the 2 events occurred, but we do care about the aggregate price of all events over some span of time. Because the two events share the same “attribute” value “foo”, we can combine these two records together if we truncate the timestamp of the events (in this example, to the second).

The combined event is then:

{“timestamp”:”2017–01–01:12:10:01Z”, “attribute”:”foo”, “price”:7}

Druid provides this rollup capability out of the box and in practice, enables us to save significant storage space compared to the raw data size.

After we switched to Druid, our query latencies also dropped to near sub-second and in general, the project fulfilled most of our requirements. Today, our cluster ingests nearly 1B+ events per day (2TB of raw data), and Druid has scaled quite well for us.

Do try this at home!

If you have streaming data workloads similar to ours, we suggest you check out some of the projects we mentioned. They could be of great help as you think about how to architect your data stack.