Mongo-Spark Connector Deep Dive: JSON Serialization
Understanding JSON serialization specifications and the importance of explicitly configuring them
Hi! I’m going to take you on a journey that had its challenges yet ultimately became a great example of why I do this work and the learning that can come from cracking others’ open-source code and challenging my own assumptions. Let’s start.
From the Zen of Python:
Explicit is better than implicit.
I found this statement to be so true in my latest journey with the Mongo-Spark connector!
Before I continue on to discuss some Mongo-Spark intricacies, check out the first article in the series to get caught up and learn more about the challenges of handling MongoDB and Spark different data models.
In the present article, we will explore what we learned when we decided to run a CDC backfilling job as a separate Spark batch job, using the Mongo-Spark connector, to replicate Debezium’s Kafka-Connector snapshot phase.
Background: A Mongo CDC streaming pipeline
One of Zencity’s most interesting data pipelines is a CDC (change data capture) pipeline ingesting data from MongoDB to our Data Lake.
This CDC pipeline ensures that every change that happens in the tracked MongoDB collection will be replicated to the Data Lake. That means that the pipeline doesn’t read all the data from MongoDB but only processes changes as they happen. To be more specific, the pipeline gets a snapshot of the MongoDB collection in its first phase (known as the “snapshot phase”) and reads the whole collection to get its current state, and only in its second phase will it get the changes (known as “the CDC phase”).
The (simplified) ingestion part of this pipeline is mainly composed of two parts:
- A Debezium Kafka-Connect connector, listening to MongoDB oplog changes and producing them to Kafka
- A Spark Streaming job, consuming those Debezium-MongoDB CDC records from Kafka, parsing them, and writing them downstream
However, once in a while (or never if we could have our way, but life is way more complex and interesting than that, isn’t it?), we have to run a backfilling job re-reading all the data from a specific MongoDB collection. In this context, running a backfilling means we re-run the “snapshot phase”. There are some different ways to do that. Here I will explore what we learned when we decided to run it as a separate Spark batch job, using the Mongo-Spark connector.
A lambda architecture for CDC pipeline backfilling
When running the backfilling as a batch job, we end up implementing what is called a lambda architecture (or at least, a part of it).
It also means that the CDC streaming pipeline described above must be agnostic to the backfilling batch job. It may even not know it is happening, depending on how we orchestrate it.
More important than that: the backfilling output should conform with the currently running streaming job output.
This should be simple, but writing different code bases that end up being expected to output the exact same data types, formats, schema, etc., can introduce some challenges.
In our specific use case, we had to output each ingested record as a JSON serialized to a string. As simple as that.
The streaming path was writing some data as JSON strings so the backfilling batch path would have to do the same. The idea was to:
- Parse the whole data in the same way the streaming pipeline was parsing each record
- Output records in the exact same schema the streaming pipeline was producing them
- Serialize each record to a JSON string
Sounds simple, right?
And yes! It failed! Who would have guessed? 😃
Why? Because JSONs were written slightly differently for the same data in the batch path and the running streaming path. Therefore, downstream systems failed to parse the output.
We found that the problem was happening mainly in fields containing MongoDB dates. The streaming CDC pipeline was writing them as:
{"sourceCreatedAt": {"$date": 1602392400000}}
However, the backfilling batch pipeline was writing them as:
{"sourceCreatedAt": {"$date": "2020-10-11T05:00:00Z"}}
It begged the question: Why was the streaming pipeline consuming Debezium-Mongo records serializing JSON differently than the Spark batch job reading straight from MongoDB with Spark-Mongo connector?
As it turns out, MongoDB has its own JSON extended specification, which extends the standard JSON specification, adding some special keywords to preserve type information.
When importing data from MongoDB with Spark, using the native Mongo-Spark connector, we serialize data to JSON by applying doc.Json()
and implicitly using the extended MongoDB JSON specification (a doc
is a Document object, ie, a record in our pipeline). We serialize JSON expecting to get standard JSON strings but get “extended MongoDB JSON” strings instead.
How can we solve this?
The “simple” solution: Be explicit about JSON serialization
The solution we devised to move forward was to explicitly tell the method to use the standard JSON specification:
I’m still using the same doc.toJson
method. However, before that, I was using its overload method without any parameter, which uses the default JsonWriterSettings
. To my surprise, I discovered that the default sets the JSON writer to use the MongoDB extended JSON version, whereas I would expect the default to be the default JSON specification.
You can blame me for using an implicit default configuration. Maybe you are right. 😊
So I learned the importance of seeking explicit configurations. (We all use the defaults sometimes, don’t we?)
Explanation: Deep dive into the source code
Deep-diving into the org.bson
package toJson()
implementation, we could spot the way Debezium-Mongo was serializing records. In this way, we could fix it and re-run the backfilling, serializing in the exact same manner.
What we found is that using the method provided by the Document class to serialize documents to JSON strings implicitly used MongoDB’s extended relaxed format.
The doc.toJson()
is defined as:
This means that, when serialized, date fields in the backfilling job using Mongo-Spark used the LEGACY_EXTENDED_JSON_DATE_TIME_CONVERTER
:
We naively thought that Debezium and Mongo-Spark would conform, therefore serializing JSON using the same standards, since both are reading MongoDB data. As mentioned above, we expected the default specification to be the standard JSON specification, or — at the very least — for the two to conform to MongoDB’s extended specification.
We were wrong on both counts. Debezium uses the deprecated (by MongoDB) JSON standard, called in org.bson
codebase JsonMode.STRICT
.
From org.bson
source code:
So by now, we knew the problem: Debezium publishes “normal” JSONs (which ended up being the standard used by the streaming CDC job), and the Mongo’s org.bson
library, used to serialize JSONs in the Mongo-Spark connector, writes them in the MongoDB specification. Therefore, what we needed to do now was to write the backfilling batch job data using the same JSON serializer Debezium uses.
How did we figure out which encoder Debezium uses?
We noticed the following in our schema registry containing the Debezium-Mongo messages schema:
We admittedly didn’t give much mind to this io.debezium.data.Json
at first.
However, if we take a look at Debezium source code implementation for its MongoDB CDC Kafka-Connector, we see it appear. 😮
And in the serialization of our beloved after
field:
(After that, we found in Debezium docs that they explicitly write they use strict mode
for the record key, yet don’t mention a word about the record payload.)
And, there’s more! I found exactly what they use as the configuration in the source code:
And:
Where they use the transformer to serialize the value:
And here they test it for after
:
So, if we try using the “deprecated” strict mode in the org.bson
library side by side with the default implicit mode:
We get:
{“sourceCreatedAt”: {“$date”: 1602392400000}}{“sourceCreatedAt”: {“$date”: “2020–10–11T05:00:00Z”}}
So we can replicate it easily in the batch backfilling code.
Conclusions:
- Don’t trust the docs only
- Check things further when you find something weird (mainly if it seems like a class reference)
- Don’t expect that an open-source project will always follow closed project standards (Debezium didn’t change from Strict to Extended Relaxed JSON, even though it serves as a consumer for MongoDB data). If you assume it, double-check. You know what: check it five times. 😅
- Last, but not least, be explicit in configurations
If we followed conclusion 3 from the start, we would have saved our team a lot of time and resources.
In spite of all the above, the constant learning we do is the most fascinating thing in software engineering and data fields. Keep learning. And, as I always like to say, keep diving into others’ code. They think differently than you. You will learn from them. This rule is even more precise when we talk about widely used open-source projects. Don’t be afraid. Just open the code and read it.
Happy pipeline!