Spatial Partitioned RDD using KD Tree in Spark

Arjun Sunil Kumar
Distributed Systems Engineering
8 min readApr 27, 2022

Today, we plan to cover a use case of the KD tree in the Big Data Framework.

Reference Publications:

The content I am presenting here is taken from

Terminology:

Big Data refers to data sets that are too large to be processed by just one machine.

Spark: is a batch framework for processing big data.

RDD: Spark revolves around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel.

Map: Map is an operation similar to map()in Java. Here we will use the map() to pass a Lambda function (operator) to all the RDD.

FlatMap : FlatMap is similar to map(), except that it returns one list, merging all the RDDs after the map operation is performed.

Filter: Query all the RDD to fetch items that match the condition passed as an argument.

Sampling: A technique used to get a sample (say 5%) from the whole dataset to get a meaningful insight into the whole data.

Data Dimensions: Data can have multiple dimensions. Let's say, Employee having the feature/ property age, salary, height, and weight. Here dimensions are age, salary height, and weight.

Geo-Spatial Data or Spatial Data: Any data with its primary focus on Geography for its dimension, can be seen as Geo-Spatial Data. Say: Bar { lat, lng, name, seatcout} has its primary key, revolving around latitude and longitude ie location. (TechTarget definition: any type of data that directly or indirectly references a specific geographical area or location.)

GeoJson: GeoJSON is an open standard format designed for representing simple geographical features, along with their non-spatial attributes. It is based on the JSON format. The features include points, line strings, polygons, and multi-part collections of these types.

Problem Statement:

We have been processing (Range query, Point lookup, etc) Geo-Spatial data using Spark in the past. But, we were not using the benefits of the ordering that the Goe-Spatial nature imposes on the data set. Let's say if we had to find Pubs in a rectangular query window, we typically query all the Geo-Spatial entries and this would take linear time.

In the sequential data structure space, we have trees like the KD tree, Quadtree, etc, which can help us find this result optimally for Geo-Spatial data entries. Can we expand the same concepts to distributed systems?

Solutioning

Phase 1 of Incremental Solution ( First KD tree)

Spark uses HashPartioner, to split the whole data into smaller chunks called RDD. The Hashpartioner groups the data, in random order and puts a portion of the items into an RDD, and repeats this until the whole data is read.

Hash Partitioned RDD

Can we take the Geo-Spatial nature of data into consideration here?

For this,

  • we let spark do its initial RDD creation, using HashPartioner.
  • After that, we do a re-shuffling of data within RDDs. We group data, into RDDs using spatial proximity. For the reshuffling,

1. We collects samples from the whole data set.

2. Then we create a KD tree on that sample. All the sample items, will be located at a leaf node and they will have an incrementing id.

3. We map all the Goe-Spatial items residing in the RDD based on the proximity to the KD tree leaf node, and allocate that leaf node’s id to the GeoSpatial Item.

4. We re-partition the data using (Spark Custom partitioner — similar to group by on JavaRDDPair<Key, GeoSpatialEntry>) the key, such that Geo-Spatial Item with the same key fall in the same RDD.

Spatially partitioned RDD

For Example, if we have data on Pubs in California, Arizona, and NYC. Initially, we might have 2 RDD (build using Hash Partitioner) which contains Geo-Spatial data in any order.

After completion of our custom Geo-Spatial Partitioning, we create 3 new RDD, which will have

  • all the Californian pubs in one RDD,
  • NYC pub in another RDD,
  • and Arizona pub in another RDD.

Here we have grouped them by cities.

https://jiayuasu.github.io/files/talk/jia-geospark-apachecon19.pdf

Now what?

To do a range query in Spark, we use the filter function. It is similar to mapPartiton, which would query each RDD and do sequential iterations on the RDD items. That would still query all the items in the RDD iteratively, and we don’t see much of a difference here.

Can we do better?

Side Note:

If the master could store a Range Bloom Filter or KD Tree and a Pointer to the corresponding the RDD, then we could further optimize by sending query to only to those RDDs rather than broadcast.

Phase 2 of Incremental Solution (Second KD Tree)

During our exposure to distributed computing (CS 6380),

we learned about Gamma Synchronizer, which uses Beta Synchronizer and Alpha Synchronizer. We are thinking of something similar happening here as well.

