Mastering Big Data Pipelines: Harnessing PySpark in Google Cloud Platform

Sarthak Sarbahi
Towards Data Engineering
14 min readDec 17, 2023

--

Photo by Dawid Zawiła on Unsplash

In this blog post, I’m excited to guide you through a practical tutorial on how to use PySpark for data processing in Google Cloud. We’ll start by uploading data to Google Cloud Storage via a REST API. Then, we’ll use PySpark to transform and process this data, before finally moving our refined data into Google BigQuery for storage and analysis.

Our entire process will be controlled by a Python script. This script will not only help us set up a Spark cluster for our data work but also ensure it shuts down on its own once our job is done. So, let’s dive in and see how it’s all done!

Understanding GCP services

First, let’s start by understanding the various Google Cloud Platform (GCP) services we’ll be using in this guide.

  1. Google Cloud Storage (GCS): GCS is a highly scalable and secure object storage service. It’s designed to store and retrieve any amount of data from anywhere on the web. It’s ideal for backup and archival, data lakes, and high-performance applications. You can store large, unstructured data sets and access them through RESTful APIs.
  2. Dataproc: It is a managed service for running Apache Hadoop and Apache Spark clusters. It simplifies the process of creating, managing, and scaling these clusters, allowing you to process large sets of data efficiently. Dataproc integrates well with other Google Cloud services and is suitable for both batch processing and real-time data analysis tasks. It’s a cost-effective solution for data transformation, exploration, and machine learning.
  3. Google BigQuery: It is a fully-managed, serverless data warehouse that enables super-fast SQL queries using the processing power of Google’s infrastructure. It’s designed for analyzing large datasets, providing scalability and ease of use without the need to manage infrastructure. BigQuery is great for running complex queries on massive datasets, and it supports various data formats. It’s widely used for business intelligence, data analysis, and reporting.

You can begin with a 3-month free trial of Google Cloud Platform by clicking on this link. Now that we’ve covered the GCP services we’re going to use in this tutorial, let’s go over the necessary steps to prepare everything we need for this tutorial.

Service account creation

A service account in Google Cloud Platform (GCP) is a type of account used specifically by applications, services, and virtual machines (VMs) to interact with GCP resources. It’s different from a user account, which is meant for individuals. Here are some of its features:

  • Unique Identity for Applications: A service account provides a unique identity to an application or service running on GCP. This identity is used to authenticate the application when it requests access to GCP resources.
  • Access Control and Permissions: Each service account is assigned specific roles and permissions. These determine what actions the service account can perform on GCP resources. For example, a service account might have permissions to read data from a storage bucket but not to delete anything.
  • Automation and Security: Service accounts are ideal for automating tasks. They are used when a script, a software application, or a VM needs to perform operations in the cloud. Since these accounts can be given limited permissions, they enhance security by ensuring that automated processes can only access the resources they need.
  • API Access: Service accounts are commonly used for accessing GCP APIs. When an application needs to interact with a GCP service via its API, the service account acts as the intermediary, providing the necessary credentials.
  • Key Management: Service accounts can have keys associated with them. These keys are used to authenticate the service account when it is used outside of GCP, like from a local machine or a different cloud provider.
  • Audit and Compliance: Actions performed by service accounts are logged, which helps in monitoring and auditing the use of cloud resources. This is crucial for maintaining security and compliance.

Let’s go ahead and set up a service account, which is essential for this tutorial. First, head over to the GCP console by following this link. In the console, look for the ‘IAM and admin’ section on the left menu, and then select ‘Service accounts.'

Click on ‘IAM and admin’ and then ‘Service accounts’

Here, you’ll find the option to ‘CREATE SERVICE ACCOUNT’. Click on it. You’ll need to name your service account; I’ve named mine pyspark-demo. After naming it, click on ‘CREATE AND CONTINUE’. The next step involves assigning roles to your service account.

  • BigQuery Admin
  • Dataproc Administrator
  • Editor
  • Storage Admin

Normally, you’d want to be more specific with access control, but to keep things straightforward, we’ll use the roles I’ve mentioned. After assigning the roles, click ‘CONTINUE’ and then ‘DONE’.

Service account created

Once your service account is ready, click on it. Look for the ‘KEYS’ tab and select ‘Create new key.’ Choose the JSON format for the key and save it on your computer. We’ll use this key later in the tutorial.

