Large scale matrix multiplication with pyspark (or — how to match two large datasets of company names)

Ran Tavory
14 min readAug 2, 2016

--

Spark and pyspark have wonderful support for reliable distribution and parallelization of programs as well as support for many basic algebraic operations and machine learning algorithms.

In this post we describe the motivation and means of performing name-by-name matching of two large datasets of company names using Spark.

Motivation first

Our goal is to match two large sets of company names. We’re looking at two long lists of company names, list A and list B and we aim to match companies from A to companies from B.

Typically we would have something like this:

List A       | List B
---------------------
GOOGLE INC. | Google
MEDIUM.COM | Medium Inc
Amazon labs | Amazon
Google, inc |
Yahoo |
| Microsoft

In this example our goal is to match both GOOGLE INC. and Google, inc (from list A) to Google (from list B); and to match MEDIUM.COM to Medium Inc; and Amazon labs to Amazon, etc…

Looking at this simple example, a few things stand out:

  • It’s possible for more than one company from A to match a single company from B; it’s a many-to-one relationship.
  • Company names are in most cases easy for humans to match (e.g. Amazon labs to Amazon) but are not as easy for computers to match (how does the computer know that “labs” in this case is insignificant and that “Amazon” is just a short for “Amazon labs”?)
  • Not all companies from A have matches in B, and not all companies from B are matched from A. In our example Yahoo from list A is not matched to any other company on B and Microsoft from B is not matched to any company on A either.
  • Any element from list A should have at most one match from the B. The opposite isn’t true as noted, many companies from A could be matched into a single company on B.

First attempt — trivial match

OK, At first we thought we’d try the most simple and trivial solution, to see how well it works, if not for anything else, at the very least in order to establish a baseline for future attempts. The most simple thing to do is just case-insensitive string equation test. Simply match the strings from A to the strings from B.

Precision and Recall

There are two relevant measures to look at: Precision and Recall. Precision is “how many mistakes did we (not) make”, or in other words — given all matches, how many of them were indeed correct matches — Precise matches. So precision is about false positives.

Recall on the other hand is “how many matches should have been found, but were missed”. So recall is about the false negatives.

The first trivial attempt was, as expected, high on precision but low on recall. If we look at out short list of example companies it would match zero elements from A to B. That’s very poor recall, but 100% precision ;-). Of course in real-world scenario it would match a bit more than zero but it’s easy to see that due to many small possible variations in company names recall would remain low.

A simple improvement to that would be the removal of stop-words.

Stop Words

What are stop-words? Stop-words are words that get removed before processions various NLP algorithms because they don’t add information, usually they just add noise. In plain english stop-words are usually the “of”, “for” “if” etc of the language, these are very common words that get used a lot, but for many NLP and IR algorithms then don’t add information. They make for correct syntactical sentences and in many cases they affect the semantics, however at the level of many NLP processors, which don’t look at actual syntax, they are meaningless.

In our case the stop-words are not the “if”, “of” or “for” which are typical in English, but they are the “inc” and “llc” from the company extension. So our simple improvement is to just remove all these company extensions and try the simple string equation once again.

This indeed helped and as you can see in our example, this helped match “Google inc.” to “Google” and with simple punctuation removal and correct tokenization, we also match “Google, inc” to “Google”. But that still doesn’t match “Amazon labs” to “Amazon” b/c “labs” is not a stop-word in the sense that it isn’t not a common company extension. As it turns out, “Amazon labs” isn’t just a random example, many company names have these variations in their names that are manifested in one dataset but are not manifested in other datasets. Conclusion: we have to find a way to “look beyond that”, ignore the “labs” in “Amazon labs”.

Let us meet science.

The Science

What we’re looking at here is the problem of matching N documents from list A to M documents in list B, in many-to-one relationship. But our matching algorithm needs to be “smart”, in the sense that it needs to be able to distinguish between “important words” and “non important words”. We need to find a way to tell the computer that “labs” in “amazon labs” is insignificant but “amazon” is indeed significant. We would also trivially tokenize the names into smaller tokens by splitting by spaces, punctuation etc, so that “medium.com” would get broken up into “medium” and “com”.

Science to the rescue!

TF-IDF

To that end we use a common scheme in information retrieval theory called TF-IDF. TF-IDF stands for Term Frequency — Inverted Document Frequency. Term Frequency simply means “how many times this word appears in this document” (our documents are just company names, so they are very short “documents”). So in the case of “amazon labs” we have only two words in the document “amazon” and “labs” and their frequency is simply 1 and 1. (If by the way the name of the company just happened to be “amazon labs amazon” then the count would have been 2 for “amazon” and 1 for “labs”.) That’s what TF is all about, quite simple: Count the frequency of terms in the document.

