MapR-DB Spark Connector Performance Tests

In a previous post, we have described some of the issues with the official MapR-DB Connector for Apache Spark. We have also introduced an alternative way to overcome the known issues by implementing our own connector.

We have pushed our new connector out to Maven Central and it can be downloaded and used with ease.

<dependency>
<groupId>com.github.anicolaspp</groupId>
<artifactId>maprdbconnector_2.11</artifactId>
<version>1.0.2</version>
</dependency>
libraryDependencies += "com.github.anicolaspp" % "maprdbconnector_2.11" % "1.0.2"

The corresponding GitHub repository is here and we accept improvements and Pull Request so we can take advantage of all the features MapR-DB offers.

One of the questions we had during the implementation of our connector was related to how fast our connector is compared to the official connector.

In order to see how these two pieces of software compare to each other performance wise, we have created a MapR-DB table and set it in the way described below. Rember that the main idea of our connector is to gain performance by using the secondary indexes defined in the MapR-DB table, a feature not being used by the official connector nowadays.

Set up

MapR-DB tables with a path /user/mapr/tables/from_parquet and with two string fields:

  • _id
  • payload

The table has 100000000 rows, that is, 100M records and there is an index called payload_idx indexing the field payload.

The average payload size can be calculated as follows.

val df = spark.loadFromMapRDB("/user/mapr/tables/from_parquet")

df.agg(mean(length(col("payload")))).show

+--------------------+
|avg(length(payload))|
+--------------------+
| 264.84391354843916|
+--------------------+

The table information shows that the table occupies 32.11 GB of logical size and 28.79 GB of space in disk.

maprcli table info -path /user/mapr/tables/from_parquet -json

{
"timestamp":1551951201329,
"timeofday":"2019-03-07 01:33:21.329 GMT-0800 AM",
"status":"OK",
"total":10,
"data":[
{
"path":"/user/mapr/tables/from_parquet",
"numregions":10,
"totallogicalsize":34481758208,
"totalphysicalsize":30907727872,
"totalcopypendingsize":0,
"totalrows":99999999,
"totalnumberofspills":14840,
"totalnumberofsegments":14841,
"autosplit":true,
"bulkload":false,
"tabletype":"json",
"regionsizemb":4096,
"hasindex":true,
...
"uuid":"a7095a04-da06-0101-cdde-05394c805c00"
}
]
}

The payload_idx index information is the following. Interesting to notice the index size.

maprcli table index list -path /user/mapr/tables/from_parquet -json

{
"timestamp":1551951454409,
"timeofday":"2019-03-07 01:37:34.409 GMT-0800 AM",
"status":"OK",
"total":1,
"data":[
{
"cluster":"nico.cluster",
"type":"maprdb.si",
"indexFid":"2242.45.131206",
"indexName":"payload_idx",
"hashed":false,
"indexState":"REPLICA_STATE_REPLICATING",
"idx":1,
"indexedFields":"payload:ASC",
"isUptodate":true,
"minPendingTS":0,
"maxPendingTS":0,
"bytesPending":0,
"putsPending":0,
"bucketsPending":0,
"copyTableCompletionPercentage":100,
"numTablets":32,
"numRows":96824224,
"totalSize":37746925568
}
]
}

Now, let’s run some queries.

Querying

Using The MapRDB Official Connector

  • EQUAL
val f1 =  com.mapr.db.spark.sql.SparkSessionFunctions(sparkSession)
.loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
.filter("payload = '67-3485-8-3299-11113-118111126100-10-702283-6189-73-90-1125649-3-34668459-30734-8-342345-616665-108105-105-88311-60-59-46-105-100-10265-39-67-111-125-8559-28-22-10233-84-119-127-29951751100-116-202488-90-8966-69105-8310234-6811842-49-39-13-103-73-9866-119116-8-721130-98-50'")
.count()
println(s"RESULT: ${time(f1)}")
TIME: 11012
RESULT: 1
  • LIKE
val f1 =  com.mapr.db.spark.sql.SparkSessionFunctions(sparkSession)
.loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
.filter("payload like '67-3485%'")
.select("_id")
.count()
    println(s"RESULT: ${time(f1)}")
TIME: 12011
RESULT: 7
  • LESS THAN
