Mongo-Spark Connector Deep Dive: Projection Pushdown

Bridging the gap between MongoDB and Apache® Spark data models

Yerachmiel Feltzman
Zencity Engineering
10 min readMay 12, 2022

--

At Zencity Data Platform, we have deployed some interesting pipelines to read MongoDB documents from their collections into Apache® Spark DataFrames and enable analytics and integration within our big batch processes. However, peculiarities and differences in data model representation demand extra attention to bridge the gap between the two systems. We have done that successfully and learned a lot along the way. Let me take you with me on one of our learning journeys.

converting from MongoDB documents to Apache® Spark DataFrames requires extra attention
converting from MongoDB documents to Apache® Spark DataFrames requires extra attention

What are all of these words?

MongoDB is a popular database for NoSQL data that stores data records as BSON documents (a special type of JSON). A group of documents is logically grouped into collections, and collections are grouped together in a database. Although a comparison to RDBMS would not be perfect, a document would resemble a row in a table, and a table would be a collection. You can check MongoDB docs for an exact definition.

Spark is likely the most popular Big Data computation engine nowadays. Spark has three ways of representing data. The one we mostly use at Zencity is DataFrames, as it is more intuitive, covers the majority of use cases, and is really powerful. DataFrame is a group of Rows (split “vertically” in Columns), and it acts and feels mostly like a “normal” table. For times when we need extra low-level implementation and/or modeling, we consider having some Datasets and RDDs living around, as well.

As you might have noticed, converting a MongoDB collection into a Spark DataFrame is not the simplest job to do, because of the different ways the two systems represent data. To bridge this gap, the MongoDB team has released the Mongo-Spark connector. What a lifesaver! Using this connector, we have been able to build some interesting pipelines reading MongoDB collections into Spark within the Zencity Data Platform.

How did this task start off?

Some time ago, I was tasked to upgrade a Spark job that was running on production from Spark 3.0.1 to 3.1.2. I went into this task thinking I would deliver it in a couple of hours. It was clearly a case of changing versions, running some tests, maybe fixing some API changes, running tests again, and done. Or so I thought.

Instead, I found myself only wrapping up work a few days later — and that was after opening a ticket to the Spark-Mongo developers team and proposing a pull request to demonstrate a simple solution to the problem I faced.

In the end, those unexpected detours became the source of some important learning for me. I learned a lot about Mongo-Spark connector implementation, the importance of pushdown operations, and some low-level things that only those developing Spark connectors are used to considering. Great, I’ll take it!

Most importantly, I learned that if you encounter a problem when using someone else’s code (even if it was written by a famous and well-established platform team), don’t be afraid to jump in and examine the code. Actually, from my experience, diving into production-grade code is an extremely good practice that will help you learn a lot about design decisions, think “out-of-your-box”, and see new libraries and code techniques. I’ll walk you through what that looked like from my end with Spark-Mongo.

How did a few hours turn into a few days?

Spark 3.1.x has added a new sql.AnalysisException for some data sources, for when they detect duplicate names in nested structures. The exception would look something like the following:

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema in read

Why does this happen? Let’s explain: MongoDB is by design schemaless. It means that you can have totally different documents in the same collection :

MongoDB’s flexibility goes so far that you can even have duplicated fields in your documents, as stated in this documentation (for further reading, check out this community thread).

You might be thinking, “No one would ever do that! It doesn’t make any sense to have duplicated fields in documents.” And, you are totally right. But, sometimes errors happen. We are all humans and it is a common thing to have legacy fields that no one ever intends to use again lying around in some old objects in the collection, such as the example above. Since those fields are legacy, application developers don’t need to use them again, and MongoDB doesn’t object to them, so there is (almost) no point to look for duplicated fields in legacy nested objects in order to clean the collection. You can see what I mean here:

Now, let’s compare this super schemaless flexibility of MongoDB collections to Spark DataFrames.

As I said, I wanted to read MongoDB data into Spark. If duplicated fields can happen in MongoDB, and I want to read from it using Spark DataFrame API, Spark must have to allow duplicated fields as well, or at least, have a way to support dealing with them without crashing.

