PubSub message filter: small feature for big improvements

guillaume blaquiere
Google Cloud - Community
4 min readJun 9, 2020

On Google Cloud, PubSub is the event message queuing platform. Serverless, global, high throughput (up to 1 millions of message per seconds), affordable, customizable retries, PubSub is one of pillars of all applications on Google Cloud.

You can use PubSub as an event bus in your architecture and asynchronously trigger different other serverless services, such as Cloud Run, Cloud Function or App Engine. You can also consume these events in streaming with Dataflow.

There is a Youtube video series on PubSub that explains the core behavior

The missing feature

Features have been regularly added to the PubSub (Push subscription identity, Dead Letter topic, snapshots) in recent months but one more was missing: the capability to filter the messages.

Indeed, when you trigger a compute component, such as Cloud Run, Cloud Function or App Engine, you want to call it for doing the right job and not on useless messages.

Message filtering capability

As simple as can be this feature, you can now filter the PubSub events on the message attributes.

  • Check the presence of an attribute, whatever its value
  • Check the exact value of an attribute
  • Check the prefix value of an attribute
  • Binary compose (AND, OR, NOT operators) these 3 checks

You can use the command line or the console to set this filter.

Limitations

Today (June 2020), the feature has been just released in Beta and there is some limitations.

You can’t filter on the payload content only attributes are available.
In addition, you can’t define filter on more complex expressions than hasPrefix, like a regular expression for example. This feature could help to filter on the blob name suffix, for example, to know the extension value.

Moreover, only the PubSub defined subscription can accept filters. If you directly plug a Cloud Functions on a PubSub topic ( --trigger-topic=<topic> in command line parameter), you can’t define a filter, yet.
The solution is to define the Cloud Functions HTTP ( --trigger-http in command line parameter), and to define a PubSub push subscription on it.

Finally, you can’t update the filter. You have to delete and create again the subscription with the filter update.

To explain why this feature is great, we can take one of the most common use case.

Cloud Storage events management and blob processing

Cloud Storage use case

In this use case example, you want to perform a load job into BigQuery of blob created into gs://mybucket/path/to/directory/ directory.

First, you can publish Cloud Storage events on PubSub, that means every time that a blob is created, deleted, updated (metadata), an event is published into PubSub.

Without message filter capability

Without filter capability, the problem is that any events on the Cloud Storage bucket are published into PubSub and not only the events of the desired directory. Therefore, until now, you had to this

if (It's a blob creation) AND (it's the correct directory){
Perform the job
}
exit

The issues of this process are

  • You have to implement the check in your code and maintain this part.
  • You have to repeat these few lines of code in all your event handlers that require filtering/selection.
  • The processing part is responsible of the check and selection of the correct blob to process. It shouldn’t be its concern.
  • Your compute services (Cloud Run, Cloud Function, App Engine) are called for a very short processing for nothing. You are charged on the number of calls and for the processing time, rounded to a max value (100 ms for Cloud Functions and Cloud Run, 15 minutes for App Engine)

Notes: Cloud Run, Cloud Functions and App Engine offer a generous free tier and for low traffic, there isn’t any additional cost.

Improvement with message filter

Now, the Cloud Storage use case is simpler. You can remove the check code and define a simple filter like this before calling your https://myEndpoint/
For example, in the console

Or with command line

gcloud beta pubsub subscriptions create \
--topic=<myTopic> \
--push-endpoint=https://myEndpoint/ \
--message-filter='attributes.eventType="OBJECT_FINALIZE" AND \
hasPrefix(attributes.objectId,"path/to/directory/")'\
<mySubscriptionName>

That’s all. Now, your https://myEndpoint/ is called only when an event matches with the filter, here a file creation in the path/to/directory directory.

Filter more to process smarter

During a long time, I begged Google for having this feature. Now, I have it and it simplifies a lot of workloads and decreases the processing time!

A very good improvement for a great product. I strongly recommend you to have a try on it.

--

--

guillaume blaquiere
Google Cloud - Community

GDE cloud platform, Group Data Architect @Carrefour, speaker, writer and polyglot developer, Google Cloud platform 3x certified, serverless addict and Go fan.