The snapshot that couldn’t

A story from the Zencity data-engineering trenches. one that taught me some lessons about debugging, root cause analysis, and Kafka-connect.

Alon Nisser
Zencity Engineering
10 min readNov 16, 2021

--

A teammate pinged me. Elastic was full, and something looked terribly broken with the reindexing process.

What was supposed to be an easy, seamless change—temporarily resize the elastic cluster, re-consume the messages containing all of our database events from a Kafka topic with the new indexing logic, move the API to work against the new index => profit! (and remove the old index, resizing the cluster down)—turned into a quagmire. Elastic nodes were getting full, with no reasonable explanation why, and even worse, the number of items in elastic was far lower than the number of items in the source Datastore.

When I was pinged the issue was blamed on Elastic: it was probably failing to ingest some items.

Symptoms: “Since Thursday the number of docs in the new index hasn’t changed but the number of modified items keeps climbing up.” Alleged (wrong) prognosis I was suggested: “We have a super slow CDC process”

Two days before that I was alerted the size of our Kafka topics is growing considerably, when asked about that I was told “it’s the new topic with the items.” Although the size of this new topic didn’t fit my back-of-the-envelope calculations, I didn’t challenge this explanation and assumed I got the calculation wrong.

Bowery men waiting for bread in bread line, [New York City]

Our setup:

I’ll elaborate on our current CDC (Change data capture) setup: Our main datastore is MongoDB which is the target of all the writes from the main App. From it, We publish all Oplog events using the Debezium project — Mongodb Kafka Connect source connector, to a Kafka topic. For the topic we have multiple consumers, one of them is our elastic wrapper service, handling consumption from the topic and writing into elastic (with some custom logic, which is why we don’t use the Kafka connect Elasticsearch sink connector).

While this setup doesn’t come without issues (mainly the oplog format publishes only deltas, and those are in the original mongo command format which is really painful to deal with) it generally just “works.” We have a scalable consumption solution with workers using the Kafka consumer group to manage partitions. (So we can scale up worker number while keeping ordered consumption) And sub-second lag between the main datastore writes to elastic ingestion.

When we need to reindex elastic (because of logic changes etc) we can just replay the items from the topic to a new consumer group with the new logic. We use Ansible for creating new k8s deployment for each index/topic, so we have zero downtime during reindexing and can have multiple code versions living side by side one consuming for the existing index and one for the old index

Important technical detail about this setup is how the Debezium MongoDB connector works. If we need “from the start of time” data, we can configure the connector to use the snapshot feature. Basically, it uses the replica protocol of MongoDB, pretending to be another read replica, and publishes all the existing items in the DB in “snapshot mode.” When it finishes the snapshot, it moves to “streaming mode,” publishing events directly from the oplog.

But for that to work it needs to:

1. finish the snapshot.

2. Verify the start of the MongoDB oplog (which is always bounded by size or time) is before the latest item is published in the snapshot, or else it has lost events. So to prevent this, it restarts the snapshot if it can’t find the snapshot end position in the oplog.

Another important technical implementation detail is how Kafka-Connect connectors save their state, they save their “position” in reading from the DB as “offsets” (While it uses the same name as Kafka offsets, they aren’t Kafka topic offsets, but serve a similar cause ) in a specific Kafka topic. That way, if the connector is restarted, the Kafka connector fails, or any Kafka connect error hits it, the connector starts over from the exact same place. This works great, But there is one big exception to this: There are no offsets recorded, for the Debezium MongoDB connector, while in snapshot mode. So if a snapshot was stopped, it starts over fresh.

While our Kafka is hosted as a “serverless” Kafka service in the confluent cloud, Our Kafka-connect and Kafka connectors run in our k8s cluster, connecting to this Kafka instance. It’s set up with the Strimzi operator, allowing us to maintain Kafka-connect and Kafka connectors setup as declarative code and let the operator handle the heavy lifting. It also means there is another level of abstraction between us and managing the actual connectors, which would prove important later on.

Zencity CDC pipeline to search — tech setup

Into the data and some Initial Analysis

Since this was categorized under “mystery problems,” the first thing to do was “looking under the engine hood” — in this case at the actual data in the Kafka topic. Downloading messages from the head of the topic, showed them to be created by the snapshot mode, and not only that they were “early” items from our DB, but the dawn of Zencity MongoDB storage. Then I understood, we’re not witnessing an extremely slow CDC pipeline, it’s a snapshot that is being restarted again and again. This correlated strongly with topic size growing this large. But why?

Searching under the street light

It’s a common trait of human beings to suggest answers based on prior experience, even when irrelevant to the current problem. And developers are human, very human. Thus we immediately thought the issue is Kafka connect getting OOMed out of existence. Why? because this was the reason we used to see it causing snapshot restarts in previous cases. Even the ticket opened was actually called: “running OOM when running two debezium connectors”. Alas, we’ve already tuned the Kafka connect memory XMS/XMX to fit in the pod memory “resource request”, and our monitors didn’t indicate any OOM lately with these pods.

A quick look at the logs found this log before the connector restarted:

2021–07–26 06:59:46,739 INFO [Worker clientId=connect-1, groupId=connect-cluster] Handling connector-only config update by restarting connector mongo-insights-connector-v1613553421629 (org.apache.kafka.connect.runtime.distributed.DistributedHerder) [DistributedHerder-connect-1–1]

So this was supposedly not an error, but a planned update. But checking with our team, no one tried to update this connector. Going back to the logs, we found out it’s a recurring log. So of course, even though I’ve googled this, it was to no avail, StackOverflow wouldn’t save us from this. After another look at the logs, we encountered a pattern: this was happening exactly every two minutes. This couldn’t be a coincidence.

