Tiered Storage in Apache Pulsar: Offloading Your Data to External Storage

Sherlock Xu
8 min readOct 9, 2022

--

We know that Apache BookKeeper provides storage services for Apache Pulsar. When messages are acknowledged and consumed, we can continue storing messages (ledgers) on bookies by configuring retention policies. Ultimately, GC threads on bookies clean up the ledgers that have rolled over. In some use cases, the cold data stored on bookies, though less important, may need to be retained for a long period of time. That said, it is not practical and economical to use retention policies to store them. This is because BookKeeper needs better storage devices, which are usually more expensive, if we want to have better performance of reading and writing as well as lower latency. Therefore, using high-performance storage systems to retain cold data is clearly a colossal waste of resources.

Tiered storage and offloaders

Pulsar’s tiered storage capability provides a data offloading mechanism so that you can migrate insignificant cold data to long-term storage systems, which are much cheaper. The great part is that after the data are offloaded, you can delete the original ledgers on bookies to free up the space without impacting your clients. They can produce and consume messages in the same way.

Specifically, how does this mechanism work?

For the implementation of tiered storage, Pulsar uses Apache jclouds for cloud storage, and Apache Hadoop for filesystem storage. What is actually working under the hood are offloaders.

When an offloader is working, it follows a one-by-one policy to copy ledgers to external storage. As a ledger is immutable after it is closed, you cannot let the offloader only copy parts of the messages within the ledger. Note that currently, Pulsar does not support streaming offloading.

In Pulsar, a topic is backed by a managed ledger, which represents an append-only stream of messages. It contains an ordered list of segments, also known as ledgers in BookKeeper. After a ledger is rolled over, it can be deleted and offloaded.

After ledgers are offloaded, their corresponding managed ledger, which is stored in ZooKeeper, will be updated to record their new storage location. When a client sends a request for the offloaded data, the offloader reads them back from external storage, so it looks like there is no difference for the client when reading the data (technically, the latency may be different, depending on the storage system you use to store the offloaded data).

Pulsar supports multiple data offloaders for different cloud storage systems, including AWS S3, GCS, and Azure BlobStore. To configure an offloader, you need to create storage buckets and identification credentials, and add these configurations to broker.conf or standalone.conf. Additionally, to make sure messages are not deleted immediately after consumed, you need to set a proper retention policy.

Next, I will use Google Cloud Storage (GCS) as an example to demonstrate how to offload Pulsar data. For more information about the configurations of other storage options, see the Pulsar documentation.

Configuring GCS buckets and authentication

1. In the Google Cloud Console, navigate to the Cloud Storage page to create a bucket.

2. Enter a bucket name, choose a region to store your data, and click Create. These two fields will be added to the broker configuration later.

3. After the bucket is created, you need to create a service account for Pulsar to access the bucket. Navigate to the Service Accounts page of IAM & Admin, and click CREATE SERVICE ACCOUNT.

4. Enter a name and assign some storage roles (Storage Admin, Storage Object Creator, and Storage Object Viewer) to the account in this GCP project. Click DONE.

I suggest you add the Storage Admin role. When I tried to offload data to GCS the first time, I did not add this role and it did not work with an error message The account does not have storage.objects.delete access to the Google Cloud Storage object.

5. Click the account and go to the KEYS tab. Create a new key of type JSON. After created, it is downloaded to your machine automatically.

Configuring Pulsar

As I mentioned above, we need to add the bucket and authentication information to our broker configuration.

1. Download the Pulsar package and unzip it.

wget https://archive.apache.org/dist/pulsar/pulsar-2.10.1/apache-pulsar-2.10.1-bin.tar.gztar xvfz apache-pulsar-2.10.1-bin.tar.gzcd apache-pulsar-2.10.1

2. Download the offloaders package, unzip it, and move the offloaders folder to the root directory of the Pulsar package.

wget https://archive.apache.org/dist/pulsar/pulsar-2.10.1/apache-pulsar-offloaders-2.10.1-bin.tar.gztar xvfz apache-pulsar-offloaders-2.10.1-bin.tar.gzmv apache-pulsar-offloaders-2.10.1/offloaders offloaders

If you are running Pulsar in a bare metal cluster, make sure that the offloaders tarball is unzipped in every broker's Pulsar directory. If you are running Pulsar in a containerized environment, you can use the apachepulsar/pulsar-all image, which contains tiered storage offloaders. In a Kubernetes cluster, for example, you should be able to see the offloaders directory after you access the toolset Pod.

3. Tiered storage uses Apache jclouds to support GCS for long-term storage.

ls offloaderstiered-storage-file-system-2.10.1.nar tiered-storage-jcloud-2.10.1.nar

4. Edit broker.conf or standalone.conf in conf to add the following information (I will run a standalone Pulsar cluster in this demo).

# The storage system to which you want to offload data.managedLedgerOffloadDriver=google-cloud-storage# The bucket name.gcsManagedLedgerOffloadBucket=pulsar-data-offloader-testing-1# The GCP region where you store your data.gcsManagedLedgerOffloadRegion=us-east1# The service account JSON file for authentication.gcsManagedLedgerOffloadServiceAccountKeyFile=/Downloads/pulsar-offloader-81413a8fbf08.json# The offloaders directory. You do not need to change it if you follow the previous steps to move the offloders folder to the Pulsar root directory.offloadersDirectory=./offloaders

