Executing Spark jobs with Apache Airflow

Jozimar Back
CodeX
Published in
4 min readAug 7, 2021

Apache Spark is a solution that helps a lot with distributed data processing. To automate this task, a great solution is scheduling these tasks within Apache Airflow. In this tutorial, I share with you, ways to create DAG's in Apache Airflow capable of running Apache Spark jobs.

spiral firework in mountains by Wil Stewart
Photo by Wil Stewart on Unsplash

Preparing the environment on Airflow machine

The machine that hosts the Airflow, where I tested this tutorial, runs with Debian 9. To run Spark on Airflow using PythonOperator and BashOperator, the JAVA_HOME environment must be configured. If you don’t have java installed, install it with the following commands:

sudo apt update
sudo apt install default-jdk

After instaling java, the JAVA_HOME in the operating system must be configured by mapping the location of the java installation. For example, on Debian, in the .bashrc file, in the root directory, you will inform the following lines:

export JAVA_HOME='/usr/lib/jvm/java-8-openjdk-amd64'
export PATH=$PATH:$JAVA_HOME/bin

If you are on linux, after editing the file, remember running the command:

source ~/.bashrc

To run a script using the Airfow operator SparkSubmitOperator, in addition to the JAVA_HOME, Spark binaries must be added and mapped. On the Spark page you can download the tgz file and unzip it on the machine that hosts Airflow. Put in the file .bashrc the SPARK_HOME and add it to the system PATH.

export SPARK_HOME='/opt/spark'
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

Finally you must add the pyspark package to the environment where Airflow runs.

pip install pyspark

In the following examples, we have an example of a Spark task that ETL from SQL database to mongo database. The version of Spark used was 3.0.1 which is compatible with the mongo connector package org.mongodb.spark:mongo-spark-connector_2.12:3.0.0

PythonOperator

With PythonOperator, just create the python method that will run the Spark job sending it from Airflow. The problem with this approach is that you don’t have the log details of the Spark job execution.

script dag airflow com pythonoperator

BashOperator

To use this operator, you can create a python file with Spark code and another python file containing DAG code for Airflow. Inside BashOperator, the bash_command parameter receives the command that will be executed in the operating system’s bash. In this parameter, for example, the command python jobspark.py can be executed.

In this operator you will have more log details from Spark job. This logs are about the stage and the percentage of completion of job in the same way as it would be if executed in the terminal.

BashOperator log detail

SparkSubmitOperator

To use this operator, after mapping JAVA_HOME and Spark binaries on the Airflow machine, you must register the master Spark connection in the Airflow administrative panel.

Spark master connection

In SparkSubmitOperator the conn_id parameter will be filled with the Conn Id registered through the administrative panel. One of the main advantages that I consider in this operator, is being able to configure and inform all the Spark job properties. Thus leaving the Spark script more streamlined with practically only the logic to be sent and executed in the cluster. Under the covers this operator uses the bash spark-submit command using the settings given in the operator.

SparkSubmitOperator Sample

In this operator, the task logs are much more detailed, containing TaskSetManager information about each task started and ended

Troubleshooting

During the DAG’s creation I had some problems and in this section I would like to share how to solve them.

ERROR — [Errno 2] No such file or directory: ‘bash’

This happens in airflow when executing bash command. For some reason the Airflow may not recognize the operating system’s bash path. To solve this, add the env property in BashOperator informing the PATH that contains the bash. In SparkSumbitOperator you must inform the PATH in the env_vars property. Here is an example:

print_path_env_task = BashOperator(
task_id='elt_documento_pagar_spark',
bash_command="python ./dags/spark-jdbc-sql-test.py",
dag=dag,
env={'PATH': '/bin:/usr/bin:/usr/local/bin'}
)
task = SparkSubmitOperator(
task_id='elt_documento_pagar_spark',
conn_id='spark',
application="./dags/spark-jdbc-sql-test.py",
env_vars={'PATH': '/bin:/usr/bin:/usr/local/bin'},
packages="org.mongodb.spark:mongo-spark-connector_2.12:3.0.0,com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre8"
)

SyntaxError: Non-ASCII character

This happens when processing Spark code that contains special characters. To solve it, just write the following comment at the top of the python file.

# -*- coding: utf-8 -*-

--

--

Jozimar Back
CodeX
Writer for

I write articles about my experience in Data Engineering.