Loading Data from multiple CSV files in GCS into BigQuery using Cloud Dataflow (Python)

A Beginner’s Guide to Data Engineering on Google Cloud Platform

Sadeeq Akintola
7 min readMay 3, 2020
Photo Credit: Bloomberg

Google Cloud Platform (GCP) offers a wide range of tools and services for robust and scalable Data Engineering tasks. Data problems — such as — getting data from source location(s) or storage repository to sink/destination, would ideally be solved leveraging Cloud Dataflow, Google Cloud Composer (Airflow) or both. In this article, I have used Cloud Dataflow. please read on!!

Dataflow is GCP’s fully managed service for executing Apache Beam pipelines. Depending on the complexity of your project, you could create a solution by either using Dataflow Templates (made available in GCP) for straight-forward problems or code your solution all the way — in Java or Python.

This article explains how to load csv files in Google Cloud Storage (GCS) into Google BigQuery with the use of Cloud Dataflow. I have used Python as the coding language, but you can reproduce the same steps using Java as well.

Even though, expert knowledge of the entire architecture and products of GCP is not needed, a decent knowledge of how the Cloud works, especially on GCP is necessary.

Setting up the environment

Running Python, Dataflow/Airflow and other packages together may require some environment configuration, package installation and ensuring compatibility of package versions. In order to avoid installation setup troubles, it is best that we work in a virtual environment.

  1. Create/Sign in to your GCP account: If you have a Gmail/Google/GSuite account, you can use it to log in into the GCP Console. Otherwise, create a free new account here.
  2. Create a new project (my project id is stocks-project-2)
  3. Enable Billing on the project: Navigation Home → Billing → Link a billing account
Enabling Billing on GCP

4. Activate the Cloud Shell on the top-right hand corner of the Cloud Console.

Activating Cloud Shell

When the Cloud Shell launches, you will see a welcome message stating your current project. If the selected project is not the one you want to work with, use the command in the welcome screen to change it to the project we just created.

5. Assign a PROJECT variable (you can replace stocks-project-2 with your project ID)

export PROJECT=stocks-project-2

6. Print the PROJECT variable to be sure that it works

echo $PROJECT

7. On the top-right corner of the Cloud Shell, click on the Launch Editor button to see the Cloud Shell and Code Editor in another window

Activating Cloud Shell

8. When the Editor opens, create a new directory where we would store our pipeline programs, and navigate into it. Use these commands:

mkdir stocks-project
cd stocks-project/

9. Once in the project directory, create a python file:

touch load-stocks.py

10. Use the Cloud Shell Editor (see further below) to Examine the python code for the pipeline. Feel free to copy and paste this code or make personalized adjustments.

The Data pipeline code in Python

Before we continue, we need to create a Service Account , which is like a user profile upon which we have granted necessary privileges just so it could access certain GCP services.

11. Grant the Service Account the following roles as shown. Please make sure you assign the least required privileges/roles and in accordance with your organization’s security policy

Assigning Roles to Service Accounts on GCP

12. Download the key as a json file

Generating the json key file

13. Upload the key (json) file into stocks-project folder by right-clicking on the project folder in the Editor and clicking on “Upload Files”. Once uploaded, you will see the json file in the list files in the left pane of the Editor. Take note of the filename because we are going to reference it in out python pipeline application. Below it the snapshot of the python file — please read the comment blocks to understand the code:

Exploring content with Cloud Shell Editor Explorer

Completing the Setup

14. Download the stocks dataset from this kaggle link to your computer. Extract the zipped content.

15. Setup the data source: Since the input data would be coming from a Cloud Storage Bucket, then we would need to create one. For convenience of reference, I have named it exactly as the value of my Project ID.

16. From the downloaded stocks dataset, upload the entire content in the folder fh_20190420/full_history into the created GCS bucket above.

A view of the csv files in Google Cloud Storage bucket

17. Setup the data destination: We are using BigQuery to store the data, so we need to create a BigQuery Dataset name “stocks_data”. We would specify the storage table in the pipeline (python file)

18. Setup the virtual environment: In order not to worry about installing specific and latest versions of the Operating System or required software needed to run the pipeline, we would be using a virtual environment. Run the following commands to install it:

python3 -m pip install — user virtualenv

19. In the Editor’s Terminal window, navigate to (or ensure you are in) the project directory. Then run these commands to create a new virtual environment, activate it, and install Apache Beam for GCP:

virtualenv -p python3 venv
source venv/bin/activate
pip install 'apache-beam[gcp]'

Running the Pipeline

20. Run the python application: The below command creates a dataflow job which runs using the DataflowRunner. The pipeline requires a staging location and temp location which are storage buckets (will be automatically created if they do not already exist) which store processing and temporary files respectively while the pipeline is still running. The input variable is the cloud storage bucket/file location of the csv file(s).

python load-stocks.py \--project=$PROJECT \--runner=DataflowRunner \--staging_location=gs://$PROJECT/stocks_staging \--temp_location gs://$PROJECT/stocks_temp \--input 'gs://stocks-project-2/full_history/ZY*.csv' \--save_main_session

Did you notice the “ — — input” part? I mean this line “— input ‘gs://stocks-project-2/full_history/ZY*.csv’ \”. I have used wildcard “ZY*.csv” to load ONLY all the files whose filename starts with “ZY”, followed by any other set of characters or none. You could as well load a single csv file or all as shown:

--input 'gs://stocks-project-2/full_history/AADR.csv' \OR--input 'gs://stocks-project-2/full_history/*.csv' \
Running the pipeline via the command line interface

Side Note: I would advise that whenever you are creating any GCP Resource such as Storage Buckets, BigQuery Datasets, Compute or Kubernetes resources, endeavor to have them collocating in the same geo-location, and at best, same region, for the sake of latency and accessibility — except of course if it’s a strategic design requirement .

21. To see the execution results, return to the Cloud Console Home and navigate to the Cloud Dataflow page. The last run will be seen at the very top of the page.

List of Dataflow jobs

22. A full green check box shows that the job was run successfully. Click on the name beside it to show the job completion details.

Detailed view of a selected Dataflow job

23. Final result in BigQuery Table.

The resulting BigQuery table

More to be done

This simple data pipeline would do the basic data engineering/ETL tasks. However, you could improve this project by doing one or more of these:

  1. Create a Cloud Function Trigger, to fire the pipeline job whenever there is a new file in the GCS bucket
  2. Create an Airflow job to run this python pipeline file at a scheduled time
  3. Create an Archive folder in GCS to store already processed csv files
  4. Do a check on duplication
  5. Do some Aggregation as part of the ETL and store the result in another table
  6. Replicate this same solution entirely using Cloud Composer — Google’s managed Apache Airflow
  7. And lots more…

If you enjoyed reading this post or if it was useful in any way, please drop some claps and comments. I would like to have your feedback and encouragement to write more on Data Engineering. Also, share this post on LinkedIn and twitter. I’d really appreciate it!

Feel free to engage and follow me on twitter on this and future posts.

Thank you!

--

--

Sadeeq Akintola

As a Cloud Big Data Engineer, I help @microsoft’s customers build and maintain Data solutions on Azure. I also do (good) stuff on Google Cloud Platform too..