Read Data from GCS Bucket and Write to BigQuery using Google Cloud Scheduler and Functions

Dogukan Ulu
7 min readMay 21, 2023

--

Moving data from some sources to our warehouse or database is one of our main goals as data engineers. In this article, I will explain how to read parquet files from a public Google Cloud Storage bucket. Then, the data can be modified, but I will only show how to insert the whole data into the BigQuery in this article. Then, we will save the script as Google Cloud Function. In the end, I will run this script regularly using Google Cloud Scheduler.

Pub/Sub

The very first thing we have to do is to create a Pub/Sub topic. The reason is that we will be using Pub/Sub for the Google Cloud Scheduler.

After typing Pub/Sub on the search bar, we can click on Create Topic. We can create the topic with the below configurations.

With all these, we have the Pub/Sub topic ready. This topic will be used to buffer the messages between Cloud Scheduler and Cloud Functions.

Cloud Functions

After typing Cloud Functions on the search bar and clicking on Create Function button, we can configure the function as below.

Then, we have to Add Trigger to be able to run the script regularly. We should choose Pub/Sub trigger and configure the trigger as follows:

We can choose the topic name as the topic we already created (read_gcs_write_bq_topic). After configuring the trigger, we have to define runtime parameters.

We better set the timeout as the maximum value just in case. We have to also define the environmental variables that we will be using for this specific example. We can define them above with their names and values. We will get them within the code as follows.

project_id = os.environ.get('project_id')
bucket_name = os.environ.get('bucket_name')
dataset_name = os.environ.get('dataset_name')
table_name = os.environ.get('table_name')

After clicking Next, we can choose Python and a suitable version. We will populate the content of the script and requirements later. We can now dive deep into the script itself. After all, we will also configure the Google Scheduler.

The first thing we have to do is import the necessary libraries. Explanations are next to them.

import os # to get env vars
import gcsfs # this will help us get GCS file system from public bucket
import pandas as pd
from pyarrow import parquet as pq # this will help us read parguet files
from google.cloud import storage, bigquery # these will create BigQuery and GCS clients

Since we work in GCP, we can define them when configuring Google Functions as explained above. We can get them as follows.

project_id = os.environ.get('project_id')
bucket_name = os.environ.get('bucket_name')
dataset_name = os.environ.get('dataset_name')
table_name = os.environ.get('table_name')

Since the main target is uploading data into the BigQuery table, we have to first create the BQ table with the client. This client will also help us run queries against all the tables located inside BigQuery. Since we will get the data from the GCS bucket, we also have to create a GCS client. We will also define the bucket here.

# Initialize the Google Cloud Storage and BigQuery clients
storage_client = storage.Client(project=project_id)
bq_client = bigquery.Client(project=project_id)

# We can reach out to the public bucket using storage_client
bucket = storage_client.bucket(bucket_name)

Moving over to the operations after defining the necessary variables, we need to create the BigQuery table if it doesn’t exist according to our desired schema :

my_query = f"""
CREATE TABLE IF NOT EXISTS `{project_id}.{dataset_name}.{table_name}` (
first_column DATETIME,
second_column STRING,
third_column FLOAT,
...)"""

job_config = bigquery.QueryJobConfig()

table_ref = bq_client.dataset(f"{dataset_name}").table(f"{ad_network_table}")
job_config.destination = table_ref

query_job = bq_client.query(my_query, job_config=job_config) # query the table

query_job.result() # wait for the execution to end

After creating the BQ table, we can now read the files from the public GCS bucket. We will save the parquet file as a pandas data frame:

gcs_uri = f"gs://{bucket_name}/{file_name}" # file name will end with .parquet
fs = gcsfs.GCSFileSystem(project=project_id)
dataset = pq.ParquetDataset(gcs_uri, filesystem=fs)
table = dataset.read()
df = table.to_pandas() # we will use this pandas dataframe to insert the records to BQ table

