Streamlining an ETL Data Pipeline on Google Cloud with Cloud Data Fusion & Airflow

Vishal Bulbule
Google Cloud - Community
5 min readFeb 23, 2024

Introduction

In today’s data-driven world, extracting valuable insights from employee data is crucial for informed decision-making. This blog post guides you through building an ETL pipeline using Google Cloud. We will see how we can extract data, apply transformation on data like masking sesnsitive data , loading data in BigQuery and then Visualizing data on Looker Studio.

Data Pipeline

Step 1: Generating Sample Employee Data

We’ll start by creating dummy employee data using the Python library Faker. This data will be stored in a Google Cloud Storage (GCS) bucket, acting as our source for the ETL pipeline.

Faker is a Python library used to generate fake data for various purposes, such as testing, prototyping, and populating databases with dummy data. It allows developers to create realistic-looking data that mimics real-world scenarios without the need to manually input or retrieve large datasets.

The Faker library provides a wide range of data types and categories, including names, addresses, phone numbers, email addresses, dates, times, and more. Developers can customize the generated data to suit their specific requirements and use cases.

from faker import Faker
import csv
from faker import Faker
import random
import string
from google.cloud import storage

# Specify number of employees to generate
num_employees = 100

# Create Faker instance
fake = Faker()

# Define the character set for the password
password_characters = string.ascii_letters + string.digits + 'm'

# Generate employee data and save it to a CSV file
with open('employee_data.csv', mode='w', newline='') as file:
fieldnames = ['first_name', 'last_name', 'job_title', 'department', 'email', 'address', 'phone_number', 'salary', 'password']
writer = csv.DictWriter(file, fieldnames=fieldnames)

writer.writeheader()
for _ in range(num_employees):
writer.writerow({
"first_name": fake.first_name(),
"last_name": fake.last_name(),
"job_title": fake.job(),
"department": fake.job(), # Generate department-like data using the job() method
"email": fake.email(),
"address": fake.city(),
"phone_number": fake.phone_number(),
"salary": fake.random_number(digits=5), # Generate a random 5-digit salary
"password": ''.join(random.choice(password_characters) for _ in range(8)) # Generate an 8-character password with 'm'
})

# Upload the CSV file to a GCS bucket
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)

blob.upload_from_filename(source_file_name)

print(f'File {source_file_name} uploaded to {destination_blob_name} in {bucket_name}.')

# Set your GCS bucket name and destination file name
bucket_name = 'bkt-employee-data'
source_file_name = 'employee_data.csv'
destination_blob_name = 'employee_data.csv'

# Upload the CSV file to GCS
upload_to_gcs(bucket_name, source_file_name, destination_blob_name)

Step 2: Setting Up Cloud Data Fusion

Next, we’ll create a Cloud Data Fusion instance to orchestrate our data transformation process. Cloud Data Fusion offers a visual development environment for building and managing ETL pipelines.

  • Visit the Cloud Data Fusion console and create a new instance.
  • Choose a suitable configuration based on your needs and workload.
  • Once the instance is running, head to the development environment.

Step 3: Building the Transformation Pipeline

Within the Cloud Data Fusion environment, follow these steps:

  • Create a GCS connector: Define aGCS connector pointing to your GCS bucket’s employee_data.csv file.
  • Create a dataflow processor: This is where the data transformation happens. Use operators like Filter, Map, and Expression to:
  • Cleanse data: Address inconsistencies or missing values.
  • Mask sensitive information: Replace sensitive data like phone numbers with placeholders.
  • Derive new fields: Calculate additional information like salary bands or tenure.
  • Create a sink connector: Define a BigQuery sink connector to load the transformed data into a designated dataset and table.

Verify Masked & encoded data is loaded in BigQuery Tabke

Step 4: Visualizing Insights in Looker Studio

Finally, it’s time to visualize your transformed data in Looker Studio, a data visualization tool:

  • Create a new Looker Studio report.
  • Connect to your BigQuery dataset.
  • Drag and drop dimensions and measures to create charts, graphs, and tables.
  • Filter and segment your data for deeper analysis.

You’ve successfully built an ETL pipeline that extracts, transforms, and loads employee data into BigQuery, ready for exploration in Looker Studio. This pipeline can now be scheduled to run regularly, ensuring your data visualizations are always up-to-date.

Now last step Automate everything using Airflow Dag on Cloud Composer.

Hope you already created Cloud composer environment.Lets directly create Dagfile using below code

Create dag.py file and upload in DAGS folder in GCS bucket.

Creatwe scripts folder in DAGS folder and upload extract.py in scripts folder.

dag.py

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.datafusion import CloudDataFusionStartPipelineOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 12, 18),
'depends_on_past': False,
'email': ['vishal.bulbule@techtrapture.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG('employee_data',
default_args=default_args,
description='Runs an external Python script',
schedule_interval='@daily',
catchup=False)

with dag:
run_script_task = BashOperator(
task_id='extract_data',
bash_command='python /home/airflow/gcs/dags/scripts/extract.py',
)

start_pipeline = CloudDataFusionStartPipelineOperator(
location="us-central1",
pipeline_name="etl-pipeline",
instance_name="datafusion-dev",
task_id="start_datafusion_pipeline",
)

run_script_task >> start_pipeline

Dag contains two task. extract_data & start_datafusion_pipeline.

Trigger/schedule the Dag and monitor dag.

Its completed !!!!

Git repo for source code — https://github.com/vishal-bulbule/etl-pipeline-datafusion-airflow

About Me

As an experienced Fully certified (11x certified) Google Cloud Architect, Google Cloud champion Innovator, with over 7+ years of expertise in Google Cloud Networking,Data ,Devops, Security and ML, I am passionate about technology and innovation. Being a Champion Innovator and Google Cloud Architect, I am always exploring new ways to leverage cloud technologies to deliver innovative solutions that make a difference.

If you have any queries or would like to get in touch, you can reach me at my email address vishal.bulbule@techtrapture.com or connect with me on LinkedIn at https://www.linkedin.com/in/vishal-bulbule/. For a more personal connection, you can also find me on Instagram at https://www.instagram.com/vishal_bulbule/?hl=en.

Additionally, please check out my YouTube Channel at https://www.youtube.com/@techtrapture for tutorials and demos on Google Cloud.

--

--