val f1 =  com.mapr.db.spark.sql.SparkSessionFunctions(sparkSession)
.loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
.filter("payload <= '67-3485-8-3299-11113-118111126100-10-702283-6189-73-90-1125649-3-34668459-30734-8-342345-616665-108105-105-88311-60-59-46-105-100-10265-39-67-111-125-8559-28-22-10233-84-119-127-29951751100-116-202488-90-8966-69105-8310234-6811842-49-39-13-103-73-9866-119116-8-721130-98-50'")
.select("_id")
.count()
    println(s"RESULT: ${time(f1)}")

TIME: 752273 -- around 12.5 minutes
RESULT: 85975334
  • OR LIKE
val f1 =  com.mapr.db.spark.sql.SparkSessionFunctions(sparkSession)
.loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
.filter("payload like '67-3485-8-3299%' or payload like '3485-8-3299%'")
.select("_id")
.count()
IME: 162803
RESULT: 9
  • AND LIKE
val f1 =  com.mapr.db.spark.sql.SparkSessionFunctions(sparkSession)
.loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
.filter("payload like '67-3485-8-3299%' and payload like '3485-8-3299%'")
.select("_id")
.count()

TIME: 21837
RESULT: 0
  • AND
val f1 =  com.mapr.db.spark.sql.SparkSessionFunctions(sparkSession)
.loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
.filter("payload <= '67-3485-8-3299' and payload >= '3485-8-3299'")
.select("_id")
.count()

TIME: 363296
RESULT: 13819336

Using The Independent MapRDBConnector

  • EQUAL
val f2 =  sparkSession
.loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
.filter("payload = '67-3485-8-3299-11113-118111126100-10-702283-6189-73-90-1125649-3-34668459-30734-8-342345-616665-108105-105-88311-60-59-46-105-100-10265-39-67-111-125-8559-28-22-10233-84-119-127-29951751100-116-202488-90-8966-69105-8310234-6811842-49-39-13-103-73-9866-119116-8-721130-98-50'")
.count()
    println(s"RESULT: ${time(f2)}")

TIME: 889
RESULT: 1
  • LIKE
val f2 =  sparkSession
.loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
.filter("payload like '67-3485%'")
.count()
    println(s"RESULT: ${time(f2)}")
TIME: 878
RESULT: 70
  • LESS THAN
val f2 =  sparkSession
.loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
.filter("payload <= '67-3485-8-3299-11113-118111126100-10-702283-6189-73-90-1125649-3-34668459-30734-8-342345-616665-108105-105-88311-60-59-46-105-100-10265-39-67-111-125-8559-28-22-10233-84-119-127-29951751100-116-202488-90-8966-69105-8310234-6811842-49-39-13-103-73-9866-119116-8-721130-98-50'")
.select("_id")
.count()
    println(s"RESULT: ${time(f2)}")

TIME: 688387 ------ 11.4
RESULT: 85975334
  • OR LIKE
val f2 =  sparkSession
.loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
.filter("payload like '67-3485-8-3299%' or payload like '3485-8-3299%'")
.select("_id")
.count()

TIME: 231398
RESULT: 9
  • AND LIKE
val f2 =  sparkSession
.loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
.filter("payload like '67-3485-8-3299%' and payload like '3485-8-3299%'")
.select("_id")
.count()

TIME: 31257
RESULT: 0
  • AND
val f2 =  sparkSession
.loadFromMapRDB("/user/mapr/tables/from_parquet", schema)
.filter("payload <= '67-3485-8-3299' and payload >= '3485-8-3299'")
.select("_id")
.count()
TIME: 280387
RESULT: 13819336

We can summarize the result in the following chart (BLUE is our connector; RED is the official connector). Less is better since the chart shows time of running queries.

As we can see, our connector beats the official one in most of the queries. Those results are given by that our connector uses MapR-DB secondary indexes as much as it can.

There are certain situations where using secondary indexes is not feasible which implies that the queries issued to MapR-DB are similar to the ones issued by the official connector, which in those cases we are in the same time range of the official connector.

On the other hand, when secondary indexes are a good choice to be used, our connector is clearly faster while keeping the same functionality from an API perspective.