Spark submit with PySpark and AWS EMR Serverless 6.9.0

Minh DOAN
9 min readNov 2, 2023

--

Source : https://www.linkedin.com/posts/lucazanna_pyspark-sql-python-activity-6988055050874929153-ZsMs

Hello, I am writing this because I know that there are many Data Engineers out there who are struggling to run a Pyspark script in a managed cloud cluster like AWS EMR serverless. No more words, let’s get straight to the point.

Development environment

  • Python 3.9.12, I recommend using pyenv to manage your python environments : https://realpython.com/intro-to-pyenv/.
  • A nice IDE (I mean Pycharm).
  • Poetry to manage your python dependencies: https://python-poetry.org/docs/.

Set things up

Initialise your project with Poetry. These are the dependencies that I used in one of my project:

[tool.poetry.dependencies]
python = "3.9.12"
requests = "^2.28.0"
findspark = "^2.0.1"
jinja2 = "^3.1.2"
pyyaml = "^6.0"
venv-pack = "^0.2.0"
pyspark = "3.3.0"
boto3 = "^1.26.46"
mock = "^5.0.1"
pytest = "^7.3.0"
pytest-spark = "^0.6.0"
moto = "^4.1.7"

For EMR Serverless version 6.9.0, spark version 3.3.0 is required : EMR Serverless 6.9.0 — Amazon EMR

After launching poetry install, a Python virtual environment folder will be created (for example, .venv/). The next step is to add two additional jar files to the pyspark/jars/ directory located at .venv/lib/python3.9/site-packages/.

  1. aws-java-sdk-bundle-1.11.1026.jar
  2. hadoop-aws-3.3.2.jar

Finding the right combination of versions for these two files can be a frustrating and time-consuming process. I speak from experience — it took me a whole day to figure out which versions to use. However, I have a trick that can help you save time. Here’s what you need to do:

  • Now return to the maven page and search for hadoop-aws, then click on the 3.3.2 link.
  • Scroll down and look for “java”, you will see “aws-java-sdk-bundle”, The version next to it is the appropriate version to use, which in this case should be 1.11.1026.
  • Download these jars:

Project structure

I attempted to locate a relevant example from my projects that addresses a complex requirement. Specifically, the project involves a Spark job that reads data from an S3 bucket, performs Spark SQL processing, and writes the output to another S3 bucket. Additionally, the script allows for the inclusion of arguments.

Basically, the project’s structure could look like this:

├── CHANGELOG.md
├── Dockerfile
├── Makefile
├── README.md
├── some_modules_to_be_imported_in_your_pyspark_script
│ ├── __init__.py
│ ├── module
│ └── another_module
├── poetry.lock
├── pyproject.toml
├── pyspark_scripts
│ ├── __init__.py
│ ├── cluster_spark.py
│ └── local_spark.py

Your local_pyspark.py could look like this:

import os
import findspark
from some_modules_to_be_imported_in_your_pyspark_script.some_funcs import process
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
findspark.init()

def generate_spark_session():
# spark configuration
conf = (
SparkConf()
.set('spark.executor.extraJavaOptions', '-Dcom.amazonaws.services.s3.enableV4=true')
.set('spark.driver.extraJavaOptions', '-Dcom.amazonaws.services.s3.enableV4=true')
.set('spark.sql.debug.maxToStringFields', 1000)
.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
.setAppName('pyspark_aws')
.setMaster('local[*]')
)
sc = SparkContext(conf=conf)
sc.setSystemProperty('com.amazonaws.services.s3.enableV4', 'true')
print('modules imported')
access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set('fs.s3a.access.key', access_key_id)
hadoop_conf.set('fs.s3a.secret.key', secret_access_key)
hadoop_conf.set('fs.s3a.endpoint', 's3-eu-west-1.amazonaws.com')
hadoop_conf.set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
hadoop_conf.set('fs.s3a.multipart.size', '104857600')
return SparkSession(sc)

if __name__ == '__main__':
spark = generate_spark_session()
process(spark)

And some_modules_to_be_imported_in_your_pyspark_script.some_funcs.py could look like this:

