Real-Time Ingestion in Google Cloud Platform

One approach to handle stream data in GCP.

Anwar Shaikh
The Startup
6 min readSep 26, 2020

--

Contents

  1. Introduction
  2. Setup
  3. A word about Input Data
  4. Publishing message to Pub\Sub topic
  5. Stream Table in Bigquery
  6. Processing Data using Cloud Function
  7. Testing the Pipeline
fig. Real-Time-Ingestion-Pipeline

Introduction

In this post, we will look at how we can handle stream data in GCP. There are many ways to achieve this, we will look at one. We will be implementing our approach as we go forward. Assuming you are aware of GCP components. If you are not, please go through the below links to get an idea about pubsub, cloud function, storage.

Pubsub — https://medium.com/@anwarshaikh078/google-cloud-pub-sub-ba0cda2a1fbc

Cloud Function — https://medium.com/google-cloud/google-cloud-functions-python-overview-and-data-processing-example-b36ebde5f4fd

As you can see in the above image, our first step will be to publish a message to the pubsub topic. There are many options to do this, suppose you have to capture data from a mobile phone. You can just write a few lines of code that will capture and send data using the pubsub_v1 library. For our implementation, we will write a python script to publish a message to the pubsub topic. We will run that script in the cloud shell.

Once the topic gets a message, it forwards that message to its consumer. In our case, it will be a cloud function. The cloud function will be the one to process the data that we have received and write it to the Bigquery Table. We will also write it to the storage, for persisting incoming messages and handling errors that occur. Now you have data in your table, you can analyze and use it for reporting or any other task that you want to achieve.

This is a very simple architecture to handle stream data. Let's get started with the implementation part of it.

Setup

There are a few things we need to do, to start creating this pipeline.

  1. Set up a free Google Cloud Account.
  2. Create project.
  3. Enable Pub\Sub API for your project.
  4. Enable Cloud Function and Cloud Build APIs for your project.

A word about Input Data

We will be using the JSON format for data. Since there is a Biquery Streaming API that can write a JSON file directly to Biquery Table. We can also use XML or CSV data. JSON data is used most of the time to make API requests. Publishing a message is like calling a pubsub_v1 library function with data and parameters. That function internally makes an HTTP post request to the topic. As mostly use datasets, we will use employee data.

Publishing message to Pub\Sub topic

First, we need to create a pubsub topic. Once you have enabled pubsub API, creating a topic requires only 2 steps.

  1. Go to the pubsub homepage.
  2. Click on create PUB\SUB topic.
  3. Give a name to your topic. Encryption — Google Managed Key. Click on create a topic.

It’s done, we can now publish messages to pubsub topic.

Pic. Publisher.py code

Partition Table in Bigquery

The partition table in bigquery is a special table that is divided into segments called partitions, that make it easier to manage and query data. By dividing a large table into smaller partitions, you can improve query performance, and you can control costs by reducing the number of bytes read by a query.

You can partition BigQuery tables by:

  • Ingestion Time— Tables are partitioned based on the data’s ingestion (load) time or arrival time.
  • Date/Timestamp/Datetime — Tables are partitioned based on a TIMESTAMP, DATE, or DATETIME column.
  • Integer Range — Tables are partitioned based on an integer column.

We will use Ingestion Time partition for our case.

Processing Data using Cloud Function

The cloud function is a component of GCP and an excellent example of function as a service. Cloud Functions are a lightweight managed service that you can use to increase agility while using the cloud. They are serverless, so you can code without having to worry about managing or scaling servers. Cloud Functions integrate easily with Google Cloud Platform (GCP) services and you pay for resources only when your code runs. They are invoked by triggers that you specify and they standby waiting for specific events or HTTP endpoint calls.

Steps to create a cloud function.

  1. Go to Cloud Function Homepage.
  2. Click on Create Cloud Function.
  3. Function Name — Whatever you want to give, depends on you.
  4. Trigger — Select the trigger type as pubsub and select the pubsub topic you created earlier. This will create a subscription from the topic to function.
  5. In the Advanced tab, you will see Memory allocation set it to 128Mib and Timeout to 60 seconds. Timeout specifies the number of seconds your cloud function will be up. Maximum it can be up to 9mins.
  6. Next is the environment variables tab, we provide variables at runtime to the cloud function so that, we don't need to hardcode stuff such as project name, dataset, etc.
  7. Next is the Connections tab, select allow all traffic.
  8. Click on NEXT, now you will be asked to add your code. Here you can write code and give library requirements for it.
  9. Select Python 3.6 from the language tab.
  10. In the requirements tab, add google bigquery and google storage.

11. Use the below cloud function code, make sure you have given proper environment variables.

Pic. Runtime Variables

Once done click on create. It will take a few minutes to deploy the cloud function.

Testing the Pipeline

Now, its time to run what we created. Our publisher script is ready, the topic is done, the cloud function is done, bigquery table is in place.

Let's run the script on the cloud shell.

Go to the cloud shell editor, paste the above publisher script. The script needs you to have a pubsub_v1 library installed or else it will give an error.

Run python publisher.py on cloud shell, you will get the below result.

Pic. The result after running publisher.

Messages have been published to the topic.

Now you check the cloud function logs.

Pic. Cloud function logs

Excellent, now check Biquery Table and Storage.

Pic. BQ Table

As you can see in the BQ table we have entries for 101,102,103. These are the user_id we had published. Also, we have persisted data in google cloud storage. Check below,

Thus, we conclude that the pipeline got succeeded. You can optimize it for the production run.

My aim was to introduce you to how we can handle stream data in GCP. Hope I have achieved it, let me know in the comment sections.

Thanks, for reading this patiently.

--

--