A gentle introduction to Data Workflows with Apache Airflow and Apache Spark

Antonio Cachuan
Mar 2 · 12 min read

Imagine you’d developed a transformation process in a local Spark and you want to schedule it so a simple Cron Job would be sufficient. Now think that after that process you need to start many other like a python transformation or an HTTP request and also this is your production environment so you need to monitor each step
Did that sound difficult? Only with Spark and Cron Job, yes, but thanks we have Apache Airflow.

Airflow Logo

Airflow is a platform to programmatically author, schedule and monitor workflows [Airflow docs].

Objective

In our case, we need to make a workflow that runs a Spark Application and let us monitor it, all components should be production-ready. First, let review some core concepts and features.

Features and Core Concepts

Features

  • To create a workflow in Airflow is as simple as write python code no XML or command line if you know some python Yes! You can do some Airflow.
  • Airflow is not just for Spark It has plenty of integrations like Big Query, S3, Hadoop, Amazon SageMaker and more.
Airflow Integrations [Airfow documentation]

Core concepts

  1. DAG

Directed Acyclic Graph is a group of all the tasks programmed to run, they are organized in a way that reflects relationships and dependencies [Airflow ideas].

Airflow DAG represented graphically

2. Operator

The description of a single task, it is usually atomic. For example, the PythonOperator is used to execute the python code [Airflow ideas].

3. Task

A parameterized instance of an Operator; a node in the DAG [Airflow ideas].

4. Task Instance

A task instance represents a specific run of a task and is characterized as the combination of a DAG, a task, and a point in time (execution_date). Task instances also have an indicative state, which could be “running”, “success”, “failed”, “skipped”, “up for retry”, etc. [Airflow ideas]

Building Environment

We have different options to deploy Spark and Airflow, exist many interesting articles on the web. Here are some of them:

Now following our objective, we need a simple way to install and configure Spark and Airflow to help us we’ll use Cloud Composer and Dataproc both are products of Google Cloud.

Cloud composer: is a fully managed workflow orchestration service built on Apache Airflow [Cloud composer docs]. The main advantage is that we don’t have to worry about deployment and configuration, all are backed by Google also makes simple to scale Airflow. Cloud Composer integrates with GCP, AWS, and Azure components also technologies like Hive, Druid, Cassandra, Pig, Spark, Hadoop, etc.

Cloud composer logo

Dataproc: is a fully managed cloud service for running Apache Spark, Apache Hive and Apache Hadoop [Dataproc page]. Some features are easy deployment and scaling, integration with Cloud Composer (Airflow) and a feature we’ll be using here is create automatically a Dataproc cluster just for processing and then destroy so you will pay for minutes and avoid unused infrastructure.

Dataproc logo and main components

Deploying a Cloud Composer Cluster

For start using Google Cloud services, you just need a Gmail account and register for access the $300 in credits for the GCP Free Tier. After registration select Cloud Composer from the Console.

Console Google Cloud

If it’s the first time you need to enable the Cloud Composer API

Enabling API

Click create environment

Create Environment

Here you have access to customize your Cloud Composer, to understand more about Composer internal architecture (Google Kubernetes Engine, Cloud Storage and Cloud SQL) check this site. In the meantime, It is not necessary to complete the objective of this article. The parameters are for a small cluster.

Parameters for creating the cluster

The Service Account is a parameter from your own project so this will be different the rest is the same. Creating the cluster could take from 5 to 15 minutes.

Clour Composer is ready!

Click the cluster name to check important information

To validate the correct deployment click the Airflow web UI

Airflow Web UI

Yes! We have deployed a Cloud Composer Cluster in less than 15 minutes it means we have an Airflow production-ready environment.

For deploying a Dataproc cluster (Spark) we’re going to use Airflow so there is no more infrastructure configuration lets code!

Code

This part will be from a simple Airflow workflow to the complex workflow needed for our objective.

Simple DAG

Everyone starts learning to program with a Hello World! so let's do the same but in Airflow.

Writing an Airflow workflow almost follow these 6 steps

  1. Imports libraries Airflow, DateTime and others
  2. Define a start date of the workflow
  3. Set the default arguments for our DAG
  4. Define the DAG
  5. Set Operators
  6. Define DAGs dependencies
# STEP 1: Libraries needed
from datetime import timedelta, datetime
from airflow import models
from airflow.operators.bash_operator import BashOperator
# STEP 2:Define a start date
#In this case yesterday
yesterday = datetime(2020, 2, 28)
# STEP 3: Set default arguments for the DAG
default_dag_args = {
'start_date': yesterday,
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# STEP 4: Define DAG
# set the DAG name, add a DAG description, define the schedule interval and pass the default arguments defined before
with models.DAG(
‘simple_workflow’,
description=’Hello World =)’,
schedule_interval=timedelta(days=1),
default_args=default_dag_args) as dag:
# STEP 5: Set Operators# BashOperator
# Every operator has at least a task_id and the other parameters are particular for each one, in this case, is a simple BashOperatator this operator will execute a simple echo “Hello World!”
helloOp = BashOperator(
task_id='hello_world',
bash_command='echo "Hello Worl!"'
)
# STEP 6: Set DAGs dependencies
# Since we have only one, we just write the operator
helloOp

