EXPEDIA GROUP TECHNOLOGY — SOFTWARE

Mongo Change Streams in Production

Tuning event streams for reliability and performance

Mauricio Mena
Expedia Group Technology

--

Arches National Park. Photo by the author.

At the end 2019, we used Kafka® to prime our quoting cache and to keep it in sync with our pricing pipeline, which provides data used to generate quotes. This system was responsible for all prices shown on our website.

Our pricing cluster was a series of nodes that run Apache Ignite. Requests to quote multiple units were parallelized across our cluster, each node in the cluster was in charge of quoting some units depending on which units that node cached.

Priming from Kafka has the downside that although the topic can be compacted, it still has a lot of previous updates of a specific entry as well as delete events. Reading all this information at the start-up time made long deployment times: several minutes in test and nearly an hour in production.

The benefit of using Kafka was if we keep consumers reading from the topic, then we will keep updating our cache as our pricing pipeline generates new events and this support comes out of the box using Kafka consumers.

We proposed using Mongo® and its change streams to prime and feed our caches instead.

What are change streams and how do they work?

“Change streams are nothing but a real-time stream of any changes that occur in the database” — SeveralNines

This statement is completely true, but I would add that these streams are not initialized and do not produce any kind of information until the moment you say so.

Change streams were introduced in Mongo version 3.6. Before this version, a way to implement something similar was to keep a tailable cursor open. This could be achieved by running a query and keeping a thread iterating over this open cursor. As always, error logic and retry logic are needed to manage the connection closing.

Change streams can be created on demand to notify of changes that occur on a database, collection, group of documents, or even a single document.

The underlying mechanism relies on the oplog. The oplog is a special capped collection that keeps a rolling record of all operations that modify the data stored in your databases. All the operations that affect your data will be saved there. The oplog supports replication and recovery as well as change streams. The larger the oplog, the more operations recorded on it.

Change stream specifications

MongoDB 3.6 added change streams handling via the new collection-level method named db.collection.watch(). That function opens a new change stream cursor on a given collection. It returns a ChangeStreamIterable object.

Calling the iterator() method, will return an instance of MongoCursor<TResult>. This opens a change stream cursor, returning changes to the collection in a predictable format called change events. There is also the possibility to use apply to the change streams the aggregation framework, so we can apply filtering and transformation of specific events.

The gist below shows a configuration used to create the ChangeStreamIterable. The filter is passed as a list of BSON documents to the watch method.

Information on the event in the oplog varies depending on its type. The field operationType has many possible values, but for us the more important values were:

  • Insert: Contains information related to the new document that has been inserted.
  • Update: Contains information about the old document and the new document. Note that the update change event does not include the full document; only the change that was made. If your use case requires the complete document affected by an update, you can enable full document lookup when opening the stream.
  • Delete: Contains information about the deleted document.

When you open a change stream, Mongo will start emitting events from that moment in time unless we pass a resume token or we specify the timestamp from which we want to replay the oplog. This means we cannot query for events that are outside the limits of the oplog.

The events emitted will only be new. Change streams won’t query the database for old information. The events emitted on the stream do not rely on doing any type of query to the database; change streams just filter operations over the oplog.

The following creates a publisher from the ChangeStreamIterable to be able to work in a stream fashion with the data produced by the change stream. Each ChangeStreamDocument from the cursors was emitted to the subscriber.

Production considerations and lesson learned

Differences between Mongo 3.6 and 4.0

Although change streams are available since 3.6 the API has evolved in the latest versions and also has support for new and important methods for example startAtOperationTime().

Change streams in Mongo clusters running 3.6 lost their cursors multiple times. Mostly, the problem arose when a new leader was elected in the cluster; change streams were not able to resume properly and updates were missed. Some bugs make it impossible to maintain cursors in secondary nodes in Mongo 3.6. For example, see https://jira.mongodb.org/browse/SERVER-34810 :

“I open a cursor iterate over it and suddenly I am getting these errors. com.mongodb.MongoCursorNotFoundException: Query failed with error code -5 and error message ‘Cursor 74656212000 not found on server 10.89.73.250:27017’ on server 10.89.73.250:27017”

Mongo drivers continue to evolve.

