Publish and receive Google Pub/Sub message in Node.js

Muhammad Valdie Arsanur
8 min readJun 19, 2022

--

Google Pub/Sub product logo

This post show how to implement google pub sub in node.js. I cover this topic with the problem and solution so we have better understanding not only the integration Google Pub/Sub, but also the use case of this product.

Problem

Let’s say we have a system that provide user registration. This is basically an endpoint that accpet json data contain user registration data. For simplicity, let’s say it consists of name, email and password (let’s say we are not require to hash the password).

The system requires to send user a notification (a registration confirmation) once they finish their registration on the system, let’s say it sends an email, thus it will be depend on the mail server to execute the email notif.

The problem when the volume of traffic increase and the email server reliability is unknown, syncronous registration process will make the endpoint become busy. So we will find a way to decouple the registration logic itself with the mail notification service.

Pub/Sub pattern

Imagine that we have two systems that need to be communicated, system A, and system B. In a simple architectural pattern, system A could simply call system B directly, but what happens when:

  • The system B is unavailable, or they change their communication language? System A will not able to communicate with system B anymore, unless we do some refactoring in system A, adjust with the system B change.
  • There are conditions where system B is overheated, system A needs to wait until system B becomes available, thus there is some waiting time in system A that makes system A potentially overheat.

We can call those use cases synchronous communication. There’s a way that we can make those systems not depend on each other by using asynchronous communication.

Pub/Sub is one of asynchronous communication pattern. This governs how to exchange messages from a certain point to another via message broker. It does not require the sender and receiver to know each other, but requires them to be a publisher and subscriber.

To understand deeper, we can refer to the diagram shown below. Let’s say that system A acts as a publisher sent a message by categorizing it as a topic, say “topic X”. Then the message broker finds any subscriber that appropriate to receive the message. Let’s say that system B subscribes for a “topic X”, so system B will then receive the message and perform an action based on the message.

Google Pub/Sub

Google Cloud Pub/Sub is a messaging service that enable us to implement a Pub/Sub pattern with latencies on the order of 100 milliseconds. The overall process of Google Pub/Sub illustrated in the diagram below.

