Backpressure controls in Google Cloud PubSub

Sushil Kumar
Google Cloud - Community
6 min readJul 26, 2023

Google Cloud PubSub is Google’s serverless offering allowing you to ingest petabytes of data without worrying about underlying infrastructure. PubSub is a really versatile piece of technology. You can use PubSub for streaming data analytics needs by hooking it up with systems like Dataflow, Dataproc, BigQuery or you can use PubSub to decouple event driven micro-services.

Although its a serverless offering, PubSub provides some options to change the way they experience this service.

Below are some of the configurations that can change the behaviour of PubSub service.

  1. Message Ordering Per-Key — You can configure your subscription to deliver messages in the order they were published. I’ve discussed this in another post.
  2. Exactly Once Delivery — Configure your subscription to deliver a message only once and only redeliver if the ack-deadline exceeds or there is explicit negative acknowledgement.
  3. Exactly Once Processing — In conjunction with Dataflow you can also achieve exactly once end to end processing. Read more about it here.
  4. Exponential Retries and Dead Lettering — PubSub allows to re-deliver messages with exponential back-off making sure re-deliveries are not overwhelming the consumers. It also allows to move the messages to a Dead Letter Topic (DLT) if a certain number of retries are exhausted.
  5. Flow Control — PubSub pull consumer/subscriber SDK allows to stop ingesting the messages if a certain number of messages are still un-acknowledged. This is something which we are going to discuss in this post.
  6. Push Backoff — In case of push based subscribers, there is a built-in mechanism to lower the delivery rate by PubSub in-case the subscriber is sending negative acknowledgements or is missing acknowledgement deadlines.
Source : Generated with Midjourney

Need for back-pressure controls

Imagine a situation where you have a PubSub subscriber and you have set the size of VM (CPU and Memory) based on the steady state of throughput of incoming messages. You have everything setup for optimum infrastructure utilisation. Now if there happen to be any transient spike in incoming traffic our subscriber will start pulling in more messages without taking into account that the underlying resources (CPU and Memory) are already stretched thin. This situation can cause the latency of processing the messages shoot up and further degrade the situation.

In summary this is a situation where the production rate is higher than the consumption rate. Since this is a decoupled scenario and the consumer can’t ask the producer to decrease the production rate, but the consumer should be able to indicate it to PubSub to reduce the delivery rate. This in software systems is called applying back-pressure.

Back-pressure controls in subscribers

Pull Subscribers

Pull subscribers use the PubSub SDK and it has a mechanism called Flow Control. Flow Control settings in SDK allows a consumer to stop pulling in the messages if a pre-configured threshold of number of messages or size of messages un-acknowledged is breached.

Push Subscribers

Since push subscribers do not have any library/SDK dependency, the back-pressure control is built in PubSub service itself. There is a mechanism called Push Backoff, that kicks in if PubSub detects that consumer is unable to keep up with the push rate.

Let’s see it all in action

In this post we’ll see an example for Flow Control in Pull subscribers.

To start off, we will create a topic and a pull subscription attached to it. Next I’ll create a consumer and synthetically add delays to slow down the consumption and then see how enabling flow control helps our use case.

Demo Setup

PubSub Topic

gcloud pubsub topics create pubsub-backpressure-topic

PubSub Subscription

gcloud pubsub subscriptions create pull-subscription --topic=projects/PROJECT-ID/topics/pubsub-backpressure-topic

Pull Subscriber with Flow Control

