Recovering corrupted RabbitMQ data by reversing its storage protocol (part 1)

Wenceslas des Déserts
CybelAngel Product & Engineering
10 min readJan 28, 2022

--

· Introduction
· What happened
· RabbitMQ Storage overview
· The message store
Testing community tools
Erlang External Term Format
Digging into RabbitMQ code
Wrapping up everything we learned
· Conclusion

Introduction

CybelAngel is an external risk protection solution. As such we continuously scan various services, bringing data into our monitoring pipeline.

From a technical standpoint, our product consists of hundreds of microservices talking to each other through a message broker. Our most recent microservices have been migrated to GCP PubSub, while some of our older services still use RabbitMQ until the migration is complete.

A few weeks ago, our RabbitMQ cluster experienced a critical outage and one of the nodes’ data ended up corrupted. We had to backup the data and reset the node entirely to get the cluster back to a functional state. This blog post details how we “reversed” RabbitMQ disk storage protocol and managed to recover the corrupted data and re-inject it into our pipeline, mitigating the impact of the outage. We will also share the code we wrote for those interested!

What happened

The crash happened a few weeks ago, early on a Sunday morning. Our alerting system woke up the on-call engineer. He rapidly identified the root cause: a microservice failure triggered a rapid accumulation of messages in RabbitMQ.

We scan hundreds of thousands of documents per minute, and as such, any delay in processing can result in a very large amount of messages piling up in our message broker. We have safety systems and circuit breakers in place to avoid cascading failure, but unfortunately this time the failing microservice was on the critical path. Our last line of defense against this kind of outages is RabbitMQ high disk usage watermark: when the disk usage reaches a configured threshold, RabbitMQ blocks all incoming messages until enough disk space has been freed, acting as a naive back-pressure system.

This day, RabbitMQ continued writing data long after the watermark had been reached. It filled the data partition entirely and crashed. We suspect RabbitMQ flushed a lot of data at once, reaching the watermark and filling the disk in one operation. (We later reconfigured the watermark to have a bigger safety margin and prevent the issue from happening again.) The damage was done: some changes were only partially persisted, and RabbitMQ wouldn’t restart.

Our on-call engineer increased the VM disk size and tried restarting RabbitMQ, but the service kept complaining about corrupted log files:

** FATAL ** Possibly truncated decision_tab file: {corrupt_log_file,"/var/lib/rabbitmq/mnesia/rabbit@<ip>/DECISION_TAB.LOG"}

Deleting DECISION_TAB.log didn’t help. The only option left was to reset the node entirely. This would bring it back online, but destroy all the data stored in RabbitMQ internal database.

RabbitMQ has a replication feature that, when enabled, automatically replicates queues on multiple nodes. When enabled, the loss of a node is not a big deal because all the messages exist elsewhere. Sadly, we had no replication in this cluster because we observed a huge amount of Rabbit inter-nodes communication issues after enabling the feature. (We suspect those have to do with how Erlang transmits data between nodes internally and the sheer amount of inter-nodes traffic generated by the management plugin we use to gather metrics.)

At this point, our on-call engineer was joined by the manager of the SRE team. Data in the impacted queues were not critical, so they decided it was more important to get the node back to a live state as fast as possible. Before resetting the node, they did a backup of the whole data partition, so that we could try to recover the messages later.

During the post-mortem meeting on the following day we decided it was worth spending some engineer time on recovering the data:

  • to mitigate the impact of the outage
  • to be prepared if something similar were to happen (although we are confident the specific issue described here won’t occur again, since we reconfigured the watermark)
  • to get a better grasp of RabbitMQ internals: migrating away from RabbitMQ is a slow process, and while it is no longer a key piece of our infrastructure, it is still an interesting technology that we want to understand better!

RabbitMQ Storage overview

RabbitMQ documentation on disk storage is scarce, and we didn’t find much in mailing lists or blog posts. The best official source we found is the doc on persistence:

  • Messages can be stored on disk in two different places: the queue index or the message store
  • Each queue has an index (which can be stored in multiple files if it’s big)
  • For each message in a queue, the index stores:
    – The rank of the message in the queue
    – The status of the message (published, delivered, acknowledged)
    – The message itself if it’s shorter than 4096 bytes (default configuration)
  • The message store (which, again, can be stored in multiple files) contains all messages longer than 4096 bytes, regardless of their source and destination.