Great! Our service account is all set up with its key. Now, let’s move on to Google Cloud Storage to get things ready there too.

Create a dataset in BigQuery

In BigQuery, a dataset acts as a collection or folder where you store related tables and views. To set one up in BigQuery, start by clicking on ‘BigQuery’ in the left panel, then go to ‘BigQuery Studio’.

Open BigQuery in GCP portal

If this is your first time using BigQuery, you might see a prompt to turn on the BigQuery API. Just click ‘ENABLE’ to activate it.

GCP project ID on the console homepage

Once you’re in the BigQuery portal, creating your dataset is easy. Look for the three dots next to your project ID (you can find this ID on the homepage of your GCP console). Click there to start creating your dataset.

Creating dataset in BigQuery

You’ll need to name your dataset. I’ve named mine covid19data. We’re going to stick with the default settings for everything else. After that, just click ‘CREATE DATA SET’.

Creating dataset in BigQuery

And that’s it! Your dataset is now ready. Let’s move on to the practical part of our tutorial.

Scope of tutorial

Let’s dive into the hands-on part of the tutorial. We’re going to write two Python scripts for this:

  • main.py: Think of this as our command centre script that will handle the entire process from start to finish. It’s going to do several things: create a Google Cloud Storage (GCS) bucket, grab data from an API and store it as JSON in that bucket, set up a Spark cluster in Dataproc, send our job to this cluster, and then get rid of the cluster after our job is done.
  • transform.py: This one is our data transformer, using PySpark. We need to upload this script to our GCS bucket so the Spark cluster can use it. Its job is to take the JSON data and put it into a Spark dataframe. Then, it will change the data’s format (that’s the ‘transformation’ part) to make it simpler and more useful, and finally, load this processed data into BigQuery.

All set with our plan! Let’s begin with crafting the main.py script.

Preparing orchestration script

Let’s kick things off by bringing in the necessary Python packages. If you find you don’t have some of these packages in your development environment, make sure to install them before moving forward.

import requests
from google.cloud import storage
import json
import io
from google.cloud import dataproc_v1 as dataproc
from time import sleep

Now, we’ll define a few constants that we’ll use all through the script.

API_URL = "https://api.covidtracking.com/v2/us/daily.json"
GCP_BUCKET_NAME = "<bucket_name>"
BQ_WRITE_BUCKET_NAME = "<bucket_name>" # BigQuery bucket, different from GCP_BUCKET_NAME
DATA_DESTINATION_NAME = "covidtrackingdata.json"
PROJECT_ID = "<project_id>"
CLUSTER_NAME = "<cluster_name>"
REGION = "us-central1"
DATAPROC_JOB_FILE = "gs://<bucket_name>/transform.py" # Bucket same as GCP_BUCKET_NAME
SERVICE_ACCOUNT_JSON_PATH = "<service_account_key_path>"
client = storage.Client.from_service_account_json(SERVICE_ACCOUNT_JSON_PATH)

Here’s a bit more about them:

  • <bucket_name>: This is where you’ll name your Google Cloud Storage (GCS) bucket. Remember, this name needs to be unique globally. We’ll need two buckets: one for our transform.py script and the JSON data, and the other as a staging area for BigQuery when it’s loading data.
  • <project_id>: This is your Google Cloud Platform (GCP) project ID.
  • <cluster_name>: This will be the name of your Spark cluster.
  • <service_account_key_path>: Here, you’ll put the absolute path of the JSON file you downloaded earlier, which contains your service account key.

You can keep the other values as they are. Our next step is to create a bucket for BigQuery. We’ll create the other bucket after we complete writing the transform.py script.

# Create the bucket if it doesn't exist
def create_bucket_if_not_exists(bucket_name, project_id):
try:
client.get_bucket(bucket_name)
print(f"Bucket {bucket_name} already exists.")
except Exception as e:
bucket = client.create_bucket(bucket_name, project=project_id)
print(f"Bucket {bucket_name} created.")

create_bucket_if_not_exists(GCP_BUCKET_NAME, PROJECT_ID)
create_bucket_if_not_exists(BQ_WRITE_BUCKET_NAME, PROJECT_ID)

