From Batching to Streaming — Real Time Session Metrics Using Samza (Part 1)

David Yu
Engineers @ Optimizely
4 min readOct 17, 2016

Our mission at Optimizely is to help decision makers turn data into action. This requires us to move data with speed and reliability. We track billions of user events, such as page views, clicks and custom events, on a daily basis. To provide our customers with immediate access to key business insights about their users has always been our top most priority. Because of this, we are constantly innovating on our data ingestion pipeline.

In this two-part series, we will introduce how we transformed our data ingestion pipeline from batching to streaming to provide our customers with real-time session metrics.

Motivations

Unification. Previously, we maintained two data stores for different use cases — HBase is used for computing Experimentation metrics, whereas Druid is used for calculating Personalization results. These two systems were developed with distinctive requirements in mind:

Experimentation

  • Delayed event ingestion ok
  • Query latency in seconds
  • Visitor level metrics

Personalization

  • Instant event ingestion
  • Query latency in subseconds
  • Session level metrics

As our business requirements evolve, however, things quickly became difficult to scale. Maintaining a Druid + HBase Lambda architecture (see below) to satisfy these business needs became a technical burden for the engineering team. We need a solution that reduces backend complexity and increases development productivity. More importantly, a unified counting infrastructure creates a generic platform for many of our future product needs.

Consistency. As mentioned above, the two counting infrastructures provide different metrics and computational guarantees. For example, Experimentation results show you the number of visitors visited your landing page whereas Personalization shows you the number of sessions instead. We want to bring consistent metrics to our customers and support both type of statistics across our products.

Real-time results. Our session based results are computed using MR jobs, which can be delayed up to hours after the events are received. A real-time solution will provide our customers with more up-to-date view of their data.

Druid + HBase

In our earlier posts, we introduced our backend ingestion pipeline and how we use Druid and MR to store transactional stats based on user sessions. One biggest benefit we get from Druid is the low latency results at query time. However, it does come with its own set of drawbacks. For example, since segment files are immutable, it is impossible to incrementally update the indexes. As a result, we are forced to reprocess user events within a given time window if we need to fix certain data issues such as out of order events. In addition, we had difficulty scaling the number of dimensions and dimension cardinality, and queries expanding long period of time became expensive.

On the other hand, we also use HBase for our visitor based computation. We write each event into an HBase cell, which gave us maximum flexibility in terms of supporting the kind of queries we can run. When a customer needs to find out “how many unique visitors have triggered an add-to-cart conversion”, for example, we do a scan over the range of dataset for that experimentation. Since events are pushed into HBase (through Kafka) near real-time, data generally reflect the current state of the world. However, our current table schema does not aggregate any metadata associated with each event. These metadata include generic set of information such as browser types and geolocation details, as well as customer specific tags used for customized data segmentation. The redundancy of these data prevents us from supporting large number of custom segmentations, as it increases our storage cost and query scan time.

SessionDB

Since it became difficult to optimize our Druid indexes, we decided to obsolete the Druid route and focus on improving our HBase data representation. Pre-aggregating events and compacting away redundant `information became the most obvious next step. This was when we turned to Samza for help.

Samza is a perfect fit for our needs thanks to its seamless integration with Kafka — our distributed message queue. We will get to the details of how this real-time aggregation works in part two. But on a very high level, Samza continuously bundles events into sessions and periodically streams out snapshots of the pending sessions into HBase. With this approach, each HBase cell becomes a consolidated view of a group of events.

There are several advantages to this. First being that, our core logic for computing various statistics very much stays the same. Given the fact that majority of the base calculations we do are summations (an oversimplification of course), adding a bunch of ones together is equivalent of summing a list of cumulative values.

The second benefit we get is that, with session level information immediately available, we can start querying session metrics right off of HBase and answer questions, such as “what is the average revenue generated per user session”, in real-time! This newly created HBase schema is unsurprisingly named SessionDB, which became the basis of backend unification.

Last but not least, the HBase storage requirement is drastically reduced and queries run much faster. By aggregating session level metadata, we no longer have to replicate information, such as browser types, locations and user dimensions, across each cell. The graph below shows the average query latency (x-axis) given different number of user dimensions (y-axis). With an average of 10 events per session, for example, the median query latency drops to 5 ms as opposed to 40+ ms (the yellow line).

In the next post, we will focus on how we leverage Samza to roll-up events into sessions before persisting them into HBase.

We’re hiring!

--

--