Pulsar on Kubernetes: Offloading Your Data to AWS S3 with Tiered Storage

Sherlock Xu
9 min readApr 2, 2023

--

In my previous blog post, I demonstrated how to install Apache Pulsar, a highly scalable and reliable distributed messaging platform, on Kubernetes using StreamNative Pulsar Operators. These operators allow us to easily manage and scale Pulsar workloads in a containerized environment. You can leverage a variety of native Pulsar capabilities by configuring them in CR manifests directly.

In this article, I will provide a step-by-step guide to using Pulsar’s tiered storage to offload data to Amazon Simple Storage Service (Amazon S3), an object storage service with great data availability, security, and performance. Tiered storage in Pulsar allows you to migrate data of less immediate value to long-term storage systems, thereby optimizing resource utilization and reducing costs.

In one of my blog posts, I explained how tiered storage works and showed how to offload data to Google Cloud Storage from a standalone local Pulsar deployment. This article is different from that one, and the intention of writing this article is that I want to combine the experiences of using Pulsar Operators and tiered storage, so as to fill some of the gaps of Pulsar usage on Kubernetes in the existing Pulsar documentation.

Before you begin

In this section, I will cover the prerequisites to ensure a smooth setup process. Make sure you complete the following steps:

1. Create a Kubernetes cluster (v1.16 <= Kubernetes version < v1.26) with kubectl installed. To provide persistent storage for BookKeeper and ZooKeeper, you must configure a default storage class. The following is my GKE (Google Kubernetes Engine) environment for your reference.

kubectl get nodes -o wide

NAME STATUS ROLES AGE VERSION INTERNAL-IP EXTERNAL-IP OS-IMAGE KERNEL-VERSION CONTAINER-RUNTIME
gke-operator-default-pool-8f7cf065-0282 Ready <none> 2m41s v1.24.9-gke.3200 10.128.0.3 xx.xxx.xxx.xx Container-Optimized OS from Google 5.10.161+ containerd://1.6.9
gke-operator-default-pool-8f7cf065-lkfb Ready <none> 2m44s v1.24.9-gke.3200 10.128.0.5 xx.xxx.xxx.xx Container-Optimized OS from Google 5.10.161+ containerd://1.6.9
gke-operator-default-pool-8f7cf065-rjxx Ready <none> 2m42s v1.24.9-gke.3200 10.128.0.4 xx.xxx.xxx.xx Container-Optimized OS from Google 5.10.161+ containerd://1.6.9

2. Create an IAM user with the AmazonS3FullAccess permission and generate an access key and a secret access key for the user. The keys will be used later to allow Pulsar to access AWS S3.

3. Create an AWS S3 bucket to store the offloaded data from Pulsar. The bucket name will be used later in the broker CR.

Installing Pulsar using Pulsar Operators

In this section, I’ll walk through the steps to install Pulsar on Kubernetes using Pulsar Operators. Follow the steps below to complete the installation.

1. Create necessary Kubernetes resources of OLM. The first several steps in this section are similar to those in my previous blog post, so I will not explain the details here. Run the following command.

curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.24.0/install.sh | bash -s v0.24.0

2. Install Pulsar Operators (custom controllers) and the CRDs of Pulsar components.

kubectl create -f https://raw.githubusercontent.com/streamnative/charts/master/examples/pulsar-operators/olm-subscription.yaml

3. Create a namespace called pulsar to place related workloads of Pulsar.

kubectl create ns pulsar

4. Create two Kubernetes Secrets to store the AWS access key ID and secret access key. You need to reference these Secrets in the broker CR manifest later so that Pulsar can access AWS S3 and offload data to it. Remember to replace the access keys in the following two commands with your own.

kubectl -n pulsar create secret generic aws-access-key --from-literal=AWS_ACCESS_KEY_ID='AKIAYWR5WIQWIS6SYXXX'
kubectl -n pulsar create secret generic aws-secret-key --from-literal=AWS_SECRET_ACCESS_KEY='jXX/UBuUT72PoKUUcRUD4M//UmnbCv+teiAJXxXX'

5. Create a YAML file (for example, pulsar.yaml) with the following manifests of ZooKeeper, BookKeeper, proxies (optional), and brokers. You can always find their latest YAML files in this GitHub repository.