Google Pub/Sub flow (source: https://cloud.google.com/pubsub/docs/overview)
  1. Publisher create and send message to Google Pub/Sub through a determined topic. The message itself contains a data and attribute (optional).
  2. Each data that send to the Pub/Sub will be stored in a Message Storage. One of pricing component in the Pub/Sub is storage.
  3. There is a subscription that distributes data from the Pub/Sub storage.
  4. Message will be then consumed by a Subscriber through their subscription.
  5. As the message received by the subscriber, they need to send a signal to Pub/Sub that the message is successfully received. It’s called an Acknowledgement (or “ack”). Acked messages are removed from the subscription’s message queue.

Solution architecture

As we already understand high level of Google Pub/Sub, we can now focus on the solution. The diagram below describe the solution workflow that implement Google Pub/Sub. For simplicity, I will not provide a detail solution in a fade coloured (dotted line) component, instead we will focus on how publish and subscribe a data through Google Pub/Sub.

Integrate Pub/Sub

I provided a complete source code to cover a Registration System (see: no. 1 in diagram) and Notification System (see: no. 3 in diagram) in one repository below.

1.a. Registration API

First, clone the repository.

git clone https://github.com/valdiearsanur/google-pub-sub-notification.git & cd google-pub-sub-notification

After change the working directory to the repo dir, create a “Low Spec” compute engine instance.

gcloud compute instances create node-instance \
--machine-type=e2-micro \
--preemptible \
--boot-disk-size=10GB \
--boot-disk-type=pd-standard \
--tags=http-server \
--zone us-central1-f \
--scopes userinfo-email,cloud-platform \
--metadata-from-file startup-script=./ce-script-api.sh

The project will be then live in port 3000, thus we need to configure the firewall in order to enable public traffic to the port.

gcloud compute firewall-rules create default-allow-http-3000 \
--allow tcp:3000 \
--source-ranges 0.0.0.0/0 \
--target-tags http-server \
--description "Allow port 3000 access to http-server"

The result of creating a Compute Engine will enclose a External IP address. We will require the address to hit the application endpoint.

....
NAME: node-instance
ZONE: us-central1-f
MACHINE_TYPE: e2-micro
PREEMPTIBLE: true
INTERNAL_IP: XX.XXX.XXX.X
EXTERNAL_IP: XX.XXX.XXX.X <---- copy this

Hitting the endpoint could be performed using CURL. In this case, we will test the API by hit the /registration endpoint.

curl -i -X POST http://<VM-EXTERNAL-IP>:3000/registration/ \
-H 'Content-Type: application/json' \
-d '{"name":"john1","email":"john1@example.com", "password":"123"}'

Last, check the service account that reserved from VM creation. We can use command below with specify correct instance name, and the zone.It will be output this format XXXXXX-compute@developer.gserviceaccount.com

gcloud compute instances describe node-instance \
--zone
us-central1-f \
--format="table(serviceAccounts.email)"

Remember the VM External IP address and service account that created from the Compute Engine Instance

1.b. Code Review

The actual source code can be seen on this URL https://github.com/valdiearsanur/google-pub-sub-notification/blob/master/index.js.

Rather than explain the whole source code, this topic will focus on how to publish the message via Pub/Sub. The first step to do is actually import a Pub/Sub library.

const {PubSub} = require('@google-cloud/pubsub')

Then we initialize the PubSub and prepare a topic id that we store via environment variable.

const pubSubClient = new PubSub()
const pubSubTopicId = process.env.TOPIC_ID

Publishing messages can be done through publishMessage in code below. The function consume object format contains data and attributes. Please note that data should be a Buffer format (like a string format) and attribute should be an object format. Data is actually message that need to be send, while attribute is a property that can be used for filtering in the consumer side.

const message = Buffer.from(JSON.stringify(user))
const attributes = {'kind': 'registration'}
const messageId = await pubSubClient
.topic(pubSubTopicId)
.publishMessage({
'data': message,
'attributes': attributes
})
console.log(`Message ${messageId} published.`)

The signal that the app given when the message successfully sent is from the console log. The output should be look like this

Message XYZ published.

2. Setup Pub/Sub

We begin with creating a Pub/Sub topic. Let’s say we set the name nodeapp-pubsub. The result of command below will be form a full ID of topic projects/<project-name>/topics/pubsub.

gcloud pubsub topics create pubsub

Then, we create a subscription from that pointed to last created topic. Let’s say we set the name pubsub-sub. The result of command below will be form a full ID of topic projects/<project-name>/subscriptions/pubsub-sub.

gcloud pubsub subscriptions create pubsub-sub \
--topic=pubsub \
--ack-deadline 10

3. Notification Service

For notification system, we will also create a VM with the same script as the API server, but the differences are:

gcloud compute instances create node-notification \
--machine-type=e2-micro \
--preemptible \
--boot-disk-size=10GB \
--boot-disk-type=pd-standard \
--zone us-central1-f \
--scopes userinfo-email,cloud-platform \
--metadata-from-file startup-script=./ce-script-notification.sh

2.b. Code Review

The actual source code can be seen on this URL https://github.com/valdiearsanur/google-pub-sub-notification/blob/master/notification.js.

It begin with the pubsub initialization that used defined subscription id.

const pubSubClient = new PubSub()
const subscriptionId = process.env.SUBSCRIPTION_ID
const subscription = pubSubClient.subscription(subscriptionId)

Then, simply use even listener to handling incoming message, we are using message in our case. Regarding to the documentation, there are actually 3 (three) available event that provided: message (Upon receipt of a message), error (upon receipt of an error) and close (Upon the closing of the subscriber).

subscription.on('message', messageHandler)

The event will point to the function where we handle the incoming message. Below is the snippet code of how we use information provided by the function. Don’t forget to Acknowledge the message as the same message will be sent again (remember ack-deadline when creating Pub/Sub subscription) if they are not acknowledged.

const messageHandler = message => {
console.log(`New message ${message.id}:`)
console.log(`\tData: ${message.data}`)
console.log(`\tAttrs: ${JSON.stringify(message.attributes)}`)
messageCount += 1
message.ack()
}

While testing using the Test Drive format, the output should be print like this:

New message XYZ:
Data: {"name":"john1","email":"john1@example.com","password":"123"}
Attributes: {"kind":"registration"}

Test drive

We will hit Registration Endpoint (that provided in step 1. Registration API). And we will check whether the Notification Service (that configured in step 3. Notification Service) receive the message and hopefully can perform a desired bussiness logic regarding the architecture diagram. We can use our local machine to hit the endpoint with the command below :

curl -i -X POST http://<VM-EXTERNAL-IP>:3000/registration/ \
-H 'Content-Type: application/json' \
-d '{"name":"john1","email":"john1@example.com", "password":"123"}'

We can see the result from executed curl command like the picture below.

As the endpoint hits and publishes a message to a certain topic, the notification service will pull the message through the topic’s subscription. We can check in the node-notification instance.

As the source code in the github repository has a timeout, we can run the script again by performing ssh to the server, then execute the command below. As we already hit the endpoint previously, we can hit it again in our local to give the actual output in the ssh console.

cd /opt/app/new-repo
node node notification.js

The output of ssh console should be formed like the picture below.

In the end, as we successfully pull a message through the app, we can customize the notification regarding our business requirements, like sending email, publishing a notification, performing data processing, etc.

Clean up (optional)

To avoid incurring charges to the GCP account, please use the command below to delete resources used in this guide.

gcloud compute instances delete node-instance --zone us-central1-f
gcloud compute instances delete node-notification --zone us-central1-f
gcloud pubsub subscriptions delete pubsub-sub
gcloud pubsub topics delete pubsub

--

--