An Actor-based User Similarity Recommender
An Experiment with the Akka Toolkit
For many individuals, what may be thought of as big data is likely not that much data in reality. When properly planning the design and implementation of large-scale data processing systems — that is, those that are meant to churn through what can still be considered a lot of data—some developers realise the following point: big data systems — such as Hadoop — are not always the best solution. Sometimes it’s better — and easier — to build alternative solutions yourself. This is of course not meant to be a low-blow at influential software libraries and tools for enterprise data crunching. They do their job well, but that doesn't imply they should be used for all jobs.
Recently, I became interested in a personal project; my day job deals with a lot of data, but not at the quantitative level that I would deem it big data…at least not yet. I realised a subset of the data obtained had the potential for a common and desirable use case—user recommendations. In particular, I wanted to explore the possibility of computing user similarities based on preferential data—such as ratings—to provide users with personalised recommendations. I was especially fond of the idea of computing user similarities because of two multi-purpose uses:
- it could be used for user clustering and/or neighbourhood creation for the purpose of giving users the opportunity to discover possible ‘friends’, and
- it could be used to find recommendations and to estimate preferential data for users (e.g. Netflix’s personalised star-ratings guesses).
I have previous experience with Hadoop and — with respect to recommender systems — Mahout too. But those are heavy-weight tools when compared to the scale of my project. I wasn't sure if this project would work and if it even had potential for usage in the future. Based on past experience, the task of setting up a Hadoop cluster—even if it was a pseudo one—was still daunting. No, I just wanted to do a relatively small experiment. More so, I had a particular framework tool I wanted to experiment with.
Before I get into the meat & potatoes of discussing my project, I should mention I have looked into tools and projects beyond Hadoop — such as Spark, Cascading, Scalding, and Storm. I agree they do a good job of abstracting some of the work in big “map-reduce” like jobs, but I still felt they were too big for my project. I had something else in mind.
Lately, I've come to rely on a very popular framework for a few tasks. Akka — an actor-based framework— is used for implementing Scala (and Java) based systems that can robustly handle numerous and/or large tasks with traits such as concurrency & parallelism, asynchronicity, fault-tolerance, and remote distribution (clustering!) — just to name a few. With an understanding of some of the basics of recommender systems and algorithms, I wanted to see if Akka was suitable for the job too. I wasn't making any assumptions of Akka being better or even equivalent in performance to the now de facto set of big data frameworks. Even if it possibly couldn’t handle the same scale of processing as the big data crew, I at least felt it could be scalable enough for my task. Implementation experiments would prove or disprove this idea.
Based on some of the actor-patterns I had recently became comfortable with, an implementation was clear in my mind. Even more so, I felt that the implementation could be minimal in comparison to some of the setups I've seen for some big data frameworks.
So let’s see what I came up with…
An Actor-based User Similarity System
Short and sweet — I had a database with a whole bunch of user identifiers, item identifiers, and ratings in the range of 1 to 10. Actually, I had some ratings. Some of the preferential data I had was only binary — the user at some point in time had only signified they ‘liked’ the item. Before getting started, I decided to fill these gap ratings with a 5 — as in 5 out of 10 stars. Being an experiment, I would see what kind of results that would yield.
Now, I'm not going to spend this article giving an introduction to Akka. Mainly because I don’t want to :-) Hopefully, what I can present is enough glimpses of what Akka and/or the actor-model can possibly do for you as a developer. If I do happen to pique your interest, I recommend the book “Akka Concurrency” by Derek Wyatt for an introduction to the framework and model. For the purpose of this article, it’s enough to understand that actor-based frameworks work primarily on the concept of message passing, where actors communicate signals and information between each other to get tasks done.
For my design, the system can be abstractly broken down into three succinct modules. Along with being the primary components of the entire system, they also represent the basis of a similarity job, which is illustrated on the left.
At the top or start of a similarity job is the data streamer. This module is responsible for synchronously pulling the data from the database. “Stream” is an important keyword here. This distinction is made to indicate that each piece of data should not be stored here locally in memory for long-term usage. This would be a waste of resources since the raw data will be abundant and is actually needed elsewhere. The data streamer quickly transmits (or messages) data along to an actual consumer — the vectorization module — for processing. Vectorization is the process of arranging the raw data into vectors and is a requirement for most recommender system algorithms. In general, vectorization will perform the necessary grouping of preferential data (ratings) for each user.
To handle the expected deluge of data, the vectorization module actually consists of a number of distributed worker nodes (that is, actors) to individually collect all the raw data. This is important as the process of vectorization can be time consuming. Each worker node stores user vectors — that is “user-to-item-to-rating” tuples — locally in memory. For each similarity job, consideration and experimentation will lead to discovering the best distribution of worker nodes to efficiently allocate workloads.
Once the data streamer has no more data to stream, a message is sent to signal the workers to start pushing their collected user vectors to the next module in the workflow, the computation module. This module similarly contains a number of worker nodes (more actors) that perform the actual similarity computations. For each user vector a worker node has, a similarity computation will be made against every other user vector every other worker node has. Ideally, at-least-once and at-most-once. This is the source of the brunt of work being performed by the recommender and the distribution of the worker nodes prove their use here too.
The algorithmic approach I went with is straightforward and trivial, but with the parallelism gained from using distributed nodes, optimal efficiency can be gained.
The Distributed Algorithm
Via message passing, each worker node will individually and sequentially become distributors. A distributor is responsible for sending its user vectors to every other worker node. When a worker node retrieves a user vector from the distributor, it will compare that user vector against all the user vectors it has stored locally. After each similarity computation, the received user vector is discarded. This is performed for each user vector the distributor sends. Eventually, the distributor will signal to all the worker nodes that it has no more user vectors. Because a distributor nodes user vectors are no longer needed at this point — since they have already been compared for similarity to every other user vector sitting on the other worker nodes — the distributor can shut itself down. Another worker node then becomes the distributor. But what about computing user similarities for the user vectors at the distributor? The trick to that is in the distributor; when the distributor broadcasts its user vectors, it also sends them to itself. This is because the distributor is still considered a worker node. It accepts messages from itself, but does the same action as every other worker node: it computes user similarities.
That’s it! After the last distributor finishes up — which would be the second last worker in a queue of workers— the job can be considered complete.
With the ideology that actors should be responsible for a very minimal amount of work, Akka actors are usually programmed to perform a relatively small number of simple tasks in reaction to messages they send and receive. The Akka actor API declares one important method to override — receive — which can be thought of as the switchboard control that defines the actions an actor should perform when communicated with. Based on my design above, the receive methods for my modules and worker nodes are for the most part quite trivial.
When a similarity job is initiated, a request for acquisition of a data streamer, vectorization node, and computation node is made. For all intents and purposes for this discussion, we will assume these actors exist already locally in a virtual machine. I will first give an overview of my data streaming actor:
When a job is started, the data streamer — UserContentStreamer— is allocated for the job when it retrieves a RequestStreamer message (which is a Scala case class here). When the job is ready to start, the streamer is then sent a StartStream message, which also indicates a subset of the data in the database (i.e. with the sourceId value). Hidden from my code snippet is the implementation defined by the userContentDatabase object (line 21). That object is injected into my actor via the ApplicationModule trait and basically wraps up a bunch of implementation code (i.e. JDBC connection pooling with C3P0 and LINQ access with Slick). What it returns is a Scala Stream of RawUserContentDto case class objects created from the database entries. These are in turn streamed to awaiting vectorisation worker nodes in the upcoming UserContentVectorizer actor. In case you are wondering, the “!” is Scala and Akka’s syntax for sending (or telling) a message to some other actor. In this case, all the streaming data is sent to a single actor, who I've purposely neglected in my previous design specification, until now.
Director Actors are actor implementations hidden within my three module design. These actors are responsible for the management and organisation of worker nodes. In the case of data streaming, the output actor (line 19) is the director for the UserContentVectorizer node, which was the actor that initially sent the StartStream message. This director routes user vectors to appropriate worker nodes using Akka’s very useful consistent hashing actor routing configuration. This configuration ensures that user vectors for particular users always go to the same worker node (i.e. user grouping). This means that user vectors are collected properly without duplicate vectors for the same user existing somewhere else in the distribution of worker nodes.
Here’s a brief description of the important bits from the vectorization phase:
- The UserContentVectorizer (UCV), which is synonymous to my vectorisation node in the design, is also acquired for a similarity job. (line 8-11)
- A useful feature of Akka actors is their ability to hotswap their behaviour defined by receive. On line 10, the UCV changes its state to working (line 15-23). This is important as it allows the UCV to perform the vectorization without interruption of additional acquisition requests (however, line 16 shows a trick to allow the UCV to stash future jobs it might want to work on later).
- When the UCV is ready to start, it receives a message (line 17) indicating to it the data streamer it should work with. At this point in time, the UCV does two important things: it creates the worker nodes (lines 27-28) and creates the director (line 32) as an anonymous actor. As previously explained, the director is the actor that is responsible for routing the vectors from the UserContentStreamer to the worker nodes (lines 40-44).
- When the stream is completed, the UCV will tell each worker node to return their vectors back to the similarity job-level (where the UCV was allocated) and shut themselves down (lines 45-48).
This code snippet also includes the code for the vectorization worker nodes defined by the UserVectorizationReducer actor.
- This actor is simple: it gets RawUserContentDto’s (line 65) and retains a localized map of vectors (lines 79-102 & lines 108-109). This encapsulates the logic described earlier (i.e. filling the ratings gaps).
- Finally, when the ReturnUserVectors message is received, the actor simply pipes them to the calling job node (lines 69-77).
The beauty of this is the parallelism gained from the Akka configuration of the worker nodes. Because the actors are designed to not share any mutable data, they are free to work independently and concurrently with the data they only receive. Furthermore, with respect to “Java-world”, this is all done without the need to work directly with threads, locks, mutexes, synchronization blocks, etc. (of course, nothing is stopping you from using them if you need them). Beneficially, the actor configuration itself is easily abstracted away from the code and defined in an easy to read configuration file (line 28) .
That’s it for vectorisation. In ~100 lines, we have two classes that can potentially do a heavy load of work.
The final node in my design is the one responsible for taking those UserContentRatingVector’s and performing the similarity computations. To get straight to the point, I haven’t included the node and director actor code in the next snippet. In general, that part of the code is quite similar to the UserContentVectorizer. Once this actor — named UserSimilarityWorkerNode — is acquired, it creates a director that
- creates it’s own set of worker nodes for computing similarities,
- evenly and randomly distributes the user vectors from the vectorisation worker nodes to it’s own set of worker nodes,
- queues all it’s worker nodes to sequentially become distributors,
- tells the first and subsequent distributors to start distribution to sibling worker nodes (and itself), and
- waits for the last distributor to complete before telling the parent similarity job node that the job is done.
For computing the user similarities, I'm using the LensKit Recommender Toolkit. The implementation in the Scala object UserSimilarityAlgorithm (line 56) was initially implemented using a Pearson Correlation algorithm, but based on the sparsity of my test data, it seemed that the Cosine Similarity algorithm was better suited.
Once everything was wired up, I basically had the following system implementation:
I've definitely glossed over certain aspects — including the initiator of the similarity job itself — but those implementations are even more straightforward than what I've shown so far. Hopefully with the somewhat short snippets I've shown, the concept is clear enough.
Being a prototypical application, I kept my expectations in check. However, I was happy to see that my system did its task quite well. With the following inputs and configuration, I was able to compute ~54 million user similarities in ~15 minutes:
- 15,654 user vectors with an average number of 20 ratings per user
- 5 vectorization worker nodes
- 25 similarity worker nodes
- Computation on a Samsung Series 9 ultrabook (dual-core CPU with speed-stepping around 2.0GHz, with 4GB memory)
I feel confident the system could do even better. With a modern CPU and some additional memory, a reasonable number of additional worker nodes could be allocated to possibly get even faster results.
Some Napkin Math
I decided to do some quick math to estimate the capacity to do even more work. As a forewarning, the accuracy of the estimates should be taken with a grain of salt.
If we consider this problem to be simplified to just pairing users, the input of this task — n user vectors — equates to n choose 2 space complexity. The amount of work that needs to be performed is in the time complexity class of O(n^2).
Based on these characteristics and the hardware specifications of my initial experiment, the estimates to the left can be used as a performance guide. Using the time performance from that initial test, I rejigged the value to estimate variance in the number of similarity worker nodes. For example, I estimate if it were possible to allocate 5000 worker nodes, computing similarities of up to 1 million user vectors would approximately take 300 minutes (5 hours). This asumes the same average number of ratings per user.
It’s worth mentioning again that I would expect much better results on more dedicated hardware (e.g. cloud based servers such as Amazon EC2 or Heroku). With better hardware, I would also expect single servers to be capable of locally supporting worker nodes in the hundreds or thousands. In this case, allocating a total of 5000 worker nodes or more would be feasible with around ten remotely connected servers (i.e. clustering, something else I was able to experiment with).
These estimates ignore other external factors, such as remote network communication latency, meaning further tests are definitely a requirement. The experimentation continues!
Considering the fact that the entire system was written in ten Scala files (ten classes, a couple of Scala objects, and a few Scala case classes) and is executed directly from a single JAR, the footprint for deployment is tiny. Of all the characteristics gained from using Akka, this is one of the top ones I favour. Furthermore, maintainability of this project is therefore minimal, something desirable by any backend software engineer.
Akka clustering is a feature I was able to play with in this project after some initial prototyping. To get even faster computation, I previously mentioned in my performance estimates the ability to distribute workers horizontally across other computers in a cluster. Enabling the clustering feature was as simple as modifying some configurations (again, specified externally from the code) and changing a couple lines of code. In short, my application JAR was now able to start up and detect other seed clusters to start allocating remote CPU time and memory for worker nodes. I plan to start testing performance of a similarity job for various cluster configurations soon.
As this project moves along, there are a few other points to consider:
- I completely neglected the fact that the distributor performs redundant similarities when it sends itself user vectors. This is a waste of resources, but not an overly difficult problem to solve with further development.
- I purposely neglected discussion on the output of the similarities and their access outside of the system. I'm considering doing some research on the best option for quick output of the similarities to some data store. A relational database should work fine, but I might consider other options since it gives me an excuse to play around with some newer stuff.
- As for access to the similarities, it doesn't make sense to make this available from within this particular system since it should only focus on one task: to compute similarities. Whatever data store I decide to go with should provide sufficient accessibility to the similarity output. However, I'm considering implementing a REST API in a small external system using Spray, another library I've come to rely on a lot lately. This could make for a more preferable consumption of the similarities.
After these points, a bit of code cleanup, and bug fixes, I plan to share my prototype code publicly.
I hope this write up was interesting enough for you to consider using Akka or a similar actor-model framework in some of your future projects. Even more so, I hope it gave some insight into considering your options when building data processing intensive software. Exploration and experimentation, not silver bullets.