Effective Cloud ETL in Cross-Cloud Setup — Part 1: Let Big Data Flow

George Vyshnya
SBC Group Blog
Published in
10 min readJul 21, 2021

“Data is the new science. Big Data holds the answers.” — By Pat Gelsinger

Introduction

If you do your business as a Data Science/ML consultant, you often face a typical issue whenever you embark on a cooperation with a new customer. They dream of the cutting-edge AI and ML solutions but they may lack the basic data infrastructure with the BI-ready data that can be a common source of truth for their business operations. If it is the case, we offer the customer to build the reliable data infrastructure first. Once we put such a foundation in place, it is going to be the right time to start the discussion about ML and AI — something that the customer supposed to have from Day 1.

In the serious of upcoming blog posts, I would like to draw the picture of collateral data engineering journeys that data scientists/ML engineers must take sometimes.

I am going to share the experience of the data engineering rides we had to take in some of the recent projects. They will be full of practical cases and real-world problems solved during such efforts.

So let’s start the data engineering journey of the ML professionals.

ETL/ELT in the Cross-Cloud Data Infrastructure

Nowadays, it is more and more common to see organizations operating their data infrastructures across multiple cloud providers. There are multiple reasons to drive such an architecture as follows

  • Desire to diversify the security and cloud infrastructure availability risks across multiple transaction systems
  • Merges and acquisitions of the multiple businesses by a central organization (with individual businesses having their transaction systems deployed in different clouds)
  • Meeting legal and regulation requirements (for instance, the EU-based financial companies must run certain core systems in Azure cloud only, due to the respective legislation regulations in place)
  • Etc.

Such a cross-cloud transactional system deployment still must cope with the provisions of having a central Data Warehouse (DWH) and/or Data Lake for BI and reporting reasons. Since such a central DWH/BI Infrastructure is hosted by a single cloud infrastructure provider, it will require to build ETL/ELT solutions capable of cross-cloud data transfers (from the data sources managed by the respective transaction systems at their cloud premises).

Problem Statement

With the above-mentioned considerations in mind, let us review a practical case of building a Cloud-based ETL pipeline to operate in the cross-cloud setup (actually, the idea of this case study was inspired by a recent industrial project that required implementing something similar).

Imagine that we have to build an ETL solution that

  • Operates in Google Cloud Platform (GCP) to pull the source data
  • Enables export of every entity table/view from a BigQuery dataset (database) into a Google Cloud Storage bucket as either separate parquet files or separate JSON files (such a bucket will be often referred to as a reservoir)
  • Supports both the full export of the entire BigQuery dataset and its incremental changes from date to date
  • Minimize the lead time of the pipeline processing via invoking a Big Data-scaled ETL technology
  • Uses either fully managed or true serverless components, whenever possible
  • Minimizes the burning rate of the solution by using ‘fire-and-forget’ clusters/units or/and managed or even serverless deployment, whenever possible
  • Provides a way for third-party processes/applications to connect to the reservoir as well as pull the data into the central Data Warehouse/Data Lake on another cloud premise outside GCP
  • As an additional option, facilitates the active push of the exported data into Azure Blob Storage

The sections below will walk you through the process of building such a cloud-based ETL solution. If using a metaphor, we could call such a solution a “Sparkling Air” due to the building blocks to be used in the solution as follows

  • BigQuery
  • Cloud Composer
  • Cloud Dataproc
  • Google Cloud Storage

Data Source: The BigQuery dataset

The source dataset (database) is hosted by BigQuery, as mentioned above. For the purpose of this post, we imagine the following structural and data transfer requirements set force for such a dataset

  • Every entity (that is, a physical or virtual table (view) with essential entity data) of the dataset to be enabled to export into the central DWH outside GCP
  • Each table and view has a large amount of data (that makes it possible to state we operate in really Big Data domain when building such an ETL solution)
  • A separate treatment to partitioned vs. non-partitioned entities (physical tables or virtual tables (views)) to be enabled (we will see every partitioned table or view to contain a datetime partition field named ‘ingest_date’)
  • Auxiliary tables (with certain service-level tech details irrelevant for business reporting) to be excluded from the data export (the names of such auxiliary tables/views to start with ‘_’)