import argparse
import boto3
from pyspark.sql import DataFrame, SparkSession
from some_modules_to_be_imported_in_your_pyspark_script.some_tools import (
get_s3_file_keys_by_months_range,
read_data_multiple_paths,
write_data,
)
from some_modules_to_be_imported_in_your_pyspark_script.some_environments import MONTHS_TO_LOAD_DATA, TEMP_VIEW

def process(spark: SparkSession, is_cluster_mode: bool = False):
args_parser = build_args_parser(spark, is_cluster_mode)
args = args_parser.parse_args()
if args.func:
fn = args.func
fn(args)
else:
args_parser.print_usage()

def get_sql_script(bucket: str, key: str):
sql_file = boto3.client("s3").get_object(Bucket=bucket, Key=key)
sql = sql_file['Body'].read().decode("utf-8")
return sql

def run_sql(spark: SparkSession, df: DataFrame, sql: str):
df.createOrReplaceTempView(TEMP_VIEW)
computed_df = spark.sql(sql)
return computed_df

def transform(args, spark_session: SparkSession, is_cluster_mode: bool = False):
paths = get_s3_file_keys_by_months_range(args.input_bucket, args.input_prefix, MONTHS_TO_LOAD_DATA, is_cluster_mode)
initial_df = read_data_multiple_paths(spark_session, paths)
sql = get_sql_script(args.sql_bucket, args.sql_key)
transformed_df = run_sql(spark_session, initial_df, sql)
write_data(transformed_df, args.output_bucket, args.output_prefix, is_cluster_mode)

def build_args_parser(spark_session: SparkSession, is_cluster_mode: bool = False):
args_parser = argparse.ArgumentParser()
args_parser.add_argument("--input-bucket")
args_parser.add_argument("--input-prefix")
args_parser.add_argument("--output-bucket")
args_parser.add_argument("--output-prefix")
args_parser.add_argument("--sql-bucket")
args_parser.add_argument("--sql-key")
args_parser.set_defaults(func=lambda args: transform(args, spark_session, is_cluster_mode))
return args_parser

I will not be displaying all the functions that I am using in this context. However, I believe that the main idea has been conveyed. If you have any queries or doubts, please feel free to leave a comment below.

Run/Debug your job locally

  • Ensure that your PyCharm project is configured to use the Python interpreter built with Poetry (the python path which is .venv/bin/python).
  • Add a debug/run configuration for your script local_spark.py. In the “parameters” field, fill with your script’s arguments, for example:
--input-bucket
"in_bucket"
--input-prefix
"in/prefix/2022/01/04/"
--output-bucket
"out_bucket"
--output-prefix
"out/prefix/2022/01/04/"
--sql-bucket
"sql_bucket"
--sql-key
"sql/wow.sql"
This is necessary

now run and check your results in s3.

Run your job with EMR Serverless

Prepare/upload to s3 your .venv (python virtual environment)

To ensure that the non-default Python libraries used in our project are available in EMR Serverless, we need to create a custom Python environment that can be used by our Spark job.

In order to achieve this, you need to install the necessary dependencies and package them into a tar.gz file within an Amazon Linux 2 Docker image. To simplify the deployment process when working with CI/CD, it is recommended to use a Makefile and a Dockerfile. The Makefile can help automate the build process, while the Dockerfile can specify the environment and dependencies needed to run your application.

Makefile