5. For testing purposes, I suggest you change the following two fields to smaller values. Otherwise, you may need to wait a long time before a ledger is rolled over, after which the ledger can be offloaded. Note that a ledger rollover can only happen when certain thresholds are met. Refer to my previous blog post to see details.

managedLedgerMinLedgerRolloverTimeMinutes=2managedLedgerMaxEntriesPerLedger=5000

Offloading data to GCS

1. Run a standalone Pulsar cluster.

./bin/pulsar standalone

2. Open a new terminal, and set a namespace-level retention policy, which is applicable to all topics in the namespace. This prevents messages from being deleted immediately so that you have enough time to offload them.

./bin/pulsar-admin namespaces set-retention public/default \--size 10G \--time 2d

3. Use pulsar-perf to publish some messages. In the command below, -r defines the rate of publishing messages per second, and -s specifies the message size in bytes.

./bin/pulsar-perf produce -r 1000 -s 2048 test-topic

pulsar-perf is a built-in test tool in Pulsar. You can use it to test message writing or reading performance.

Expected output:

...2022-10-06T07:49:27,679+0000 [pulsar-perf-producer-exec-1-1] INFO  org.apache.pulsar.testclient.PerformanceProducer - Created 1 producers2022-10-06T07:49:27,729+0000 [pulsar-client-io-2-1] INFO  com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized2022-10-06T07:49:36,774+0000 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer - Throughput produced:    8939 msg ---    893.9 msg/s ---     14.0 Mbit/s  --- failure      0.0 msg/s --- Latency: mean:   5.868 ms - med:   3.871 - 95pct:  17.377 - 99pct:  25.946 - 99.9pct:  30.599 - 99.99pct:  36.973 - Max:  40.0392022-10-06T07:49:46,793+0000 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer - Throughput produced:   19103 msg ---   1001.7 msg/s ---     15.7 Mbit/s  --- failure      0.0 msg/s --- Latency: mean:   3.860 ms - med:   2.342 - 95pct:  12.378 - 99pct:  20.441 - 99.9pct:  26.635 - 99.99pct:  33.614 - Max:  33.6162022-10-06T07:49:56,807+0000 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer - Throughput produced:   29117 msg ---   1000.0 msg/s ---     15.6 Mbit/s  --- failure      0.0 msg/s --- Latency: mean:   2.985 ms - med:   2.090 - 95pct:   8.760 - 99pct:  14.224 - 99.9pct:  19.030 - 99.99pct:  21.762 - Max:  22.8352022-10-06T07:50:06,822+0000 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer - Throughput produced:   39133 msg ---    999.9 msg/s ---     15.6 Mbit/s  --- failure      0.0 msg/s --- Latency: mean:   2.981 ms - med:   2.110 - 95pct:   9.308 - 99pct:  15.368 - 99.9pct:  25.364 - 99.99pct:  29.651 - Max:  29.6592022-10-06T07:50:16,844+0000 [main] INFO  org.apache.pulsar.testclient.PerformanceProducer - Throughput produced:   49149 msg ---   1000.1 msg/s ---     15.6 Mbit/s  --- failure      0.0 msg/s --- Latency: mean:   2.849 ms - med:   1.995 - 95pct:   8.695 - 99pct:  14.753 - 99.9pct:  20.599 - 99.99pct:  26.295 - Max:  27.569

4. Open a new terminal. Run the following command to check the status of ledgers. After they are offloaded later, you can run the command again and the value of offloaded in the output will turn to true.

./bin/pulsar-admin topics stats-internal test-topic

Expected output:

{"entriesAddedCounter" : 185605,"numberOfEntries" : 185605,"totalSize" : 533380189,"currentLedgerEntries" : 9931,"currentLedgerSize" : 31610224,"lastLedgerCreatedTimestamp" : "2022-10-06T07:53:27.685Z","waitingCursorsCount" : 0,"pendingAddEntriesCount" : 4,"lastConfirmedEntry" : "21:9927","state" : "LedgerOpened","ledgers" : [ {    "ledgerId" : 19,    "entries" : 87874,    "size" : 250738228,    "offloaded" : false,    "underReplicated" : false}, {    "ledgerId" : 20,    "entries" : 87803,    "size" : 251046282,    "offloaded" : false,    "underReplicated" : false}, {    "ledgerId" : 21,    "entries" : 0,    "size" : 0,    "offloaded" : false,"underReplicated" : false} ],"cursors" : { },"schemaLedgers" : [ ],"compactedLedger" : {    "ledgerId" : -1,    "entries" : -1,    "size" : -1,    "offloaded" : false,    "underReplicated" : false  }}

5. You can set a size threshold so that Pulsar automatically offloads data once it is exceeded.

./bin/pulsar-admin topics offload --size-threshold 10M persistent://public/default/test-topic

Expected output:

Offload triggered for persistent://public/default/test-topic for messages before 22:0:-1

6. Check the offloading status. It may take some time before you can see the result.

./bin/pulsar-admin topics offload-status persistent://public/default/test-topic

Expected output:

Offload was a success

7. Go back to the Buckets page and you should be able to see that the data have been successfully offloaded.

Summary

Tiered storage provides a cost-effective mechanism that allows Pulsar and BookKeeper to interact with external storage systems to store data that have no immediate value. In future blog posts, I will explain another way of migrating data between Pulsar and other tools, also known as connectors.

Reference

Pulsar documentation

--

--