Now save the code in a file simple_airflow.py and upload it to the DAGs folder in the bucket created. That folder is exclusive for all your DAGs.

Click the link to access the folder
The file is uploaded

After the file is uploaded return to the Airflow UI tab and refresh (remember the indentation in your code and It could take up to 5 minutes to update the page).

DAG, Task execution Log

Yes! our DAG ran correctly to access the log click the DAG name and then click task id hello_worldand view log

Click hellor_world task
Click View Log

On the page, you could check all the steps executed and obviously our Hello World! so this simple DAG is done we defined a DAG that runs a BashOperator that executes echi "Hello World!" and we validated the correct execution =)

[Optional] Error handling and Debugging

If any error occurs a red alert will show brief information under the Airflow Logo, to view a more detailed message go to the Stackdriver monitor.

Clic view logs to access Stackdriver

You could expand all and review the error, then upload a new version of your py file to the Google Cloud Storage and refresh the Airflow UI.

More details about datetime.datime error

Complex DAG

Now that we understand the basic structure of a DAG our objective is to use the dataproc_operator to makes Airflow deploy a Dataproc cluster (Apache Spark) just with python code!

[Optional]
If it is the first time using Dataproc in your project you need to enable the API

Look for Dataproc in the main menu
Enable Dataproc API

Afte enable the API don’t do anything, just close the tab and continue ;)

Close this tab

Before reviewing the code I’ll introduce two new concepts that we’ll be using in this DAG

  • Macros (Default Variables) The Airflow engine passes a few variables by default that are accessible in all templates [Airflow Macros]. It means we´re able to call theses variables at any part of our DAG (.py file), in this case, we’ll use {{ ds_nodash }} to make unique the name of our cluster each.
Airflow default variables [Airflow Macros]
  • Airflow variables These variables are defined by the user and could be called at any part of the DAG.

We need to create two variables one to set up the zone for our dataproc cluster and the other for our Project ID, to do that click ‘Variables’.

Enter dataproc_zoneas key and us-central1-aas Value then save.

For the case of your project_id remember that this ID is unique for each project in all Google Cloud. In order to get the value click your project name in this case ‘My First Project’ this will pop up a modal with a table just copy the value from the column ID.

Getting the project id

If everything is wright your Variables table should look like this.

Airflow variables defined by the user

Let’s check the code for this DAG, It has the same 6 steps only we added dataproc_operator first for creating and then for deleting the cluster, note that in bold are the Default Variable and the Airflow Variable defined before. It’s important to validate the indentation to avoid any errors.

