Enterprise integration using Pub/Sub, Cloud Functions and Elasticsearch

Hi, in this article I’ll show an approach for enterprise level software integration using cloud services, the proposal is to create a fully integrated system without any hand-installed piece of software, what I mean is: We are about to create everything using cloud computing, without any software installation besides our client code. This is very worth, this software trend let you save a bunch of money used into infrastructure maintenance, specially in the enterprise level applications, take a look in the diagram below, the main idea is to use a messaging system to categorize published messages into classes without knowledge of which system is subscriber, then we’ll create a subscriber to consume our topic and integrate to the search index system. Our solution is based on Google Cloud platform. Elasticsearch is the search index, it is managed outside Google Cloud, but is hosted into Google Cloud in the same region of the other solutions. This article is a proof of concept, there is no specification about the JSON document that we’re gonna use in the message traffic, you might change by Product, Customers, Dogs, Asteroids and so on.

solution big picture

Used Technologies

  1. Elasticsearch

Elasticsearch provides a fully index environment, a very stable index server that over the years has proven its efficiency, it features flexible schema and fast retrieval of records, by definition: “Elasticsearch is a search engine based on the Lucene library. It provides a distributed, multitenant-capable full-text search engine with an HTTP web interface and schema-free JSON documents”. My decision was to try Elastic Cloud https://cloud.elastic.co, they provides a 14-days trial that is enough to our tests, but feel free to use any Elasticsearch installation since it is reachable on Internet.

Search Index characteristics:

  • Support for complex search expressions
  • Full text search
  • Stemming (reducing inflected words to their stem)
  • Ranking and grouping of search results
  • Geospatial search
  • Distributed search for high scalability

As I mentioned, this is a proof of concept, Elastisearch is used more as an intermediary or supplementary store than a primary database, in this example the records are orders, no matter what you are going to store is very important to know that It has low durability and poor security, there’s no innate authentication or access control, usually Elasticsearch is a good option to store data that needs very fast retrieves, for instance a product catalog.

2. Google Cloud PubSub

Google Cloud PubSub

Google Cloud Pub/Sub is an enterprise level message-oriented middleware, that provides topics and subscriptions, as a cloud service you don’t need to mess with messaging systems configurations, installations, etc. We know that messaging systems should be reliable with great performance, would be great point this responsibility to a reliable cloud service like Google Cloud. By its definition “provides many-to-many, asynchronous messaging that decouples senders and receivers, it allows for secure and highly available communication among independently written applications. Cloud Pub/Sub delivers low-latency, durable messaging that helps developers quickly integrate systems hosted on the Google Cloud Platform and externally.”

3. Google Cloud Functions

Since this guys are fully integrated why not to use Google Cloud Functions to triggered by PubSub? Google cloud functions has native integration with PubSub, and we’ll configure them to trigger our integration. Functions can be written in Node or Python a simple scalable piece of software

Let’s Deploy Elasticsearch

Below are the steps to create a new deployment in https://cloud.elastic.co.

Deployment creation on Elastic Cloud

After deployed select the instance and go to security menu in order to see the credentials, these credentials are needed to configure the Elasticsearch client in the next steps. Make sure you have selected Google Cloud, because this is a Google Cloud based article, we’re going to use GC products, and of course take a look to deploy everything in the same region.

Test Elasticsearch JS client

Ok, Before continue to the Cloud Function creation why not to test our Node/Elasticsearch integration? I used to write JS code in Visual Studio Code, but feel free to pick any other code editor.

There is a lib that provides a very good client for Elasticsearch, let’s init a new proof-of-concept project. Make sure you have NodeJS installed in your system.

Clone my PoC for Elasticsearch NodeJs client lib

git clone https://github.com/humbss/nodejs-elasticsearch-example.git

Make sure you have configured the connection.js file, it contains the elastic search credentials.

Creating a new index:

nodejs create-index.js products

To add a new document to the index execute

nodejs main.js products 1

Now let’s browse the index using the Elasticsearch API Console, login into cloud.elastic.co go to your deployment and open the menu item Elasticsearch / API Console

It worked, as you can see out client is ready to put elements in Elasticsearch, now we’re able to use this code snippet to implement our Google Cloud Function that will integrate the Pub/Sub to Elasticsearch.

Configuring the messaging system (Pub/Sub)

Let’s create a Pub/Sub topic, I suppose that you have a Google Cloud account for tests, since Google provides some initial budget for it. Go to the side menu and scroll down to Big Data / PubSub as the image below.

It will ask for a project and to enable the service go ahead and then create a new topic.