Inverted Document Frequency is the real deal. The inverted document frequency looks at all “documents” (aka corpus, all the company names) and tests how often the word “labs” appears in all of them. If the word “amazon” only appears in a single document, that means that “amazon” is a significant word, but if the word “labs” appears in many other documents (e.g. many companies use the word “labs” as part of their name) then that means that the word “labs” is insignificant. IDF is just that — how many documents does the word appear in.

TF-IDF is TF of the term, divided by the term’s IDF. It provides a good measurement of how important or how significant words are in the context of specific documents.

It’s easy to compute the TF-IDF matrix for a set of documents. There are ready-made libraries that do that, and we used scikit-learn’s implementation for that.

The TF-IDF matrix is a two dimensional matrix in which the rows represent documents (in our case — company names) and the columns represent unique tokens (or words). If we wanted to build the TF-IDF matrix of our little corpus from list A it’d look something like this (after the removal of stop-words, punctuation and lowercasing everything):

           | google | medium | com | yahoo | amazon | labs 
-----------------------------------------------------------
GOOGLE INC.| 1 0 0 0 0 0
MEDIUM.COM | 0 .77 .63 0 0 0
Amazon labs| 0 0 0 0 .7 .7
Google, inc| 1 0 0 0 0 0
Yahoo | 0 0 0 1 0 0
com | 0 0 1 0 0 0

Here’s the code:

from sklearn.feature_extraction.text import TfidfVectorizermatrix = vectorizer.fit_transform(['GOOGLE','MEDIUM.COM', 'Amazon labs', 'Google', 'Yahoo', 'com'])

The matrix that was created is NxM where N = number of companies and M = number of unique tokens.

You’d notice that we added another (made up) company named “com”. We did that in order to demonstrate an important property of TF-IDF. We use TF-IDF in order to distinguish between significant and insignificant tokens in the documents. A significant token in a document is a token that not only appears in the document often, but that is also relatively rare in the entire corpus. If a term appears many times in the corpus then it becomes less significant for this specific document. We added the made up company “com” so that ”Medium” within “Medium.com” becomes more significant. (You’d notice that “Medium” weight is .77 while “com” weight is .63, and that’s due to the appearance of “com” in another document hence it’s IDF is lower).

Of course in real world situation you’d have dozens or hundreds of company names with the token “com” or “labs” so you’d see a substantial difference between “Medium” and “com” within the name Medium.com.

Cosine Similarity

The next step after calculating the TF-IDF matrix for both sides (both lists A and B of companies) is to multiply the matrices.

Multiplying matrices provides an interesting measure called the Cosine Similarity. The cosine similarity is a simple similarity measurement that ranges between 0 and 1. A value of 1 indicates identical elements and a velue of 0 indicates completely different elements (just like the cosine trig function does). Multiplying the matrices provides the cosine similarity between every element in list A to every element in list B. As a matter of fact, we multiply A by B.T (B.transpose) so that the dimensions fit. The interesting thing about cosine similarity between TF-IDF matrices is that the result is a matrix of similarities between every element in A to every element in B, while taking into account the significance of tokens in the names. Usually a result of > .8 means a valid match.

Luckily the python package sklearn provides a simple cosine_similarity function that accepts two matrices and results in the cosine similarity of these two. Here’s some demo code:

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
a = vectorizer.fit_transform(['aa','bb', 'aa bb', 'aa aa bb'])
b = vectorizer.fit_transform(['aa', 'bb'])
cosimilarities = cosine_similarity(a, b)

The result is a matrix of similarities between every element in A to every element in b.

           aa   |  bb
----------------------
aa | 1 | 0
bb | 0 | 1
aa bb | .7 | .7
aa aa bb | .89 | .44

As expected, the word “aa” from a is very similar to the word “aa” from b (note the 1). You can also see that “aa bb” is equally similar to both “aa” and “bb” and that also makes sense. And finally you’d notice that “aa aa bb” bares higher similarity to “aa” than it does to “bb”. This all makes sense.

Let us sum up the science here. First we take two lists of documents and for each set we compute it’s TF-IDF matrix*. Then we multiply the two matrices to come up with their cosine similarity, which is a matrix that describes the similarity between every document in A to every document in B.

Second Attempt — numpy matrix multiplication

After having seen the science, we want to try this in practice. The next attempt is to load all real companies from list A and load all real companies from list B and multiply the heck out of them using the function cosine_similatiry.

Easier said than done.

In small scale this simply works and it works very nicely. For example with a few thousand companies on each side that would work. However with our dataset, in which we have a couple of hundred of thousands names in each list, up to a few millions, this becomes challenging.

Simply calculating the TF-IDF is feasible, even with such large datasets, on a single host (my laptop runs that easily, in a few seconds). However, multiplying the matrices is where the real challenge is.

Local multiplication doesn’t scale