# Project settings
PACKAGE := my_spark
PACKAGES := $(PACKAGE) tests
MODULES := $(wildcard $(PACKAGE)/*.py)
ARTIFACTS_BUCKET := artifacts_bucket
STATUS := snapshot
# const
.DEFAULT_GOAL := help
FAILURES := .pytest_cache/v/cache/lastfailed
DIST_FILES := dist/*.tar.gz dist/*.whl
#GIT_COMMIT_SHA := $(shell git rev-parse HEAD)
IMAGE_NAME := my_image
GIT_COMMIT_SHA := $(shell git describe --always --long --tags)
AWS_ACCESS_KEY_ID := $(shell cat ~/.aws/credentials | grep -m1 aws_access_key_id | cut -d"=" -f2 | tr -d '[:space:]')
AWS_SECRET_ACCESS_KEY := $(shell cat ~/.aws/credentials | grep -m1 aws_secret_access_key | cut -d"=" -f2 | tr -d '[:space:]')
AWS_DEFAULT_REGION := $(shell cat ~/.aws/config | grep -m1 region | cut -d"=" -f2 | tr -d '[:space:]')
# Docker
#--------------------- Build ---------------------#

.PHONY: docker_build
docker_build:
docker build --rm \
--progress plain \
--target release \
--build-arg AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} \
--build-arg AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} \
--build-arg AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION} \
--build-arg USER_USERNAME=appuser \
--build-arg USER_UID=1000 \
--build-arg USER_HOME_DIR=/home/appuser \
--build-arg PYTHON_VERSION=3.9.12 \
--build-arg ARTIFACTS_BUCKET=${ARTIFACTS_BUCKET} \
--build-arg STATUS=${STATUS} \
--build-arg PACKAGE_NAME=${PACKAGE} \
-t ${IMAGE_NAME}:${GIT_COMMIT_SHA} \
-t ${IMAGE_NAME}:latest .

Dockerfile

FROM --platform=linux/amd64 amazonlinux:2 AS build
ARG PYTHON_VERSION
ARG AWS_ACCESS_KEY_ID
ARG AWS_SECRET_ACCESS_KEY
ARG AWS_DEFAULT_REGION

FROM --platform=linux/amd64 amazonlinux:2 AS release
ARG PYTHON_VERSION
ARG AWS_ACCESS_KEY_ID
ARG AWS_SECRET_ACCESS_KEY
ARG AWS_DEFAULT_REGION
# Install Python 3.9 - Note that python 3.10 requires OpenSSL >= 1.1.1
RUN yum install -y sudo util-linux gcc openssl11-devel bzip2-devel libffi-devel tar zip unzip gzip wget make && \
wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-${PYTHON_VERSION}.tgz && \
tar xzf Python-${PYTHON_VERSION}.tgz && \
cd Python-${PYTHON_VERSION} && \
./configure --enable-optimizations && \
make install
RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
unzip awscliv2.zip && \
sudo ./aws/install
# Create our virtual environment
# we need both --copies for python executables for cp for libraries
ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV --copies
RUN cp -r /usr/local/lib/python3.9/* $VIRTUAL_ENV/lib/python3.9/
# Ensure our python3 executable references the virtual environment
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
# Upgrade pip (good practice) and install venv-pack
# You can install additional packages here or copy requirements.txt
#COPY dist/requirements.txt requirements.txt
COPY pyproject.toml pyproject.toml
COPY poetry.lock poetry.lock
RUN python3 -m pip install --upgrade pip && \
python3 -m pip install poetry && \
poetry export --without-hashes -f requirements.txt > requirements.txt && \
sed -i '1d' requirements.txt && \
python3 -m pip install -r requirements.txt
# Package the env
# note you have to supply --python-prefix option to make sure python starts with the path where your copied libraries are present
RUN mkdir /output && \
venv-pack -o /output/pyspark_venv.tar.gz --python-prefix /home/hadoop/environment
RUN aws s3 cp pyspark_venv.tar.gz s3://${ARTIFACTS_BUCKET}/artefact/${STATUS}/${PACKAGE_NAME}/venv/pyspark_venv.tar.gz

When you execute the make docker_build command, it will generate a pyspark_venv.tar.gz file which will be stored in the S3 bucket at s3://${ARTIFACTS_BUCKET}/artefact/${STATUS}/${PACKAGE_NAME}/venv/pyspark_venv.tar.gz (Please note that the variables are defined in the Makefile).

Prepare/upload to s3 your external modules (some_modules_to_be_imported_in_your_pyspark_script)

In order for EMR Serverless to find and use external Python modules, we must upload a compressed file of these modules to S3, still work with Makefile:

# Other Makefile variables
.PHONY: upload_modules
upload_modules:
echo "[🕵️] Zipping package"
pip install zip-files # if not installed
export PATH="$PATH:/home/runner/.local/bin" # if using Gitlab CI
zip-files -o some_modules_to_be_imported_in_your_pyspark_script.zip some_modules_to_be_imported_in_your_pyspark_script
aws s3 cp some_modules_to_be_imported_in_your_pyspark_script.zip s3://${ARTIFACTS_BUCKET}/artefact/${STATUS}/${PACKAGE}/package/

Run make upload_modules .

Prepare/upload to s3 your pyspark script

Last thing to do is to upload your pyspark script to s3, you will need to use another script, for example cluster_spark.py, by unsetting “master=’local[*]’” (because we will run in mode cluster EMR)

# Other lines
conf = (
SparkConf()
.set('spark.executor.extraJavaOptions', '-Dcom.amazonaws.services.s3.enableV4=true')
.set('spark.driver.extraJavaOptions', '-Dcom.amazonaws.services.s3.enableV4=true')
.set('spark.sql.debug.maxToStringFields', 1000)
.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
.setAppName('pyspark_aws')
# .setMaster('local[*]') # remove this
)
# Still other lines

in the Makefile:

# Other Makefile variables
.PHONY: upload_scripts
upload_scripts:
aws s3 cp pyspark_scripts/cluster_spark.py s3://${ARTIFACTS_BUCKET}/artefact/${STATUS}/${PACKAGE}/scripts/

Run make upload_scripts .

Create an EMR Serverless application

You have two options to create the necessary infrastructure for running your Spark job with AWS EMR Serverless: you can use the AWS console or use Terraform. If you prefer using Terraform, you can refer to this GitHub repository for an example: https://github.com/aws-samples/aws-emr-serverless-using-terraform.

Submit your job to EMR Serverless application

To interact with your EMR Serverless application, you have two options. You can use the AWS CLI or write a Python script similar to the one below. Note that your EMR Serverless application must be in the STARTED state, and make sure to replace <application_id> with your actual EMR Serverless application ID, which can be found in the AWS console for your application.

import boto3
client = boto3.client("emr-serverless")
response = client.start_job_run(
applicationId="<application_id>",
executionRoleArn="<role_arn>",
jobDriver={
"sparkSubmit": {
"sparkSubmitParameters": "--conf spark.submit.pyFiles=s3://artifacts_bucket/artefact/snapshot/" +
"my_spark/package/some_modules_to_be_imported_in_your_pyspark_script.zip " +
"--conf spark.archives=s3://artifacts_bucket/artefact/snapshot/" +
"my_spark/venv/pyspark_venv.tar.gz#environment " +
"--conf spark.emr-serverless.driverEnv.PYSPARK_DRIVER_PYTHON=./environment/bin/python3 " +
"--conf spark.emr-serverless.driverEnv.PYSPARK_PYTHON=./environment/bin/python3 " +
"--conf spark.emr-serverless.executorEnv.PYSPARK_PYTHON=./environment/bin/python3",
"entryPoint": "s3://artifacts_bucket/artefact/snapshot/my_spark/scripts/cluster_spark.py",
"entryPointArguments": ["--input-bucket", "in_bucket",
"--input-prefix", "in/prefix/2022/01/04/",
"--output-bucket", "out_bucket",
"--output-prefix", "out/prefix/2022/01/04/",
"--sql-bucket", "sql_bucket",
"--sql-key", "sql/wow.sql"]}
},
configurationOverrides={
"monitoringConfiguration": {
"s3MonitoringConfiguration": {"logUri": "s3://log_bucket/logs/"}
}
},
)
# Note that application must be in `STARTED` state.

Note that you can also test your environment by running a spark-submit command locally on your machine. You can upload your code and input data to S3, and then run the spark-submit command with the appropriate arguments to see if everything works as expected before submitting a job to the EMR Serverless engine.

One approach is to download all the files from S3 to your workspace and then execute the following command.

spark-submit --master local --deploy-mode client --archives pyspark_venv.tar.gz --py-files some_modules_to_be_imported_in_your_pyspark_script.zip pyspark_scripts/local_spark.py --input-bucket "in_bucket" --input-prefix "in/prefix/2022/01/04/" --output-bucket "out_bucket" --output-prefix "out/prefix/2022/01/04/" --sql-bucket "sql_bucket" --sql-key "sql/wow.sql"

Conclusion

Although I recognise that there may be more optimised methods available, the approach that I will be sharing in this article is the one that worked for me, and I hope that it will save you time and effort in your own work. Additionally, in future articles, I will demonstrate how to schedule our job run with Airflow.

About me

--

--