Destination: File Reservoir in GCS

To make the data exported from a BigQuery dataset available to pull by an external ETL system into a central DWH/BI Infrastructure outside the GCP premise, we will serialize it as parquet or JSON files in a Google Cloud Storage (GCP) bucket.

For each table in the source BigQuery dataset, we are going to create a subfolder in the destination GCS bucket. Every partitioned table in turn will further generate the partition subfolder structure corresponding to the values of the ingest_date partition field in it.

Secret Sauce of BigData-Scaled ETL: The power of managed PySpark

As we all see, Big Data becomes bigger by the day. Requirements to handle large amount of data in your ETL pipelines in a very compressed time intervals are more and more common (the case we are reviewing in this blog post is no more an exception). You cannot simply avoid tackling the problems of this sort in your real-world industrial projects.

This is where Apache Spark comes into the place. It is a processing framework that operates on top of the Hadoop Distributed File System (HDFS) (as well as other data stores). It features interactive shells that can launch distributed process jobs across a cluster. Spark supports programming language interfaces for Scala, Java, Python and SQL. PySpark is the Python interface to Spark.

PySpark is the Python API written in Python to support Spark.

One traditional way to handle Big Data is to use a distributed MapReduce framework like Hadoop. As a drawback, such an approach requires a lot of read-write operations on a hard disk which makes it very expensive in terms of the computational resource allocation.

Spark (and PySpark, in particular), as opposed to more traditional Big Data tech stacks, deals with it in an efficient and easy-to-understand manner.

Let us see it in action as per our project case. The sections below will walk you through the topics below

  • Building effective and speedy ETL pipeline using PySpark to meet our requirements
  • Deploying 100% managed PySpark-based solutions in Google Cloud, using Dataproc
  • Debugging PySpark scripts in Google Cloud Platform, using Dataproc

Note: It is assumed you have been already familiar with the fundamentals of PySpark. If not, you can go over the tutorials provided in References section at the end of this blog post.

ETL Logic: PySpark scripts

The magic of PySpark is to implement complex and computationally extensive Big Data transformations without any hassle of the complicated coding. As an outcome, the resulted code to manipulate the data via PySpark dataframes will be as simple as one you are used to see when doing the data wrangling with Pandas.

You can see the PySpark script capable of exporting the data from a particular table or view in a BigQuery dataset in action per the snippet below

It is worth noting the essential details below

  • The script has separate logic to handle partitioned and non-partitioned tables/views in BigQuery
  • The script can output the data either in parquet or JSON formats
  • The same PySpark code can be applicable in both the full reload of the data and the incremental update of the diff vs. a day ago
  • Spark’s ‘viewsEnabled’ option allows for handling not only the physical tables but virtual tables (views and material views) as well
  • Due to BigQuery’s integration requirements, PySpark has to use a temporary materialization dataset in BigQuery when fetching the data from the source view and before exporting such a data as files to the reservoir (that’s the reason you see ‘viewMaterializationProject’ and ‘viewMaterializationDataset’ configuration options set)
  • The filter option is used to filter the data in the partitioned data tables by the ingestion date

Deployment: Google Dataproc and its workflow templates

As we mentioned earlier, the real power of the cloud platforms comes whenever the modern managed or even a serverless solution deployment is entertained.

The Dataproc Workflow Templates API provides a flexible and easy-to-use mechanism for managing and executing workflows. A Workflow Template is a reusable workflow configuration. It defines a graph of jobs with information on where to run those jobs.

Instantiating a Workflow Template launches a Workflow. A Workflow is an operation that runs a Directed Acyclic Graph (DAG) of jobs on a cluster.

The workflow can potentially use a cluster selector (that is, to run the job on a selected existing Dataproc server cluster) or a managed cluster. In the latter case, The workflow will create this “ephemeral” (aka ‘fire-and-forget’) cluster to run workflow jobs, and then delete the cluster when the workflow is finished.

Workflows are ideal for complex job flows. You can create job dependencies so that a job starts only after its dependencies complete successfully.

Creating a workflow template does not create a Dataproc cluster or submit jobs. Clusters and jobs associated with workflows are created when a workflow template is instantiated.