The big lesson learned here is to update the version of your driver accordingly and use at least mongo 4.0 in production. It’s even better if you use 4.1.x as it fixes some bugs and includes the latest API additions.

Majority read concern and read preference

A very important system consideration is that change streams require a majority of data-bearing members to be alive, meaning streams may pause if you lose a majority of members or if the majority is relying on an arbiter.

Also, part of the configuration at the initialization of the change stream defines the ReadPreference. Your application/cluster needs to be compliant with this config or the change stream won’t be able to be created or resumed.

Another point is to remember that in one single node cluster, change streams won’t work unless you enabled the oplog in your Mongo instance. We need to remember that replication is one of the intended reasons for the existence of the oplog. Having this constraint will probably have an effect when you want to test them in your localhost or the selection of the environment if you are not using a cluster.

When we tried to adopt the change streams, we tried it on our test cluster, which was running 3.4 and did not have support for them. Creating a local cluster in our test environment was doable, but we decided to use Mongo Atlas and do our initial proof of concept on it.

Oplog size

The oplog must be large enough for the stream events to exist until the time of processing by the application has come. You will need to know your current lag when you use the change stream. If for some reason you are ingesting messaging at a lower rate than you are producing, you will get some errors saying that the token does not exist anymore in the oplog. That means the record that you tried to read is very old and has been discarded from the oplog.

Depending on your use case, you may not be able to resume the stream from a specific position because the entry has been deleted from the oplog. This might cause a data loss problem. In our case, we configured a secondary strategy that uses the Mongo driver and the timestamp of the server to recover any missing record. We also fire alerts if we detect this condition.

Decorative separator

The major problem for us was when we executed replays, as the oplog size was not enough to deal with all of those updates, meaning that we might be lost updates. In that case, we had to restart our change streams after the replay to re-sync the cache with the documents.

Monitoring the oplog is critical for your application if you are using change streams. You should have a way to stop updates in case your oplog is decreasing; if not, you will start losing updates.

Replication lag is another interesting metric to monitor. This tells you how far behind the secondaries are from the primary, and also how much time it’s taking for your change streams to receive the newest changes.

Change streams do not emit any of these metrics. We monitored these metrics using Datadog:

Oplog Size : avg:mongodb.oplog.timediff{name:$mongoClusterName} by {host}
Datadog Replication Lag: avg:mongodb.replset.replicationlag{name:$mongoClusterName} by {host}

Drop/rename collection

When you drop or rename collections, change streams receive an error and will be broken, as part of its initialization process is to define the name of the collection. You will not be able to restart the change stream until you change the query to point to the new collection.

Mongo Java libraries

We analyzed 3 different java implementations for dealing with change streams.

  1. mongo-java driver is the main one, as it was included directly in the driver.
  • This implementation does not provide any type of reactive support, however, you can build a reactive implementation on top of the ChangeStreamIterable<TResult>, you just need to write your own publisher and subscriber.
  • No object-document mapping (ODM) support is provided. We ended up using morphia as the ODM solution as a DBObject was returned as the response of each iteration.
  • This is the faster implementation, and you can reuse code for any existing logic that you have once you have created your subscriber. This means that you will have objects you can pass to your business logic.
  • We used this subscriber implementation for our high volume update collection:

2. spring-boot-starter-data-mongodb-reactive: This one has everything: ODM support via annotations and reactive support via Spring reactor. Unfortunately, this is very tied with the Spring ecosystem; we did not consider it because we mostly use Dropwizard.

3. mongo-java-driver-reactivestreams: This is a reactive implementation of change streams. We used this one for collections in which we have a low volume of updates. We suspect that the internal record fetching from the change stream has a performance issue if the change stream detects several changes arriving at the same collection. For our main collection, we encountered issues reading and it was very slow, even though we configured several options on it. getMoreExecution operations were separated by 40 seconds or more depending the configuration we were using. Also, the getMore commands were taking too much time to complete.

Understand the ChangeStreamDocument