After that, we’ll fetch JSON data from a REST API. We’re using an API from The COVID Tracking Project, and you can find more info about it here.

# Fetch JSON data from the API
response = requests.get(API_URL)
response.raise_for_status()
data = response.json()["data"]
json_data = json.dumps(data)

# Create a bytes stream and write the JSON string to it
bytes_stream = io.BytesIO()
bytes_stream.write(json_data.encode('utf-8'))
bytes_stream.seek(0)

Then, we’ll write this data as a JSON file and store it in our bucket in Google Cloud Storage.

# Initialize the GCS client with service account credentials
bucket = client.get_bucket(GCP_BUCKET_NAME)
blob = bucket.blob(DATA_DESTINATION_NAME)

# delete JSON file if already exists
if blob.exists():
blob.delete()
print(f"Blob {DATA_DESTINATION_NAME} deleted from {GCP_BUCKET_NAME}.")

# Save the JSON data to GCS
blob.upload_from_file(bytes_stream, content_type='application/json')
print(f"JSON data saved to gs://{GCP_BUCKET_NAME}/{DATA_DESTINATION_NAME}")

Following that, we’ll lay out the configuration for our Spark cluster. We’re setting up a single-node cluster, meaning there won’t be any worker nodes. The type of virtual machine we’ll use is n2-standard-4.

# Initialize Dataproc and Storage clients
cluster_client = dataproc.ClusterControllerClient.from_service_account_file(SERVICE_ACCOUNT_JSON_PATH,client_options={'api_endpoint': f'{REGION}-dataproc.googleapis.com:443'})
job_client = dataproc.JobControllerClient.from_service_account_file(SERVICE_ACCOUNT_JSON_PATH,client_options={'api_endpoint': f'{REGION}-dataproc.googleapis.com:443'})

# Create cluster config
cluster_config = {
"project_id": PROJECT_ID,
"cluster_name": CLUSTER_NAME,
"config": {
"config_bucket": "",
"gce_cluster_config": {
"service_account_scopes": [
"https://www.googleapis.com/auth/cloud-platform"
],
"network_uri": "default",
"subnetwork_uri": "",
"internal_ip_only": False,
"zone_uri": "",
"metadata": {},
"tags": [],
"shielded_instance_config": {
"enable_secure_boot": False,
"enable_vtpm": False,
"enable_integrity_monitoring": False
}
},
"master_config": {
"num_instances": 1,
"machine_type_uri": "n2-standard-4",
"disk_config": {
"boot_disk_type": "pd-standard",
"boot_disk_size_gb": 500,
"num_local_ssds": 0
},
"min_cpu_platform": "",
"image_uri": ""
},
"software_config": {
"image_version": "2.1-debian11",
"properties": {
"dataproc:dataproc.allow.zero.workers": "true"
},
"optional_components": []
},
"lifecycle_config": {},
"initialization_actions": [],
"encryption_config": {
"gce_pd_kms_key_name": ""
},
"autoscaling_config": {
"policy_uri": ""
},
"endpoint_config": {
"enable_http_port_access": False
},
"security_config": {
"kerberos_config": {}
}
},
"labels": {},
"status": {},
"status_history": [
{}
],
"metrics": {}
}

Now, it’s time to create the cluster!

# Create cluster
request = dataproc.CreateClusterRequest(
project_id=PROJECT_ID,
region=REGION,
cluster=cluster_config,
)
try:
print("Creating cluster...")
operation = cluster_client.create_cluster(request = request)
result = operation.result()
except Exception as e:
print(e)

We’ll also include code to check that the cluster is ready and operational before we submit any job to it.

# Ensure cluster is up before submitting a job
cluster_request = dataproc.GetClusterRequest(
project_id=PROJECT_ID,
region=REGION,
cluster_name=CLUSTER_NAME,
)
while True:
cluster_info = cluster_client.get_cluster(request = cluster_request)
if cluster_info.status.state.name == 'RUNNING':
print("Cluster running...")
break
sleep(10)

Then, we’ll send our job to Dataproc.

# Submit job to Dataproc
print("Submitting job...")
job_config = {
'placement': {
'cluster_name': CLUSTER_NAME
},
'pyspark_job': {
'main_python_file_uri': DATAPROC_JOB_FILE
}
}
submit_job_request = dataproc.SubmitJobRequest(
project_id=PROJECT_ID,
region=REGION,
job = job_config
)
job = job_client.submit_job_as_operation(request = submit_job_request)
job_id = job.result().reference.job_id

