Mongo-Spark Connector Deep Dive: JSON Serialization

Understanding JSON serialization specifications and the importance of explicitly configuring them

Yerachmiel Feltzman
Zencity Engineering
7 min readJun 27, 2022

--

Hi! I’m going to take you on a journey that had its challenges yet ultimately became a great example of why I do this work and the learning that can come from cracking others’ open-source code and challenging my own assumptions. Let’s start.

From the Zen of Python:

Explicit is better than implicit.

I found this statement to be so true in my latest journey with the Mongo-Spark connector!

Before I continue on to discuss some Mongo-Spark intricacies, check out the first article in the series to get caught up and learn more about the challenges of handling MongoDB and Spark different data models.

In the present article, we will explore what we learned when we decided to run a CDC backfilling job as a separate Spark batch job, using the Mongo-Spark connector, to replicate Debezium’s Kafka-Connector snapshot phase.

Background: A Mongo CDC streaming pipeline

One of Zencity’s most interesting data pipelines is a CDC (change data capture) pipeline ingesting data from MongoDB to our Data Lake.

This CDC pipeline ensures that every change that happens in the tracked MongoDB collection will be replicated to the Data Lake. That means that the pipeline doesn’t read all the data from MongoDB but only processes changes as they happen. To be more specific, the pipeline gets a snapshot of the MongoDB collection in its first phase (known as the “snapshot phase”) and reads the whole collection to get its current state, and only in its second phase will it get the changes (known as “the CDC phase”).

The (simplified) ingestion part of this pipeline is mainly composed of two parts:

  1. A Debezium Kafka-Connect connector, listening to MongoDB oplog changes and producing them to Kafka
  2. A Spark Streaming job, consuming those Debezium-MongoDB CDC records from Kafka, parsing them, and writing them downstream
(TK: In the future, we intend to write an article explaining the CDC architecture in-depth, including backfilling strategies. Keep following.)

However, once in a while (or never if we could have our way, but life is way more complex and interesting than that, isn’t it?), we have to run a backfilling job re-reading all the data from a specific MongoDB collection. In this context, running a backfilling means we re-run the “snapshot phase”. There are some different ways to do that. Here I will explore what we learned when we decided to run it as a separate Spark batch job, using the Mongo-Spark connector.

A lambda architecture for CDC pipeline backfilling

When running the backfilling as a batch job, we end up implementing what is called a lambda architecture (or at least, a part of it).

It also means that the CDC streaming pipeline described above must be agnostic to the backfilling batch job. It may even not know it is happening, depending on how we orchestrate it.

More important than that: the backfilling output should conform with the currently running streaming job output.

This should be simple, but writing different code bases that end up being expected to output the exact same data types, formats, schema, etc., can introduce some challenges.

In our specific use case, we had to output each ingested record as a JSON serialized to a string. As simple as that.

The streaming path was writing some data as JSON strings so the backfilling batch path would have to do the same. The idea was to:

  1. Parse the whole data in the same way the streaming pipeline was parsing each record
  2. Output records in the exact same schema the streaming pipeline was producing them
  3. Serialize each record to a JSON string

Sounds simple, right?

And yes! It failed! Who would have guessed? 😃

Why? Because JSONs were written slightly differently for the same data in the batch path and the running streaming path. Therefore, downstream systems failed to parse the output.

We found that the problem was happening mainly in fields containing MongoDB dates. The streaming CDC pipeline was writing them as:

{"sourceCreatedAt": {"$date": 1602392400000}}

However, the backfilling batch pipeline was writing them as:

{"sourceCreatedAt": {"$date": "2020-10-11T05:00:00Z"}}

It begged the question: Why was the streaming pipeline consuming Debezium-Mongo records serializing JSON differently than the Spark batch job reading straight from MongoDB with Spark-Mongo connector?

As it turns out, MongoDB has its own JSON extended specification, which extends the standard JSON specification, adding some special keywords to preserve type information.

