Extracting ZIP codes from longitude and latitude in PySpark

Mostafa Majidpour
6 min readJun 1, 2020

--

Given the pair of (longitude, latitude) how could one find the corresponding US ZIP code?

If you just need the working code jump to the end of the post (Final Take), if you like to experience the journey as well, read along!

Motivation

This could emerge as a problem in feature engineering for ML algorithm: You intend to use ZIP codes as categorical feature while only data you have in each row is longitude and latitude; it also could come up when ZIP code is the key in your other sources of data, e.g. you have demographic data (income, household size, …) for each ZIP in one dataset, while geo data in the other dataset is in the form of (longitude, latitude) and you need to find their corresponding ZIP code.

We are going to attack the problem in PySpark environment since 1) We have large datasets and we want to use Big Data framework such as Apache Spark; 2) we are using Python to take advantage of its whole host of scientific computing packages.

Packages in Python

There are handful packages in Python where one could submit (longitude, latitude) and get back the US ZIP code; uszipcode, Geopy, googlemaps to name a few.

We picked uszipcode, mainly because the database was getting downloaded when package is initialized and it does not involve calling an API each time a request is being submitted. For example for googlemaps Google Map API Key is needed and it has limits on the rate of requests.

One row at a time

Assuming you have installed uszipcode and are inside PySpark session (Python 2.7), imagine we have a Spark dataframe with 3 columns: id (of the row), latitude, and longitude:

The actual dataframe of course has a lot more than three rows! In order to establish a baseline case, we convert 1000 rows of Spark dataframe to Pandas dataframe for looping through the rows:

For 1000 rows, this approach takes about 11.8 seconds on an AWS m4.xlarge machine. Although this approach is fine for a few thousand rows; for a billion rows (the order we are working with), it will take us about 137 days (11.8 sec*10**6/3600/24).

PySpark UDFs

The obvious draw back of the previous approach is lack of distributed computation. The problem clearly follows into embarrassingly parallel ones, as each row is independent from all the other rows and could be sent to a separate executor for finding the zip code. Spark is equipped with UDFs (User Defined Functions) to address this situation.

Take 1

A first attempt look like this:

However this errors out:

cPickle.PicklingError: Could not serialize object: TypeError: can't pickle wrapper_descriptor objects

The reason is that search() initiates a database file (i.e. it checks if it has been downloaded, otherwise it downloads it). But the database file can not be serialized as it is not inside the get_zip_udf1.

Take 2

Let’s put the line that initiates search()inside the udf.

While this runs without error, the strange output should raise doubts; all zip codes are bad which shows all got exceptions. Looking into the error in the try block, it seems each executor wants to download the database file but it can’t because of permission issues on EC2 machines:

OSError: [Errno 13] Permission denied: ‘/home/.uszipcode’

Fortunately, we can provide the download path for the database file:

This is not a good approach as each and every call the search objects gets initiated every time. Besides, it does not work and returns empty in try block possibly because all the cores are using the same file all the time.

If PySpark had functions similar to addFile() for directories, one could perhaps send the database directory to all the Spark executors.

Another more efficient approach was suggested here however it is not trivial to translate the approach from Scala to Python. (Translating with common equivalent for syntaxes did not work.)

Take 3

Disappointed from PySpark’s capabilities, we looked into parallelizing packages in Python and picked joblib due to its wide use. Here is the code:

Interestingly, this setup took 33 second for 1000 rows which is worse than plain for loop. We tried different parameters of the Parallel() function but it does not seem to change the timing much. We speculate this is also because of reading the same database file by all the cores.

Take 4

A lightbulb goes off! The database file is a relatively small one. Can we use broadcast, and send the database file to all the nodes? At the same time write the logic of extracting the zip code from database file ourselves? After all what we are trying to do in embarrassingly parallel and involves finding closest row from smaller dataset to each row of the larger dataset.

Let’s extract the logic and data from uszipcode and use PySpark capabilities more efficiently.

  1. Extracting the database file and broadcasting it:

2. Using the logic in uszipcode inside the PySpark UDF:

Note that search() returns the number of zip codes in the specified radius. What we care about is the closest zip code to the point thus we calculate the distance in try block and return the closest one.

There is an issue, however, radius (5 in the code above) needs to be picked correctly. For example with the selection above (radius = 5)if a zip code has radius of more than 10 miles, nothing will be returned, on the other hand in dense areas, there will be several zip codes returned which makes the distance calculation loop more costly.

One way to overcome this would be to run the UDF with smaller radius first and then for rows with bad zip, run it again with larger radius:

For a billion rows, this takes about 620 minutes (515+105) on 4 m4.16xlarge core nodes with a m4.xlarge master node. The improvement is from about 137 days to about 10 hours.

After checking some rows in final_df, it turns out a few percentage of the rows did not receive the correct zip code. The investigation led us to cases similar to the one depicted below: the green do represents the input coordinates. The radius is set in a way which two zip codes (two blue squares) are candidates. After calculating the distance, the green dot will be assigned the zip code corresponding to the smaller square however the correct zip code for green dot is the larger square.

Two zip code areas. Orange dots are center of each zip code and green dot represents the input coordinates.

Take 5

One way to remedy the situation above is to make sure smaller square never shows up in the pool of candidate zip codes for the green dot. Fortunately the database file has boundaries for each zip codes from four directions of North, West, South, and East. If we find candidate zip codes based on boundaries, 1) we have eliminated wrong zip code situations similar to the one pictured above and 2) there is no need to set radius as parameter for picking candidate zip codes.

Two birds with one stone! We’ll apply only one UDF and for each row the candidate pool is limited by definition. Here is the modified UDF:

Now this look much better! From 137 days to about 9 hours which could be easily reduced by adding more machines if need be.

Take 6

What we have done so far was with Spark UDFs, however in PySpark, there are more recent variants of UDFs: Pandas UDFs which are typically faster.

The modified code for Pandas UDF:

However we did not observe decrease in execution time (actually we even saw a little increase), so we did not pursue Pandas UDF this time.

Final Take

In this post, we used data from uszipcode package to find zip code of a point given its latitude and longitude, however we changed the original logic in the package for finding zip code for each and also modified execution to take advantage of PySpark distributed computation framework. The piece of code below covers everything you need:

--

--