Calculating user profiles in real time

Design patterns for stateful stream processing systems

--

Stream photo by Alberto Roldán

Knowing your users’ preferences and behavior can help to optimize the user journey by improving search relevancy and presenting relevant products. One way to leverage those preferences is to calculate a passive user profile based on the user’s interactions such as ad views and searches.

At mobile.de, Germany’s largest online vehicle marketplace, the user profile contains preferences for vehicle attributes, such as color, price, and mileage. These are represented as likelihood distributions, that allow us to infer how likely a user will view or “favorite” cars with certain features, for example. The user profile is calculated and constantly updated to reflect the user actions on the journey. There are significant challenges in designing such systems and I’d like to share some of the possible tradeoffs we encountered.

In a nutshell, updating the user profile in real time is actually a stateful stream processing system in which the state is the user profile, the state key could be the user ID, and the state update operation can be as simple as incrementing a counter or an updating an average or variance value.

One of the most important choices in the design of such a system — and the main focus of this article — is how to store the stream state?

There are basically two options: local storage and external storage that is globally available to the workers. Most stream processing frameworks are designed to process streams of events in a distributed fashion by starting multiple workers (executers) on multiple nodes. Each worker processes a subset of the events and stores a fraction of the stream state inside an embedded data storage, which can only be accessed by that worker.

Typical choices for local storage would be a simple in-memory key-value data structure such as a dictionary or embedded key-value data storage. The stream state can be then queried by exposing a REST endpoint on each worker.

The main benefits of local storage for state management are reduced complexity, as there’s no need to maintain an external data storage, and extremely low latencies while updating and querying the state. Since data is available locally, network bottlenecks typical to external storage can be avoided.

On the other hand, scaling out the local storage to fit an increasing state size can be quite challenging, as local storage is bound to the available space on the node “hosting” the worker. Recovering from failures can be quite slow with large states, and in some cases it might even produce duplications (for example by reading the same messages from Kafka again since those were not committed due to the failure).

One more important point to consider is whether the state must be always queryable even in the case of worker failure. If that is one of the main business requirements, then it is better to avoid the local storage option, since any failure of a specific worker would cause a subset of the state to be unavailable.

The second option for storing the state is external storage. Some of the popular choices here are Cassandra and Hbase. External storage has the advantage of more resiliency to failures, high availability, and a more robust recovery mechanism. Additionally, it can be scaled out and adjusted much better to handle increasing request rates (querying the state) and growing state sizes. Furthermore, guaranteeing that the state is always queryable can be achieved by simply deploying a REST service on top of the external storage. If some of the stream processing workers are down, the state would be still queryable via the REST service.

On the other hand, updating and querying stream state in external storage introduces higher latencies, mostly resulting from network bottlenecks, in addition to possible race condition scenarios, which are the price of the external storage being accessible by all the workers at the same time.

Race conditions in a distributed system can easily lead to an incorrect state as illustrated in the following example:

If worker-1 is processing an ad view event of user X with a blue car and exactly at the same time worker-2 is processing another ad view event — also of user X but with a red car. The current state (user profile) in the external storage for user X is {color-blue: 2 views}. The sequence of operations performed by each worker would then be as follows: read the current state of user X from the external storage, apply the state update operation locally (increment the counter), and then write the new state back to the external storage.

Eventually worker-1 would try to write {color-blue: 3 views} whereas worker-2 wants to write {color-blue: 2 views, color-red: 1 views}.

Which of the writes would win isn’t deterministic and either way the final state for user X would be incorrect. How to escape those race conditions scenarios is an interesting topic, but beyond the scope of this article.

Two possible workarounds are worth mentioning. Either design the stream processing system so that all events of a specific user would always be processed sequentially by a single worker, or design the system as a stateless one by persisting to the external storage only the state delta changes.

At mobile.de, we calculate user profiles on a daily basis, and store each daily user profile for a period of 60 days. The main motivation behind that decision is to allow us provide the various stakeholders with a possibility to decide based on how many days they need the user profile, and as well the option to apply different decay functions on the profile days in order to boost more recent data. We generate approximately 2 million user profiles per day and each profile has 20 dimensions (profile dimensions are based on car attributes such as price and color). To get a specific profile dimension, the following key combination, which is also the state key, is used: (user ID, date, dimension).

2M (daily profiles) × 20 (profile dimensions) × 60 (days) = 2.4 billion keys, which is quite a big state. The requirements that we had for the user-profile product were quite clear:

  • The user profile should be always served even if the stream processing system that updates the user profiles is down. We prefer to serve an older user profile rather than not serving one at all.
  • The serving layer must be able to scale with the increase in request rate.
  • State storage must be able to scale and grow with the increase in state size.
  • Additional CPU-intensive calculations and transformations are required as one step before serving the final user profile result.
  • The user profile needs to be updated “just-in-time” — meaning that even if there are some latencies and the user profile is updated with some delay (for example 30 seconds after that the corresponding user interaction event triggered) it is completely tolerable.

Those requirements convinced us to store the stream state in an external storage. We have selected Apache Cassandra™ as the external storage and the Akka HTTP API to implement the REST service on top of it.

So in summary, choosing how to store the stream state is one of the most important choices in the design of a stateful stream processing system. The right choice can only be made after thoughtful review of the technical pros and cons of each possible option and more importantly after deeply analysing and understanding the business requirements.

--

--