We’ll have a bit of code to wait for the job to finish.

# Wait for job completion
job_request = dataproc.GetJobRequest(
project_id=PROJECT_ID,
region=REGION,
job_id=job_id,
)
while True:
job_info = job_client.get_job(request = job_request)
if job_info.status.state.name == 'DONE':
print("Job completed...")
break
sleep(10)

And finally, we’ll delete the cluster once the job is done.

# Delete the cluster
print("Deleting cluster...")
operation = cluster_client.delete_cluster(
request={
"project_id": PROJECT_ID,
"region": REGION,
"cluster_name": CLUSTER_NAME,
}
)
operation.result()
print("Cluster deleted...")

With that, our first script, main.py, is all set. You can check out the full script here. Now, let’s move on to the transformation script.

Preparing transformation script

We’ll begin by bringing in the necessary PySpark packages for our script.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType
from pyspark.sql.functions import col

After that, our next step is to set up a Spark session. Just like in our previous script, we will define some constants:

  • <file_bucket_name>: This is where you’ll specify the name of the bucket holding the JSON data from the API. It must be the same as in main.py.
  • <bigquery_bucket_name>: This is the name of the bucket used for loading data in BigQuery. It must be the same as in main.py.
  • <project_id>: Your Google Cloud Platform (GCP) project ID goes here.
  • <dataset_id>: This is the name of the dataset we created in BigQuery earlier.
spark = SparkSession.builder.appName("ProcessingData").getOrCreate()
INPUT_FILE = "gs://<file_bucket_name>/covidtrackingdata.json"
BQ_WRITE_BUCKET_NAME = "<bigquery_bucket_name>"
PROJECT_ID = "<project_id>"
DATASET_ID = "<dataset_id>"
TABLE_ID = "covid19table"

Now, we’ll move on to defining the structure (or schema) of our Spark dataframe. Once that’s in place, we will load our JSON file into this dataframe.

schema = StructType([
StructField("date", StringType(), True),
StructField("states", IntegerType(), True),
StructField("cases", StructType([
StructField("total", StructType([
StructField("value", IntegerType(), True),
StructField("calculated", StructType([
StructField("population_percent", DoubleType(), True),
StructField("change_from_prior_day", IntegerType(), True),
StructField("seven_day_change_percent", DoubleType(), True)
]))
]))
])),
StructField("testing", StructType([
StructField("total", StructType([
StructField("value", IntegerType(), True),
StructField("calculated", StructType([
StructField("population_percent", DoubleType(), True),
StructField("change_from_prior_day", IntegerType(), True),
StructField("seven_day_change_percent", DoubleType(), True)
]))
]))
])),
StructField("outcomes", StructType([
StructField("hospitalized", StructType([
StructField("currently", StructType([
StructField("value", IntegerType(), True),
StructField("calculated", StructType([
StructField("population_percent", DoubleType(), True),
StructField("change_from_prior_day", IntegerType(), True),
StructField("seven_day_change_percent", DoubleType(), True),
StructField("seven_day_average", IntegerType(), True)
]))
])),
StructField("in_icu", StructType([
StructField("currently", StructType([
StructField("value", IntegerType(), True),
StructField("calculated", StructType([
StructField("population_percent", DoubleType(), True),
StructField("change_from_prior_day", IntegerType(), True),
StructField("seven_day_change_percent", DoubleType(), True),
StructField("seven_day_average", IntegerType(), True)
]))
]))
])),
StructField("on_ventilator", StructType([
StructField("currently", StructType([
StructField("value", IntegerType(), True),
StructField("calculated", StructType([
StructField("population_percent", DoubleType(), True),
StructField("change_from_prior_day", IntegerType(), True),
StructField("seven_day_change_percent", DoubleType(), True),
StructField("seven_day_average", IntegerType(), True)
]))
]))
]))
])),
StructField("death", StructType([
StructField("total", StructType([
StructField("value", IntegerType(), True),
StructField("calculated", StructType([
StructField("population_percent", DoubleType(), True),
StructField("change_from_prior_day", IntegerType(), True),
StructField("seven_day_change_percent", DoubleType(), True),
StructField("seven_day_average", IntegerType(), True)
]))
]))
]))
]))
])

