Scalable Sparse Matrix Multiplication in Apache Spark
Our team often deals with sparse matrices. In recommendation systems, the user/item rating matrix, which shows the extent of how much a user likes an item (e.g., a movie), is typically very sparse. Rather than a user/movie rating matrix, we often look at user/host login matrices to find out which logins are unusual.
The plot above shows a typical user/host login matrix. Each login from a user to a host is shown. Interesting patterns emerge; for example you can see that some user accounts visit almost all 3500 hosts so they might be scripted to do a manual job. The visualization can be misleading but the sparsity of the matrix is about 99.7%!
The problem
A common task in algorithms for recommendation engines is the calculation the matrix product of two sparse matrices. Let us call the operands of the multiplication left matrix and right matrix. The product matrix itself will be sparse as well. When these matrices are big, we cannot possibly afford storing any of them in a dense way. They probably would not even fit into the memory, causing a lot of headache.
For one of our anomaly detector algorithm, we needed to implement a scalable sparse matrix-matrix multiplication in Apache Spark. We first turned to the documentation of Spark MLlib to learn what is implemented in Spark already. You can first read the summary of our findings. Then we present a solution for sparse matrix multiplication that worked better for us than the currently implemented functions in Spark.
Disclaimer: in the following part of this post, the cited source and API documentation reflect the state of Apache Spark as of November 2016, i.e., v2.0.2. As Spark rapidly evolves, the summary and findings might become outdated in the future.
Matrix multiplication implementations in Spark
In Spark MLlib, there are two main types of matrices: local matrices are stored on a single machine, while distributed matrices are stored on multiple machines. As of now, there are two implementations of local matrices: DenseMatrix
and SparseMatrix
, and four implementations of distributed matrices: CoordinateMatrix
, IndexedRowMatrix
, RowMatrix
and BlockMatrix
. We wanted to store our matrix distributively so we looked at those possibilities.
According to the documentation, a CoordinateMatrix
should be used to store very sparse matrices with huge dimensions, which seemed to be our use case. A CoordinateMatrix
is built up from MatrixEntry
instances (actually from an RDD[MatrixEntry]
), each MatrixEntry
describing a non-zero element in the matrix with its coordinates and value. Unfortunately for us, matrix multiplication is not yet a supported operation of the CoordinateMatrix
class.
You can think of IndexedRowMatrix
as a collection of IndexedRow
s, where an IndexedRow
is a tuple of a row index and a Vector
. The Vector
can be a sparse vector. Thus, an IndexedRowMatrix
can also work as a sparse representation of a matrix. RowMatrix
is similar to IndexedRowMatrix
but an entry in such a matrix is only a Vector
without the row index. A RowMatrix
can be multiplied with another matrix, and a CoordinateMatrix
can be coverted into a RowMatrix
with the toRowMatrix
method. You might think that this way we can solve sparse matrix multiplication. But only until you find in the source of RowMatrix.scala that the right matrix should be a DenseMatrix
, so both matrices in the operation cannot be sparse:
def multiply(B: Matrix): RowMatrix = {
val n = numCols().toInt
val k = B.numCols
require(n == B.numRows, s"Dimension mismatch: $n vs ${B.numRows}") require(B.isInstanceOf[DenseMatrix],
s"Only support dense matrix at this time but found ${B.getClass.getName}.")... }
The case of BlockMatrix
is similar to the previous. It has a multiply method; however, from its Scala API docs it turns out that if the right matrix is SparseMatrix
, it will be converted to a DenseMatrix
. We tried it but worked miserably for us as the dense matrix could not fit into memory.
We have seen that distributed sparse matrix multiplication is not yet solved in Spark. To give you a full picture let us mention that in local matrix multiplication, again, the right matrix should be dense.
A naive solution
After being disappointed for a minute, it occurred to us that a naive implementation of the multiplication of two sparse matrices should not be hard to realize. We recalled having studied how to multiply matrices in the MapReduce way in the book Mining of Massive Datasets by Leskovec, Rajaraman and Ullman. In the following we build on the ideas and notations from Chapter 2.3.9 of said book; it is worth taking a look there.
Let us call the left matrix M. It consists of i rows, j columns and has entries mᵢⱼ. Likewise, the right matrix is N with j rows, k columns and entries nⱼₖ. The product matrix P has entries pᵢₖ, where pᵢₖ = ∑ⱼ mᵢⱼ nⱼₖ.
Our CoordinateMatrix
implementation of the algorithm works like this:
def coordinateMatrixMultiply(leftMatrix: CoordinateMatrix,
rightMatrix: CoordinateMatrix): CoordinateMatrix = {
val M_ = leftMatrix.entries
.map({ case MatrixEntry(i, j, v) => (j, (i, v)) })
val N_ = rightMatrix.entries
.map({ case MatrixEntry(j, k, w) => (j, (k, w)) }) val productEntries = M_
.join(N_)
.map({ case (_, ((i, v), (k, w))) => ((i, k), (v * w)) })
.reduceByKey(_ + _)
.map({ case ((i, k), sum) => MatrixEntry(i, k, sum) }) new CoordinateMatrix(productEntries)
}
First we arrange the entries of both matrices into pairs to form key/value RDDs. For the left matrix, the key will be the column index, for the right matrix it will be the row index, both denoted by j. The value of the key/value pair is a tuple itself comprising the other coordinate and the value of the current matrix entry.
The key of the join
is j, so the entries with the same j will stick together (that is, entries of matrix M from column j and entries of matrix N from row j). The elements of the same j will be multiplied together (see the formula of the matrix product and the first map
after the join
), then the products with the same coordinates in P will be summed together (reduceByKey
).
In the end, we just map what we calculated to obtain MatrixEntry
elements so that we can make a CoordinateMatrix
from them.
This solution might be naive but so far it works for our use case. At least none of the matrices are converted into their dense representations, making the multiplication more scalable than what the API currently provides. Obviously its limits should be further analyzed. If you have any insights about this, we would be curious to hear it.
Conclusion
During building a recommedation system, we faced the problem of large sparse matrix multiplication with Apahce Spark. Since Spark MLlib did not contain a suitable solution for us, we fabricated our own implementation. Hopefully, this blog post will help you on your way with dealing with large sparse matrix using Spark.
Originally published at www.balabit.com on December 1, 2016 by Árpád Fülöp.