The whole idea is to create Fat and Short trees for Beta Synchronizer (BFS tree — Cluster) and run Alphase Synchronizer (Broadcast) on top of these clusters.

Synchronizer Gamma

Coming back to the KD tree, what if we can build a KD tree for the items in the new RDD that we have created. Since we have grouped them based on spatial proximity, the new KD tree that we build here would be short and fat (dense).

Spatially Partitioned + Spataily Indexed RDD

Now, if do a range query, we can use mapPartition() function to broadcast our query window, and fetch items, which fit in the query region, simply by checking the trees in each RDD.

Result

This significantly reduces the range query time.

  • Previously we were scanning all the items in RDD in parallel.
  • Now also, we would be broadcasting to every RDD for the information. But within the RDD, we don’t need linear iteration. We only take logarithmic time due to the KD tree on RDD elements.

We have skipped some of the nitty-gritty details to make this analysis simpler.

Why Tree for Spatial Indexing?

Tree help in reducing query time by a factor of height (logarithmic). Short and fat trees are ideal for faster query response.

Why KD Tree for Spatial Indexing?

For single dimension range queries, we can choose a segment tree, Fenwick tree, etc.

When you have multiple dimensions, we tend to choose trees that are developed for handling them.

From Quora:

https://qr.ae/pv2et3

We prefer the KD tree when our dimensions are less.

Sampling on Big Data

We are using Spark sample() in Spark to get the required sample data from the whole dataset (ie covering all RDDs). The sample here gives a good approximation for organizing data to have shorted fat trees.

How sampling is done?

Sampling is a bit complex to cover here. We are using epsilon approximation to get the sample size.

Boundary Envelop?

A boundary Envelop is a rectangular region covering all the points. This envelope is then split with the KD tree constructed on the sample set, into smaller envelopes. The union of smaller envelopes form the Boundary Envelop.

Possible Trees for Spatial Re-Partitioning

1. KD Tree:

Each level is a dimension parameter, there by helping us organize points based on the dimension property.

http://groups.csail.mit.edu/graphics/classes/6.838/S98/meetings/m13/kd.html

2. kd B-Tree:

Here each child has a list of blocks instead of just one block in the case of the KD tree. This helps reduce the height of the tree, thereby improving the write throughput.

https://commons.wikimedia.org/wiki/File:KDB-tree.svg

Possible Trees for Spatial Indexing within RDD

https://en.wikipedia.org/wiki/Spatial_database#Spatial_index

1. Kd-Tree

In Kd-trees, the data is split into two regions based on some data analysis (e.g. the median of some coordinate)

https://www.geeksforgeeks.org/k-dimensional-tree/

2. Quad-Tree

In Quad-Tree, data is split into 2^d equal-sized regions.

http://blog.notdot.net/2009/11/Damn-Cool-Algorithms-Spatial-indexing-with-Quadtrees-and-Hilbert-Curves

Benchmarking:

We ran a benchmark comparing Orthogonal Range query on

  1. Raw RDD: default Spark implementation
  2. KD Tree (for Partitioning) & KD Tree (secondary indexing): based on our reference paper
  3. KDB Tree(for Partitioning) + QuadTree(secondary indexing)

The results were a bit surprising.

We thought that KDB Tree + Quad Tree would result in faster response time.

KDB trees are fat and short, and resulted in fewer RDD partitions. In our case, KD tree had 32 partitions (# of sample points) and KDB tree had 2 partitions (leaf nodes). So the number of map-reduce operations are less for KDB tree than KD tree.

But, instead, we got the below results

The orange bar, i.e., KD Tree + KD Tree based implementation resulted in a faster query response.

Test Configuration:

Source Code:

Github

Wrapping Up:

We have implemented spatial partitioning to repartition the data across RDD for creating a dense index tree with RDD. Inside the RDD, we have chosen to have the KD tree for indexing the RDD dataset. Our benchmark results are skewed as we haven’t run this on a distributed environment with proper big data load. That is an action item for future scope. But we see a huge potential with this approach to reduce the range query search time, closest neighbor queries, etc.

Credits:

--

--

Arjun Sunil Kumar
Distributed Systems Engineering

Writes on Database Kernel, Distributed Systems, Cloud Technology, Data Engineering & SDE Paradigm. github.com/arjunsk