Let’s assume we have 1M (10⁶) names in each list. Each matrix then would be about the order of 10⁶ x 10⁶ (since the number of unique tokens is similar in order to the number of companies due to uniqueness of names). Creating a 10⁶ x 10⁶ matrix in memory means 10¹² floats. A python float takes 16 bytes so we end up with 16*10¹² bytes, which translates to ~4PT (four petabytes) of RAM. Can we keep a 4PT matrix in memory? Well, of course not, at least not on my shabby laptop. But — there’s a trick. We don’t have to keep it all in memory. Keep in mind that although the matrix is 1M on 1M, in practice it is mostly filled with Zeros. So, why should we bother maintaining all these zeros in memory? We can instead use a sparse representation of the matrix, in which instead of keeping the two dimensional matrix in memory using arrays of arrays, we can instead only keep track of the coordinates of the non-zero elements and assume that all the rest are just zeros. That’s great for preserving low memory footprint as well as executing fast matrix multiplication operations. And as a matter of fact, this is exactly what sklearn already does. Keeping the matrices as Sparse Matrices means that we only have to allocate around 1M floats (values), plus 2M integers (indices of non-zero elements), but all in all it’s about 24Mbytes, which is quite easy.

But — multiplying the two matrices, even that they are sparse, would mean at the very least 10¹² operations (if we’re smart with the zeros). Now that is a bit harder. And although numpy (which lies underneath sklearn) is very good at such fast math, this thing is a bit challenging even for numpy.

We tried that — simply multiplying these two matrices. It works well for small enough matrix size, but at some numbers (which are much smaller than what we want) it started to fail and run out of memory. Now, we could’ve worked this out by splitting one of the matrices to smaller chunks and running a number or multiplications one after the other and then summing all things up. But, this kind of reminded us that we already know that kind of system that does that, it’s called Spark.

Third Attempt — Spark matrix multiplication

Spark is great for highly parallelized memory intensive computations and lo and behold, it has a BlockMatrix data type which implements a multiply operation. Looks like exactly what we were looking for! OK, so we create the TF-IDF matrices and convert them to Spark’s BlockMatrix and run a.multiply(b.transpose()) which is more or less what cosine_similatiry does.

# Pseudocode...a_mat = tfidf_vect.fit_transform([..., ..., ...])
b_mat = tfidf_vect.fit_transform([..., ..., ...])
a_block_mat = create_block_matrix(a)
b_block_mat_tr = create_block_matrix(b.transpose())
cosimilarities = a_block_mat.multiply(b_block_mat_tr)

This seems easy enough, and it really is. But there’s a “but” of course… this thing, although simple and works correctly from the math point of view — well, it doesn’t scale… We’re able to multiply large matrices, but not as large as we would like to. We tried playing with the block sizes etc alas, for large enough inputs it fails with either out-of-memory errors or just long runs that never end (hours and hours).

What’s the problem? Can’t Spark scale?

Of course spark can scale. But you have to use it wisely, stupid... The problem with BlockMatrix is that in order to implement the multiply operation Spark converts the sparse blocks of the matrix into dense (sub)matrices. And although most of our matrix is zeros, spark would still convert all these zeros into dense representation, which would either consume too much memory, or if we keep the size of the blocks small, would result in too many operations, repartitions etc and run forever.

Spark does support sparse matrices, but these matrices do not implement the multiply (aka dot) operation, and the only distributed matrix that does implement the multiply operation as of the time of writing is the BlockMatrix, which as noted, converts the sparse representation into dense representation before multiplying them. We should note that there had been discussions in the spark community about ways to implement distributed sparse matrix multiplication, however as noted — at the time of this writing, this was not implemented yet.

BlockMatrix.multiply() failed. What’s next?

Fourth Attempt — and the winner is…

Our fourth and last attempt was successful. The idea is to mix and match Spark with numpy. Our tests show that numpy is able to multiply a smaller matrix with a larger matrix, so if we take just a small chunk of matrix A and multiply that by matrix B, that would work and numpy would not explode. And if you recall your algebra lessons, multiplying matrices can be done vector-by-vector so algebraic-wise that would still be correct. The idea is to split just one of the matrices into smaller pieces, and then have each Spark worker run the multiply on its chunk, and then return just the conclusion, e.g. the conclusion might be that the name at A[13] matches the name at B[21] etc.

Broadcast and Parallelize to the rescue

Spark has two useful capabilities: broadcast and parallelize. Broadcast simply broadcasts the exact same data to all the workers. We use broadcast to send matrix B to all workers so that all workers have the complete B matrix. Parallelize chunks the data into partitions and sends each partition to a different worker. We use parallelize to send chunks of A to the workers so that each worker has all of B but just a small chunk of A.

