Real-time Data Processing for Monitoring and Reporting

A practical use case of spark structured streaming

Photo credit: geralt

In this post I will explore how we designed and implemented our new realtime monitoring system for Walmart’s in house AB testing platform, Expo. The system processes billions of records and generates millions of metrics per day. These metrics help experimenters monitor their experiments and how they impact the Walmart site or app.

In my previous post I gave a small example of how AB testing works. Before pushing each new feature, we expose the new feature to a subset of users. Based on the feedback given from Expo, we either turn on the feature to all users or withdraw the new feature and keep the site as it is. Each experiment has its own objectives. Expo provides a lot of metrics and experimenters can decide which one is more important for them. This can include click through rate, revenue or anything that the user wants to be able to measure.


Background

One of the systems we developed in WalmartLabs is Expo which is our experimentation platform for A/B testing. Expo manages A/B test setup and execution.There are multiple systems that support Expo to accomplish A/B testing and help analysts and experimenters to arrive at a conclusion. One of them is a reporting system that analyzes the data nightly in a batch manner. Another is a monitoring system which runs continuously to give a realtime snapshot of how the experiment is currently working. We will focus on the monitoring system in this article.

We already had an existing monitoring system which is explained in this post by my colleague. The job was doing well but we had a performance issue during peak traffic. After evaluating our options to refactor the code or rewrite the job with structured streaming, we decided to go with the latter which gives us a better performance and state management.

A dummy scenario to continue discussion!

Now for the rest of this post let’s assume a simplified example as a reference for the rest of article.

Let’s say we need to change the color of a blue button to red on the first page of walmart.com and we would like to see how this change affects revenue per visit or conversion rate which is a really important metric in e-commerce. To do so, we create an experiment <exp> with two variations in Expo. In <var1> , which is the control version currently running in production, we have blue color button. In <var2>the color is red and it’s our desired feature that we would like to eventually push to production. Now when a user visits home page, if he gets assigned to the experiment, he will either see blue color button (<var1>) or red color button( <var2>) during the time that experiment is running.

In our example walmart.com is called tenant and has uswm identifier. Expo, and monitoring, support other tenants, such as the Walmart mobile app or Walmart groceries. home_page and item_page are two sensors for visiting those pages that we call measurement point.

The monitoring system generates many different metrics for each experiment. Metrics are defined by keys, such as uswm.visit.fm.<var1>.item_page.beta. This metric shows the number of unique visits which is qualified for the experiment and visited the item page. The only parts of the naming that might be vague are fm and beta part.

fm is abbreviation for factor matched meaning a user only contributes to the value of the metric if he is assigned to <var1> and he is exposed to the treatment. We also call this “qualified” in our system. In our example, the button color change on the home page is the treatment. So a user will increase the example metric if 1) he is assigned to the <var1> 2) he has visited the home page 3) after home page visit(step 2), at some point he visits item page.

If the second step out of three top conditions is not met, the user will just contribute to uswm.visit.assigned.<var1>.item_page.beta metric.

It is possible for the fm criteria to be very complicated. For example, the condition could be that the user is visiting the site on a mobile device and chose a specific payment type at checkout. These conditions are specified in the experiment configuration.

The beta means each sessionId can only contribute to the metric once. We have other metrics without beta showing that if a user goes to the item page multiple times it will increase the metric value multiple times.


Overall Architecture

Image 1: Components involved in monitoring system

The above image shows major components used in our monitoring platform. With this setup we can focus on the bottlenecks and scale up each part horizontally and independent of each other. In the below subsections I will describe how each part works.

Kafka

When a user hits the page or takes different actions inside a page, logs are generated. These logs are either server logs or beacons being emitted by javascript). Each log has a sessionId that represents a user. Not all the records are useful for generating metrics so we forward the useful logs to their own topic. The job connects to the topic and picks the records one by one and extracts the metrics out of them.

Monitoring Job

Each instance of the job is a Spark Structured Streaming app that triggers at each minute, reads the data from Kafka and sinks them in KairosDB.

To guard against failure, we run 2 mirror instances of the job in separate data centers and let them write the result to our KairosDB independently. Each job tag their metrics by data center. When a failure happens, the system is able to switch to the other data center metrics automatically by changing the tag’s name and UI will serve the correct data.

KairosDB

KairosDB is the Sink for our streaming job. It is a time series database backed by Cassandra. It not only keeps our monitoring metrics but also keeps track of health of the monitoring system. Along with Expo metrics, the monitoring system also populates internal health metrics, such as size of memory consumed, number of record processed and so many others. We keep dashboards on these metrics in Grafana to be able to debug issues more easily.

KairosDB is capable of aggregating data based on different tags. Each metric is tagged with different information like type of device and log in state. We were able to generate all metrics and send all combinations to KairosDB but we decided to take the tagging strategy. It allowed us to distributed the complexity between different part of the system instead of integrating all of them in one location that makes it harder to scale computationally. UI services can query data as granular/coarse as they want by combining different tags.


Overall Implementation

Image 2: High level view of what job does in each batch