df = spark.read.option("multiline","true").json(INPUT_FILE,schema = schema)
df.printSchema()

The next part involves transforming the dataframe. This step is all about flattening the schema. Basically, this means turning each nested item in the JSON file into its own column in the new, transformed dataframe.

transform_df = (
df
.select("*")

.withColumn("cases_total_value",col("cases.total.value"))
.withColumn("cases_total_calculated_population_percent",col("cases.total.calculated.population_percent"))
.withColumn("cases_total_calculated_change_from_prior_day",col("cases.total.calculated.change_from_prior_day"))
.withColumn("cases_total_calculated_seven_day_change_percent",col("cases.total.calculated.seven_day_change_percent"))
.drop("cases")

.withColumn("testing_total_value",col("testing.total.value"))
.withColumn("testing_total_calculated_population_percent",col("testing.total.calculated.population_percent"))
.withColumn("testing_total_calculated_change_from_prior_day",col("testing.total.calculated.change_from_prior_day"))
.withColumn("testing_total_calculated_seven_day_change_percent",col("testing.total.calculated.seven_day_change_percent"))
.drop("testing")

.withColumn("outcomes_hospitalized_currently_value",col("outcomes.hospitalized.currently.value"))
.withColumn("outcomes_hospitalized_currently_calculated_population_percent",col("outcomes.hospitalized.currently.calculated.population_percent"))
.withColumn("outcomes_hospitalized_currently_calculated_change_from_prior_day",col("outcomes.hospitalized.currently.calculated.change_from_prior_day"))
.withColumn("outcomes_hospitalized_currently_calculated_seven_day_change_percent",col("outcomes.hospitalized.currently.calculated.seven_day_change_percent"))
.withColumn("outcomes_hospitalized_currently_calculated_seven_day_average",col("outcomes.hospitalized.currently.calculated.seven_day_average"))

.withColumn("outcomes_hospitalized_in_icu_currently_value",col("outcomes.hospitalized.in_icu.currently.value"))
.withColumn("outcomes_hospitalized_in_icu_currently_calculated_population_percent",col("outcomes.hospitalized.in_icu.currently.calculated.population_percent"))
.withColumn("outcomes_hospitalized_in_icu_currently_calculated_change_from_prior_day",col("outcomes.hospitalized.in_icu.currently.calculated.change_from_prior_day"))
.withColumn("outcomes_hospitalized_in_icu_currently_calculated_seven_day_change_percent",col("outcomes.hospitalized.in_icu.currently.calculated.seven_day_change_percent"))
.withColumn("outcomes_hospitalized_in_icu_currently_calculated_seven_day_average",col("outcomes.hospitalized.in_icu.currently.calculated.seven_day_average"))

.withColumn("outcomes_hospitalized_on_ventilator_currently_value",col("outcomes.hospitalized.on_ventilator.currently.value"))
.withColumn("outcomes_hospitalized_on_ventilator_currently_calculated_population_percent",col("outcomes.hospitalized.on_ventilator.currently.calculated.population_percent"))
.withColumn("outcomes_hospitalized_on_ventilator_currently_calculated_change_from_prior_day",col("outcomes.hospitalized.on_ventilator.currently.calculated.change_from_prior_day"))
.withColumn("outcomes_hospitalized_on_ventilator_currently_calculated_seven_day_change_percent",col("outcomes.hospitalized.on_ventilator.currently.calculated.seven_day_change_percent"))
.withColumn("outcomes_hospitalized_on_ventilator_currently_calculated_seven_day_average",col("outcomes.hospitalized.on_ventilator.currently.calculated.seven_day_average"))

.withColumn("outcomes_death_total_value",col("outcomes.death.total.value"))
.withColumn("outcomes_death_total_calculated_population_percent",col("outcomes.death.total.calculated.population_percent"))
.withColumn("outcomes_death_total_calculated_change_from_prior_day",col("outcomes.death.total.calculated.change_from_prior_day"))
.withColumn("outcomes_death_total_calculated_seven_day_change_percent",col("outcomes.death.total.calculated.seven_day_change_percent"))
.withColumn("outcomes_death_total_calculated_seven_day_average",col("outcomes.death.total.calculated.seven_day_average"))
.drop("outcomes")
)