But…it doesn’t!

Spark will throw an exception whenever it reads from the kinds of sources stated in the migration guide and finds a duplicated field. And, from Spark 3.1.x onwards, it will even object to duplicated fields in nested objects.

In the MongoDB collection I was reading from, we happened to have some legacy fields like these. I knew they were “garbage-in”, so I dropped them when I wrote the code for the first time (way before I was tasked to upgrade to Spark 3.1.2) to exclude them from further processing.

This approach worked for Spark 3.0.1 because the duplication was in nested fields, but when I started upgrading the jobs to Spark 3.1.2, it suddenly stopped working. This new nested analysis doesn’t allow me to read data from any MongoDB collection that has duplicated fields in nested objects, even before I have time to drop the ‘problematic’ fields.

Pushing down projection to MongoDB as a solution

Projection (select and drop fields) and predicate (filters) pushdown is actually a simple concept, as explained here:

- If you issue a query in one place to run against a lot of data that’s in another place, you could spawn a lot of network traffic, which could be slow and costly. However …

- … if you can “push down” parts of the query to where the data is stored, and thus filter out most of the data, then you can greatly reduce network traffic.

As stated in the answer above, pushdown is mostly used to reduce network traffic. It is also used to save computational resources of the reader system. In my case, Spark would push down some operations to be run in MongoDB, which is actually optimized to do this work in contrast to Spark. For example:

Taking a look under the hood in the example above, Spark would optimize the query and then push down the filter operation to MongoDB. As a result, these key resources are saved:

  • Network: fewer data flowing from MongoDB’s servers to Spark’s cluster
  • Memory: fewer data for Spark to hold
  • CPU: fewer data for Spark to process

In addition, the three factors above might together reduce running times for the Spark (it might also reduce costs, although faster running times do not always translate to lower costs)

However, in our case, it’s not network-traffic, memory, or CPU issues that are presenting an optimization opportunity when processing the data I need. Data isn’t able to be processed in Spark at all, because of the new exception.

To solve this challenge, I needed to find a solution to avoid the new exception. I was looking for a way to say to MongoDB: “Hey, please just send me the “good” columns, not the legacy columns! Otherwise, my Spark application will implode before I have time to do anything.”

Essentially, I was looking for a way to push down the projection to MongoDB while using the DataFrame API from the beginning.

Thanks to the great work Mongo-Spark developers had already done, I had the beginnings of a solution with their out-of-the-box predicate pushdown. Just add df.filter(some-predicate) to your code, and whenever possible, it will effortlessly push it down for you.

However, df.drop(some-column), was still not being pushed down. If it was, the problematic columns would not have gotten into Spark, and Spark would not have thrown an exception.

Understanding the connector code

To solve this piece, let’s take a closer look at what’s happening in the code.

What’s happening in the background when we use the DataFrame reader API using MongoDB as the source, such as in the following code snippet?

Perhaps you noticed that the com.mongodb.spark.sql.DefaultSource in the spark.read.format method resembles a full class reference? It’s a path to the Mongo-Spark connector class called when the Spark `.load()` method is called. This DefaultSource class is a custom Spark source implementation that relies on DataSourceRegister and other Spark traits to make it work with MongoDB.

In practice, the Spark DataFrame reader uses the com.mongodb.spark.sql.DefaultSource#constructRelation method. What it does is:

  1. Build an RDD[BsonDocument]
  2. Infer the schema (when not provided, as in my use case)
  3. And, from both, build a MongoRelation.

MongoRelation is a class that extends some traits Spark provides to help build sources. Some of those traits have methods that developers can override to build pushdowns. For example, MongoRelation implements the buildScan method, from the PrunedFilteredScan trait. It should permit the Mongo-Spark connector to build pushdowns when Spark scans and builds the plan for each DataFrame read.

As promised in the documentation, Mongo-Spark developers actually added filter pushdown to the code:

To summarize where we stand, we’ve learned that when using the DataFrame `.load()` method with MongoDB as the source (`com.mongodb.spark.sql.DefaultSource`), the object MongoSpark is called to create an `RDD[BsonDocument]`, the schema is inferred, and a DataFrame is returned to us.