Understanding the API https://mongodb.github.io/mongo-java-driver/3.8/javadoc/com/mongodb/client/model/changestream/ChangeStreamDocument.html is crucial and will give you a lot of precision of what you want to get from the change streams. Important things to remember:

  • All the changes will have a resumeToken that you can use to return to that specific point in time.
  • All the changes will have an operationType and that will define what information could be present. For example, Deletes won’t have a fullDocument as the document has been deleted.
  • All the changes will have the documentKey no matter the type of the operation.
  • Bulk updates will produce several change events.
  • Most of the options and returned values, such as FullDocument options, read preference, and/or batch size, can be configured at the moment of the change stream creation.
  • You can find the list of change events in the Mongo reference.

Restarting change streams

You will be able to recover from failures if you connect again and start a new change stream. But between the moment of your change stream dying and the time that you reconnect, you might lose any new updates that happen during that time. Change streams provide two mechanisms to avoid this:

  1. Use the resumeToken; this token will be used as a cursor that points to a specific record from your oplog so you can restart your change stream from that point.
  2. Use startAtOperationTime, passing a BsonTimestamp that is a couple of minutes or seconds before your change stream crash. This is the option that covers our requirements.

Change streams have their caveats mostly because they rely on the oplog, which is a special capped collection, and as you might think, this capped collection has a specific size. Mongo change streams read that collection and keep a tailable cursor to it. If the Mongo database receives so many updates and the cursor was pointing to an entry that is being deleted, we’ll lose some data. Also, when a new primary is elected, the change streams could be closed as per our read preference. In some other circumstances, due to network or code issues, these change streams can be closed as well.

Decorative separator

Since every time we have an issue with change streams we could potentially miss updates, we need to have a way to recover without needing to deploy a new cluster. This will be true as long the problem is not a Mongo replication issue.

Diagram showing interplay between Mongo stored data, oplogs, and change streams.

We needed to avoid losing quoting data updates when Mongo was down or when anything else affects our change streams.

To achieve this, we use the subscriber to notify the listener that something went wrong and the change stream needs to be discarded. Then the listener will create a new change stream that points to the updates that happened 5 minutes before the previous subscriber died so we can replay any possible missing updates from the time the subscriber died and the time we created a new one.

One important point about using change streams and applying any kind of restartability strategy is to avoid possible memory leaks.

  • Don’t forget to close any open cursor. If you are using the Mongo Java driver implementation, don’t forget MongoCursor is still a cursor that needs to be closed.
  • If you are using any subscription model, don’t forget to cancel all the active subscriptions and clean all the references to unused publishers and subscribers that were restarted. As part of the process in your retry logic, you need to provide a way to kill and discard previous publishers and subscribers.

Scalability

Perhaps you want to scale your system for situations in which your consumers can be overwhelming with too many events to process that arrive in the change stream, and you don’t want to apply backpressure.

In our use case, we had 18 machines per cluster in production, each containing in its cache a subset of bookable units for which each node can create quotes. Not all of the updates need to arrive in all the nodes. The optimal way of doing this with change streams is to use the aggregation pipeline to filter records at the moment of change stream creation that makes the change stream never emit events for content that a specific node does not care about.

We defined a partition number for each one of the documents ranging from 0 to 1023. Each node in the cluster has a range within which it listens for events. Let’s say node 1 listens to 0–128, node 2 listens to 129–255, and so on. For us, the ranges are defined by Ignite — the more nodes you have, the fewer partitions you need to manage in each host.

You could also define your own partition function so that you can scale your system by making sure only certain hosts receive updates from certain partitions when dealing with high volumes of updates.

Overall results

We greatly simplified our data ingestion process in priming time and listening time.

Using Mongo in priming time, we no longer needed Kafka and only needed to read one single collection. The initial read was done using a specific number of partitions per node, so each node only reads what it will serve. Doing this drastically reduces our prime time.

In our test environment, the priming time was reduced from 45 minutes to 1 minute. In our staging environment, the reduction was from 95 minutes to less than 5 minutes. Finally, in our production environment, we reduced from 120–150 minutes to less than 10 minutes.

Devops dashboard illustrating performance of the recommended solution.

And using the change stream allows us to move off of Kafka completely, as now all of the updates for the quoting data are coming from the change events.

It was really interesting and a great learning opportunity to take this technology from a minimal proof of concept using Mongo Atlas and dummy data to a new ingestion process for quoting data within our pricing system.

Now that all of the data is in Mongo and we can prime from documents, we could move to a stateless model for our pricing system based on Redis.

--

--