@Component
@Slf4j
public class ConsumerWithFlowControl implements CommandLineRunner {

private final PubSubSubscriberTemplate pubSubTemplate;
private final String subscription;

@Autowired
public ConsumerWithFlowControl(PubSubSubscriberTemplate pubSubTemplate, @Value("${gcp.subscription}") String subscription) {
this.pubSubTemplate = pubSubTemplate;
this.subscription = subscription;
}

@Override
public void run(String... args) throws Exception {
log.info("Starting the consumer...");
pubSubTemplate.subscribe(subscription, (pubsubMessage) -> {
String message = pubsubMessage.getPubsubMessage().getData().toString(StandardCharsets.UTF_8);
log.info("Got a message : " + message);
try {
/*
Sleep to simulate a delay in message processing. This will cause outstanding messages to pile up and
consumer will stop pulling in more messages.
The delay is smaller than 10 sec which is
*/
Thread.sleep(11000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
pubsubMessage.ack();
});
}
}

The Beans for this subscriber to work are below.

@Configuration
public class SubscriberConfig {

@Value("${spring.cloud.gcp.project-id}")
public String projectId;

@Value("${gcp.subscription}")
public String subscription;

@Value("${spring.cloud.gcp.pubsub.subscriber.flow-control.max-outstanding-element-count}")
public long maxOutstandingElement;

public PubSubConfiguration configuration() {
PubSubConfiguration conf = new PubSubConfiguration();
conf.initialize(projectId);
return conf;
}


@Bean
public PubSubSubscriberTemplate pubSubSubscriberTemplate(PubSubConfiguration configuration) {
DefaultSubscriberFactory factory = new DefaultSubscriberFactory(() -> projectId, configuration);
factory.setFlowControlSettings(
FlowControlSettings.
newBuilder()
.setMaxOutstandingElementCount(maxOutstandingElement)
.build());
factory.setMaxAckExtensionPeriod(Duration.ofMinutes(10L));
return new PubSubSubscriberTemplate(factory);
}
}

The configuration driving the bean configuration is below.

gcp:
subscription: pull-subscription

spring:
cloud:
gcp:
pubsub:
subscriber:
flow-control:
max-outstanding-element-count: 5
project-id: PROJECT_ID

As you can see we have configured that max-outstanding-element-count should be 5, i.e. if there ≥5 messages un-acknowledged the subscriber will stop pulling in messages.

Will all the configuration out of the way let us now try to see it in action.

I’m running my consumer on local machine and will use gcloud to publish messages to my topic.

The script to publish messages is given below.

for i in {1..20}; do gcloud pubsub topics publish projects/PROJECT_ID/topics/pubsub-backpressure-topic --message=$i ; done

You’ll see that since you have added a delay of 11 sec consumer will stop pulling in messages until these 5 messages are acknowledged. It will then pull in next bunch of 5 messages and rinse and repeat.

Here are the logs from my subscriber

2023-07-26T09:48:53.185+05:30  : Got a message : 1
2023-07-26T09:48:53.185+05:30 : Got a message : 3
2023-07-26T09:48:53.185+05:30 : Got a message : 2
2023-07-26T09:48:53.860+05:30 : Got a message : 4
2023-07-26T09:48:56.577+05:30 : Got a message : 5
2023-07-26T09:49:04.578+05:30 : Got a message : 12
2023-07-26T09:49:04.892+05:30 : Got a message : 11
2023-07-26T09:49:05.176+05:30 : Got a message : 6
2023-07-26T09:49:06.133+05:30 : Got a message : 7
2023-07-26T09:49:08.941+05:30 : Got a message : 8
2023-07-26T09:49:16.848+05:30 : Got a message : 9
2023-07-26T09:49:16.858+05:30 : Got a message : 15
2023-07-26T09:49:16.858+05:30 : Got a message : 13
2023-07-26T09:49:17.652+05:30 : Got a message : 10
2023-07-26T09:49:20.166+05:30 : Got a message : 14
2023-07-26T09:49:28.915+05:30 : Got a message : 16
2023-07-26T09:49:28.981+05:30 : Got a message : 18
2023-07-26T09:49:29.129+05:30 : Got a message : 19
2023-07-26T09:49:29.139+05:30 : Got a message : 17
2023-07-26T09:49:31.457+05:30 : Got a message : 20

You can see that first 5 messages are pulled in and the the consumer waits around 11 sec to pull in next messages.

To really make sure that flow control is working, you can disable the flow control and see that all 20 messages are delivered at once (barring some delay in delivery itself).

Removing this setting will disable the flow control.

factory.setFlowControlSettings(FlowControlSettings.newBuilder().setMaxOutstandingElementCount(maxOutstandingElement).build());

You can see that the ingestion doesn’t stop if there is no flow control in place.

2023-07-26T10:28:18.655+05:30  : Got a message : 4
2023-07-26T10:28:20.443+05:30 : Got a message : 1
2023-07-26T10:28:23.593+05:30 : Got a message : 2
2023-07-26T10:28:23.593+05:30 : Got a message : 3
2023-07-26T10:28:27.648+05:30 : Got a message : 6
2023-07-26T10:28:29.664+05:30 : Got a message : 7
2023-07-26T10:28:31.447+05:30 : Got a message : 8
2023-07-26T10:28:34.596+05:30 : Got a message : 5
2023-07-26T10:28:34.597+05:30 : Got a message : 9
2023-07-26T10:28:38.654+05:30 : Got a message : 10
2023-07-26T10:28:40.667+05:30 : Got a message : 11
2023-07-26T10:28:42.460+05:30 : Got a message : 12
2023-07-26T10:28:45.600+05:30 : Got a message : 13
2023-07-26T10:28:45.602+05:30 : Got a message : 14
2023-07-26T10:28:49.658+05:30 : Got a message : 15
2023-07-26T10:28:51.672+05:30 : Got a message : 16
2023-07-26T10:28:53.463+05:30 : Got a message : 17
2023-07-26T10:28:56.607+05:30 : Got a message : 18
2023-07-26T10:28:56.608+05:30 : Got a message : 19
2023-07-26T10:29:00.663+05:30 : Got a message : 20

So FlowControl can really help a consumer that is experiencing some delays in processing by stopping the message inflow and giving some breather to the consumer till all the backlog is cleared.

In the upcoming post, we’ll take a look at Push Backoff for push subscriptions and how that works.

If you find any bug in the code or have any question in general, feel free to drop a comment below.

Also don’t forget to delete all of your cloud resource otherwise you might get a shock of huge bill 😅

Till then happy coding :)

--

--

Sushil Kumar
Google Cloud - Community

A polyglot developer with a knack for Distributed systems, Cloud and automation.