Here’s the general outline:

  1. Calculate TF-IDF matrices on the driver.
  2. Parallelize matrix A; Broadcast matrix B
  3. Each worker now flatMaps its chunk of work by multiplying its chunk of matrix A with the entire matrix B. So if a worker operates on A[0:99] then it would multiply these hundred rows and return the result of, say A[13] matches a name found in B[21]. Multiplication is done using numpy.
  4. The driver would collect back all the results from the different workers and match the indices (A[13] and B[21]) to the actual names in the original dataset — and we’re done!

This method works very well and as a matter of fact when it ran for the first time it was such a nice surprise that we thought it just didn’t work (but it did…). Compared to the previous methods, which either ran for hours (and didn’t finish) or ran out or memory, this method was able to complete its computation in order of just a few minutes. Of course it depends on the size of the data and the size of Spark’s cluster, but all in all it performed really well.

Right now the only bottleneck is the driver calculating TF-IDF matrices and on that front we still have tons of elbow space because this calculation is still quite easy for sklearn. (side-note: Spark also implements distributed TF-IDF calculation but we didn’t have to use it).

Here’s pseudocode to illustrate our solution:

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
# these would realistically get read from files or dataframes.
a = ['google inc', 'medium.com', ...]
b = ['google', 'microsoft', ...]
stopwords = ['ltd', ...]
vect = CountVectorizer(stop_words=stopwords)
# this can be done with less memory overhead by using a generator
vocabulary = vect.fit(a + b).vocabulary_
tfidf_vect = TfidfVectorizer(stop_words=stopwords,
vocabulary=vocabulary)
a_mat = tfidf_vect.fit_transform(a)
b_mat = tfidf_vect.fit_transform(b)
a_mat_para = parallelize_matrix(a_mat, rows_per_chunk=100)
b_mat_dist = broadcast_matrix(a_mat)
a_mat_para.flatMap(
lambda submatrix:
find_matches_in_submatrix(csr_matrix(submatrix[1],
shape=submatrix[2]),
b_mat_dist,
submatrix[0]))
def find_matches_in_submatrix(sources, targets, inputs_start_index,
threshold=.8):
cosimilarities = cosine_similarity(sources, targets)
for i, cosimilarity in enumerate(cosimilarities):
cosimilarity = cosimilarity.flatten()
# Find the best match by using argsort()[-1]
target_index = cosimilarity.argsort()[-1]
source_index = inputs_start_index + i
similarity = cosimilarity[target_index]
if cosimilarity[target_index] >= threshold:
yield (source_index, target_index, similarity)
def broadcast_matrix(mat):
bcast = sc.broadcast((mat.data, mat.indices, mat.indptr))
(data, indices, indptr) = bcast.value
bcast_mat = csr_matrix((data, indices, indptr), shape=mat.shape)
return bcast_mat
def parallelize_matrix(scipy_mat, rows_per_chunk=100):
[rows, cols] = scipy_mat.shape
i = 0
submatrices = []
while i < rows:
current_chunk_size = min(rows_per_chunk, rows - i)
submat = scipy_mat[i:i + current_chunk_size]
submatrices.append((i, (submat.data, submat.indices,
submat.indptr),
(current_chunk_size, cols)))
i += current_chunk_size
return sc.parallelize(submatrices)

You’d notice that after the broadcast and parallelize, we re-assemble the matrix into scipy csr_matrix, which is what it originated from. So what we basically do is — we serialize the matrices over the wire and then reassemble them on the other side, on the workers. The serialization is efficient since we only need to send the non-zero elements of the sparse matrix. So for a matrix of 1M elements we only send about 1M floats, along with 2M ints, which is definitely in the comfort zone of Spark.

Conclusion

We describe a method for finding similarity between two lists of strings A and B which describe company names. We used TF-IDF and cosine-similarity as a similarity factor.

Next we show various attempts for scalable implementation of matrix multiplication using spark, and the winning method which combines numpy matrix multiplication along with spark’s broadcast and parallelize capabilities.

* One fine point to mention: the vocabulary of both matrices must be the same. In other words the number of rows in both matrices must be equal and they must have exactly the same order e.g. each row represents a term and the order of the rows must be exactly the same between matrix A and matrix B. This is easily done by first calculating the vocabulary and only then calculating the TF-IDF as in the following example:

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics.pairwise import cosine_similarity
a = ['google inc', 'medium.com']
b = ['google', 'microsoft']
company_name_stopwords = frozenset(['ltd', 'llc', 'inc'])
vect = CountVectorizer(stop_words=company_name_stopwords)
vocabulary = vect.fit(a + b).vocabulary_
tfidf_vect = TfidfVectorizer(stop_words=company_name_stopwords,
vocabulary=vocabulary)
a_mat = tfidf_vect.fit_transform(a)
b_mat = tfidf_vect.fit_transform(b)
cosimilarities = cosine_similarity(a_mat, b_mat)

--

--

Ran Tavory

The voice @reversim, head of Data Science at AppsFlyer