---
apiVersion: zookeeper.streamnative.io/v1alpha1
kind: ZooKeeperCluster
metadata:
name: zookeepers
namespace: pulsar
spec:
image: streamnative/sn-platform-slim:2.10.3.4
pod:
resources:
requests:
cpu: 50m
memory: 256Mi
securityContext:
runAsNonRoot: true
persistence:
data:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 8Gi
dataLog:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2Gi
reclaimPolicy: Delete
replicas: 3
---
apiVersion: bookkeeper.streamnative.io/v1alpha1
kind: BookKeeperCluster
metadata:
name: bookies
namespace: pulsar
spec:
image: streamnative/sn-platform-slim:2.10.3.4
replicas: 3
pod:
resources:
requests:
cpu: 200m
memory: 512Mi
securityContext:
runAsNonRoot: true
storage:
journal:
numDirsPerVolume: 1
numVolumes: 1
volumeClaimTemplate:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 8Gi
ledger:
numDirsPerVolume: 1
numVolumes: 1
volumeClaimTemplate:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 16Gi
reclaimPolicy: Delete
zkServers: zookeepers-zk:2181
---
apiVersion: pulsar.streamnative.io/v1alpha1
kind: PulsarProxy
metadata:
name: proxys
namespace: pulsar
spec:
image: "streamnative/sn-platform-slim:2.10.3.4"
pod:
resources:
requests:
cpu: 200m
memory: 512Mi
securityContext:
runAsNonRoot: true
brokerAddress: brokers-broker
replicas: 2
config:
tls:
enabled: false
dnsNames:
[]
issuerRef:
name: ""
---
apiVersion: pulsar.streamnative.io/v1alpha1
kind: PulsarBroker
metadata:
name: brokers
namespace: pulsar
spec:
image: streamnative/sn-platform-slim:2.10.3.4
replicas: 2
zkServers: zookeepers-zk:2181
pod:
resources:
requests:
cpu: 200m
memory: 512Mi
securityContext:
runAsNonRoot: true
vars:
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: aws-access-key
key: AWS_ACCESS_KEY_ID
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: aws-secret-key
key: AWS_SECRET_ACCESS_KEY
config:
custom:
PULSAR_PREFIX_managedLedgerOffloadDriver: "aws-s3" # The storage system to which you want to offload data.
PULSAR_PREFIX_s3ManagedLedgerOffloadRegion: "ap-southeast-1" # The AWS region where you created the bucket to store your offloaded data.
PULSAR_PREFIX_s3ManagedLedgerOffloadBucket: "k8s-pulsar-offload" # The AWS S3 Bucket name.
PULSAR_PREFIX_managedLedgerMinLedgerRolloverTimeMinutes: "1" # The minimum time duration for a ledger to be rolled over.
PULSAR_PREFIX_managedLedgerMaxEntriesPerLedger: "5000" # The maximum number of entries that a ledger can contain before it is rolled over.

Before proceeding to the next step, let’s take a closer look at broker’s configurations.

spec.pod.vars references the Secrets created above as environment variables. You must change the name of the access key ID (aws-access-key) and the secret access key (aws-secret-key) if you used different ones in the previous step.

spec.config.custom lists some customized parameters for enabling and configuring the AWS S3 offloader. If your Pulsar instance is not deployed on Kubernetes (for example, bare metal), usually you need to add these configurations in broker.conf or standalone.conf. As I am using Operators and CRs to install Pulsar on Kubernetes, I have to add PULSAR_PREFIX_ as their prefixes and use them as customized configurations. If you have other necessary configurations that can’t be directly set in the CR (i.e. no reference in the broker CRD), you can add them in the same way. Note that StreamNative is still working to optimize the CRDs of Pulsar Operators by allowing users to set more native Pulsar parameters in the CR manifest directly.

For more information about AWS S3 configurations, see the Pulsar documentation.

6. Run the following command to create a Pulsar instance.

kubectl create -f pulsar.yaml

7. Check the status of Pods.

kubectl get pods -n pulsar

NAME READY STATUS RESTARTS AGE
bookies-bk-0 1/1 Running 0 59s
bookies-bk-1 1/1 Running 0 59s
bookies-bk-2 1/1 Running 0 59s
bookies-bk-auto-recovery-0 1/1 Running 0 10s
brokers-broker-0 1/1 Running 0 57s
brokers-broker-1 1/1 Running 0 57s
proxys-proxy-0 1/1 Running 0 3m9s
proxys-proxy-1 1/1 Running 0 3m9s
zookeepers-zk-0 1/1 Running 0 3m10s
zookeepers-zk-1 1/1 Running 0 3m10s
zookeepers-zk-2 1/1 Running 0 3m10s

Offloading data to AWS S3

Now that the Pulsar cluster is up and running on Kubernetes, you can begin to send some messages to it and offload the data to S3.

1. Exec into a broker Pod to create a topic. The following commands create a tenant called sherlock, a namespace called sherlock/pulsar, and a persistent topic called persistent://sherlock/pulsar/offload.

kubectl exec -n pulsar brokers-broker-0 -- bin/pulsar-admin tenants create sherlock
kubectl exec -n pulsar brokers-broker-0 -- bin/pulsar-admin namespaces create sherlock/pulsar
kubectl exec -n pulsar brokers-broker-0 -- bin/pulsar-admin topics create persistent://sherlock/pulsar/offload

2. Verify that the topic has been created successfully by listing the topics in the sherlock/pulsar namespace:

kubectl exec -n pulsar brokers-broker-0 -- bin/pulsar-admin topics list sherlock/pulsar
Defaulted container "pulsar-broker" out of: pulsar-broker, init-sysctl (init)
persistent://sherlock/pulsar/offload

3. Set a namespace-level retention policy, which is applicable to all topics in the namespace. This prevents messages from being deleted immediately so that you have enough time to offload them. For more information about message retention, see my blog post Key Concepts You Need to Know about Message Retention, Expiry, and Deletion in Apache Pulsar.

kubectl exec -n pulsar brokers-broker-0 -- bin/pulsar-admin namespaces set-retention sherlock/pulsar --size 10G --time 2d

4. Verify that the policy has been created successfully.

kubectl exec -n pulsar brokers-broker-0 -- bin/pulsar-admin namespaces get-retention sherlock/pulsar
Defaulted container "pulsar-broker" out of: pulsar-broker, init-sysctl (init)
{
"retentionTimeInMinutes" : 2880,
"retentionSizeInMB" : 10240
}

5. Publish some messages. In the command below, -r defines the rate of publishing messages per second, and -s specifies the message size in bytes.

kubectl exec -n pulsar brokers-broker-0 -- bin/pulsar-perf produce -r 1000 -s 2048 persistent://sherlock/pulsar/offload

Expected output:

