Apache BookKeeper Internals — Part 1 — High Level

Jack Vanlightly
Splunk-MaaS
Published in
7 min readOct 18, 2021

The primary objective of this internals series is to help people build a mental model of how BookKeeper server works internally. This mental model is the foundation for troubleshooting incidents in production or when adding/modifying features to the project. In a subsequent series we’ll look at combining this mental model with the available BookKeeper metrics to make us all be more effective at identifying performance bottlenecks.

Note that BookKeeper has different pluggable components and we will be focusing specifically on BookKeeper as configured for use by Apache Pulsar. So there are other pluggable components that are not covered in this post. We’ll also be focusing on the workings of a single bookie, rather than a cluster as a whole and the replication protocol. To understand the wider BookKeeper Protocol see our blog post to learn more about it.

We start the series focusing on the read and write path and may explore other areas such as garbage collection, compaction and CLI tooling in the future.

If you read the BookKeeper code you will see it organised into various modules and within modules there are different abstractions. This guide does not concern itself with these code abstractions as they are only really relevant for BookKeeper contributors. Our focus is on threads that do the work and the flow of data between threads, data structures and persistent storage, think of of it as the physical model.

High Level View

At its simplest a BookKeeper server node (known as a bookie) is a network server (using Netty) at the top with a disk IO layer at the bottom and a bunch of buffers and caches in the middle. Bookies are just dumb storage nodes whose primary purpose is to write and read ledger entries as fast as possible, while maintaining data safety guarantees.

Fig 1. A bookie in its simplest representation

Hidden in that representation are the various components and the threading model. We’ll incrementally peel back the layers and explain everything as we go in this post.

The Components

Each ledger entry is written to both a journal and long-term storage, known as Ledger Storage. Ledger storage is pluggable and the version used by bookies in a Pulsar cluster is called DbLedgerStorage.

The journal provides write durability and a low latency write capability. As soon as an entry has been written to the journal and an fsync performed, a write response can be sent as the entry is safely on disk. DbLedgerStorage writes to disk asynchronously which allows it to perform large sequential batch writes and layout data on disk in an optimized way for more sequential reads. We’ll cover this in more detail later on.

Read requests only go to DbLedgerStorage and usually will hit a cache if things are going well. When a cache miss occurs the entry is read from disk and also a readahead operation is performed to fill the read cache so subsequent reads for the next entries hit the cache instead. More on that later too.

Fig 2. A high level view of which components are involved in read and write requests.

Let’s start looking inside the Journal and DbLedgerStorage regarding write requests. When a write request is received by the Netty server, a write request object is created and it is submitted to the write thread pool. From here the entry gets passed to DbLedgerStorage that adds it to the write cache (an in-memory cache) and to the journal that adds it to an in-memory queue. Different threads responsible for journal and ledger storage take the entry from there and respectively write the entry to disk. Once an entry has been written to the journal and an fsync performed the write response is sent.

Fig 3. A high level view inside the Journal and DbLedgerStorage.

Even at a high-level there is more to understand than only which components are involved, there is also the threading model. Each bookie has multiple thread pools and single threads that are actually responsible for doing all this work via the use of the Journal and Ledger Storage APIs.

The Threading Model

Fig 4. The threads and thread pools of a bookie

The diagram above simply shows which threads and thread pools exist and the lines of communication between them. Netty does all the request/response handling, from there work is submitted to any of four thread pools depending on the request.

Normal reads don’t interact with other threads, they can do all the work themselves. Long poll reads on the other hand get notified by write threads when a relevant write occurs. Writes pass across multiple threads as part of the synchronous flow from write request to write response. Other threads like the Sync Thread and DbStorage thread also do writes but asynchronously.

The high priority thread pool is for reads and writes that are flagged with the high priority bit. This typically includes fencing operations (which get written to the journal) and reads/writes related to recovery operations. In the steady state, this thread pool should see little work.