The two-minute rate led us in the right direction. I vaguely remembered the Strimzi operator does reconciliation every two minutes, which seemed to correlate with our observation. A strimzi bug that failed its reconciliation equality check (checking if the Kafka connector CR was deployed with the *declared* properties, and if not, update them accordingly). It seems that it was restarting the connectors every two minutes for a couple of months, following an operator update. But it didn’t really bother us because none of them were in snapshot mode.

So we deployed the fix, deployed the updated Kafka connector, and the snapshot started running!

That is …Until a few hours later, it crashed. And there we were, starting the snapshot process all over again.

Purgatory

In the next couple of days we went through the slow hell of trying different stuff and getting all sorts of snapshot related failures:

Snapshots have finished only to find their ending point already out of the Oplog (we resized our Oplog size, and then yet again)

Snapshots have failed with a strange race condition which is an existing bug with Kafka connect which shouldn’t hurt you in most cases (but me, oh lord, was one)

Snapshots have failed (yet again!) when the Kafka producer got a timeout trying to heartbeat the Kafka broker, with some transient network failure. Remember, this could happen, it’s the first rule of distributed computing: the network is flakier than you think and generally not reliable.

And the list goes on. While some were fixable, others were just “those things happen from time to time, it won’t bother you usually” (usually = unless you don’t have offsets, which is the case with snapshotting mode.) Also, the iteration process on this was painstakingly slow, do a fix, go to sleep, wake up in the morning and check, does it still run without a restart? Nope? Try again.

Technical Root Cause and solution.

Finally, I understood what the underlying issue was. The snapshot was just too slow. With complex distributed systems (and with all the great work done by extremely talented developers to abstract this from us, Kafka and Kafka's related systems are complex distributed systems) we’re playing Russian roulette, something would fail eventually. The whole system is built around the premise of being able to maintain order, consistency, and delivery guarantees in the face of failure. But some of those guarantees are the exact cause of failing our snapshot while not providing us protection from it.

We need to find a way to finish the snapshot faster and move as quickly as possible to streaming mode, where Kafka-connect guarantees would work for us instead of against us.

Thanks to this blog posts series pointing us in the right direction, we focused on Kafka producer throughput (beneath our Kafka connector, it’s using the Kafka producer to produce the topics). When Kafka-connect cannot produce fast enough it back pressures the sink, producing items more slowly from the DB. changing some default properties did the trick, most of those are detailed here. The snapshot finished after a couple of (long and tense) hours. And we were good to go with streaming mode. We didn’t hit the jackpot in the Russian roulette this time. Service is back and Elastic can finally be reindexed.

Root cause analysis

While that was the technical core issue and fixing it allowed us to deliver, it’s not the root cause in my opinion, because the interesting question is — why do we need the snapshot in the first place?

The quick answer was that a few months ago, someone by mistake shortened the topic retention from “forever” to 1 Week, which means we can’t use it for replaying events logic. And yes, currently keeping the items in the topic is the easiest, working solution. But it’s clearly not a long-term solution.

This specific issue just pointed us to how fragile this is. Next time someone shortens or removes the topic I’m not sure we’ll be able to finish the snapshot. There is a finite amount of bytes that can be processed within a single Kafka connect thread in a timeframe (and yes, snapshotting a collection is a single task, by design, I don’t think it’s possible to parallelize this) eventually we’ll hit the Russian roulette bullet.

What could we do differently? Since we already stream from the CDC topic to our data lake (via spark structured streaming), we can just stream it from the data lake. But doing that without saving all the data in the topic (which leads to the same problem) involves implementing a way to output a “snapshot” of past events and then seamlessly move to “streaming mode,” which isn’t easy to achieve. It seems that we’ll need to implement that sooner than later. So we’ll have a robust and less fragile solution.

Epilogue

My two takes from this story, while still licking my wounds.

Learning new stuff

As with most hard stuff you do, we did gain some good value and insights from this process:

Because we needed to try multiple configurations of a connector, removing and creating topics was error-prone (and alas, errors have happened) so we understood we needed to automate this part of the setup. Fortunately, Kafka-connect versions now support auto-creating topics with specific configurations through connector configurations so it wasn’t a heavy lift.

Debugging this involved metrics which in the beginning of this process we just didn’t have. Turns out Kafka-connect and Debezium expose LOTS of metrics through JMX, and we needed to learn how to activate JMX in our specific Strimzi managed Kafka-connect and export the metric in a Prometheus compatible way. Also, a good reminder, that logs are usually misleading and metrics are the only sure way to go as noted in the “distributed systems for young bloods” paper (Please read this, if you haven’t. Really do, it’s one of those).

We discovered some surprising behavior with elastic: upserts do cost disk space (or else, we wouldn’t see) there is probably a way to compact this without locking the DB, but we didn’t find it yet. So it’s another fragile thing with our systems we need to fix.

Mental models

In my view, each step in this arduous process is a good example of the importance of having a good, thorough as possible, mental model of the core tech you’re using.

Reading the docs, blog posts, etc., while they might not be relevant immediately, would help you build a mental model of how things work. And when they are (eventually) broken, you’ll have better tools to understand what’s going on. I’ll discuss building mental models in a future blog post because it’s a big subject, but I’ll note that being able to “hold” a mental model of all the moving parts in this kind of nontrivial setup, can be also a way to assess if it’s the right solution for a specific team or scenario.

Also: And if you want to work on interesting technical stuff while serving citizens and local governments, come work with us at Zencity.io

Until next time

--

--