Exploring Spark and Airflow Integration for Submitting Python and Scala Jobs

Rafael VM
5 min readJun 15, 2024

--

Github repository

Contents

  1. CONTEXT
  2. Project Setup
  3. Docker Configuration
  4. Creating and Configuring a DAG in Airflow
  5. Implementing the WordCount Job in Python
  6. Implementing the WordCount Job in Scala
  7. Running and Testing the Jobs
  8. SPARK UI overview

1. CONTEXT

This project explores the interaction between Apache Spark and Airflow, focusing on how to submit jobs in Python and Scala. The aim is to set up a scalable architecture that leverages Airflow’s orchestration capabilities to manage Spark jobs efficiently. The example jobs used in this project are simple WordCount applications. Although these jobs are straightforward, they provide a useful template for more complex text processing or data transformation tasks.

This project was developed in a Windows 11 environment using PyCharm and Docker.

It is designed to be a starting point for data engineers who want to explore the integration of Spark and Airflow in a containerized setup. The jobs demonstrate basic functionality but can be easily extended to handle more complex workflows.

Important Note: When configuring the connection to Spark in the Airflow UI, the Spark master must be specified as spark://spark-master.

2. Project Setup

Start by setting up the project environment. Ensure you have Docker and sbt installed.

# Install Docker
# Follow the instructions at https://docs.docker.com/get-docker/
# Install sbt
# Open an elevated command prompt and run the following command (windows + chocolatey)
choco install sbt
# Verify sbt installation
sbt --version
# Clone the project repository
git clone https://github.com/Rafavermar/SparkAirflow-PythonScala/tree/master
# Navigate to the project directory
cd Python/SparkAirflowJavaPythonScala
# Start the Docker containers
docker-compose up -d

3. Docker Configuration

DOCKER HUB AVAILABLE IMAGE:

docker pull jrvm/airflow-spark-jps

Here’s the docker-compose.yml file that sets up the environment:

version: '3'
x-spark-common: &spark-common
image: bitnami/spark:latest
volumes:
- ./jobs:/opt/bitnami/spark/jobs
networks:
- rafavm
x-airflow-common: &airflow-common
build:
context: .
dockerfile: Dockerfile
env_file:
- airflow.env
volumes:
- ./jobs:/opt/airflow/jobs
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
depends_on:
- postgres
networks:
- rafavm
services:
spark-master:
<<: *spark-common
command: bin/spark-class org.apache.spark.deploy.master.Master
ports:
- "9090:8080"
- "7077:7077"
spark-worker:
<<: *spark-common
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 1g
SPARK_MASTER_URL: spark://spark-master:7077
postgres:
image: postgres:14.0
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
networks:
- rafavm
webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
depends_on:
- scheduler
scheduler:
<<: *airflow-common
command: bash -c "airflow db migrate && airflow users create --username admin --firstname Rafa --lastname Vera --role Admin --email jr.vera.ma@gmail.com --password admin && airflow scheduler"
networks:
rafavm:
driver: bridge

4. Creating and Configuring a DAG in Airflow

In Airflow, a Directed Acyclic Graph (DAG) represents a workflow of tasks with dependencies. Here’s the DAG configuration:

import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
default_args = {
"owner": "Rafael Vera-Maranon",
"start_date": airflow.utils.dates.days_ago(1),
}
dag = DAG(
dag_id="sparking_flow",
default_args=default_args,
schedule_interval="@daily",
)
start = PythonOperator(
task_id="start",
python_callable=lambda: print("Jobs started"),
dag=dag,
)
python_job = SparkSubmitOperator(
task_id="python_job",
conn_id="spark-conn",
application="jobs/python/wordcountjob.py",
dag=dag,
)
end = PythonOperator(
task_id="end",
python_callable=lambda: print("Jobs completed successfully"),
dag=dag,
)
scala_job = SparkSubmitOperator(
task_id="scala_job",
conn_id="spark-conn",
application="jobs/scala/target/scala-2.12/word-count_2.12-0.1.jar",
dag=dag
)
start >> [python_job, scala_job] >> end

5. Implementing the WordCount Job in Python

Here’s a simple WordCount job in Python using PySpark:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()
sc = spark.sparkContext
textData = sc.parallelize(["Yamaha is known for its agility", "Ducati excels in raw power", "Both brands have their unique strengths"])counts = textData.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.collect().foreach(println) # Printing the word counts directlyspark.stop()

Saving Output to a File

If you are interested in saving the output into a .csv file, the code to add in should be like this:

counts.toDF(["word", "count"]).write.csv("/opt/bitnami/spark/jobs/output/wordcount_output.csv")

Explanations

  • flatMap: Splits each line into words.
  • map: Transforms each word into a pair (word, 1).
  • reduceByKey: Aggregates the counts for each word.

JOB OUTPUT

6. Implementing the WordCount Job in Scala

Here’s the WordCount job in Scala:

import org.apache.spark.sql.SparkSession
object WordCount {
def main(args: Array[String]) {
val spark = SparkSession.builder.appName("Word Count").master("spark://spark-master:7077").getOrCreate()
val sc = spark.sparkContext
val textData = sc.parallelize(List("Yamaha is known for its agility", "Ducati excels in raw power", "Both brands have their unique strengths")) val counts = textData.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.collect().foreach(println) // Printing the word counts directly spark.stop()
}
}

Saving Output to a File

If you are interested in saving the output into a .csv file, the code to add in should be like this:

counts.toDF("word", "count").write.csv("/opt/bitnami/spark/jobs/output/wordcount_output.csv")

JOB OUTPUT

7. Running and Testing the Jobs

Once the DAG is configured and the jobs are implemented, you can run the DAG from the Airflow UI. Make sure your Spark and Airflow containers are running, and trigger the DAG manually from the Airflow dashboard

# Compile the Scala project
cd jobs/scala
sbt compile
# Package the Scala job
sbt package
# Check the output directory for results
docker exec -it <spark-container> ls /opt/bitnami/spark/jobs/output

8. Spark UI overview

--

--