GCP Cloud Engineering Project — Part 1 — Google Cloud Storage, BigQuery, Functions, Scheduler, Pub/Sub

Dogukan Ulu
6 min readOct 27, 2023

--

In this article, we will go through the code blocks and I will explain the whole procedure. For all the processes regarding:

  • Creating Pub/Sub topic to be used by both Google Functions and Google Scheduler.
  • Creating a new Google function to run both scripts in the Google Cloud environment.
  • Creating a new job for Google Scheduler to run the script regularly

You may see the below article:

Tech Stack

  • Google Cloud BigQuery
  • Google Cloud Storage
  • Google Cloud Scheduler
  • Google Cloud Functions
  • Google Cloud Pub/Sub
  • Python
  • SQL
  • API

Overview

The project consists of two main parts. For the second part, I will not declare the public bucket URL but will explain all details (what it explains, etc.) and the structure of the data itself along with all the necessary steps to handle. Your data and its structure in the bucket will be different, but this project stands for explaining a real-life issue and finding solutions.

First part: The free API (currency API) will be used to pull necessary exchange rates into the BigQuery table. This application will run with Google Cloud Functions and will be running regularly with Google Cloud Scheduler. Daily retrieved data will be kept historically in the target table as well. The target BigQuery table (currency_rate) will have the date, currency code, and rates columns. The main target of this table is converting the other currencies to USD.

Second part: The data structure of the parquet files located in the public GCS bucket is as follows:

The second part of the pipeline also follows the below guidelines and information.

  • One parquet file comes daily.
  • The app checks all the files in the bucket every day.
  • If there is a new file that has not been imported into BigQuery before, the application imports that file into BigQuery.
  • One record in the breakdown of dt, network, currency, and platform is to be added to the target table. If there is more than one record with that breakdown in the file, the record with the highest cost is included.
  • There is at most one record in the target table in terms of dt, network, currency, and platform. If the cost of the record in the new incoming file is higher than the cost in the target table, the record in the target table is updated with the one in the file. If the new cost is low, no updates are made.
  • A field named cost_usd is added for each row when transferring records to the target table. The data to be transferred to this field is calculated using the latest exchange rate information from the currency_rate table and the cost field in the row. The most recent currency rate is taken into consideration.
  • If there is a previously processed file and this file is no longer in the bucket, if there are records processed with this file (inserted and updated), these records are deleted from the target table.
  • Files previously processed with the same name are not processed again in new runs. Ad Network sometimes refreshes the file with the same name but does not change its content, so this file is ignored and not processed again. However if the file name changes, the records processed from the previous file are deleted, and the records in the new file are processed.
  • This application runs with Google Cloud Functions and runs regularly with Google Cloud Scheduler.

Retrieve Data from the API and Upload to BigQuery

First things first, we have to define the requirements.txt for the requirements part in Google Functions.

Then, we have to define some environmental variables on the Functions creation page. We have to import necessary libraries and define global variables (including BigQuery client).

import os
import base64
import requests
import pandas as pd
from google.cloud import bigquery

# Obtain the environmental variable defined in Cloud function
currency_api_token = os.getenv('currency_api_token')
project_id = os.getenv('project_id')
bucket_name = os.getenv('bucket_name')
dataset_name = os.getenv('dataset_name')

# Global variables
client = bigquery.Client()

table_name = 'currency_rate'
table_id = f"{project_id}.{dataset_name}.{table_name}"

After the first step, we have to create the BigQuery table if it doesn’t already exist.

def create_currency_rate_table():
"""
Creates the currency_rate table if not exists
"""
sql = f"""
CREATE TABLE IF NOT EXISTS `{project_id}.{dataset_name}.{table_name}` (
currency_code STRING,
date DATETIME,
rates FLOAT)"""

job_config = bigquery.QueryJobConfig()

table_ref = client.dataset(f"{dataset_name}").table(f"{table_name}")
job_config.destination = table_ref

query_job = client.query(sql, job_config=job_config)

query_job.result()

Since the retrieved JSON will have some redundant fields, we have to remove them.

def create_dict(response) -> dict:
"""
Obtains the response from currency API and creates the dictionary accordingly
"""
response_dict = eval((response.text).replace('true', 'True', 1)) # Compatibility with Python
del response_dict['valid'], response_dict['base'] # Unnecessary fields

return response_dict

We have to transform this dictionary to a pandas data frame to be able to upload it to the BQ table.

def create_dataframe_from_dict(response_dict:dict):
"""
Creates a pandas dataframe from the response dictionary
"""
df = pd.DataFrame.from_dict(response_dict)
df.reset_index(inplace=True)
df.rename(columns={"index": "currency_code", "updated": "date"}, inplace=True)
df['date'] = df['date'].astype('datetime64[s]') # Necessary for compatibility with GCP

return df

This data frame doesn’t need any modification. Therefore, we can directly upload the whole data frame into the table we already created. We will create a suitable schema for the table and will use the BigQuery client.

def load_dataframe_to_table(df, client, table_id):
"""
Defines a job configuration and loads the pandas dataframe to BigQuery table currency_rates
"""
job_config = bigquery.LoadJobConfig(
schema=[
bigquery.SchemaField(f"{df.columns[0]}", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField(f"{df.columns[1]}", bigquery.enums.SqlTypeNames.DATETIME),
bigquery.SchemaField(f"{df.columns[2]}", bigquery.enums.SqlTypeNames.FLOAT),
],
)

try:
job = client.load_table_from_dataframe(
df, table_id, job_config=job_config)
job.result()

print("Dataframe was loaded successfully")
except Exception as e:
print(Exception, e)

After all, we can create the main method we will be using as the endpoint of Google. We can name our function as a generic name send_currency_to_bigquery and set the endpoint the same.

def send_currency_to_bigquery(event, context):
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
pubsub_message = base64.b64decode(event['data']).decode('utf-8')
print(f"Target of the function: {pubsub_message}")

try:
url = f'https://currencyapi.net/api/v1/rates?key={currency_api_token}'
response = requests.request("GET", url)
except Exception as e:
print(Exception, e)

response_dict = create_dict(response)
df = create_dataframe_from_dict(response_dict)

load_dataframe_to_table(df, client, table_id)

try:
table = client.get_table(table_id)
except Exception as e:
print(Exception, e)

print("{} rows inserted into {} \n There are {} rows and {} columns in {}"
.format(len(df), table_id, table.num_rows, len(table.schema), table_id))

After all, we can query the table in the BigQuery console using SQL. We will see that the table is created successfully and data is loaded into it. We can configure the Google Scheduler using the article at the very top.

This brings us to the end of the first part of the project. Please stay tuned for the second part!

Hope it helps, thanks for reading :)

Please reach out via Linkedin and Github, all comments are appreciated 🕺

--

--