Geospatial Operations at Scale with Dask and Geopandas

May 2016 New York City Taxi Dropoffs Heatmap by Taxi Zone.

In this post, I will give a motivating example of a spatial join, and then describe how to perform spatial joins at scale with GeoPandas and Dask.

Note: To condense and simplify this post for Medium, I removed interactive graphics and most code. You can view the full post including code and interactive graphics on my blog, or the original Jupyter notebook on nbviewer.


Part 1: A Gentle Introduction to the Spatial Join

One problem I came across when analyzing the New York City Taxi Dataset, is that from 2009 to June 2016, both the starting and stopping locations of taxi trips were given as longitude and latitude points. After July 2016, to provide a degree of anonymity when releasing data to the public, the Taxi and Limousine Commission (TLC) only provides the starting and ending “taxi zones” of a trip, and a shapefile that specifies the boundaries, available here. To get a continuous dataset, I needed an efficient way to convert a latitude and longitude coordinate pair into a “taxi zone”. Let’s load the shapefile in Geopandas, and set the coordinate system to ‘epsg:4326’, which is latitude and longitude coordinates. Here are the first few rows.

The first few rows of the dataset.

We see that the geometry column consists of polygons (from Shapely) that have vertices defined by longitude and latitude points. Let’s plot in order of ascending LocationID.

Taxi Zones chloropleth colored by taxi zone ID

This is a familiar map of New York, with 262 taxi districts shown, colored by the id of the taxi district. I see that LocationID is not in any particular geographic order. I have added a random point (-73.966˚E, 40.78˚N) in magenta, which happens to fall in the middle of Central Park. Assigning a point as within a taxi zone is something humans can do easily, but on a computer it requires solving the point in polygon problem. Luckily the Shapely library provides an easy interface to such geometric operations in Python. But, point in polygon is computationally expensive, and using the Shapely library on 2.4 billion (latitude, longitude) pairs to assign taxi zones as in the NYC Taxi Dataset would take a modern single core cpu about four years. To speed this up, we calculate the bounding boxes for each taxi zone, which looks like:

Bounding boxes for each Taxi Zone

Now, given a (longitude, latitude) coordinate pair, bounding boxes that contain that pair can be efficiently calculated with an R-tree. You can find an excellent introduction to R-trees here. Only the polygons (taxi zones) that have bounding boxes that contain the coordinate pair need to be examined, and then the point in Polygon is solved for those (hopefully) few taxi zones. This reduces computation by a factor of about 100–1000. This process, assigning coordinate pairs to taxi zones is one example of a spatial join. Geopandas provides a nice interface to efficient spatial joins in Python, and it takes care of calculating bounding boxes and R-trees for you, as this snippet which performs a left spatial join shows.

import geopandas as gpd
from shapely.geometry import Point
df = gpd.read_file('taxi_zones.shp').to_crs({'init': 'epsg:4326'})
df = df.drop(['Shape_Area', 'Shape_Leng', 'OBJECTID'], axis=1)
gpd.sjoin(gpd.GeoDataFrame(crs={'init': 'epsg:4326'},
geometry=[Point(-73.966, 40.78)]),
df, how='left', op='within')

This code does the merge for a single point (drawn in magenta) on maps above, and correctly identifies it in Central Park

The spatial join for the magenta point in the figures above

Part 2 : Spatial Joins at scale using Dask

In my NYC transit project, I download and process the entire 200GB Taxi dataset. Here I load up a single file from the taxi dataset (May 2016) into Dask, and show the first few rows and a few columns. The file is a bit large at 1.8GB, and Dask chooses to divide up the dataframe into 30 partitions for efficient calculations. Each partition is a pandas DataFrame, and dask takes care of all the logic to view the combination as a single DataFrame. Here are a few columns.

The first few rows of time and location data in May 2016

So each trip has pickup and dropoff (longitude, latitude) coordinate pairs. Just to give you a feel for the data, I plot the start and end locations of the first trip, ending in the East Village. Driving directions come with a great deal of additional complexity, so here I just plot an arrow, as the crow flies. A spatial join identifies the taxi zones as Clinton East and East Village.

The first trip in May 2016 — As the crow flies

So, Dask DataFrames are just collections of Pandas DataFrames, and I know how to perform a spatial join on a Pandas DataFrame. Let’s take advantage of Dask’s map_partitions function to do a spatial join with the taxi zones on every partition. Here is the function to do the spatial join, given a Pandas DataFrame, and the names of the longitude, latitude, and taxizone id columns. Code is directly linked here.

At Scale

Using the map_partitions function, I apply the spatial join to each of the Pandas DataFrames that make up the Dask DataFrame. For simplicity, I just call the function twice, once for pickup locations, and once for dropoff locations. To assist dask in determining schema of returned data, we specify it as a column of floats (allowing for NaN values).

trips['pickup_taxizone_id'] = trips.map_partitions(
assign_taxi_zones, "pickup_longitude", "pickup_latitude",
"pickup_taxizone_id", meta=('pickup_taxizone_id', np.float64))
trips['dropoff_taxizone_id'] = trips.map_partitions(
assign_taxi_zones, "dropoff_longitude", "dropoff_latitude",
"dropoff_taxizone_id", meta=('dropoff_taxizone_id', np.float64))
trips[['pickup_taxizone_id', 'dropoff_taxizone_id']].head()

At this point, the trips Dask DataFrame will have valid taxizone_id information. Lets save this data to Parquet, which is a columnar format that is well supported in Dask and Apache Spark. This prevents Dask from recalculating the spatial join (which is very expensive) every time an operation on the trips DataFrame is required.

trips.to_parquet('trips_2016-05.parquet', has_nulls=True,
object_encoding='json', compression="SNAPPY")
trips = dd.read_parquet('trips_2016-05.parquet',
columns=['pickup_taxizone_id', 'dropoff_taxizone_id'])

To close this post out, I’ll produce a heatmap of taxi dropoff locations, aggregated by taxi zone using Dask. Unsurprisingly (for New Yorkers at least) the vast majority of taxi dropoffs tend to be in Downtown and Midtown Manhattan. I will analyze this dataset further in future posts.

May 2016 New York City Taxi Dropoffs by Taxi Zone.

Summary

In this post I described the process of a Spatial Join, and doing the Spatial Join at scale on a cluster using Dask and Pandas. I glossed over some details that are important for the full New York Taxi Dataset, but my full code is available here on Github. In future posts, I will analyze this data more thoroughly, and possibly look into releasing the processed data as a parquet file for others to analyze.


A side note on spatial join performance

The spatial join as written above with GeoPandas, using the New York Taxi Dataset, can assign taxi zones to approxmately 40 million taxi trips per hour on a 4 GHz 4-core i5 system. A lot of the code that supports this join is some amalgamation of Python and wrapped C code.

This is approxmately a factor of two slower than performing the same spatial join in highly optimized PostGIS C/C++ code. However, PostGIS does not efficiently use multiple cores (at least without multiple spatial joins running simultaneously), and more importantly, the network and serialization overhead of a round trip to the PostgreSQL database puts PostgreSQL/PostGIS at roughly the same speed as the GeoPandas implementation I describe in this post, with vastly more moving parts to break.

Basically, Python is actually pretty fast for these kinds of data structure operations.