As mentioned above, the monitoring job is a Spark structured streaming job written in scala running on a minute micro-batch. It includes two stateful phases: one for sessionization and the other consumes the sessionization output for metric aggregation. Structured streaming made the stateful operation part easier to implement by having some higher order methods like flatMapGroupsWithState. The developer needs to provide a function as an argument passed to flatMapGroupsWithState that gets triggered each time we have a data for a key which for example in sessionization is sessionId. The function makes an abstraction and provides some useful parameters for the developer to update the current state. For each key, it provides all data associated to the key for that time range and also gives the developer access to the previous state. The only thing a developer needs to do is focus on the core problem of how keep the state, what metrics he needs to emit for each phase and how to correct the metrics if the event is out of order and how to merge the previous state with current state in the system. The key for the sessionization is a sessionId string and for the metric aggregation phase is (metricName, minute bucket) tuple.

Sessionization

In the sessionization part, we aggregate the events for each session in a data structure named Session. You can consider each event as an action the user took. We don’t need to keep the entire data and we just need a summary of information in order to generate the metrics. The key for this phase is sessionId. A session is a case class in scala that keeps track of information that we need to generate the new metric and correct the misfired metrics.

  1. sessionId: The id for the session
  2. session property: numerical property of the session like last time the session has activity and etc.
  3. markers: A data structure that keeps track of the events which are important; for example keeps the time the user was exposed to the variation for the first time.
  4. counts: A data structure that keeps track of sessions’ actions; for example it’s aware of at which time and for how many times the user visited the home page.

We generate all intermediate metrics with only these four data structures. Some metrics can be directly calculated from markers like number of assigned users to a variation while others needs to be processed by making a Cartesian product of data in markers and counts data structures.

One example of output metric of this phase can be:

(uswm.visit.fm.123#desktop#in#dc1.home_page.beta.1525356180000,1)

As you can see the tags and minute bucket timestamp are serialized in the metric name. This is the output of one session and the value for this metric name needs to be aggregated across all sessions. Making tags as granular as possible gives us this opportunity to reduce the space and time complexity in our real time monitoring and pass the responsibility of calculating values for different tag combinations to KairosDB.

Since we populate the metrics for each minute bucket, we need to have access to this information for each metric emitted. That’s why we append the batch time to each metric.

Handling out-of-order events

Another thing needs to be considered in real time system is late data or out of order data. For example, let’s say event1 and event2 happened at batch time 10 and 14 respectively but event2 arrives earlier into the monitoring job because of a failure in one of the Kafka brokers. If we don’t consider this out of order event, the factor match for this session is considered happened at time 14 while the right value is 10 and this gives us a wrong result. To correct the wrong metrics we need to calculate what metrics has been emitted between times 10 and 14 and correct them by emitting the negative value for those metrics.This resolves the problem but makes computation and space very inefficient if we do it naively and keep track of every state and out of order event. The problem can be solved by using watermark concept introduced in spark structured streaming. You might be interested to read about it here in detail. In general it’s a mechanism that tracks the current event time and attempts to clean up old state and ignores the very late events.

Metric Aggregation phase

The input in this phase is all the metrics coming from sessionization phase. Remember in sessionization we only keep track of metrics that belong to each session. In this phase we aggregate them across all sessions. The key for the stateful operation in this phase is (metric name, minute bucket) tuple. Based on this key, the system just adds the sum of each metric across all sessions.

The final metrics going to Kairos will be something like this:

{ metricName: ‘uswm.visit.fm.123.home_page.beta’, 
tags=[‘dc1’,’in’,’desktop’],
timestamp=1525356180000,
count=10000,ttl=’30 days’}

Other metrics

Spark has an interface that allows developers to have access to the information about the status of streaming query. We used the interface StreamingQueryListener to populate information about the monitoring job in KairosDB. These metrics basically are for monitoring the monitoring system!


Lessons Learned

There are a lot of experiments and challenges that we learned during development and I would like to share some of the important ones:

  • Test and Validate: For every feature that we added to the system we extensively wrote multiple unit and integration tests and completed it with validation against HDFS to make sure that everything works fine. The only thing that we didn’t want to get undermined was the credibility of the job. Since the system was complicated and adding a new feature might introduce a bug, the tests were really help us save the integrity of the system.
    We also ran the job for a few days on a non production cluster after developing (each) feature(s) to see how the new features impact the performance of the system. This let us test the boundaries and weaknesses of the application and let us focus on real problems.
  • Embrace functional programming: We love functional programming here in WalmartLabs and for this specific project we intentionally embraced it. Here are some outcomes:
    - It made our development much faster
    - It reduced the complexity of the system for us and made the code easier to understand
    - Immutability of the objects allowed us to debug the code easier
    - It reduced null pointer exception to zero. We haven’t seen any NPE after we deployed the monitoring job in production.
  • prototyping is the key for uncertainties: If you doubt about functional and even non functional aspects of your system the best way to prove the concept is abstracting the use case and making a small prototype of it. It’s really important to realize the fact that spending more time on prototyping and design at the very beginning of the project gives you a better chance of a happy ending project than a final implementation that doesn’t satisfy the requirements and service level agreements.
  • A chain is only as strong as its weakest link: When you design a multi tenant system ensure about scalability of each major component in your design and that it can be easily scaled up horizontally during peak traffic. We thought about our design multiple times and flushed it out overtime to make sure it can sustain the peak traffic of Walmart.

The current monitoring job supports 8 different tenants of Walmart and we are expecting to onboard more tenants. That means every single request of these tenants, after being filtered, goes through this system. The current normal input rate to the application is 30k to 60k entries per second and it generates 1 to 3 million metrics per minute.