Threads interact in one of the following ways:

  • Work is submitted to another thread or thread pool (Java executors). Each executor has its own task queue and work is queued until it can be executed.
  • In-memory queues like blocking queues, one thread adds an object to the queue and another thread takes those objects from the queue and processes them.
  • Caches — one thread adds an object to the write cache and another thread reads those objects to write them to disk.

Before we go into the details of how the threads and components work together we should cover how both the compute (threads) and IO components (Journal/Ledger Storage) can be parallelized.

Parallelization of Work and Ordering Guarantees

BookKeeper allows for the parallelization of both the compute and the disk IO. The compute is parallelized via the use of thread pools and the disk IO is parallelized by spreading out of disk IO over multiple directories (each of which can be mounted to a different volume).

The four thread pools: write, read, long poll and high priority are all instances of the OrderedExecutor class which is a thread pool where work can be assigned to threads based on the ledger id of the entry being written or read.

Fig 5. OrderedExecutor is a thread pool with task-thread assignment based on ledger id

Routing deterministically by ledger id allows for both parallelization and ordering of operations that are related. Each single threaded executor has a task queue allowing tasks to be submitted and worked through one at a time.

The configs for setting the number of threads are:

  • serverNumIOThreads (Netty threads, defaults to 2xCPU threads)
  • numAddWorkerThreads (defaults to 1)
  • numReadWorkerThreads (defaults to 8)
  • numLongPollWorkerThreads (defaults to 0 meaning that long poll reads are submitted to the read thread pool)
  • numHighPriorityWorkerThreads (defaults to 8)
  • numJournalCallbackThreads (defaults to 1)

On the disk IO side, we can parallelize both the Journal and the Ledger Storage by spreading out the work over multiple directories.

A separate Journal instance is created for each configured journal directory. Each journal instance has its own internal threading model for writing to disk and calling the write request callbacks for sending the write responses.

Fig 6. Multiple journal instances can increase the write throughput

You can configure multiple journal directories via the config journalDirectories.

For each configured ledger directory, DbLedgerStorage creates a SingleDirectoryDbLedgerStorage instance which contains a write cache, read cache, DbStorage thread and its own ledger entry log files and RockDB index files. Each instance is independent, i.e. not sharing files or caches.

Fig 7. Multiple SingleDirectoryDbLedgerDirectory instances can increase the write throughput

You can configure multiple ledger directories via the config ledgerDirectories.

SingleDirectoryDbLedgerStorage is a mouthful, so I’ll simply refer to this as a DbLedgerStorage instance from now on.

The final route a request takes in terms of threads and component instances depends on the size of thread pools and numbers of journal and ledger directories.

Fig 8. The path a read request might take with 8 read threads and 2 ledger directories.

By default the write thread pool has only a single thread because there isn’t a whole lot of work done by this thread pool as we’ll see in the next post.

Fig 9. The path a write request may take with 1 write thread, 4 journal directories and 2 ledger directories.

This parallelizable architecture allows a bookie to scale up both its compute and disk IO on large servers with lots of CPU cores and multiple disks.

Of course let’s not forget that one of the easiest ways of scaling BookKeeper is by simply scaling out the number of bookies. But for those that want to scale-up, BookKeeper has that ability.

Thread Naming

If you ever take a flame graph or a memory dump of a bookie process you can see these threads and thread pools listed with the following prefixes:

  • bookie-io (Netty threads)
  • BookieReadThreadPool-OrderedExecutor
  • BookieWriteThreadPool-OrderedExecutor
  • BookieJournal-3181 (if you use the default port)
  • ForceWriteThread
  • bookie-journal-callback
  • SyncThread
  • db-storage

Summary

In this post we’ve covered the high level architecture in terms of the components and threads as well as how requests are deterministically routed to those threads and components.

In the next post we’ll look at write operations in detail including how the threading model and component model combine.

Series posts:

--

--