Streamining Recommendations

Here we’re going to tell you about how you can use stream-oriented data processing to boost the quality of your recommendations and reduce the response time of your entire recommendations system 5 times. We’re going to be talking about one of our clients — the Rutube video streaming service.

First, we’ll start with a few words about Rutube itself and why exactly it needs recommendations anyway. First of all, at the moment that this article is being written, our recommendations system has data stored on 51.75 million users and 1.26 million items, or videos. It goes without saying that no user would be able to watch all of those videos in the foreseeable future. That’s where our recommendations system comes to his aid. Secondly, Rutube makes money from the advertisements that are shown. One of the company’s key business figures is the time users are spending on the site. Rutube maximizes that number as much as it can. The recommendations system helps them achieve that. And third is a feature of Rutube itself. Part of the content entails commercial videos from copyright holders. For instance, TNT has its own channel where they regularly put out new releases of their shows. When there’s a new release of Dom 2 or Tantsy, people flock to watch it. The video has a “viral” nature to it. A recommendations system comes in handy for helping to track and display these videos to other users.

Summary

At E-Contenta, we made the first version of the recommendations system for Rutube in May 2015. It entailed the following: item-based collaborative filtration recalculating the recommendations every n time, where n is a number from the Fibonacci series. That is, a view of each separate video was considered an event and if a sequential number of this event was included in the Fibonacci series, we would recount the proximity measurements of this video with another video. It was implemented using the following stack: Tornado + Celery (where the broker is RabbitMQ) + MongoDB. Thus, the data went into Tornado, from there through Celery publisher into RabbitMQ, and from there through Celery consumer they were processed and sent to MongoDB. We have to respond to user recommendations requests in the course of 1 sec (SLA).

Issues

From the beginning, the system was working okay: we took in a stream of events, processed them, and calculated their recommendations. However, pretty soon the following problems began to reveal themselves.

Firstly, there was a technical issue. Recalculation of recommendations for trending videos entered the whole system. A huge number of events from Mongo clogged up all the RAM and processing these events “clogged up” the engines. In situations such as these, we couldn’t provide already calculated recommendations or recalculate new ones, violating the SLA.

Second was a business issue. While recalculating the recommendations only the i-the number of times, where i belongs to the Fibonacci series, we couldn’t track a viral video right away. When we finally got to it, we had to perform a full recalculation every time, which took too much time and funds, meanwhile the number of events just kept growing.

Thirdly, there was an algorithm issue labeled implicit feedback problem. Any recommendations system is built on ratings. We, however, only counted video views. In other words, the ratings system we used was basically binary: 1 — watched the video or 0 — did not watch the video.

The Solution

The demanded solution was a transition into full real-time; however, this called for a rather expensive algorithm from a financial perspective and an algorithm that was also efficient. What served as the main source of inspiration was an article published by a company called Tencent. We took the algorithm they described as the foundation for our new recommendations engine.

Algorithm: Personal Recommendations

The new algorithm entailed a classic item based CF, where an angle cosine was used as the proximity measurement:

If you look at the formula closely, it is noticeable that it consists of three sums: the sum of the combined ratings in the numerator and two sums of the squares of the ratings in the denominator. By replacing them with simpler concepts, we get the following:

Now lets think. What happens when we recalculate the recommendations after receiving a new event (new rating)? That’s right! We just increase our sums for the received value. Thus, we can represent the new proximity measurement in the following form:

From here follows a very important conclusion. We don’t have to recalculate all the sums every time. All we have to do is just store them and increase it for every new value. This principle is called incremental updating. Thanks to that, the information load is substantially decreased when recalculating recommendations and it becomes possible to use such an algorithm in real-time processing.

In the same article, a solution for “implicit feedback problem” was provided as well: give a rating for each specific user action that he may take while watching the video. For example, to give separate ratings for beginning a video, watching it until the middle or the end, pausing it, rewinding it, etc. Here’s how we’ve spread out the weights now:

ACTION_WEIGHTS = {
 “thirdQuartile”: 0.75,
 “complete”: 1.0,
 “firstQuartile”: 0.25,
 “exitFullscreen”: 0.1,
 “fullscreen”: 0.1,
 “midpoint”: 0.5,
 “resume”: 0.2,
 “rewind”: 0.2,
 “pause”: 0.05, 
 “start”: 0.2
}

After several different user actions with the same video, that will give it a maximum rating.

Algorithm: Trending Videos

Now we are also keeping trending videos in account. Each video has a counter that is increased by 1 after an event occurs that is related to it. The highest trending videos fall into the Top 20. After each new event occurs, we check if the video is in the Top 20 and its counter is increased. If not, then all videos’ counters in the Top 20 decrease by 1. Thus, we get a fairly dynamic list that we can use to easily track viral videos. They keep a list like this for each region. In other words, we recommend different trending videos for users of different regions.

After user recommendation requests, the turnout is 85% from personal recommendations and 15% from trending videos.

Technologies

We’ve got everything cleared up with the algorithm and now we need to choose the technologies that we’ll use to implement it. If we sum up all the cases that we found through stream processing, then the stack of the technologies used will look more or less like the following:

