How to run PySpark jobs in an Amazon EMR Serverless Cluster with Terraform

Ignite Your Data Revolution and Harness the Power of Amazon EMR Serverless

Ana Escobar
datamindedbe
6 min readJun 15, 2023

--

This is going to be the first article of a series of 3 articles. In this first one, I’m going to go through the deployment of Amazon EMR Serverless to run a PySpark job using Terraform to manage the infrastructure. The next one, I will go through the deployment of Amazon MSK in order to have an Apache Kafka Cluster fully managed in the cloud (using Terraform too to manage the infrastructure). And you can probably guess the third one, it will be a combination of the previous ones. As we say at Data Minded

You know nothing until you’ve deployed to production. Technology itself is never the solution. It’s a tool to deliver business value.

Data Minded Values

The goal of this series is to build a streaming data pipeline in the cloud, step by step.

Getting started

What do you need? First, you need to have the following things installed:

You also need to have an AWS account. All the necessary links are provided in this article.

You will need to have already a PySpark application developed, that is not covered in this article. Otherwise, you can use my spark-hello-world repository as base to test this deployment:

Pack your dependencies

If you need any special library to be installed for your code to work when running the spark job, this is an essential step. Otherwise the job will fail. In the spark-hello-world repository I shared earlier, there’s no dependencies in the requirements.txt file besides PySpark, anyways I’ll go through the process.

The first thing I like to do is to create a new folder at the root of my repositories called dependencies for better organisation. Inside this folder we’ll place a Dockerfile that will export your virtual environment compressed in a .tar.gz file to your local filesystem. The Dockerfile you need (took it from aws-samples GitHub account, source) :

FROM --platform=linux/amd64 amazonlinux:2 AS base

RUN yum install -y python3

ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"

RUN python3 -m pip install --upgrade pip && \
python3 -m pip install \
python3 -m pip install -r requirements.txt \
venv-pack==0.2.0

RUN mkdir /output && venv-pack -o /output/virtualenv.tar.gz

FROM scratch AS export
COPY --from=base /output/virtualenv.tar.gz /

Note: If using Docker on Apple Silicon ensure you use --platform linux/amd64

Now, we can build the virtualenv archive:

cd dependencies
# Enable BuildKit backend
DOCKER_BUILDKIT=1 docker build --output . .

You will see the output file under the dependencies directory.

Main Python file

We will need to modify a little bit our python file since we’re going to output the result files of our spark job in an S3 bucket.

Basically, you need theif __name__ == "__main__" entry point and to check the arguments because we’ll be passing the S3 output path as an argument when running the Spark job in our Amazon EMR Cluster. Our main.py file from the spark-hello-world repository should be modified to look like this:

import findspark

findspark.init()

from pyspark.sql import SparkSession


def main():
nums = [1, 2, 3, 4]
return nums.map(lambda x: x * x).collect()


if __name__ == "__main__":
"""
Usage: main <s3_output_path>
"""
# Create the SparkSession
spark = SparkSession.builder.appName("Demo").getOrCreate()

if len(sys.argv) != 2:
print("Invalid arguments, please supply <s3_output_path>")
sys.exit(1)

# The output path is passed as an argument
output_path = sys.argv[1]

output = main()

# Write results to S3
spark.sparkContext.parallelize(output).saveAsTextFile(
output_path
)

Managing your infrastructure with Terraform

Now that we have our code ready it’s time to create the necessary resources in AWS.

We’ll put all our terraform files under a folder called terraform for better organisation.

config.tf

terraform {
required_providers {
aws = {
source = "hashicorp/aws"
version = "~> 5.0"
}
}

# Terraform Cloud configuration for GitHub Actions
# cloud {
# organization = "escobarana"
#
# workspaces {
# name = "gh-actions-demo"
# }
# }
}

# Configure the AWS Provider
provider "aws" {
region = var.region
}

# Another option, but aws cli is recommended (aws configure, to configure your secrets)
# export AWS_ACCESS_KEY_ID=your_access_key
# export AWS_SECRET_ACCESS_KEY=your_access_secret
# export AWS_REGION=eu-west-1

variables.tf

# This variable defines the AWS Region.
variable "region" {
description = "region to use for AWS resources"
type = string
default = "eu-west-1"
}

data.tf

# Get the AccountId
data "aws_caller_identity" "current" {}

data "aws_availability_zones" "available" {}

main.tf

locals {
name = "emr-serverless-spark"
region = var.region

vpc_cidr = "10.0.0.0/16"
azs = slice(data.aws_availability_zones.available.names, 0, 3)
}

