Experimenting with PySpark to Match Large Data Sources
By Zhihuang Chen
Civis provides a solution called Identity Resolution, which unifies data from different sources and resolves duplicate identities. The core feature of this solution is a probabilistic matching algorithm. Given two data sources, for each person in the first data source, the algorithm finds the people who have the highest possibility of being the same person in the second data source.
Civis has a Python implementation of this algorithm, but we wanted to experiment with Apache Spark to see if it helps solve two issues with our current implementation.
- Speed: When matching 5 million rows against 5 million rows on a medium size instance, our current implementation can take over 24 hours.
- Reduce Infrastructure Complexity: Our current implementation has a limitation on the number of rows that can be matched. This limitation requires us to utilize an external key-value datastore. Many of our users are matching their data against large data assets with upwards of 200 million rows. By eliminating the external key-value datastore we not only reduce infrastructure overhead but also reduce costs.
Apache Spark provides an easy-to-use API and high performance computing for large-scale data processing by exploiting in-memory computing. Using Spark, we can write and maintain less code because Spark provides a handful of easy-to-use data ETL functions. Due to Spark’s distributed nature, we can also dramatically increase the size of our input file without relying on an external database. There is no limit on the size of file because we can just increase the number of instances in our Spark cluster.
The matching algorithm in Civis consists of two parts, blocking and ranking. During blocking, we find all the possible matches for a person in the first source. During ranking, we score every possible match result from blocking.
We have implemented a simplified version of the matching algorithm using Spark RDD API and DataSet API in Scala. The results of this experiment are pretty assuring. With a cluster of one master node and four medium memory optimized worker nodes, matching 30,000 rows (randomly selected from 200 million rows) to the same 200 million rows only takes about 27 minutes. Both files are stored in S3, so the runtime includes the IO of downloading the data. The accuracy of the implementation was verified by ensuring that input rows were matched back to their source with high frequency.
However, if we want to fully implement the probabilistic matching system, Python is a better choice because of its scientific computing packages.
This summer, as a software engineering intern, I investigated the performance of implementing this simplified matching algorithm using PySpark. I first tried using PySpark RDD. I converted each ‘Row’ in the input data source to a ‘Person’ object and manipulated the data based on the ‘Person’ object. This implementation failed because it ran out of memory when matching 30,000 rows to 200 million rows. Even though I increased the memory to 370GB, PySpark may have been serializing every object in Python and producing large amounts of intermediate data. Therefore, I tried using the ‘Row’ object, which is a built-in object in Spark. I hoped that PySpark would not serialize this built-in object; however, this experiment ran out of memory too.
In addition to running out of memory, the RDD implementation was also pretty slow. When matching 30,000 rows to 200 million rows, the job ran for about 90 minutes before running out of memory. The issue may have been caused by the small disk size instead of insufficient memory space. We only had a 10GB disk for each instance and, when the job reaches the disk capacity, the memory cannot swap with the disk. Adding a larger disk to allow the machine to swap out its memory could have solved this problem, but due to the slowness of this implementation, it was not worth exploring.
Finally, I gave up on PySpark RDD and moved on to the DataFrame API. The DataFrame API is faster and uses less memory than the RDD API because it’s just a python interface to calling java methods on java data frames via py4j. The results from this experiment are exciting! Using the same cluster as the Scala implementation test, matching 30,000 rows to 200 million rows only took 18 minutes 36 seconds.
I also tried matching the same 30,000 rows to the pre-computed 200 million rows data, which is the data that was processed and then stored into the S3. This shortens the process of blocking, but depends more on IO because the pre-computed data is about 5 times larger than the original one. The runtime of this implementation is also promising. Below is the runtime comparison between Scala DataSet and PySpark DataFrame:
I expected the PySpark implementation to be slower than the previous Scala implementation. However, I believe it was faster because the Scala program used a lower version of Spark and their implementations are slightly different.
Finally, we needed to make sure larger matching jobs could be run in a timely fashion with Spark. I matched a 10 million row dataset to itself, taking 9 minutes and 8 seconds to finish, which is pretty fast. But, when matching 10 million rows to the pre-computed base file, the cluster runs out of memory. I added 4 more worker nodes for a total of 8 worker nodes and the job then finished in 45 minutes and 19 seconds.
On the whole, Spark is new and practical technology. Based on my implementation in PySpark using DataFrames, Spark has the ability to make up for the shortcomings of the Python implementation. We can now match large data sources within a short time. It also prevents us from storing data in a key-value database and saves a lot of money for us and our customers.
Thanks to Walt Askew for giving me a lot of guidance on Spark and the optimization of my code. Also, thanks to my mentor Rachel Connolly and my manager Brian Nichols for giving valuable advice.