SparkRSVD open-sourced by Criteo for large scale recommendation engines
Recommendation engines are at the heart of many advertising platforms, enabling to recommend to a user products they might like, based on their previous interaction with other products. Several techniques exist to build such recommendation engine (Koren09). One of them, called collaborative filtering, is based on analyzing all the interaction data between the users and the products (e.g. ratings, buys or page visits) to identify relationships between users, and also between products, so as to identify product categories a user might be interested in. One way to do it is to use latent factors, which are traits that characterizes both users and products. To create them automatically, as in latent factor analysis, one might use matrix factorization. This technique was popularized with the Netflix prize (Bennett07). The interaction matrix, where for instances rows represent users and columns represent products, and whose entries are filled with a number when an interaction has taken place (it might be a 1 when it is an uncountable interaction, such as a film view or page visit, or a number when an explicit rating is given by the user), can be factorized using different methods: Bayesian Personalized Ranking (Rendle09), Weighted Alternated Least Squares (Hu08) or more recent Variational Autoencoders (Liang18) just to name a few.
The problem with many state-of-the-art collaborative filtering approaches is that they are made and tested for relatively small datasets — Netflix (463k items and 18k users), Million Songs Dataset (571k items and 41k users), etc. While in the case of web-scale recommendations we have to make recommendations for ~10⁹ users and ~10⁸ items. There are some existing scalable approaches that are worth mentioning: PinSage (or older Pixie) or Giraph-based matrix factorization. But we could not use either of them: the former does not have an open-source implementation that we could use and the latter is a de-facto custom distribution framework that would be hard to maintain. We needed a solution that would be scalable, light on the cluster and running on a general-purpose distributed framework with high community support. In order to achieve that, we leveraged the last decade of advances in randomized methods (for algorithms, refer to Li16), not so well known approach to collaborative filtering — PureSVD (Carmona12) and built it on top of Apache Spark. Recently, we open-sourced our implementation (Spark-RSVD), a library which allows to compute approximate SVD factorization of large scale sparse matrices. To the best of our knowledge, this is the first library on Spark which implements randomized SVD at scale.
The randomized-SVD algorithm
The randomized SVD algorithm is an iterative algorithm used to find an approximate truncated singular value decomposition (for a review on full singular value decomposition, see (Golub12)). It is truncated because not all the singular vectors are computed (i.e. only the first ones with the biggest corresponding singular value) and approximated because they are computed through an iterative process. Typically, the number of singular vectors to be computed is very small compared to the matrix size, so that it produces a very low-rank approximation.
The randomized-SVD algorithm can be split into two phases:
- The iterative phase aims at finding a good approximation of the range of the matrix with a very low dimensional subspace.
- The second phase uses this approximate image space to compute the singular vectors.
For a thorough presentation of randomized algorithm, please refer to (Halko11).
The first phase starts with (k + p) random vectors, where k is the number of singular vectors to be computed, and p is the oversampling parameter. The last p approximate singular vectors are solely computed to increase the accuracy of the first k ones. This random matrix, called Ω, is then multiplied by the matrix A (the matrix to decompose) to create the matrix Y. This matrix Y, because of the randomness of Ω, is already a good approximation of the range of A. An intuitive way of understanding this is that this random matrix excites all the eigenmodes of the matrix A, and only the dominant ones stand out. Thus, Y spans a subspace which approximates the one spanned by the k + p first singular vectors of A. To increase the precision of the approximation, an iterative refinement of this method consists in multiplying q times the vector Y by the transpose of A and the by A (multiplying by the transpose matrix brings back the result to the row space of A, so that the ensuing multiplication is valid and gives a result in the image space of A). To increase the numerical accuracy, the columns of Y are orthonormalized through a QR decomposition after each multiplication by A (or its transpose).
Now, we take the QR decomposition of Y, more precisely the Q factor, so as to orthonormalize the columns of Y. The matrix Q now approximates the column space of A. The quality of the approximation can be measured through a matrix norm (see (Halko11) for more details).
The second part of the algorithm is easy, now that the the Q matrix is found, and does not entail further approximation. See the figure below for the full algorithm.
Spark data structures
In this section, we will focus on how we designed data structures specific to linear algebra that provide fast operations when run on Spark.
In Spark-SVD we have two kind of elementary data structures on which the computation are based:
- Big rectangular sparse matrices: these matrices correspond for instance to the initial matrix to decompose, and their size is in the order of magnitude of 100 million rows/columns.
To be represented in Spark, this matrix is first split into a grid of blocks, each block being a square sparse matrix of “blocksize” size (we currently use a “blocksize” of 50 000).
Then, we are partitioning the matrix into Spark partitions by grouping adjacent blocks into a single partition. The number of “row of blocks” and “column of blocks” per partition can be configured, and will impact the matrix decomposition time.
- “Tall-and-skinny” matrices. These matrices correspond to the embedding result for instance, so they are very tall (up to 100 million rows) but have only a few hundreds columns. They are also dense.
Here, we have kept the split of the matrix in blocks of “blocksize” length. We then end up with a stack of blocks, where we regroup adjacent blocks in a given partition. Again, the number of “row of blocks” per partition can be configured. Note that the partitioning scheme is consistent between the sparse matrix and the tall-and-skinny matrix for easy matrix-vector computation.
The most computationally intensive operation of the Spark-SVD algorithm is the multiplication of the rectangular sparse matrix with a tall-and-skinny matrix. Here is how this multiplication can be done in Spark using the previous data-structure:
- First, it is possible to split the problem into smaller multiplications, and stack the results. Indeed, a multiplication of a m-by-n matrix by a n-by-p matrix can be split into a several multiplications of l-by-n matrix by n-by-p matrix, where l < m, and stacking the resulting l-by-p matrices. We use this idea to split the sparse matrix into several matrices with fewer lines, iteratively doing each multiplication, then stacking the results. The horizontal matrices are stored as separate RDDs, as shown in the figure below
- By correctly choosing the number of blocks per partition for the tall-and-skinny matrix and for the sparse matrix, we can end up in a situation where each partition of the sparse-matrix exactly requires one partition of the tall-and-skinny matrix in order to do the multiplication. With this setup, we can then call the zipPartitions() Spark method on both RDDs, and then do local multiplications.
- Each partition will then contain a partially-summed resulting matrix. We then just need to sum all of them across all partitions to get the final resulting matrix.
Spark memory issues
During development of Spark-SVD, we faced several scalability issues while running the library on our Hadoop cluster.
Indeed, due to how we chose to distribute the matrix-vector multiplication phase, the best speed will be attained when we increase the width of the matrix partition as much as possible, so that most of the “sum” phase is executed on a single partition, minimizing the amount of cross-partition communication required.
As a result, we have configured Spark-SVD such that the total amount of Matrix and Vector data that ends up on a given partition is pretty close to the total amount of memory available for a task.
However, using such big partition size (close to 2 GB) and having only a few big objects per partition triggered lots of Out Of Memory errors and other transport errors in the internal Spark stack during the set-up of the matrix-multiplication tasks. After some time spent debugging we managed to uncover some behaviors of the Spark library that we were not aware of and that explained these issues:
- Persisted (“cached”) RDDs will behave differently with respect to maximum memory usage depending on whether they are persisted as “serialized” or not. Indeed, storing an RDD as “deserialized” in memory and sending it over network to another partition will require that this RDD is serialized on the original partition, sent over network, and then deserialized on arrival. In this case, the original partition will see its memory footprint increase as each item will be temporarily stored in both its deserialized and serialized versions. This may not be important if the RDD has lots of small objects, but if a partition contains a few big objects (close to 1GB each) then the temporary memory increase can definitely trigger Out Of Memory errors
- Likewise, within the Spark library, the RDD content from the moment where it is received by the network to the moment where it is usable by the end-user is going through several different transformations. Unfortunately, some of these transformations require extra-memory to be allocated.
This happens for instance once in the Netty stack (Netty is the library that handles Spark networking), where some operations could be optimized so they don’t require any extra memory. We made a pull request for this optimization in https://github.com/netty/netty/pull/8176 .
The same kind of “over memory allocation” can also be found in Spark, where several intermediate buffers are created / allocated to transfer data, which isn’t really required. We made a pull request for this in https://github.com/apache/spark/pull/22024 . In the meantime, these extra-memory allocations can be avoided by asking Spark to store incoming RDD data to disk, then read it again from disk (spark.maxRemoteBlockSizeFetchToMem). This is slower but this also lowers the peak memory used for a task.
Just before releasing SparkRSVD on Github, we discovered that Dask has an implementation of RSVD. Dask is a computing framework written in Python for parallel processing. It integrates smoothly in the numerical Python ecosystem (i.e. NumPy, SciPy, scikit-learn, Pandas, etc). It tries to tackle the same issues as Spark, though with some subtle differences.
Dask supports sparse matrices inside its Distributed Array structure, though it is based on a fork of SciPy sparse matrix structure (as the standard SciPy sparse matrix does not implement an ndarray interface, but rather a matrix interface, see scipy PR8162 for an extended discussion).
Dask arrays are distributed N-dimensional arrays, split into chunks which are NumPy array (or in our case, sparse array). Thus, the elaborate data structure that was devised for SparkRSVD is not needed in Dask. For our tests, the chunks of the Dask array had nearly the same size of the partitions in Spark-RSVD.
Description of the benchmark
To enable rapid testing of randomized SVD on Dask, we chose to generate a random matrix instead of reading entries from a Parquet file and populating the sparse matrix with them. Our first test was done on a square matrix of dimension 30 million, with a sparsity of 1e-5, distributed on 60 executors, 5 v-cores each and 38 Gb. The number of computed approximate singular vectors is 100. Only one power iteration was made.
The results are presented in the following table:
When using 60 executors, the results do not look extraordinarily positive for our library. It might be linked to the use of accelerated libraries of SciPy and NumPy by Dask. For this benchmark, 60 executors is actually quite a lot, as the chunks (or the Spark partitions) are 1 million by 1 million. Thus, the blocks of the tall and skinny matrix must be duplicated on the executors.
What if we used only 30 executors ?
The Dask implementation never finishes (or more precisely, crashes), whereas Spark-RSVD completes the task in 38 min (notice the good strong scalability when using twice as many executors and having a job nearly twice as fast).
To conclude this benchmark, one last number is one from one of our production dataset of size roughly 100 million time 100 million. SparkRSVD used on it needs roughly 2 hours 40 minutes to complete the algorithm to compute 100 singular vectors with one power iteration, on 100 executors. We were never able to scale Dask on datasets this large. This proves the usefulness of our implementation of the randomized SVD algorithm in SparkRSVD.
Bennett07: Bennett, J., & Lanning, S. (2007, August). The netflix prize. In Proceedings of KDD cup and workshop (Vol. 2007, p. 35).
Carmona12: Carmona, G.V. and Heindl, E., 2012. Performance of Recommender Algorithms on Top-N Recommendation Tasks.
Golub12: Golub, G. H., & Van Loan, C. F. (2012). Matrix computations (Vol. 3). JHU Press
Koren09: Koren, Y., Bell, R., & Volinsky, C. (2009). Matrix factorization techniques for recommender systems. Computer, (8), 30–37.
Halko11: Halko, N., Martinsson, P. G., & Tropp, J. A. (2011). Finding structure with randomness: Probabilistic algorithms for constructing approximate matrix decompositions. SIAM review, 53(2), 217–288.
Hu08: Hu, Y., Koren, Y. and Volinsky, C., 2008, December. Collaborative filtering for implicit feedback datasets. In Data Mining, 2008. ICDM’08. Eighth IEEE International Conference on (pp. 263–272). Ieee.
Li16: Li, H., Kluger, Y. and Tygert, M., 2016. Randomized algorithms for distributed computation of principal component analysis and singular value decomposition. Advances in Computational Mathematics, pp.1–22.
Lian18: Liang, D., Krishnan, R.G., Hoffman, M.D. and Jebara, T., 2018. Variational Autoencoders for Collaborative Filtering. arXiv preprint arXiv:1802.05814.
Rendle09: Rendle, S., Freudenthaler, C., Gantner, Z. and Schmidt-Thieme, L., 2009, June. BPR: Bayesian personalized ranking from implicit feedback. In Proceedings of the twenty-fifth conference on uncertainty in artificial intelligence (pp. 452–461). AUAI Press.
Aloïs Bissuel (machine learning engineer), Vincent Grosbois (machine learning engineer) and Ivan Lobov (researcher), working at Criteo AI Lab