Spark Geospatial Analysis with B23 Data Platform

Courtney Whalen is a Senior Data Scientist with B23 working on the B23 Data Platform.

As a member of the B23 Data Platform development and data science team, we’ve been excited to continue to launch new innovative and secure features that allow Data Scientists to process data more effectively and quickly than previously possible. We launched B23 Data Platform in early 2016 as a data science orchestration platform. B23 Data Platform is both a marketplace for data as well as marketplace of big data Stacks that can be provisioned in minutes. Most automated provisioning tools are just a “blank canvas,” but with B23 Data Platform you have access to both data sets as well as secure Stacks in the cloud. Using B23's EasyIngest capability, data scientists are only a few mouse clicks and several minutes away from analyzing data in a securely provisioned Stack.

Recently, I had the opportunity to work on a project that highlights the capabilities of B23 Data Platform — geospatial analysis using Apache Spark. This included using a large dataset containing network-based location data in geographic coordinates. Specifically, this dataset contained over a billion rows of latitude and longitude metrics with timestamps over a multi-year period. The challenge was to figure out how many of these “locations” map to certain points of interest (“POI”) each day using this initial, raw, dataset. I was able to complete this geo-spatial enrichment task in the following 5 steps:

1. Acquire POI data

2. Determine how to join the data sets

3. Transform the data

4. Geo-fence the POI data

5. Run spatial join

Location Data

Acquire POI Data

My first step was to pull a second dataset containing geospatial data for a particular POI. We used POI data that contains the locations for 1000+ restaurant chains in North America. In one case, I downloaded the data for the restaurant Chipotle, which has 2,076 locations (as of 6/13/16).

POI Data

Determine How to Join Datasets

Now that I had acquired my datasets, I needed a plan to join them. The first dataset contains about 6.5 million records per day. After doing some quick math, I realized that joining each of these location points with 2000+ Chipotle location points would result in over 13 billion records per day. A Cartesian analysis was quickly thrown out the door.

I quickly discovered a library — called SpatialSpark — that would suit my purposes. As evidenced by the name, SpatialSpark uses Spark as its underlying engine. The algorithm iterates over a larger dataset and efficiently queries a smaller dataset by converting the smaller dataset to an R-tree and maintaining it in memory.[1]

Transform the Data

Before I could actually test out the SpatialSpark library, I needed to perform a few data transformations. SpatialSpark makes uses of the JTS Topology Suite, which provides a geometry model and a number of geometric functions[2], so each latitude/longitude point in my datasets had to be converted to a geometry object.

Each point also needed to be projected onto a new coordinate system. This is because geographic coordinates represent a spherically-shaped earth. When you try to represent this sphere on a flat surface, it becomes distorted. The lines of longitude — which are supposed to meet at the earth’s poles — actually run parallel to each other when transposed onto a flat surface, and therefore never meet. Due to this misrepresentation, the farther away you move from the equator, the wider and more distorted the area becomes.[3]

To fix this issue, I used the GeoTools library to project each latitude/longitude point onto a coordinate reference system (“CRS”) that corrects for the scale change. There are several CRS projections for the United States that were constructed to preserve different map properties (area, shape, direction, etc.).[4] I landed on a particular CRS, called ESRI:102005, that attempts to preserve the distance between objects.

def transformGeom(longitude: Double, latitude: Double): Geometry = {
// create CRS
val EPSG_102005: String = "PROJCS[\"USA_Contiguous_Equidistant_Conic\",GEOGCS[\"GCS_North_American_1983\",DATUM[\"North_American_Datum_1983\",SPHEROID[\"GRS_1980\",6378137,298.257222101]],PRIMEM[\"Greenwich\",0],UNIT[\"Degree\",0.017453292519943295]],PROJECTION[\"Equidistant_Conic\"],PARAMETER[\"False_Easting\",0],PARAMETER[\"False_Northing\",0],PARAMETER[\"Longitude_Of_Center\",-96],PARAMETER[\"Standard_Parallel_1\",33],PARAMETER[\"Standard_Parallel_2\",45],PARAMETER[\"Latitude_Of_Center\",39],UNIT[\"Meter\",1],AUTHORITY[\"EPSG\",\"102005\"]]";
val sourceCRS: CoordinateReferenceSystem = CRS.parseWKT(EPSG_102005);
  // project lat/long points on CRS
val sourceGeom = new WKTReader().read(fact.createPoint(new Coordinate(longitude, latitude)).toString)
val mathTransform: MathTransform = CRS.findMathTransform(DefaultGeographicCRS.WGS84, sourceCRS, true)
JTS.transform(sourceGeom, mathTransform);
}

Geo-Fence the POI Data

My next step was to transform my projected Chipotle location coordinates into small polygons, so that I could set an arbitrary boundary around each restaurant. If a person’s location fell within that boundary, then they had been at that particular Chipotle. I created polygons with a radius of 4 meters for each Chipotle location.

def bufferGeom(geom: Geometry, distance: Double): Geometry = {

// create polygon around Geometry object
geom.buffer(distance)

}

Run Spatial Join

Now that my data was prepared, I was ready to kick off the join process. I transformed each of my datasets, and then called SpatialSpark’s join method to see which points intersected with a Chipotle polygon for a particular day. The whole process ran in less than a minute, and the results were quite accurate!

B23 can easily help you set up the environment to run this analysis. B23 Data Platform can ingest your data, store that data securely in the cloud, and provision a Spark stack in fifteen minutes or less.

  // RDD with pairs (id, point) for a person's location
val personRdd: RDD[(Long, Geometry)] = personDf.map { row => (row.getLong(0), transformGeom(row.getDouble(2), row.getDouble(1)) )}
  // RDD with pairs (id, polygon) for a Chipotle's location
val chipotleRdd: RDD[(Long, Geometry)] = chipotleDf.map { row =>(row.getInt(0).toInt, bufferGeom(transformGeom(row.getDouble(2), row.getDouble(1)), 4.0) )}
  // RDD with pairs (id, id) of intersecting person/Chipotle locations
val joinedRdd: RDD[(Long, Long)] = BroadcastSpatialJoin(sc, personRdd, chipotleRdd, SpatialOperator.Intersects, 0.0)

For the first result, the (latitude, longitude) of chipotle_id=714 is (33.562836, -84.324768) and the (latitude, longitude) of location_id=4338942 is (33.5628122, -84.3247918).

Summary

Apache Spark is quickly becoming the de-facto tool for processing large data sets at scale. At B23, we believe elastic cloud infrastructures are the perfect companion for processing large amounts of data. Taking the commodity “heavy lifting” out of launching scalable big data solutions like Spark, and doing it securely and in minutes is a key differentiator for us.

Joining billions of rows of data in a timely manner in any context is a tough challenge. Using a third party library called SpatialSpark, I was able to join a very large network-based location data set with a POI data set to derive the intersection of people and locations of hundreds of restaurants in North America in minutes.

This processing resulted in a data product that allowed us to track users “foot traffic” in 1000+ specific restaurants in North America on a daily basis over a prolonged period of time. Try it for yourself at B23 Data Platform.

[1] http://getindata.com/blog/post/geospatial-analytics-on-hadoop/

[2] https://en.wikipedia.org/wiki/JTS_Topology_Suite

[3] https://docs.qgis.org/2.6/en/docs/training_manual/vector_analysis/reproject_transform.html

[4] https://en.wikipedia.org/wiki/Map_projection