An Actor-based User Similarity Recommender

An Experiment with the Akka Toolkit

For many individuals, what may be thought of as big data is likely not that much data in reality. When properly planning the design and implementation of large-scale data processing systems — that is, those that are meant to churn through what can still be considered a lot of data—some developers realise the following point: big data systems — such as Hadoop — are not always the best solution. Sometimes it’s better — and easier — to build alternative solutions yourself. This is of course not meant to be a low-blow at influential software libraries and tools for enterprise data crunching. They do their job well, but that doesn't imply they should be used for all jobs.

Recently, I became interested in a personal project; my day job deals with a lot of data, but not at the quantitative level that I would deem it big data…at least not yet. I realised a subset of the data obtained had the potential for a common and desirable use case—user recommendations. In particular, I wanted to explore the possibility of computing user similarities based on preferential data—such as ratings—to provide users with personalised recommendations. I was especially fond of the idea of computing user similarities because of two multi-purpose uses:

I have previous experience with Hadoop and — with respect to recommender systems — Mahout too. But those are heavy-weight tools when compared to the scale of my project. I wasn't sure if this project would work and if it even had potential for usage in the future. Based on past experience, the task of setting up a Hadoop cluster—even if it was a pseudo one—was still daunting. No, I just wanted to do a relatively small experiment. More so, I had a particular framework tool I wanted to experiment with.

Before I get into the meat & potatoes of discussing my project, I should mention I have looked into tools and projects beyond Hadoop — such as Spark, Cascading, Scalding, and Storm. I agree they do a good job of abstracting some of the work in big “map-reduce” like jobs, but I still felt they were too big for my project. I had something else in mind.

Lately, I've come to rely on a very popular framework for a few tasks. Akka — an actor-based framework— is used for implementing Scala (and Java) based systems that can robustly handle numerous and/or large tasks with traits such as concurrency & parallelism, asynchronicity, fault-tolerance, and remote distribution (clustering!) — just to name a few. With an understanding of some of the basics of recommender systems and algorithms, I wanted to see if Akka was suitable for the job too. I wasn't making any assumptions of Akka being better or even equivalent in performance to the now de facto set of big data frameworks. Even if it possibly couldn’t handle the same scale of processing as the big data crew, I at least felt it could be scalable enough for my task. Implementation experiments would prove or disprove this idea.

Based on some of the actor-patterns I had recently became comfortable with, an implementation was clear in my mind. Even more so, I felt that the implementation could be minimal in comparison to some of the setups I've seen for some big data frameworks.

So let’s see what I came up with…

An Actor-based User Similarity System

The Environment

Short and sweet — I had a database with a whole bunch of user identifiers, item identifiers, and ratings in the range of 1 to 10. Actually, I had some ratings. Some of the preferential data I had was only binary — the user at some point in time had only signified they ‘liked’ the item. Before getting started, I decided to fill these gap ratings with a 5 — as in 5 out of 10 stars. Being an experiment, I would see what kind of results that would yield.

The System

Now, I'm not going to spend this article giving an introduction to Akka. Mainly because I don’t want to :-) Hopefully, what I can present is enough glimpses of what Akka and/or the actor-model can possibly do for you as a developer. If I do happen to pique your interest, I recommend the book “Akka Concurrency” by Derek Wyatt for an introduction to the framework and model. For the purpose of this article, it’s enough to understand that actor-based frameworks work primarily on the concept of message passing, where actors communicate signals and information between each other to get tasks done.

The Design

The top-down design of a similarity job

For my design, the system can be abstractly broken down into three succinct modules. Along with being the primary components of the entire system, they also represent the basis of a similarity job, which is illustrated on the left.

At the top or start of a similarity job is the data streamer. This module is responsible for synchronously pulling the data from the database. “Stream” is an important keyword here. This distinction is made to indicate that each piece of data should not be stored here locally in memory for long-term usage. This would be a waste of resources since the raw data will be abundant and is actually needed elsewhere. The data streamer quickly transmits (or messages) data along to an actual consumer — the vectorization module — for processing. Vectorization is the process of arranging the raw data into vectors and is a requirement for most recommender system algorithms. In general, vectorization will perform the necessary grouping of preferential data (ratings) for each user.