Thus, we send all of the events into the message broker and then concurrently process them in the stream processing engine and add up the results in the key-value database from which we then count them up after receiving the request. All that’s left to do is to select a specific variant for each element.

We’ve already had RabbitMQ and we were quite happy with it. However, most streaming engines are supported specifically from the Kafka box. We weren’t totally opposed to transitioning, not to mention that lately Kafka has broken serious ground, was well recommended, and was generally considered a pretty universal, reliable tool, which, if mastered, could find many different uses.

We’ve examined all 3 options for a streaming engine. However, we introduced one limitation right away — the entire code must be written only in Python. All the members of our team know it well. Plus, all the libraries we need are on Python and the ones it doesn’t have we wrote ourselves. For that reason, Samza was cast aside right away, since it only supports Java. First, we wanted to try Storm, but after rummaging in its documentation we realized that part of the topology will be written on Java and only the bolts processing code can be written on Python. Only later did we then discover the wonder Python wrapper function on Python for Storm named streamparse. We recommend it to anyone who needs Storm completely on Python! As a result, we resorted to Spark. First of all, we were already familiar with it. Second of all, it fully supported Python. And thirdly, it is really quite impressive how vigorously they are developing.

We made our database selection right away. Despite that we are experienced working with each of them, we do have a favorite. That’s Aerospike. Aerospike reveals all of its potential on servers with SSD disks. It circumvents the Linux file system and writes directly to the SSD in separate blocks. For that reason, an efficiency is achieved of 1 million TPS per node and the response time to the requests is < 1 ms. We have used it many times and know quite well how to prepare it (although there isn’t that much to prepare at all :)).

As a result, our stack appears like so:

A few more comments about choosing between Storm and Spark in case it may come in handy for someone.

Spark streaming isn’t quite a real-time engine, but rather a near real-time engine, since mini-batching is used in it. The data streaming is broken up into mini batches with a size of the indicated number of seconds and is processed concurrently. For those familiar with the concepts in Spark, mini-batching is nothing other than RDD. Storm is truly real-time. Occurring events are processed right away (although Storm can be configured for mini-batching as well using Trident). In summary of everything stated above, Storm is lower latency and Spark is higher throughput. If you have a rigid latency limit, then you should go with Storm. For example, for security applications. Otherwise, Spark would be more than enough. We have been using it for more than a month now in production and with proper tuning, the cluster works like clockwork. There also exists the opinion that Spark is more fail-operational. However, in our opinion its best to have a complement for it. Spark is more fail-operational out of the box. With ample desire and know-how, Storm can be made to operate on that level as well.

The results

And the main thing. Didn’t that all make any sense? It did. Let’s start with the quality of the recommendations. We use online quality metrics that help us measure whether a video actually entered into the list we recommended or not and what ranking it held on it. The distribution appears in the following way.

For personal recommendations only (item-item CF):

For trending items only:

For hybrid recommendations (85% item-item CF and 15% trending):

It is the last variant that we use in production, since it doesn’t feature maximum metrics. In other words, only 10% of the videos the users viewed do we recommend as the top ranking! About 36% enter the Top 100. If those numbers don’t mean anything to you, then you can look at the distribution for trending videos. If we only recommend a user trending videos, then we will only be able to guess 0.1% of all the videos he watched. Which isn’t so bad in its own right if you compare it with a random distribution where the probability of guessing any video will be equal to 1/(1.26 million).

For a better visual representation, a graph of our metrics is provided below:

Section 12.02 from 14:00 to 15:30, with the highest proportion of videos that weren’t guessed (red) just so happens to be the testing of recommendations from trending items only. Before that, it is item-item CF and afterwards it’s the hybrid model. As you can see, owing to its real-time nature, our recommendations system quickly adapts to a new configuration. By the way, when launching the system from scratch, it emerges onto the figures indicated above in a half hour.

Thanks to the transition from MongoDB to Aerospike, the average user recommendation response time fell. The moment of the transition from the old engine to the new one is demonstrated below:

As you can see, the response time fell by about 5 times. Which is also nice both for us and for Rutube itself, because we don’t have to worry about the SLA anymore.

As far as the hardware is concerned, we expanded our new cluster (Cloudera) onto three machines. That’s enough for us for the meantime. The entire system is fail-operational, so the uptime is 99.9%. In addition, the scalability magnifies that: at any moment, we can “plug in” new machines in the event of a significant surge on the cluster’s workload.

What Next

Right now we’re interested in 2 factors: the weights of the events (beginning of viewing, viewing until the middle, a pause, etc.) and the proportion of trending videos in the issuing of recommendations. Both these figures and the other figures were produced in the expert method, in other words — we “used our gut”. We would like to determine the numbers that would be as efficient as possible in this case and in the other case. We will most likely use the Metric Optimization Engine (MOE). This framework is for defining system parameters based on A/B testing. As soon as we get some noteworthy results, we will definitely write about them.

If you have any questions about the case or about stream processing as a whole, don’t hesitate to ask them in the comments section and we’ll try to answer all of the questions.

A single golf clap? Or a long standing ovation?

By clapping more or less, you can signal to us which stories really stand out.