Using the MongoSpark object

MongoSpark object is a companion object to its case class with the same name in the same file.

MongoSpark’s case class has the methods responsible to transform collections from MongoDB to Spark data objects. We are interested in the .toDF method since we want to work with DataFrames.

On the other hand, the MongoSpark companion object has load() helper methods to perform this transformation without accessing the MongoSpark case class builder. Those load() methods are just another way to read data from MongoDB into Spark. As we have already seen, when we call

spark
.read
.format(“com.mongodb.spark.sql.DefaultSource”)
.option(…)
.load()

it actually calls one of MongoSpark.load() methods. This possibility is indeed documented.

My first instinct was that I could use some of the load() methods to more granularly define the way the Mongo-Spark connector would read my collection from MongoDB.

Well, I was wrong!

When you check the signatures of the methods in the API Docs, you’ll see that none of the load() methods give us the option to pushdown column drop to MongoDB.

So, I decided to go back to the drawing table and see what those load() methods do. As it turns out, it’s actually pretty simple: they build a MongoSpark case class. As we have seen, this is the class responsible for converting MongoDB collection into Spark data objects. Take the following MongoSpark.load(…) method as an example:

It’s pretty straightforward stuff, yet it still doesn’t offer the option to add a pushdown!

But, wait! What is this builder() we see there? Could that be a possible clue?

You bet! The builder method MongoSpark.Builder#pipeline is exactly what we are looking for! It simply adds a pipeline to the MongoSpark case class:

Combining MongoDB pipeline with toDF() as the final solution

A MongoDB pipeline is a built-in feature in MongoDB to build multi-stage data transformations. There is one particular MongoDB built-in pipeline stage that is most relevant to us: $project. The project stage can be used to choose which fields will be passed to the next stage. It seems to be exactly what we need: a way to tell MongoDB to exclude only a set of fields for the collection documents when reading them into Spark.

Let’s use it!

In the end, the solution I arrived at was essentially leveraging exactly what MongoSpark.load() methods already do, but with a little customization. That last detail is what let me push a projection pipeline down to MongoDB in order to exclude problematic fields.

This solved the problem. :)

Some technical notes

We could have solved the problem by specifying the schema upfront. But, we needed dynamic schema evolution for our pipeline.

Mongo-Spark connector offers pushdown projection for static schema read, but this isn’t always an alternative. Let’s be honest: MongoDB collections can grow deep (nested) wildly. And even if we know the schema upfront, we may want to permit the schema to evolve dynamically for business-related reasons.

Those specific Spark jobs we needed to upgrade are consumers of upstream systems, so we could not enforce data deletion to clean the fields. And, even if we could, as I said, the effort it would require from the development application teams would not be worth it.

Giving back to the community

Have you noticed that adding a MongoSpark.load() method with an option to add pushdown MongoDB pipelines is pretty simple? The MongoSpark.Buider#pipeline() method is already there ready to be used, but none of the MongoSpark.load() methods actually uses it! The ‘hard’ implementation is done, but the ‘easy’ use of it is not!

Following the recommendation of our VP R&D and my team’s Tech Lead, I decided to contribute back to the community by opening a ticket to Mongo-Spark developers¹ alerting them about the new Spark 3.1.x exception and proposing a pull request to exemplify a possible simple solution.

Now, coming back to the conclusion we already stated before

if you encounter a problem when using someone else’s code (even if it was written by a famous and well-established platform team), don’t be afraid to jump in and examine the code. Actually, from my experience, diving into production-grade code is an extremely good practice that will help you learn a lot about design decisions, think “out-of-your-box”, and see new libraries and code techniques.

Happy pipeline!

____________

  1. UPDATE:

Mongo-Spark connector developers acknowledged the absence of automatic pipeline projection pushdown but rejected the ticket, based on their own priorities, which is perfectly understandable. Nonetheless, now that you know that it doesn't happen automatically in the DataFrame API, you can deal with it manually. Awareness is a key virtue in software engineering.

--

--