To handle the expected deluge of data, the vectorization module actually consists of a number of distributed worker nodes (that is, actors) to individually collect all the raw data. This is important as the process of vectorization can be time consuming. Each worker node stores user vectors — that is “user-to-item-to-rating” tuples — locally in memory. For each similarity job, consideration and experimentation will lead to discovering the best distribution of worker nodes to efficiently allocate workloads.

Once the data streamer has no more data to stream, a message is sent to signal the workers to start pushing their collected user vectors to the next module in the workflow, the computation module. This module similarly contains a number of worker nodes (more actors) that perform the actual similarity computations. For each user vector a worker node has, a similarity computation will be made against every other user vector every other worker node has. Ideally, at-least-once and at-most-once. This is the source of the brunt of work being performed by the recommender and the distribution of the worker nodes prove their use here too.

The algorithmic approach I went with is straightforward and trivial, but with the parallelism gained from using distributed nodes, optimal efficiency can be gained.

The Distributed Algorithm

Via message passing, each worker node will individually and sequentially become distributors. A distributor is responsible for sending its user vectors to every other worker node. When a worker node retrieves a user vector from the distributor, it will compare that user vector against all the user vectors it has stored locally. After each similarity computation, the received user vector is discarded. This is performed for each user vector the distributor sends. Eventually, the distributor will signal to all the worker nodes that it has no more user vectors. Because a distributor nodes user vectors are no longer needed at this point — since they have already been compared for similarity to every other user vector sitting on the other worker nodes — the distributor can shut itself down. Another worker node then becomes the distributor. But what about computing user similarities for the user vectors at the distributor? The trick to that is in the distributor; when the distributor broadcasts its user vectors, it also sends them to itself. This is because the distributor is still considered a worker node. It accepts messages from itself, but does the same action as every other worker node: it computes user similarities.

That’s it! After the last distributor finishes up — which would be the second last worker in a queue of workers— the job can be considered complete.

Implementation

With the ideology that actors should be responsible for a very minimal amount of work, Akka actors are usually programmed to perform a relatively small number of simple tasks in reaction to messages they send and receive. The Akka actor API declares one important method to override — receive — which can be thought of as the switchboard control that defines the actions an actor should perform when communicated with. Based on my design above, the receive methods for my modules and worker nodes are for the most part quite trivial.

When a similarity job is initiated, a request for acquisition of a data streamer, vectorization node, and computation node is made. For all intents and purposes for this discussion, we will assume these actors exist already locally in a virtual machine. I will first give an overview of my data streaming actor:

https://gist.github.com/kdrakon/01301eb66b16da2c6e4b.js

When a job is started, the data streamer — UserContentStreamer— is allocated for the job when it retrieves a RequestStreamer message (which is a Scala case class here). When the job is ready to start, the streamer is then sent a StartStream message, which also indicates a subset of the data in the database (i.e. with the sourceId value). Hidden from my code snippet is the implementation defined by the userContentDatabase object (line 21). That object is injected into my actor via the ApplicationModule trait and basically wraps up a bunch of implementation code (i.e. JDBC connection pooling with C3P0 and LINQ access with Slick). What it returns is a Scala Stream of RawUserContentDto case class objects created from the database entries. These are in turn streamed to awaiting vectorisation worker nodes in the upcoming UserContentVectorizer actor. In case you are wondering, the “!” is Scala and Akka’s syntax for sending (or telling) a message to some other actor. In this case, all the streaming data is sent to a single actor, who I've purposely neglected in my previous design specification, until now.

Director Actors are actor implementations hidden within my three module design. These actors are responsible for the management and organisation of worker nodes. In the case of data streaming, the output actor (line 19) is the director for the UserContentVectorizer node, which was the actor that initially sent the StartStream message. This director routes user vectors to appropriate worker nodes using Akka’s very useful consistent hashing actor routing configuration. This configuration ensures that user vectors for particular users always go to the same worker node (i.e. user grouping). This means that user vectors are collected properly without duplicate vectors for the same user existing somewhere else in the distribution of worker nodes.

https://gist.github.com/kdrakon/ad5f15163884cc5753d8.js

Here’s a brief description of the important bits from the vectorization phase:

This code snippet also includes the code for the vectorization worker nodes defined by the UserVectorizationReducer actor.