# STEP 1: Libraries needed
from datetime import timedelta, datetime
from airflow import models
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# STEP 2:Define a start date
#In this case yesterday
yesterday = datetime(2020, 2, 29)
# STEP 3: Set default arguments for the DAG
default_dag_args = {
'start_date': yesterday,
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
# STEP 4: Define DAG
# set the DAG name, add a DAG description, define the schedule interval and pass the default arguments defined before
with models.DAG(
'complex_workflow',
description='DAG for deployment a Dataproc Cluster',
schedule_interval=timedelta(days=1),
default_args=default_dag_args) as dag:
# STEP 5: Set Operators# BashOperator
# A simple print date
print_date = BashOperator(
task_id='print_date',
bash_command='date'
)
# dataproc_operator
# Create small dataproc cluster
create_dataproc = dataproc_operator.DataprocClusterCreateOperator(
task_id='create_dataproc',
cluster_name='dataproc-cluster-demo-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('dataproc_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# BashOperator
# Sleep function to have 1 minute to check the dataproc created
sleep_process = BashOperator(
task_id='sleep_process',
bash_command='sleep 60'
)
# dataproc_operator
# Delete Cloud Dataproc cluster.
delete_dataproc = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc',
cluster_name='dataproc-cluster-demo-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# STEP 6: Set DAGs dependencies
# Each task should run after have finished the task before.
print_date >> create_dataproc >> sleep_process >> delete_dataproc

Save the code as complex_dag.py and like for the simple DAG upload to the DAG directory on Google Clod Storage (bucket)

Upload complex_airflow.py

If everything is running OK you could check that Airflow is creating the cluster.

Airflow creating Dataproc cluster

After some minutes our process finished successfully.

Complex workflow done!

Spark workflow

This is the moment to complete our objective, so to keep it simple we´ll not focus on the Spark code so this will be an easy transformation using Dataframes although this workflow could apply for more complex Spark transformations or pipelines since it just submits a Spark Job to a Dataproc cluster so the possibilities are unlimited.

Spark code

First, we are using the data from the Spark Definitive Guide repository (2010–12–01.csv) download locally and then upload to the /data directory in your bucket with the name retail_day.csv.

Retail data
upload the data

Our Spark code will read the data uploaded to GCS then create a temporal view in Spark SQL, filter the UnitPrice more than 3.0 and finally save to the GCS in parquet format. Remember to change with your Google Cloud Storage name. Save as transformation.py and upload to the spark_files (create this directory).

transformation.py
#transformation.py
from pyspark.sql import SparkSession
spark = SparkSession \]
.builder \
.appName("My PySpark code") \
.getOrCreate()
df = spark.read.options(header='true', inferSchema='true').csv("gs://us-central1-cl-composer-tes-fa29d311-bucket/data/retail_day.csv")df.printSchema()
df.createOrReplaceTempView("sales")
highestPriceUnitDF = spark.sql("select * from sales where UnitPrice >= 3.0")highestPriceUnitDF.write.parquet("gs://us-central1-cl-composer-tes-fa29d311-bucket/data/highest_prices.parquet")

The airflow code for this is the following, we added two Spark references needed to pass for our PySpark job, one the location of transformation.py and the other the name of the Dataproc job.

# STEP 1: Libraries needed
from datetime import timedelta, datetime
from airflow import models
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators import dataproc_operator
from airflow.utils import trigger_rule
# STEP 2:Define a start date
#In this case yesterday
yesterday = datetime(2020, 2, 29)
# Spark references
SPARK_CODE = ('gs://us-central1-cl-composer-tes-fa29d311-bucket/spark_files/transformation.py')
dataproc_job_name = 'spark_job_dataproc'
# STEP 3: Set default arguments for the DAG
default_dag_args = {
'start_date': yesterday,
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
'project_id': models.Variable.get('project_id')
}
# STEP 4: Define DAG
# set the DAG name, add a DAG description, define the schedule interval and pass the default arguments defined before
with models.DAG(
'spark_workflow',
description='DAG for deployment a Dataproc Cluster',
schedule_interval=timedelta(days=1),
default_args=default_dag_args) as dag:
# STEP 5: Set Operators
# BashOperator
# A simple print date
print_date = BashOperator(
task_id='print_date',
bash_command='date'
)
# dataproc_operator
# Create small dataproc cluster
create_dataproc = dataproc_operator.DataprocClusterCreateOperator(task_id='create_dataproc',
cluster_name='dataproc-cluster-demo-{{ ds_nodash }}',
num_workers=2,
zone=models.Variable.get('dataproc_zone'),
master_machine_type='n1-standard-1',
worker_machine_type='n1-standard-1')
# Run the PySpark job
run_spark = dataproc_operator.DataProcPySparkOperator(
task_id='run_spark',
main=SPARK_CODE,
cluster_name='dataproc-cluster-demo-{{ ds_nodash }}',
job_name=dataproc_job_name
)
# dataproc_operator
# Delete Cloud Dataproc cluster.
delete_dataproc = dataproc_operator.DataprocClusterDeleteOperator(
task_id='delete_dataproc',
cluster_name='dataproc-cluster-demo-{{ ds_nodash }}',
trigger_rule=trigger_rule.TriggerRule.ALL_DONE)
# STEP 6: Set DAGs dependencies
# Each task should run after have finished the task before.
print_date >> create_dataproc >> run_spark >> delete_dataproc

We can also check during the execution that the job worked correctly.

Dataproc Job Succeeded

We can view the parquet file created.

Parquet file created correctly

Finally, after some minutes we could validate that the workflow executed successfully!

Success

Conclusions and Future work

The purpose of this article was to describe the advantages of using Apache Airflow to deploy Apache Spark workflows, in this case using Google Cloud components. If you need to check any code I published a repository on Github.

PS if you have any questions, or would like something clarified, you can find me on Twitter and LinkedIn. Also If you are considering taking a Google Cloud certification I wrote a technical article describing my experiences and recommendations.

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Antonio Cachuan

Written by

GCP Professional Data Engineer (2x GCP). When code meets data, success is assured 🧡. Happy to share code and ideas 💡 https://linkedin.com/in/antoniocachuan/

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

More From Medium

More from Analytics Vidhya

More from Analytics Vidhya

More from Analytics Vidhya

The Illustrated Word2vec

More from Analytics Vidhya

More from Analytics Vidhya

Financial Transaction Fraud Detection

214

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade