Apache Pulsar Key Shared Mode-Sticky Consistent Hashing

Ankush Khanna
5 min readJun 11, 2020

--

Kafka Pulsar is a pub-sub message streaming service created by Yahoo.

Pulsar has been recognized as a great mix between traditional pub/sub and Apache Kafka. Without going into many details about Apache Pulsar, I would like to focus on Pulsar key-shared subscription mode.

In this article, we will cover:

  • Auto split hash ranges
  • Consistent auto split hash ranges (2.6.0)
  • Sticky hash ranges
  • Implementing consistent sticky hash ranges (2.7.0)

Subscription modes

There are many subscriptions modes which Apache Pulsar offers:

  1. Failover
  2. Exclusive
  3. Shared
  4. Key Shared

You can read more about them here.

Use cases for Key Shared mode

In key shared mode, messages with the same key are routed to the same consumer. Allowing you to have:

1.Better key-based local caches
2. Building aggregated state
3. Easy scalability for sharded services

There are two ways you can use key shared mode, auto and sticky hash. In auto hashing, pulsar distributes hashes across consumers and handles rebalancing. In sticky hashing, a user defines the hash range for the consumer, it provides a behavior where each consumer is bounded to the hash range and cannot consume any message outside its hash range. The sticky hash range can be used for sharded cache services.

Starting Pulsar locally

You can start Pulsar locally using the command below:

docker run -it -p 6650:6650 -p 8080:8080 \ 
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \ apachepulsar/pulsar:2.5.2 bin/pulsar standalone

Creating a partitioned topic

There is no need to create a topic explicitly. But in case you would like to have a partitioned topic, you can follow the steps below:

> docker ps

Copy the container id:

> docker exec -it <CONTAINER ID> bash> ./bin/pulsar-admin persistent create-partitioned-topic \
persistent://public/default/regions-partitioned \
--partitions 4

Producing message to a topic

Let’s start with a simple producer which pushes message with a key derived from the regions list.

You can use the following code as an example, the important part is to set batch builder as BatcherBuilder.KEY_BASED.

How keys are forwarded to a consumer in Key shared mode

Consumer

When a consumer starts in key shared mode, it provides hash ranges it would consume to Pulsar broker. This hash range can be determined automatically (auto hash mode) or can be manually set (sticky hash) by each consumer. The allowed hash range is from 0 to 65,535. (65,536 is the default hash range size)

Producer

When a producer pushes a message to Pulsar broker in key-based mode, Pulsar broker uses the following function to calculate the hash for the key:

Murmur3_32Hash.getInstance().makeHash(key) % 65536

and determines which consumer the key would be forwarded to.

For example, if consumer 1 announces a hash range from 0 to 10,000. It will get all messages whose key hashes between 0 and 10,000.

Please note that overlapping hash ranges in consumers would throw an exception.

Auto split hash consumer

You can use the following code as an example to consume using Auto hashed KeyShared mode.

When a consumer is added or removed, Pulsar auto rebalances the hash ranges between existing consumers.

Currently auto split hash splits on the largest hash range. The current implementation uses a mechanism to divide the hash space across the available consumers. This is based on dividing the currently assigned hash ranges when a new consumer joins or leaves. This can cause some issues as explained here.

Consistent auto split hash consumer

To overcome the above issues, Pulsar is introducing consistent hashing in auto split hash ranges. This feature would be available in 2.6.0 release.

Using auto split hash with consistent hashing provides equal distribution of load across consumers.

If you are not familiar with consistent hashing you can check out the following video.

Auto split hash ranges should cover most cases. But in certain scenarios, you might want to avoid auto rebalance or want to self assign your hash ranges to consumers. There is a possibility to provide this with sticky hash consumers.

Sticky hash consumer

You can start a simple sticky consumer, providing equal size hash range to each consumer. I have simply calculated hash range, by dividing the default hash size (65536) with the number of nodes.

For example, if you have 4 nodes:

  1. Consumer 1 hash range = 0 to 16383
  2. Consumer 2 hash range = 16384 to 32767
  3. Consumer 3 hash range = 32768 to 49151
  4. Consume 4 hash range = 49152 to 65535
4 Simple sticky consumers

Sticky hash range consumer code example

The obvious disadvantage is that the hash ranges are not equally spread. In case all your keys are hashed between 16384 to 49151, consumer 1 and 4 would be ideal, while consumer 2 and 3 would be consuming all the messages.

Sticky consumer consistent hashing

To solve the above-mentioned issue we can use consistent hashing in sticky consumers. This is not possible at this moment, you can check the bug report here. It has been fixed and would be released in 2.7.0.

I have implemented a class to provide consistent hash ranges based on the number of nodes, node index, and the number of points around the circle.

You can check the code for consistent hashing calculation here:

With consistent hashing, you can spread the hash ranges across different consumers, the diagram below shows hash range distribution for 4 consumers, with 4 number of points to calculate consistent hash ranges.

4 Sticky consumers, 4 points

I have used 4 points to calculate the consistent hash ranges for this diagram. But in practice, I would use around 100 points. With a high number of points, you can provide an equal distribution of hashes around nodes.

Below is the code for Sticky hash range consumer with consistent hashing

If Pulsar broker does not know which consumer to deliver a message to, all consumers would stop consuming. This can happen if we have missing hash ranges or one consumer is restarting. It might cause downtime when a node restarts.
For adding or removing nodes, it is required to first stop all consumers and then change the node index and count.

Sticky hash ranges are quite flexible and it is an interesting direction to explore for use cases that have strict requirements on how messages can be distributed across nodes.

You can check the complete code here

--

--