The beauty of this is the parallelism gained from the Akka configuration of the worker nodes. Because the actors are designed to not share any mutable data, they are free to work independently and concurrently with the data they only receive. Furthermore, with respect to “Java-world”, this is all done without the need to work directly with threads, locks, mutexes, synchronization blocks, etc. (of course, nothing is stopping you from using them if you need them). Beneficially, the actor configuration itself is easily abstracted away from the code and defined in an easy to read configuration file (line 28) .

That’s it for vectorisation. In ~100 lines, we have two classes that can potentially do a heavy load of work.

The final node in my design is the one responsible for taking those UserContentRatingVector’s and performing the similarity computations. To get straight to the point, I haven’t included the node and director actor code in the next snippet. In general, that part of the code is quite similar to the UserContentVectorizer. Once this actor — named UserSimilarityWorkerNode — is acquired, it creates a director that

https://gist.github.com/kdrakon/aa694efd85e9c1b0bbcd.js

For computing the user similarities, I'm using the LensKit Recommender Toolkit. The implementation in the Scala object UserSimilarityAlgorithm (line 56) was initially implemented using a Pearson Correlation algorithm, but based on the sparsity of my test data, it seemed that the Cosine Similarity algorithm was better suited.

Once everything was wired up, I basically had the following system implementation:

I've definitely glossed over certain aspects — including the initiator of the similarity job itself — but those implementations are even more straightforward than what I've shown so far. Hopefully with the somewhat short snippets I've shown, the concept is clear enough.

Initial Results

Being a prototypical application, I kept my expectations in check. However, I was happy to see that my system did its task quite well. With the following inputs and configuration, I was able to compute ~54 million user similarities in ~15 minutes:

I feel confident the system could do even better. With a modern CPU and some additional memory, a reasonable number of additional worker nodes could be allocated to possibly get even faster results.

Some Napkin Math

I decided to do some quick math to estimate the capacity to do even more work. As a forewarning, the accuracy of the estimates should be taken with a grain of salt.

Time performance with 25 similarity worker nodes
Time performance with 5000 similarity worker nodes

If we consider this problem to be simplified to just pairing users, the input of this task — n user vectors — equates to n choose 2 space complexity. The amount of work that needs to be performed is in the time complexity class of O(n^2).

Based on these characteristics and the hardware specifications of my initial experiment, the estimates to the left can be used as a performance guide. Using the time performance from that initial test, I rejigged the value to estimate variance in the number of similarity worker nodes. For example, I estimate if it were possible to allocate 5000 worker nodes, computing similarities of up to 1 million user vectors would approximately take 300 minutes (5 hours). This asumes the same average number of ratings per user.

It’s worth mentioning again that I would expect much better results on more dedicated hardware (e.g. cloud based servers such as Amazon EC2 or Heroku). With better hardware, I would also expect single servers to be capable of locally supporting worker nodes in the hundreds or thousands. In this case, allocating a total of 5000 worker nodes or more would be feasible with around ten remotely connected servers (i.e. clustering, something else I was able to experiment with).

These estimates ignore other external factors, such as remote network communication latency, meaning further tests are definitely a requirement. The experimentation continues!

Further Work

Considering the fact that the entire system was written in ten Scala files (ten classes, a couple of Scala objects, and a few Scala case classes) and is executed directly from a single JAR, the footprint for deployment is tiny. Of all the characteristics gained from using Akka, this is one of the top ones I favour. Furthermore, maintainability of this project is therefore minimal, something desirable by any backend software engineer.

Akka clustering is a feature I was able to play with in this project after some initial prototyping. To get even faster computation, I previously mentioned in my performance estimates the ability to distribute workers horizontally across other computers in a cluster. Enabling the clustering feature was as simple as modifying some configurations (again, specified externally from the code) and changing a couple lines of code. In short, my application JAR was now able to start up and detect other seed clusters to start allocating remote CPU time and memory for worker nodes. I plan to start testing performance of a similarity job for various cluster configurations soon.

As this project moves along, there are a few other points to consider:

After these points, a bit of code cleanup, and bug fixes, I plan to share my prototype code publicly.

I hope this write up was interesting enough for you to consider using Akka or a similar actor-model framework in some of your future projects. Even more so, I hope it gave some insight into considering your options when building data processing intensive software. Exploration and experimentation, not silver bullets.

--

--

software developer, among other things. http://policarp.io

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store