Technically, the workflow templates are defined as yaml templates.

In our case, we are going to encapsulate PySpark into Dataproc workflows and workflow templates.

You can review the source code of the workflow template below

As you see, the essential details about our workflow template are as follows

  • It has just one job step (this step will run the PySpark script we reviewed in the previous section)
  • The “ephemeral” cluster is designed for this workflow template to minimize the costs of operating it in Google Cloud Platform
  • The “ephemeral” cluster we defined in the template will feature two worker nodes, with quite a low-end node machines to be used (it turns out the two-node cluster with such machines is sufficient to complete the ETL activities required thus keeping our burning rate on executing such a workflow in cloud on the relatively low level)
  • The additional spark plugin to ensure data exchange with BigQuery is mounted into the cluster for the workflow template since the PySpark job code is dependent on it to work properly
  • PySpark script source is uploaded to a dedicated Google Cloud Storage bucket, to be available to the instantiated workflows to invoke it

Note: when defining the cluster configuration in your workflow template, you should balance within the cost-performance-execution time triangle as every Dataproc job has a maximal execution time cap of 60 min (that is, the number of your worker nodes as well as your worker machines power should be sufficient to execute your workflow within the time constraints imposed by Google Cloud Platform — the less time it spends, the better, obviously).

Debugging PySpark Scripts in Google Cloud

Naturally, you need to debug your PySpark scripts before you are ready to package them as a par of Dataproc workflow templates.

One of the convenient methods of doing it in Google Cloud in to create a permanent Dataproc cluster with

  • Jupyter Lab and Jupyter notebooks enabled
  • External disk space (in a dedicated Google Cloud Storage bucket) mounted to the cluster

You keep this cluster running as much time as you need to interactively debug your PySpark code while storing your PySpark and Jupyter notebooks on the GCS bucket (rather then on a local drive of the cluster). You shut down the cluster when it is no more needed (to optimizing your cloud operation costs) while preserving your notebooks on Google Cloud Storage (if you stored the notebooks on the local drive of the cluster, they would vanish as soon as you shut down the cluster).

Below is the Bash script automating such a permanent cluster setup

You can reuse whenever you need a new cluster to spin off for your PySpark development and debugging purpose (with the notebooks from the previous ‘incarnations’ of your cluster to be immediately available from the GCS bucket as designed).

Useful Deployment Automation

These days, DevOps/SRE-centric paradigm shifts the mindsets in Data Engineering, Data Science and ML more and more. I noticed this trend back to 2017 (see https://www.kdnuggets.com/2017/08/data-version-control-analytics-devops-paradigm.html), and it proved to become the mainstream since that time, as of 2021, indeed.

One of the consequences of this mindset/philosophy is you as a Data Engineer, Data Scientist or ML Engineer is responsible for deployment of the applications/solutions you built (with minimal-to-zero help from dedicated DevOps professionals).

Therefore, I really recommend to invest some time in automating your development routines. One of my preferred ways of doing such an automation is to prepare Linux makefiles with the frequent deployment command shortcuts. The example of such a makefile is presented below

Next Steps

In the sections above, we had a comprehensive outlook on

- The overall ETL requirements in our project case

- The data source (BigQuery datasets) and the destinations for the file-based data export (in the GCP bucket)

- Engineering the Big Data-scaled ETL using PySpark and managed service deployment with Google Dataproc

Note: you can review the source code files and assets discussed in this blog post in the Github repo

Our next big challenge is to orchestrate such an ETL pipeline end-to-end as well as enable its scheduled unattended execution in Google Cloud Platform. That is where Google Composer with Airflow arrives to help us.

We will be looking at the Airflow-based workflow orchestration inside a Google Composer environment in the next part of this blog series.

As Pearl Zhu, author of the “Digital Master” book series told one day, “We are moving slowly into an era where big data is the starting point, not the end”. This is the key driver to focused on our future Big Data journey.

--

--

George Vyshnya
SBC Group Blog

Seasoned Data Scientist / Software Developer with blended experience in software development, IT, DevOps, PM and C-level roles. CTO at http://sbc-group.pl