...
2023-03-30T10:44:02,007+0000 [pulsar-client-io-2-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xbf9f5e54, L:/10.124.0.8:38294 - R:brokers-broker-0.brokers-broker-headless.pulsar.svc.cluster.local/10.124.0.8:6650]] Connected to server
2023-03-30T10:44:02,012+0000 [pulsar-client-io-2-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://sherlock/pulsar/offload] [null] Creating producer on cnx [id: 0xbf9f5e54, L:/10.124.0.8:38294 - R:brokers-broker-0.brokers-broker-headless.pulsar.svc.cluster.local/10.124.0.8:6650]
2023-03-30T10:44:02,104+0000 [pulsar-client-io-2-1] INFO org.apache.pulsar.client.impl.ProducerImpl - [persistent://sherlock/pulsar/offload] [brokers-0-0] Created producer on cnx [id: 0xbf9f5e54, L:/10.124.0.8:38294 - R:brokers-broker-0.brokers-broker-headless.pulsar.svc.cluster.local/10.124.0.8:6650]
2023-03-30T10:44:02,120+0000 [pulsar-perf-producer-exec-1-1] INFO org.apache.pulsar.testclient.PerformanceProducer - Created 1 producers
2023-03-30T10:44:02,233+0000 [pulsar-client-io-2-1] INFO com.scurrilous.circe.checksum.Crc32cIntChecksum - SSE4.2 CRC32C provider initialized
2023-03-30T10:44:12,676+0000 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 8481 msg --- 848.1 msg/s --- 13.3 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 516.120 ms - med: 36.997 - 95pct: 2221.551 - 99pct: 2437.295 - 99.9pct: 2454.127 - 99.99pct: 2471.919 - Max: 2514.015
2023-03-30T10:44:23,317+0000 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 21123 msg --- 1001.5 msg/s --- 15.6 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 29.757 ms - med: 6.211 - 95pct: 184.282 - 99pct: 291.547 - 99.9pct: 395.231 - 99.99pct: 401.889 - Max: 401.919
2023-03-30T10:44:33,343+0000 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 31194 msg --- 1000.0 msg/s --- 15.6 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 5.752 ms - med: 5.473 - 95pct: 8.476 - 99pct: 12.134 - 99.9pct: 15.575 - 99.99pct: 17.059 - Max: 17.210
2023-03-30T10:44:43,370+0000 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 41229 msg --- 1000.2 msg/s --- 15.6 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 5.578 ms - med: 5.333 - 95pct: 8.018 - 99pct: 11.617 - 99.9pct: 15.135 - 99.99pct: 17.484 - Max: 18.207
2023-03-30T10:44:53,403+0000 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 51252 msg --- 999.9 msg/s --- 15.6 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 9.432 ms - med: 5.609 - 95pct: 10.061 - 99pct: 151.474 - 99.9pct: 236.355 - 99.99pct: 247.404 - Max: 247.406
2023-03-30T10:45:03,431+0000 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 61284 msg --- 1000.2 msg/s --- 15.6 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 20.195 ms - med: 5.689 - 95pct: 68.040 - 99pct: 420.437 - 99.9pct: 494.241 - 99.99pct: 502.949 - Max: 503.991
2023-03-30T10:45:13,449+0000 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 71310 msg --- 999.9 msg/s --- 15.6 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 5.849 ms - med: 5.455 - 95pct: 7.941 - 99pct: 12.086 - 99.9pct: 63.361 - 99.99pct: 71.320 - Max: 72.245
2023-03-30T10:45:23,467+0000 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 81329 msg --- 1000.1 msg/s --- 15.6 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 5.670 ms - med: 5.456 - 95pct: 8.137 - 99pct: 11.350 - 99.9pct: 15.747 - 99.99pct: 18.982 - Max: 19.997
2023-03-30T10:45:33,489+0000 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 91337 msg --- 999.1 msg/s --- 15.6 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 5.637 ms - med: 5.273 - 95pct: 7.788 - 99pct: 11.838 - 99.9pct: 56.972 - 99.99pct: 62.058 - Max: 63.124
2023-03-30T10:45:43,519+0000 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 101373 msg --- 1000.9 msg/s --- 15.6 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 9.363 ms - med: 5.334 - 95pct: 9.964 - 99pct: 173.578 - 99.9pct: 263.485 - 99.99pct: 267.847 - Max: 268.459
2023-03-30T10:45:53,544+0000 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 111399 msg --- 1000.1 msg/s --- 15.6 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 15.630 ms - med: 5.529 - 95pct: 46.508 - 99pct: 304.433 - 99.9pct: 356.841 - 99.99pct: 365.799 - Max: 366.427
...

6. Open a new terminal and run the following command to check the status of the ledgers created.

kubectl exec -n pulsar brokers-broker-0 -- bin/pulsar-admin topics stats-internal persistent://sherlock/pulsar/offload
Defaulted container "pulsar-broker" out of: pulsar-broker, init-sysctl (init)
{
"entriesAddedCounter" : 140879,
"numberOfEntries" : 140879,
"totalSize" : 349294691,
"currentLedgerEntries" : 31459,
"currentLedgerSize" : 75471471,
"lastLedgerCreatedTimestamp" : "2023-03-30T10:46:13.012Z",
"waitingCursorsCount" : 0,
"pendingAddEntriesCount" : 6,
"lastConfirmedEntry" : "3:31452",
"state" : "LedgerOpened",
"ledgers" : [ {
"ledgerId" : 0,
"entries" : 5000,
"size" : 21772995,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 1,
"entries" : 51570,
"size" : 126242484,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 2,
"entries" : 52856,
"size" : 125822400,
"offloaded" : false,
"underReplicated" : false
}, {
"ledgerId" : 3,
"entries" : 0,
"size" : 0,
"offloaded" : false,
"underReplicated" : false
} ],
"cursors" : { },
"schemaLedgers" : [ ],
"compactedLedger" : {
"ledgerId" : -1,
"entries" : -1,
"size" : -1,
"offloaded" : false,
"underReplicated" : false
}
}

7. Set a threshold so that Pulsar can automatically offload data once it is exceeded.

kubectl exec -n pulsar brokers-broker-0 -- bin/pulsar-admin topics offload --size-threshold 10M persistent://sherlock/pulsar/offload
Defaulted container "pulsar-broker" out of: pulsar-broker, init-sysctl (init)
Offload triggered for persistent://sherlock/pulsar/offload for messages before 6:0:-1

8. Check the offloading status.

kubectl exec -n pulsar brokers-broker-0 -- bin/pulsar-admin topics offload-status persistent://sherlock/pulsar/offload
Defaulted container "pulsar-broker" out of: pulsar-broker, init-sysctl (init)
Offload was a success

9. Go to the AWS console and navigate to the Buckets page. You should be able to see that the data has been successfully offloaded.

Conclusion

With tiered storage, you can optimize resource utilization, while ensuring that your data is securely stored and readily available when needed. I hope this guide can be helpful to those who want to install Pulsar on Kubernetes, and take advantage of Pulsar’s native features like tiered storage.

--

--