From Batching to Streaming — Real Time Session Metrics Using Samza (Part 2)

In part one of this series, we introduced the motivations behind transforming our batch processing into real-time event streaming. In this post, we will continue with more details of how Samza is used to drive this aggregation process.

Background

Session aggregation (AKA sessionization) was not the first stream processing use case at Optimizely. We have been applying stream processing for various ETL tasks, such as:

  • Data enrichment (backfilling missing metadata for events)
  • Event stream repartition (for various downstream use cases)
  • Real-time experiment metrics (count number of events received for each experiment)

There are several production ready stream processing frameworks available today, including Samza, Spark, Storm and Flink. We chose Samza because of several reasons. First, Samza allows you to create a processing topology by chaining together kafka topics, which offers high isolation. As we encourage our engineers to explore different processing ideas, this pluggability creates the minimal ripple effect to our overall data pipeline. Secondly, Samza can be easily integrated into Kafka and YARN, which are what we use heavily. In addition, Samza is very low latency. It provides a simple programming model and easy to use state management framework, all of which fit well with many our needs.

What is Apache Samza?

On a very high level, a Samza job consumes a stream of immutable events from one or more Kafka topics. It then does some kind of computation to these events, one at a time, and writes outputs into one or more downstream Kafka topics.

One noticeable characteristic of Samza jobs is that, they are highly composable. To help you visualize what Samza jobs are capable of, you can think of them as a bunch of real-time MR jobs, chained together by Kafka topics. The above diagram shows you how a job can have multiple input streams and how several jobs are connected to form a complex workflow.

Samza is scalable, fault-tolerant and stateful. We are going to briefly touch each of these three aspects, since our sessionization job takes advantage of all these features to some extend.

Scalability. Just like MR jobs, Samza jobs are also highly concurrent and scalable. The difference lies in that, Samza parallelism is partition-oriented as opposed to file-oriented. A job is divided into multiple tasks. As analogous to MR, a Samza task is roughly equivalent to a mapper + reducer. A task “maps” events by funneling them into designated Kafka topic partitions. At the same time, events from an input stream partition all go to a single task, mimicking the behavior of a reducer.

One important aspect to keep in mind is that, an input topic partition is statically assigned to a task instance. The number of task instances, thus, is directly related to the max number of partitions of any given input topic (illustrated in an example below). The first time you submit a job, this mapping is created and persisted into a Kafka topic for fault-tolerance. The coupling helps simplify Samza’s state management model (which we’ll cover later), since each task only needs to manage its own state.

With a fixed number of tasks, how can we scale out? A Samza job can scale out by increasing the number of running containers. When running on a YARN cluster, the only distributed environment supported to run Samza currently, each task instance executes in a container — a single unit of computational resource. Depending on how much computational power a task demands, you can run everything within a single container, or you can create as many containers as tasks, providing the maximum power for each task. The diagram above demonstrates this kind of elasticity.

Fault-tolerance. Samza promises that, if a container dies for whatever reason, tasks within that container will be able to recover from where they left off once a new container launches. To do this correctly, Samza periodically bookmarks/checkpoints the offsets at which the tasks are consuming from. It also does this for changes made to its internal state stores. With all of this information logged to Kafka, a job recovery can be done by consuming and replaying this stream of changes to reconstruct various internal job states.

Statefulness. Most stream processing needs to store some kind of states, whether it be the number of events for a given user if you are generating user metrics, or in our case, all the events triggered within a given session. To make stream processing truely real-time, it is impractical to store these states anywhere other than local to the task. Out of the box, Samza provides RocksDB as an efficient KV store (like an embedded HBase) to track states too large to fit into the local memory. As mentioned above, this store is also recoverable on a task failure, thanks to the changes it persists in a changelog topic.

With all of the basic concepts of Samza in mind, we can now move on to introduce how we leverage this tool to continuously roll up events into sessions.

Implementation

A session is simply a series of user events triggered in a relatively quick succession. To provide different types of session demarcations, we ask our upstream clients to assign a session id for each event. Typically, all events are assigned the same session id if they are created less than 30 minutes apart.

The main Samza event loop for aggregating events then becomes quite straightforward. The job maintains a per task KV store using the session ids as keys. Upon receiving an event, the job does a lookup in the KV store. It either creates a new session if the session id does not exist or updates the session otherwise. The update is where most of the consolidation happens. The common set of metadata (e.g. ip address, location information, browser version, etc) is aggregated at the top level, whereas event specific ones (e.g. event type, add to cart dollar amount, etc) are preserved as a list.

As introduced in the previous post, each HBase cell now stores a session. To keep overwriting the cells with the latest copies of the sessions, we need to continuously flush snapshots of the KV store into a Kafka topic.

One important question we asked earlier in our design was, how frequently should we snapshot the ongoing sessions to keep our HBase updated? We could do this as soon as a session is updated after processing an event. The benefit of doing so is that it minimizes session update latency and produces truly real-time results. The downside, however, is that it also creates a large number of session updates, which significantly increases our Kafka load. Instead, we leveraged the windowing feature of Samza and batch one minute worth of session updates before flushing into HBase. If a user triggers 10 click events within 60 seconds, for example, we write out a single session snapshot with 10 events at the end of the minute, instead of 10 session snapshots. This greatly reduced the amount of data sent through the wire.

Challenges and lessons

As straightforward as the sessionization logic sounds, we did encounter various challenges during our sessionization development.

Iterating through RocksDB affects job throughput. As mentioned above, Samza windowing needs to traverse the list of all pending sessions when writing out the session snapshots. Due to the fact that each Samza process is single threaded (future Samza releases will introduce multi-threaded processing), the actual event processing will be blocked until windowing is finished. This means that, in extreme cases, if windowing takes longer than the configured windowing interval, the event loop will not have a chance to process any actual events. When that happens, you will observe a high window-ns and zero process-envelopes count. To prevent this, you will have to make windowing run as efficiently as possible and probably want to avoid frequent full scans of RocksDB. In our case, we achieved this by keeping a stripped down version of the KV store in memory just for this purpose. Traversing a plain old hash table is a lot faster than over the on disk KV store after all.

Be careful with Kafka log compaction. The log compaction feature allows Kafka to strategically remove log entries instead of a simple truncate at the tail of each partition. A log compacted topic will only retain the last known value of each key. If that value is null, Kafka removes that key after a configured retention time. First of all, if you are using an older version of Kafka (e.g. 0.8.2), make sure that log.cleaner.enable is set to true. Otherwise, some of the Samza topics such as checkpoints and changelogs will not be compacted. We figured this out when during a redeployment, the job took hours to resume processing. It turned out that it needed to consume TBs of uncompacted changelogs to materialize each KV store.

Another lesson we learned regarding log compaction is that, you should not produce to a log compacted topic if no deletion takes place. The gotcha here is that, log.retention.bytes does not apply to log compacted topics at all. We learned this the hard way when our output topic grows unbounded and exhausted our Kafka disk space.

Use SSDs for RocksDB. During our development, we noticed that, once in awhile, a subset of the tasks would have a hard time keeping up with the rate at which events are received. Further investigation revealed somewhat longer process-ns and window-ns time on these tasks. It turned out that these tasks were having slower interactions with the underlying RocksDB stores due to the type of disks configured on the YARN instances. The containers of these tasks were deployed on Node Managers that used EBS drives. Since RocksDB is highly optimized to run fast compactions in fast storage like SSD or RAM, using EBS significantly reduced performance for RocksDB IO.

We are hiring!