When importing data from MongoDB with Spark, using the native Mongo-Spark connector, we serialize data to JSON by applying doc.Json() and implicitly using the extended MongoDB JSON specification (a doc is a Document object, ie, a record in our pipeline). We serialize JSON expecting to get standard JSON strings but get “extended MongoDB JSON” strings instead.

How can we solve this?

The “simple” solution: Be explicit about JSON serialization

The solution we devised to move forward was to explicitly tell the method to use the standard JSON specification:

I’m still using the same doc.toJson method. However, before that, I was using its overload method without any parameter, which uses the default JsonWriterSettings. To my surprise, I discovered that the default sets the JSON writer to use the MongoDB extended JSON version, whereas I would expect the default to be the default JSON specification.

You can blame me for using an implicit default configuration. Maybe you are right. 😊

So I learned the importance of seeking explicit configurations. (We all use the defaults sometimes, don’t we?)

Explanation: Deep dive into the source code

Deep-diving into the org.bsonpackage toJson()implementation, we could spot the way Debezium-Mongo was serializing records. In this way, we could fix it and re-run the backfilling, serializing in the exact same manner.

What we found is that using the method provided by the Document class to serialize documents to JSON strings implicitly used MongoDB’s extended relaxed format.

The doc.toJson() is defined as:

This means that, when serialized, date fields in the backfilling job using Mongo-Spark used the LEGACY_EXTENDED_JSON_DATE_TIME_CONVERTER:

We naively thought that Debezium and Mongo-Spark would conform, therefore serializing JSON using the same standards, since both are reading MongoDB data. As mentioned above, we expected the default specification to be the standard JSON specification, or — at the very least — for the two to conform to MongoDB’s extended specification.

We were wrong on both counts. Debezium uses the deprecated (by MongoDB) JSON standard, called in org.bson codebase JsonMode.STRICT.

From org.bson source code:

So by now, we knew the problem: Debezium publishes “normal” JSONs (which ended up being the standard used by the streaming CDC job), and the Mongo’s org.bson library, used to serialize JSONs in the Mongo-Spark connector, writes them in the MongoDB specification. Therefore, what we needed to do now was to write the backfilling batch job data using the same JSON serializer Debezium uses.

How did we figure out which encoder Debezium uses?

We noticed the following in our schema registry containing the Debezium-Mongo messages schema:

We admittedly didn’t give much mind to this io.debezium.data.Json at first.

However, if we take a look at Debezium source code implementation for its MongoDB CDC Kafka-Connector, we see it appear. 😮

And in the serialization of our beloved after field:

(After that, we found in Debezium docs that they explicitly write they use strict mode for the record key, yet don’t mention a word about the record payload.)

And, there’s more! I found exactly what they use as the configuration in the source code:

And:

Where they use the transformer to serialize the value:

And here they test it for after:

So, if we try using the “deprecated” strict mode in the org.bson library side by side with the default implicit mode:

We get:

{“sourceCreatedAt”: {“$date”: 1602392400000}}{“sourceCreatedAt”: {“$date”: “2020–10–11T05:00:00Z”}}

So we can replicate it easily in the batch backfilling code.

Conclusions:

  • Don’t trust the docs only
  • Check things further when you find something weird (mainly if it seems like a class reference)
  • Don’t expect that an open-source project will always follow closed project standards (Debezium didn’t change from Strict to Extended Relaxed JSON, even though it serves as a consumer for MongoDB data). If you assume it, double-check. You know what: check it five times. 😅
  • Last, but not least, be explicit in configurations

If we followed conclusion 3 from the start, we would have saved our team a lot of time and resources.

In spite of all the above, the constant learning we do is the most fascinating thing in software engineering and data fields. Keep learning. And, as I always like to say, keep diving into others’ code. They think differently than you. You will learn from them. This rule is even more precise when we talk about widely used open-source projects. Don’t be afraid. Just open the code and read it.

Happy pipeline!

--

--