RabbitMQ data directory contains a lot of stuff, but we were only interested in the msg_stores directory that lives at /var/lib/rabbitmq/mnesia/rabbit@<node_private_ip>/msg_stores. On our systems the whole /var/lib/rabbitmq is on a dedicated partition that our engineers had backed up before resetting the node. The store directory was included in our disk backup.

It has the following structure:

rabbit@<node_private_ip>/msg_stores/vhosts/<vhost_id>: 
.
├── msg_store_persistent
│ ├── 1011223.rdq
│ ├── 1011669.rdq
│ ├── 1012396.rdq
│ ├── <other .rdq files>
├── queues
│ ├── 115HPE6OZTZ9QG7OOPLLVSAT
│ │ └── journal.jif
│ ├── 120ZI4D1WKBFGVQ1V8GT3BPWZ
│ │ ├── 5.idx
│ │ └── journal.jif
│ ├── 1F530S78TDLFD4ZUYWVN25L43
│ │ ├── 10588.idx
│ │ ├── 10589.idx
│ │ └── journal.jif
│ ├── <other directories with ids>

The message store lives in the msg_store_persistent directory and consists of multiple .rdq files. Indexes are stored in sub-directories of queues (one subdir per queue) in .idx files. Not all queues have a .idx, we are not sure why‽ Maybe those are old or dead queues. If you know, please reach out!

Most messages were stored in .rdq files. We had ten thousand of those!

Note: there is also a msg_store_transient store for transient messages. If you use transient queues, your setup might be slightly different. We didn’t dig into it, but the transient message store has the same structure as the persistent message store, so everything below stands true.

The message store

We began our investigation with .rdq files since most messages were stored there. All .rdq files have the same structure, so we picked a random one and ran hexdump on it (e.g. hexdump -C 1011223.rdq ). Looking at the first bytes of the file already gives us a lot of intel:

First 256 bytes of 1011223.rdq

The -C flag gives us hexadecimal and ASCII display side by side so that we can see at a glance what part of the file is valid ASCII.