Caution: Publish–subscribe is a software design pattern, is very important to understand what it does and why to use it instead of a simple network socket client-server, this is very required before configure the broker or decide to use this kinda software. Click here for more concept information.

Setup a Pub/Sub Service Account and Publish!

We are about to publish our first message thru a NodeJs simple program, this step is optional remember that you could add messages directly through the Pub/Sub console in the Topic menu. To use Pub/Sub outside the Cloud Console we need to setup a Service Account, this procedure is very simple, go to the APIs & Services menu and select credentials page.

Click on “Create credentials” and select “Service account key”

In the service account configuration page select the Pub/Sub Subscriber role, this is because we’re going to use this account key to publish messages to our Pub/Sub topic.

Click on create and download the key txt file. Is very important to create an env variable that points to the key file, example:

export GOOGLE_APPLICATION_CREDENTIALS=~/Downloads/file.txt

Another option is create an .env file in your NodeJs project and add this env variable to this.

The client example is here: https://github.com/humbss/nodejs-pubsub-example

Let’s try it, here we go

As you can see, we are able to publish to topic, some messages has been added to our “products” topic.

Creating a Google Cloud Function to consume messages

Since Google Cloud Functions are fully integrated to Pub/Sub, let’s create one to simple print our arrived message to console. Go to Google Cloud Function and click “Create Function”

In the function setup session you should configure it’s name, memory allocated, and the most important part is the trigger, in our case we NEED to select Cloud Pub/Sub, and the topic that we made previously.

The default function body will print messages arrived in our Product topic.

/**
* Triggered from a message on a Cloud Pub/Sub topic.
*
* @param {!Object} event Event payload and metadata.
* @param {!Function} callback Callback function to signal completion.
*/
exports.helloPubSub = (event, callback) => {
const pubsubMessage = event.data;
console.log(Buffer.from(pubsubMessage.data, 'base64').toString());
callback();
};

If you public more messages using the NodeJs client and go to the function log, go to the function log and you’ll see the function been triggered

Great! We have the Pub/Sub topic, the NodeJs publisher and the Google Function consumer

First, we need to configure the Cloud Function package.json

{
"name": "sample-pubsub",
"version": "0.0.1",
"dependencies": {
"request": "^2.81.0",
"elasticsearch":"15.2.0",
"microtime":"3.0.0"
}
}

below is the function body, remember that it is a proof of concept, a minimal function.

var elasticsearch = require('elasticsearch');
var microtime = require('microtime')
var client = new elasticsearch.Client({
hosts: [
'https://elastic:PASSWORD@your_host.us-central1.gcp.cloud.es.io:9243'
]
});
exports.helloPubSub = (event, callback) => {
const pubsubMessage = event.data;
var persistOrder = (orderObj) => {
 client.index({
 index: 'products',
 id: microtime.now(),
 type: 'product_detail',
 body: messageObj
}, function (err, resp, status) {
if(status < 200 || status > 299) {
   console.error(status);
   callback(new Error('error processing obj id:'+messageObj.id));
} else {
   callback();
}
});
}
var messageObj = JSON.parse(Buffer.from(pubsubMessage.data, 'base64').toString());
persistOrder(messageObj);
};

After insert in Pub/Sub the function will be triggered and it inserts into Elastic Search. You can check the results in Elastic Search API console as showed below:

Stress Loading

Before start the stress loading let’s see the Elastic Cloud cluster config, currently using the trial I have the following cluster:

The cloud function is configured with the minimal size 128mb.

Important note:

“Cloud Functions can start multiple function instances to scale your function up to meet the current load. These instances run in parallel, which results in having more than one parallel function execution.

However, each function instance handles only one concurrent request at a time. This means while your code is processing one request, there is no possibility of a second request being routed to the same function instance, and the original request can use the full amount of resources (CPU and memory) that you requested.

Because concurrent requests are processed by different function instances, they do not share variables or local memory. This is discussed in detail later in this document.” source: https://cloud.google.com/functions/docs/concepts/exec

I didn’t find any property to enable, disable or even configure the concurrency, so we assume the function will run in parallel with the available power.

Index: Products
No. of entries: 1k

Results:

Index: Products
No. of entries: 2k

Index: Products
No. of entries: 10k

3 items are gone, not sure if Elastic Search problem, once our function is configured to reattempt failures, according to the logs every failure messages were reprocessed, the function considers failure every status messages < 200 and > 299.

that’s it, I hope this Article/PoC helps you with a new serverless paradigm, with this approach for fast integration and indexing.