Now, we have a pandas data frame that stores the data coming from the GCS bucket. We can run many queries like SELECT, INSERT, or UPDATE with the help of BigQuery Client. Since we already created the BQ table, we can see the records of it with the following code block:

select_query = f"""SELECT * FROM `{project_id}.{dataset_name}.{table_name}`"""
# we will trigger the BQ table with a query_job
query_job = bq_client.query(select_query)
# we will save the result of the query as a pandas dataframe
# by this, we can compare our main dataframe df and bq dataframe
bq_row = query_job.to_dataframe()

The steps between INSERT and CREATE might include many modifications to the pandas data frame. For this article, I will only explain the process.

The thing about the INSERT statement is that since BigQuery is an analytics service, it is a best practice to use bulk inserts instead of row-by-row inserts. Therefore, we can use the following code block to bulk insert many rows at the same time:

values = []
for _, row in df.iterrows():
if some_condition:
values.append(f"('{row['first_column']}', '{row['second_column']}', '{row['third_column']}')")

# after creating the values list with the values to be inserted to the BQ table:
insert_query = f"""
INSERT INTO `{project_id}.{dataset_name}.{table_name}` (first_column, second_column, third_column)
VALUES {','.join(values)}
"""
bq_client.query(insert_query).result()

Using UPDATE is not an advised method for BigQuery since it will include single-row updates. But I will still show here how to do it just in case.

When it comes to UPDATE queries, we can use the following function to run multiple queries asynchronously. This will increase the efficiency and decrease the runtime:

def run_multiple_queries(queries):
"""
Runs multiple queries asynchronously.
"""
# This is a good practice for multiple queries
# since it decreases the runtime and increases the efficiency
with ThreadPoolExecutor() as executor:
executor.map(bq_client.query().result(), queries)
update_queries = []
for _, row in df.iterrows():
if some_condition:
# we will define the UPDATE queries depending on some_condition
update_queries.append(f"""
UPDATE `{project_id}.{dataset_name}.{table_name}`
SET third_column = '{row['third_column']}'
WHERE first_column = {row['first_column']}
""")
# after creating the update_queries list, we can run multiple UPDATE queries
# asynchronously.
run_multiple_queries(update_queries)

We can modify the pandas data frame depending on the job requirements between all these processes. But I tried to explain how to read parquet (or any type of files) files, create a pandas data frame, and write it to a BigQuery table.

Since Google Functions requires an endpoint name, we have to put all these functions inside the endpoint function. We can save our script inside main.py file, create a main method and define the endpoint name the same. That’s the only way to run Google Functions. You might see an example function here. We should also populate the requirements.txt with the libraries we used for this script. This is an example requirements.txt file.

google-cloud-core==2.3.2
google-cloud-storage==2.9.0
google-cloud-bigquery==3.10.0
pandas==1.4.2
gcsfs==2023.5.0
pyarrow==12.0.0
db-dtypes==1.1.1
futures==3.1.1

Google Scheduler

The last part of the project is running the script regularly. We will need Google Scheduler for this purpose. On the Google Scheduler main dashboard, we have to click on Create Job and configure the first part as follows:

After defining the name and description, we have to define the schedule. While configuring Cloud Scheduler, we can use CronGuru to schedule the job correctly. Once we configure the execution schedule, our script will run at that exact time. The scheduled job will be running according to UTC for this project.

Then we have to define the execution target. Since we already created a Pub/Sub topic and created the Function according to that topic, we have to choose the same topic here as well.

We can leave all other parameters as default and create the job.

We have created a Pub/Sub topic first. Then created the Google Functions with that topic being triggered. We populated the main.py and requirements.txt with the code we have wrote. After all, we configured the Google Scheduler. In the end, we have a script that retrieves data from the Google Cloud Storage bucket and uploads the data to a BigQuery table.

Hope it helps, thanks for reading :)

Please reach out via Linkedin and Github, all comments are appreciated 🕺

--

--