The good news is that our messages are there, stored in (almost) plain text. We mostly transmit JSON through RabbitMQ, and the {“document": {"$storable... part is a valid document start in our messaging model.

Everything before the JSON start is a bit cryptic, but we can break it down:

  • basic_message is the type of the resource stored
  • exchange [...] job.annotated_micromirrors matches with an existing AMQP exchange in our cluster
  • rabbit_framing_amqp_0_9_1 is the version of AMQP RabbitMQ uses

This is a good start, but if we want to reinject our lost messages we need to extract them with their destination exchanges and their routing keys.

Testing community tools

Several projects out there do exactly that, for example RDQ dump. When we ran this tool on our files though, it immediately appeared that it only extracted some of the messages but missed a lot of them!

All the RDQ parsers we could find work on the same principle: they look for all occurrences of a (seemingly arbitrary) token in the file, and read what comes after. That token is 395f316c000000016d0000 and has probably been copy-pasted from one script to another!

Since this didn’t work for us, we had to understand how exactly messages were encoded and what all the data surrounding our messages meant.

Erlang External Term Format

Erlang has its own serialization protocol called Erlang External Term Format. Some of the data we had definitely looked like ETF.

All Erlang serialized items have the same binary structure:

  • A one-byte long header specifying the type of the serialized variable (for example, 104 means small tuple)
  • One or more metadata fields describing the type (for example, the tuple length)
  • One or more fields of variable lengths containing the actual data (for example, tuple items)

Please note that Erlang’s documentation describes prefixes in decimal format while this post shows bytes directly in hexadecimal format. In the rest of this post, we will use the following prefixes to avoid confusion:

  • 0x for hexadecimal
  • 0d for decimal
  • 0b for binary

Since a group of 4 bits can take 2⁴ = 16 values, one byte (8 bits, or 2 groups of 4 bits) can be represented as two hexadecimal characters so that the following notations are equivalent:

  • 0x64 in hexadecimal
  • 0d100 in decimal
  • 0b01100100 in binary

If we look at a portion of our hexadecimal dump:

The 64 00 19 part (first line) is particularly interesting. Indeed, 0x64 (or 0d100 in decimal) is the prefix for an ATOM_EXT aka en Erlang constant string.

Erlang’s doc specify that ATOM_EXT have the following structure:

  • A prefix which value is 0x64 (one byte)
  • The length len of the atom (two bytes)
  • The atom itself (len bytes)

So if 64 is the ATOM_EXT prefix, it follow that 00 19 must be the length. That’s 25 in base 10 ( 0d25 ), so we take the following 25 bytes: 72 61 62 62 69 74 5f 66 72 61 6d 69 6e 67 5f 61 6d 71 70 5f 30 5f 39 5f 31

All those bytes are valid in ASCII and translate to rabbit_framing_amqp_0_9_1 .

Great! We now have a better understanding of how messages are encoded. But if we look at the beginning of the .rdq file, we have bytes that are NOT valid ETF.

Erlang documentation specifies that ETF data always begin with 0x83 ( 0d100). This marker is clearly visible here (second line, start of second group), but all bytes before can’t be ETF: 00 00 00 00 00 01 c2 fb 02 e9 eb e0 0f 39 2b 51 fd cd 34 ba d7 fc 29 92

We had no choice but to look for the RabbitMQ portion of code which wrote those bytes.

Digging into RabbitMQ code

Navigating Erlang code is not easy when you don’t know the language. Fortunately, RabbitMQ files are well-named and rabbit_msg_store.erl had the function we were looking for, along with a bunch of comments explaining how the store works. The actual message-to-rdq-file code, though, lives in the append function of rabbit_msg_file.erl:

This is very interesting! term_to_binary is a standard Erlang function that serializes a data structure into ETF. We can see that RabbitMQ appends a bunch of information around the serialized message. The final structure looks like this:

  • The size of (message id + serialized message), aka record size
  • The message id
  • The serialized message itself, in ETF
  • A “write ok marker”

If we replace the size hints and constants by their values (declared above in the file), we get the full specification of the storage protocol:

  • Record size (message id size + serialized message size): 8 bytes
  • Message id: 16 bytes
  • Serialized message in ETF: Record size minus 16 bytes
  • Write ok marker: 1 bytes

If we look again at the beginning of our RDQ file:

We can now identify the 24 bytes before 0x83 that where puzzling us before:

  • 00 00 00 00 00 01 c2 fb: record size on 8 bytes
  • 02 e9 eb e0 0f 39 2b 51 fd cd 34 ba d7 fc 29 92: message id on 16 bytes

The following bytes, starting with 0x83, contain the serialized message. Just after the end of the serialized message (not shown in the extracts above), we would find the write ok marker, whose value is always 0xff.

Wrapping up everything we learned

Now that we have a good understanding of the RDQ format, we can write a Python parser that loads all the messages stored in a .rdq file. We discovered some quirks while doing so:

Encoding
We have non-ASCII characters in our messages (accented characters and such). Erlang stores strings in UTF-8, so decoding them was straightforward.

Chunking
If we look at the following block of hexadecimal data:

We see that after the rabbit_framing_amqp_0_9_1 atom, there is the beginning of an ETF list (marked by the byte 0x6c = 0d108). The following four bytes 00 00 00 01 specify that the list has a length of one: the only element of the list is the body of the message.

When a message body is long, RabbitMQ chunks it into multiple pieces and the list will contain more than one element. Fun fact: these elements need to be joined in reverse-order to get back the full message body!

This also explains why the community tools we tried didn’t work! If we align the “magic token” they use to find messages with the actual bytes structure, we get this: 39 5f 31 6c 00 00 00 01 6d 00 00

Or, in plain text:

  • 39 5f 31 is 9_1 in ASCII, aka the end of the rabbit_framing_amqp_0_9_1 atom
  • 6c begins an ETF list
  • 00 00 00 01 means the list has a size of 1
  • 6d 00 00 is the beginning of the body

All these tools consistently missed all messages whose body had been chunked, because they expected the list to always have a size of exactly one!

Conclusion

This wraps up the first part of this investigation! We have seen how a microservice failure caused an accumulation of messages in RabbitMQ, filling the disk and corrupting data. We had to dig pretty deep in the data format and the code, but we were eventually able to understand most of the storage protocol and write a parser able to extract all messages stored in the message store.

In the second (and last) part, we will apply the same techniques to the index store and share the CLI tool we wrote to recover messages!

--

--