Use Composer for Dataproc Serverless workloads
Let us continue to understand how to use Airflow for more awesome stuff — particularly to run Dataproc Serveless workloads.
For those of you who prefer to skip straight to the code snippets — go ahead and check out this guide with Github repo links
Run Dataproc Serverless workloads with Cloud Composer | Google Cloud
Cloud Composer 1 | Cloud Composer 2 This page describes how to use Cloud Composer 2 to run Dataproc Serverless…
For the rest of you who would like to understand why this is even important in the first place — follow along to understand why we use Dataproc and how Cloud Composer can be used for running Dataproc jobs.
Dataproc helps users process, transform and understand huge quantities of data. It is used for everything from data lake modernization, ETL / ELT, to secure data science projects at scale. For e.g. consider a hospital triaging Covid-19 patients thousands by the hour, or a chat messenger trying to process millions of messages per second or the good old ecommerce website handling several hundred customer orders every minute. To start with, Dataproc can run jobs of different types: Pig, PySpark, Spark, Hadoop, Hive, SparkSql etc. Its fast, easy and cost-effective. Regardless of what your Data team might be using currently, there is a flavour supported as shown in this video
Dataproc is available in three flavors: Dataproc Serverless, Dataproc on Google Compute Engine, and Dataproc on Google Kubernetes Engine. Let’s zoom into the first one for now. Dataproc Serverless allows you to run PySpark jobs without needing to configure infrastructure and autoscaling.
Dataproc Serverless supports PySpark, Spark, SparkR and SparkSql batch workloads and sessions / notebooks. This means even small teams can go ahead and run PySpark jobs without having to worry about tuning infrastructure to reduce the montly cloud bills! :P Users can simply submit Spark code and properties for any customizations. Billing is only for the duration the job runs! Serverless spark works with both BigQuery and Cloud Storage. Users can also attach a Persistent History Server(PHS) (we will be covering how to do this) to view spark logs after the job is finished and the hive meta store as well. Once the job finishes everything is cleaned up, except the logs and persisted results.
As a Data engineers, for years, we have all spent time writing cron jobs, manually maintaining our dependencies and scheduling data ingestions or data transfers. But with the advent of the open source platform Apache Airflow, that can orchestrate the functionings of your data pipeline automatically, life sure has become easier. Instead of going to the Dataproc GUI to create these Batches, we will use Cloud composer and write DAGs to create those with the help of the operators for managing Dataproc Serverless batch workloads.
Let’s focus on DataprocCreateBatchOperator DAG within PHS would be added as a config — so pay attention.
Step 1: If not already, ensure you have enabled the Dataproc API
gcloud services enable dataproc.googleapis.com
and the Cloud Composer API in your projects.
Step 2: Create a storage bucket or use an existing one — note its name and region.
Step 3: Save any pyspark script to a local file named
spark-job.py. For example, you can use the sample pyspark script and upload this file to the bucket you chose in Step 2.
Step 4: Create a single node server using the following command and add its name as the “phs_cluster” :
gcloud dataproc clusters create cluster-name \
In the UI you should be able to see:
Step 5: Import the Dataproc Operators
from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateBatchOperator, DataprocDeleteBatchOperator, DataprocGetBatchOperator, DataprocListBatchesOperator)
Step 6: Now let’s set a few Airflow variables
Step 7: Run this DAG in Airflow just like how we learnt to in the last post.
The DAG above uses the DataprocCreateBatchOperator which creates a Batch in the Dataproc GUI, to run the Spark job. As you may have noticed, we have also used the Spark-Bigquery connector! It is this simple. The DAG would create a Batch in the Dataproc and run the spark-job.py. Unless this batch is deleted either manually or via a DAG that deletes it, this batch would continue to exist under ‘Dataproc Batches’ GUI.
Step 8: To understand the spark job using PHS, go back to the Dataproc single node cluster and click on it to see:
Now click on Web-interfaces and select Spark — History Server to see PHS in action. This is really cool, trust me.
We can see that it takes us to a new tab where all the applications of Spark you created in the past are listed here. On choosing any one of them to dive deeper you would be able to see the following detailed Event Timeline as well as the jobs completed. To anyone who has been using Spark on-prem and is eager to find out the exact areas where the pipeline can be optimized, this is crucial.
There’s much more that we can do with Composer, stay tuned!