transform_df.show(1,truncate = False)

Finally, we’ll take this transformed dataframe and write it to BigQuery.

# write to BigQuery
(
transform_df
.write
.format("bigquery")
.option("temporaryGcsBucket",BQ_WRITE_BUCKET_NAME)
.option("table", f"{PROJECT_ID}:{DATASET_ID}.{TABLE_ID}")
.mode('overwrite')
.save()
)

print("written to BigQuery!")

And there you have it! With that, we’ve finished our second and final script. You can check out the full version of this script here.

Before we can kick off our data pipeline, remember we need to upload the transform.py script to our Google Cloud Storage (GCS) bucket. It’s important to use the same bucket name we mentioned in the main.py script.

Here’s how to do it:

  1. Head over to the GCP console and click on ‘Cloud Storage’ in the left pane.
  2. Then, select ‘Buckets’ and click on ‘CREATE.’
  3. Name your bucket — I’ve named mine covid19data_demo. Keep the other settings as they are and click ‘CREATE’.
  4. If a message pops up about preventing public access, just click ‘CONFIRM’.
Buckets in Google Cloud Storage

Once your bucket is ready, click on ‘UPLOAD FILES’ and choose the transform.py script.

Transformation script uploaded to the bucket

That’s it! We’re now ready to run our data pipeline. Let’s hope everything works out just as we planned. Fingers crossed!

Run the data pipeline

Now, let’s put our setup to the test. Before running our data pipeline, we need to turn on the Dataproc API in Google Cloud Platform (GCP). If you’ve already enabled it for your project, you can skip this step.

Here’s how to enable it:

  1. In the GCP console, click on the menu in the left pane.
  2. Select ‘APIs & Services’ and then click on ‘Library’.
  3. Search for ‘Cloud Dataproc API’ and click on the first result that appears.
  4. Hit the ‘ENABLE’ button. It might take a few minutes to activate.
Cloud Dataproc API enabled

Once that’s done, we’re ready to launch our data pipeline. Run the command python3 main.py in the directory where your script is located.

After initiating the command, use the GCP console’s search bar to navigate to Dataproc. There, you’ll soon see the Spark cluster that gets created as part of our job.

Spark cluster running

When the cluster is up, click on it and go to the ‘JOBS’ tab. Select your job ID. In the ‘Output’ section, you’ll see the logs for the PySpark job. Keep an eye on these logs before the job completes, because once the job is successful, the cluster will be automatically deleted, along with its logs.

Terminal logs of the data pipeline

After the job finishes without any issues, our next step is to check in BigQuery to see if the table has been created and contains the data we expect.

Querying data in BigQuery

Now, let’s check if our data has successfully made its way into BigQuery. Go to the BigQuery portal. When you find and expand the dataset we created, you should see the table named covid19table. Clicking on it will reveal its schema and other details.

Data written to BigQuery

To query this table, click on the three dots next to its name and choose ‘Query’. This action will open a new query editor window. In this window, enter and run the following query to view the data: SELECT * FROM `<project_id>.<dataset_id>.covid19table` LIMIT 10;. Remember to replace <project_id> with your specific GCP project ID and <dataset_id> with the name of your BigQuery dataset.

Query the data in BigQuery

And there we go! That’s a wrap on this tutorial. Well done on making it through!

Conclusion

In this tutorial, we used Dataproc to execute a PySpark job. This job’s task was to modify data from a JSON file and then load it into BigQuery. Our goal was to simulate a real-world data processing scenario. However, there’s much more that can be done to enhance this process!

  • Automating the upload of files to Google Cloud Storage (GCS) is a good next step. Manually uploading multiple files to a GCS bucket can be inefficient in larger operations.
  • For managing and coordinating such ETL (Extract, Transform, Load) processes, an orchestration tool like Apache Airflow can be highly effective.
  • Instead of hardcoding values in the script, using environment variables to set these ‘constant’ values is a better practice.
  • Also, it’s crucial to handle your GCP account and project credentials with care. They should be stored securely, and using encryption is recommended to boost security even further.

I sincerely hope this guide was beneficial for you. Should you have any questions about this tutorial, please don’t hesitate to drop them in the comments below.

References

--

--