################################################################################
# Cluster
################################################################################
module "emr_serverless_spark" {
source = "terraform-aws-modules/emr/aws//modules/serverless"
version = "1.1.2"

name = "${local.name}-demo"

release_label_prefix = "emr-6"

initial_capacity = {
driver = {
initial_capacity_type = "Driver"

initial_capacity_config = {
worker_count = 2
worker_configuration = {
cpu = "2 vCPU"
memory = "10 GB"
}
}
}

executor = {
initial_capacity_type = "Executor"

initial_capacity_config = {
worker_count = 2
worker_configuration = {
cpu = "2 vCPU"
disk = "32 GB"
memory = "10 GB"
}
}
}
}

maximum_capacity = {
cpu = "12 vCPU"
memory = "100 GB"
}
}

################################################################################
# Supporting Resources
################################################################################

resource "aws_s3_bucket" "demo" {
bucket = "demo-bucket"
}

resource "aws_s3_object" "artifacts" {
bucket = aws_s3_bucket.dsti-demo.id
key = "artifacts/"
}

resource "aws_s3_object" "code" {
bucket = aws_s3_bucket.dsti-demo.id
key = "code/"
}

outputs.tf

################################################################################
# Spark
################################################################################

output "spark_arn" {
description = "Amazon Resource Name (ARN) of the application"
value = module.emr_serverless_spark.arn
}

output "spark_id" {
description = "ID of the application"
value = module.emr_serverless_spark.id
}

What these terraform files are doing is using the AWS official provider, creating an EMR Serverless application and EMR Serverles Cluster for Spark, creating an S3 Bucket with two folders (artifact — to store the compressed virtualenv, code — to store the main.py file) and output the EMR ARN and ID upon creation.

Let’s create these 4 resources in our AWS account:

cd terraform

aws configure # configure your AWS credentials

terraform init
terraform plan # this is optional, it will show you what will be deployed - check that 4 resources will be created
terraform apply

We’ll see the following output:

Apply complete! Resources: 4 added, 0 changed, 0 destroyed.

spark_arn = "arn:aws:emr-serverless:eu-west-1:xx:/applications/xx"
spark_id = "xx"

Now, check on your AWS Account that everything has been created. You should see two folders, artifacts and code under your S3 bucket. Also, you should see a new EMR Serverless Application called spark-emr-serverless and a new EMR Serverless Cluster called spark-emr-serverless-cluster.

You have to upload the virtualenv.tar.gz file to the artifacts folder in your S3 bucket and your python file to the code folder. You can do this manually or using the aws cli.

aws s3 cp virtualenv.tar.gz s3://<your-bucket-name>/artifacts/virtualenv.tar.gz
aws s3 cp main.py s3://<your-bucket-name>/code/main.py

Running the Spark job

Run the Spark Job by going to Amazon EMR > EMR Serverless (new) and click on Get Started, make sure you're in the right AWS region. Click on Applications under Serverless and Submit job.

Name: The name you want to give to your Spark Job

Runtime role: Select/create one with the default settings

Script location: s3://<your-bucket-name>/code/main.py

Script arguments: s3://<your-bucket-name>/output/ (this is the output folder where the results will be stored)

Spark properties (edit as text):

- conf spark.driver.cores=1 
- conf spark.driver.memory=2g
- conf spark.executor.cores=4
- conf spark.executor.memory=4g
- conf spark.executor.instances=2
- conf spark.archives=s3:///artifacts/virtualenv.tar.gz#environment
- conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python
- conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python
- conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python
- conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory

Finally, click on Submit job and wait for the results. You can check the logs by clicking on the Job ID and then on Logs.

Check the output folder in your S3 bucket to see the results.

You can also copy the output locally to view the stdout:

aws s3 cp s3://<your-bucket-name>/logs/applications/<your-application-id>/jobs/<your-job-id>/SPARK_DRIVER/stdout.gz - | gunzip 

Next steps

You could use GitHub Actions in combination with Terraform Cloud to automate your workflow, so that in every push to main terraform checks if your infrastructure has any changes and applies them automatically without you having to type terraform apply in your terminal and approving the changes.

Do you like this content?

Subscribe to my medium page and be the first to get notified whenever I publish a new one!

Follow me on LinkedIn for daily insights about Software & Data Engineering 🫰🏻

--

--

Ana Escobar
datamindedbe

Galician 🖖🏼 | Data enthusiast, passionate about Event-Streaming platforms | Software Engineer in the Cloud Infra at Tinder | ana-escobar.com