Efficient geospatial analysis with Spark

Kari Dempsey
UK Hydrographic Office
4 min readApr 9, 2019
Density mapping of AIS data showing shipping routes around the UK EEZ

Many organisations like to understand the movement of global shipping traffic. For us, it is helpful to know how ports are being used, or which regions are busiest. Automatic Identification System (AIS) data gives a global view of shipping movement; every ship over a certain size is continuously emitting a signal. This data is collected by satellites and terrestrial stations, and can be well over a few TB of data for a year.

To process the data I found Spark, in conjunction with some geospatial libraries, very useful. I did not find tonnes of resources about using Spark for geo-data, so I wanted to highlight some helpful code and approaches I figured out along the way.

Apache Spark is a fast and efficient tool for processing large data sets; I find it particularly useful as you are easily able to code once — starting small and scale as required — rather than re-writing when you want to scale to larger data sets, as with some tools.

https://spark.apache.org/ https://github.com/locationtech/jts

Spark can also be a useful tool for geospatial processing, particularly for large 2-D point arrays. Using Scala with Spark and the Java library, Java Topology Suite (JTS), works very well for this type of query.

The AIS system on a ship emits information on the ship location at periodic intervals. Each row in the AIS data gives the latitude, longitude, time, speed and heading of a given ship. These individual points show the routes of the ships; to understand how ships are moving around the world, we can cross compare areas of interest with the ship tracking points. Example points are shown in the diagram as grey spots, and the port/areas of interest shown in red.

Representation of AIS points denoting ship location and polygons denoting the ports

The simple query underpinning the further analysis of how busy a port or region is, is determining if a ship point is contained within an area, or polygon, of interest. To do this, we can use the geometry types in JTS for both our individual ship points and the area of interest/polygons.

Looping through each row of the data, you can use the latitude and longitude to create a JTS geometry object.

import com.vividsolutions.jts.geom.{Coordinate, GeometryFactory}val gf = new GeometryFactory()
val AISpoint = gf.createPoint(new Coordinate(lon, lat))

Similarly for the areas of interest or polygons, you can load them as follows, using the well-known text representation of geometry.

import com.vividsolutions.jts.io.WKTReaderval wktReader = new WKTReader()
val polygon = wktReader.read("polygon_WKT")

This code is for individual examples; in the full code, I put these within a .map to load all the data. Once you have both the points and the polygons as geometry types, we can then use the JTS geometry method .contains on the polygon geometry to determine if the point is within the polygon.

val containingGeometries = polygon.contains(AISpoint)

However, if the numbers of points are in the billions and numbers of polygons are in the thousands, and you check all of the points against all of the polygons — as in our case — this query is slow.

To make your Spark code most efficient, it’s worth understanding how the data is moving around the cluster, what gets replicated to each executor, and what must be distributed. Additionally, when doing geospatial queries, it’s good to utilise geospatial specific techniques.

To parallelise as much as possible, I created a hashmap object to store all the polygons; it is small enough that it will be passed to all of the executors allowing the .contains check to be parallelised. This also means the large data set — the points in this case — are only addressed once, which is key to making it efficient.

val coverageMap = new mutable.HashMap[Geometry, String]()
coverageMapsMutable.put(polygon.geometry, polygon.id)

To make the query more efficient still, I used an STRtree to load all of the polygons. An STRtree is optimised for spatial indexing/queries.

http://files.basex.org/publications/Seydi%20Gheranghiyeh%20[2015],%20Geospatial%20Processing%20in%20BaseX.pdf

This STRtree object allows us to efficiently query for nearby polygons before calling .contains, greatly reducing the number of polygons which need comparing with each point.

import com.vividsolutions.jts.index.strtree.STRtreeval coverageIndex = new STRtree()

coverageMap.foreach(polygon => {
coverageIndex.insert(polygon.geometry.getEnvelopeInternal, polygon.geometry)})
val geometries = coverageIndex.query(point.getEnvelopeInternal).toArray(new Array[Geometry](0)) //rough searchif (geometries.nonEmpty){
val containingGeometries = geometries.filter(polygon => polygon.contains(point)) //exact search
}

As you can see, the index is initially queried to get polygons near to the point, only then is a further .contains check carried out. This STRtree object is also small enough to be passed to each executor, again allowing it to be parallelised.

The query is now very efficient and allows analysis over years’ worth of geospatial data with Spark with ease. I hope this was useful for those trying to use Spark on geospatial data; for more geospatial tools for Spark, or other geo big data tools, check out LocationTech from the Eclipse Foundation.

Contact kari.dempsey@ukho.gov.uk

--

--