Spark Essentials: A Guide to Setting Up, Packaging, and Running PySpark Projects

Suffyan Asad
21 min readDec 30, 2023

--

Introduction

In my recent experiences at work, I’ve encountered a multiple questions regarding setting-up Spark projects. These inquiries span from initiating new projects to checking out and setting up existing codebases in both PySpark and Scala environments. Based on challenges and questions that have repeatedly come up, I’ve decided to share my insights more broadly. This article is crafted as a guide, aimed at explaining the process of initiating PySpark projects and preparing them for submission to Spark clusters.

Whether you are a beginner just stepping into the world of big data, or a seasoned developer, this guide is tailored for you. We’ll delve into the practical aspects of setting up Spark projects using PySpark, Spark’s Python API. I expect that this will be a helpful reference that you or your colleagues can refer to when setting up codebase of Spark projects and getting ready to work.

Here’s what we’ll cover:

  1. Setting Up PySpark Projects: Learn the essentials of setting up a PySpark project using venv, complete with instructions for both command line and PyCharm setups. This will ensure you have a well-structured and isolated environment for your Python-based Spark applications.
  2. Packaging and Running Projects: This section will walk you through the process of packaging your projects and running them on a Spark cluster, preparing you for real-world Spark deployment.

By the end of this article, you should have an understanding of the process of setting up PySpark projects, running them locally, packaging, and running them on Spark clusters, equipping you with the necessary skills to tackle big data challenges with Apache Spark.

Note: Please note that the commands and instructions in this article are for Linux and MacOS, and anyone following them on a computer running Microsoft Windows would need to adapt multiple steps, specially the commands that run on the terminal. But in such cases, I request the readers to still follow along, as the concepts presented are very useful.

So lets begin…

Setting up and running a simple PySpark job

This section covers setting up and running a PySpark project, and will cover:

  • Using venv to create a virtual environment
  • Installing pyspark and other required libraries using a requirements.txt file
  • Creating the Spark session for both local testing and for running on the cluster
  • Packaging the job and submitting to a test cluster, along with packaged code files and dependent Python packages

This article covers performing each step from the command line, and is designed to be IDE agnostic, therefore, the steps are generic and can be used in any IDE. So lets begin, first, setting up Virtual Environment and installing PySpark:

Setting up, creating Virtual Environment and installing PySpark

First, ensure that your system has Python 3.9 installed. To check, run the python --version command:

Python version is 3.9.13

If not, please download and install Python 3.9:

In this tutorial, we use Python 3.9, but it’s important to note that PySpark can be compatible with other Python versions. If you’re using a different version of Python, you might need to adjust some steps accordingly. It’s always a good practice to use a Python version that matches the compatibility requirements of the Spark version you’re working with. For more details, please refer to the PySpark documentation here:

Lets begin, first open up the terminal, and create a new folder, lets call it pyspark_test_project:

Create the project folder

Next step is to set-up a virtual environment for the project . Setting up a virtual environment is not a necessity, but I prefer to create one in each of my projects as it allows using different versions of Python and packages with convenience.

This article uses venv for creating the virtual environment as mentioned before. The following commands create a virtual environment and activate it:

Create a virtual environment and activate it

The aforementioned commands creates a virtual environment in the directory venv and activate it.

You can confirm that Python is now running from the virtual environment by running the command which python, which should point to the virtual environment folder. Additionally, all packages will be installed within the virtual environment:

Activate the virtual environment, python is now running from the virtual environment

Note: To deactivate the virtual environment, run deactivate. If the virtual environment gets deactivated, for example, by closing the terminal, it can be reactivated.

For more information about virtual environments and venv, you can refer to the following documentation:

Next step is to install PySpark, and the latest version at the time of writing this article is 3.5.0. While you can use pip install to install PySpark, we'll use a requirements.txt file to keep a list of installed packages for the project. Run the following commands to put PySpark 3.5.0 in the requirements.txt file and then install it:

Commands to create a requirements.txt file and install the required packages

To further study, visit the following link to the documentation of Requirements files:

Now, we can start writing some code! Create two python files, job.py containing an example Spark job, and common.py containing the code to create a Spark session. The example job uses TLC Trip record data for September 2023, and it can be downloaded from:

Another folder called data is also needed to put the data. Create them using the following commands:

Commands to create the empty Python files and data folder

Then download the trips data into the data folder. Next, put the following code in common.py:

Code in common.py to create the Spark session

The Spark session has been configured to use 8GB of RAM, and 4 cores. This can be modified according to the specs of your system.

And the code of the job in the job.py file is:

job.py — Simple Spark job

The example Spark job reads the trip data, repartitions it in 4 partitions, aggregates it by pickup location, and calculates the average tip amount per pickup location. It then filters the aggregated data to include only the locations with more than 20 trips.

The job then shows the top 10 pickup locations with highest average tip amount, and finally, it writes the data to the output folder as CSV files.

To run the job, execute the python job.py command. Make sure the virtual environment is active before executing the command to run the job. The above command runs the job, and the following results and logs are outputted on the terminal:

Logs and output of the example job

Also, the output of the job is saved in the output folder, with a _SUCCESS file and a few .csv files containing the result. The presence of the _SUCCESS file is the indication that the Spark job writing the output ended successfully. To run the job again, either delete the output folder, or change the location where the data is outputted.

Now, a question might come up in your mind, the job was not submitted to any Spark cluster, not even a cluster running locally, so where did it run? This job was executed on Spark in Local Mode. In local mode, the job is not submitted to a cluster, rather, the local hardware (i.e. the computer this job is being executed on) runs the driver code as well as the worker tasks. The local mode is useful for testing and debugging Spark jobs.

We specifically set-up local mode while creating the Spark session. The code .master("local[4]") in the creation of Spark Session creates a local session with 4 cores. This local session is created when the Spark session is created in the main function. Spark starts up in local mode, runs the job, and then terminates. Lets add a wait at the end of the main function, and examine the local mode environment.

To do that, put import time at the start of the job.py file, and time.sleep(100000000) at the end of the main function. Then, delete the output folder, and run the job again. The job will wait before terminating, and therefore, the local Spark environment created to execute the job will not terminate. Open a browser, and visit http://localhost:4040 to access the Spark UI:

When the Spark UI opens, click on Environment tab. It mentions that the master is local[4], and the driver memory is 8GB, the same as configured in the Spark session.

Environment in local mode

Next, click on executors tab in the top menu:

Executors tab: all the tasks in local mode run on the driver

In local mode, all the tasks run on the driver, which has 4 cores as was configured earlier. In addition to the local mode, Spark applications can also be run on Spark clusters in Client or Cluster modes.

Packaging and Executing on a Spark cluster

Next step is to prepare the application to execute on a Spark cluster. The following steps will be required to prepare the job and to run it on a cluster. These steps are quite generic for PySpark jobs, whether they are to be submitted to self-managed clusters, or to managed clusters on one of the public clouds such as for Amazon Elastic MapReduce (EMR). Some additional steps maybe required for public clouds which can be determined from their documentation. The steps are:

  • Update the code that creates Spark Session to exclude local as master.
  • Prepare two artifacts: the Python file containing the main function and the driver code that acts as the entry-point of the job, and a package (.zip in this case) containing other Python files.
  • Submit the job to the test cluster using the spark-submit command.

A note about the first point (updating the creation of Spark Session): In real projects, the best practice is to write unit-tests to test the code, and in that case, a local session should be created to run the unit-tests, whereas the main code should only create Spark Session for the environment the job is destined for.

This method is one of the possible ways to package the code for submitting to Spark clusters, and there are other options available as well. A major factor in determining the packaging mechanism is the delivery of packages that the code depends on. Depending on how the dependent packages are used, they maybe required on the master node only, or on all the nodes.

For example, if a Python package is being used in a User Defined Function (UDF), it should be available to all the executors, and therefore, be installed on all the nodes. Also, the type of cluster that the job executes on is an important factor to consider.

Generally:

  • For jobs submitted to short-lived (ephemeral) clusters that are spun up on for particular jobs or sets of related jobs, it is usually appropriate to install the required libraries when spinning up the cluster. Spark distributions provided by public clouds mostly provide a mechanism to run scripts, that can, in addition to other things, install the required packages. An example of such mechanism is Bootstrap Actions in Amazon Elastic MapReduce (EMR). This is becoming a common mechanism due to the popularity of ephemeral clusters on public clouds due to cost-effectiveness, and long-running clusters are only spun up for streaming use-cases or if absolutely necessary.
  • In a shared cluster environment, directly installing dependencies on nodes might not be feasible due to potential conflicts with the dependencies of other jobs. In such cases, bundling dependencies with each job ensures they are accessible when needed.

If require packages are needed to be submitted with the job, the method being discussed is not ideal, as packaging in a .zip file does not support including packages built as Wheel. The other packaging options available are:

  • Packaging using Conda (conda-pack)
  • Packaging using Virtualenv
  • Packaging using PEX

These mechanisms have been covered in detail in Spark documentation:

Setting up a locally running cluster for testing

Before preparing the job, lets create a Spark cluster to run it on. For this, I’ll use the excellent example of setting up a 3-node Spark cluster using Docker and docker-compose by Marco Villarreal. I have forked and modified the scripts to run Spark 3.5.0 for this article. It can be obtained from:

The branch is adapt-for-spark-3.5.0.

Prerequisites:

Then execute the following commands to checkout the code, and set-up the Spark cluster:

Commands to start the Spark cluster

The above commands start a 3-node Spark cluster, 2 workers and 1 master node, with each worker node having 2 cores and 2GB RAM:

Viewing the spark cluster in Docker Desktop

The Spark cluster status and node details can also be viewed by accessing http://localhost:9000 in a browser.

Spark cluster status and resources

In addition to the Spark nodes, the code mounts two folders:

  • apps folder in the code repository is mounted to the /opt/spark-apps folder in the master node
  • data folder in the code repository is mounted to the /opt/spark-data folder in the master node

Using these volume mounts, code and data can be put on the cluster.

Link to original blog post:

Link to original code repository:

Preparing, packaging, and executing the job on the test cluster

As mentioned before, the master has been set to local[*] for local testing while creating a Spark session, this needs to change. In fact, a master will not be supplied, as it’ll be provided with the spark-submit command, and will be picked up from there. This is in accordance with the documentation here:

master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode. In practice, when running on a cluster, you will not want to hardcode master in the program, but rather launch the application with spark-submit and receive it there. However, for local testing and unit tests, you can pass “local” to run Spark in-process.

Link:

Lets modify the the code that creates the Spark session, i.e. in common.py file:

Spark session creation with an additional parameter to support local mode when needed

When not running in local mode, there is no master specified, as it is no longer required. In fact, it is an optional part of the spark-submit syntax.

The job.py file also requires a minor update: set the path of data to /opt/spark-data/*.parquet as this is the path where we’ll copy the data to process. Additionally, the wait (time.sleep) at the end has also been removed. The code looks like:

Updated job.py code

Now, both files are ready. In this example deployment, the structure will be:

  • job.py file is the main file, and will be submitted to the cluster as-is.
  • The other file (common.py) will be put in a zip file called job_zip.zip file, and will also be submitted with the job. This is an example of how related files in a project can be packaged and submitted to the spark-submit command.
  • Copy the data and the files to the Spark cluster set-up earlier using docker-compose
  • Run the job using the spark-submit command

So, lets package the files. Navigate to the folder containing the code, and then run the following commands. Before running them, make sure ZIP is installed, and the cluster is running. Additionally, replace <path to data> and <path to docker-spark-cluster folder> with paths on your system:

Package the code, and copy data and job package to the cluster

With the code and data available to the cluster, running the job requires to connect to the master node, and running the spark-submit command. To connect to the spark master, run the following command:

Command to connect to the spark master

Next, run spark-submit:

spark-submit command to submit the job to the Spark cluster

Remember to update the <master URL> placeholder in the command with the URL of Spark master on your system. It can be determined by visiting localhost:9000 on the browser to view the Spark environment details:

Spark master UI

Note: Please note that the spark master URL might change if the docker-compose cluster is stopped and restarted, so in that case, make sure to get the latest Spark URL.

After the job is complete, it prints out the output as well as Spark logs:

Spark logs of the job

Also, the output is written to the output folder with current timestamp in the data folder:

Output data after the job finishes

This is a simple example of a Spark job that is packaged for execution on a Spark cluster. These steps are generic, and the process may vary slightly depending on the specific target Spark cluster, such as Amazon EMR. I hope this provides a good overview of the steps required for packaging and running PySpark jobs on production clusters.

A little complex example: Packaging a job with dependent libraries

In addition to Python files, your PySpark job may require additional Python packages not present on the Spark cluster. Let’s enhance the previous job example by incorporating packages like Pandas, Numpy, and Apache Arrow and examine how to package these dependencies with the PySpark job. These packages are not present on the Spark cluster used for testing:

Numpy and Pandas libraries are not present on the cluster

PySpark job to calculate average trip duration for each vendor

So, create two new files in the pyspark_test_project directory: second_job.py and analysis.py. The file second_job.py contains the main function. Its code is:

Code in second_job.py — second analysis job

In the analysis.py file, the code uses a Pandas UDF to compute the trip duration, and aggregates to calculate the average duration per vendor (VendorID column). The code is:

Analysis code in analysis.py

The job now uses Pandas UDF, which require PyArrow and Pandas libraries, so requirements.txt will be updated as:

Updated requirements.txt file with additional dependencies

And Spark session creation code also needs to be updated to enable Pandas UDFs. Update the create_spark_session function in common.py as:

Updated create_spark_session function in common.py

The job gives the following output, and writes the same in the output file:

Console output of the job

Packaging code files and dependencies for execution on test cluster

Next step is to prepare the job for execution on the test cluster. This job, in addition to two python files — analysis.py and common.py, requires Numpy, Pandas, and Apache Arrow as dependencies. These dependencies are not present on the Spark cluster, and they have to be submitted with the Spark job. If only the code is packaged and submitted, the job fails with package not found exceptions. For this demonstration, the following items are created in the packaging process of the job:

  • second_job.py file: The main file of the second analysis job.
  • analysis_job.zip package: This ZIP package contains the common.py and analysis.py files. Its preparation is done using the same method as demonstrated in the prior example. These are provided to the cluster by using the —py-files argument.
  • job_dependencies.tar.gz package: The package containing the dependent libraries, and also contains Python from the virtual environment. The compilation of this package will be done using the venv-pack Python package, which packs the current state of the virtual environment i.e. packages all the libraries installed in the virtual environment. The package is then copied to the cluster, and is passed to the executors using the --archives flag. For providing the dependencies to the driver, and for making Spark use Python from the package, it needs to be unpacked on the master node. To ensure that Spark uses Python in the package, the PYSPARK_PYTHON flag needs to be set to Python in the package folder that has just been unpacked.

The job_dependencies.tar.gz package needs to be prepared on a system that is running the same Operating System as the one running on the Spark cluster’s master node, and should be as close to it as possible. Ideally, this can be done by compiling the package on the master node itself, but depending on access policies of organizations, it may not be possible.

In this article, we’ll use a Docker container running the same OS: amazoncorretto:8u392 as the Spark nodes, to package the dependencies. The code files in analysis_job.zip can be packaged on any system. If job dependencies are not packaged on a compatible environment, Python itself or dependent libraries may not execute on the cluster. On my system running MacOS 14.2.1, if the dependencies are packaged, the job fails on the cluster with the error that Python binary cannot be executed.

The compilation of this package is according to the process defined for packaging the dependencies using Virtual environment in the Spark documentation. The documentation explains this process, as well as alternative approaches in detail. Link is:

The process can be summarized visually as the following diagram:

Packaging and deployment process diagram

Packaging dependencies in a ZIP package in a similar was as packaging the dependent code files was not possible because packages that are built as wheels cannot be packaged in a ZIP. This is also mentioned in the documentation linked above:

However, it does not allow to add packages built as Wheels and therefore does not allow to include dependencies with native code.

If attempted to submit Numpy and Pandas packaged in a ZIP, the job fails with the exception Error importing numpy: you should not try to import numpy from its source directory.

numpy exception when dependencies are provided in a ZIP package

In addition to using venv-pack, the dependencies can also be packaged using Conda or PEX, and as ZIP as mentioned earlier.

To create the package of libraries, an environment similar to the Spark master node is needed. A Docker container running the same OS can be used for this purpose. For this, add a folder in the project called build_image, and add the following Dockerfile to it:

Dockerfile to create docker container for preparing the libraries package

Additionally, create a folder called package_env in the project folder. This will be mounted as volume to the docker container to put the requirements.txt file in the container, and to retrieve the package. The following commands will do it:

Commands to create the package_env folder and to copy requirements.txt file into it

Next, build the docker image, and run it in interactive mode. The commands need to be executed from the project folder pyspark_test_project:

Commands to build the docker image and run docker container in interactive mode to build the dependencies package

And then, on the docker container, run the following commands:

Commands to create the package

The above commands initialize a virtual environment with Python 3.9. The --copies flag ensures that Python is copied into the virtual environment folder venv. Then the libraries are installed using the provided requirements.txt file. Finally, the package job_dependencies.tar.gz is created using venv-pack.

After executing the commands, the package is created in the package_env folder.

Libraries package

This package needs to be copied to the Spark cluster, to the same shared folder that was used earlier in the first example. But before that, lets create another package containing common.py and analysis.py files. Run the following commands on the terminal after exiting from the docker container. Use the exit command to exit the interactive mode and to shut-down the container.

The commands are:

Commands to package common.py and analysis.py files into analysis_job_zip.zip

Finally, the 3 items need to be copied to the Spark cluster, to the apps folder in the docker-spark-cluster folder that has been mounted as a volume on the Spark master node. Run the following commands:

Commands to copy the files and packages to the Spark master node

Next steps are:

  • Connect to the Spark master node in interactive mode
  • Create a folder called environment
  • Unpack the job_dependencies.tar.gz to the environment folder
  • Point Spark to use Python in the environment folder
  • Execute the job

The following commands perform these steps. Make sure that the cluster is running

Commands to execute the job

The job appears in the Spark UI at http://localhost:9000, and job progress can be tracked in the Spark UI at http://localhost:4040. The job takes a few minutes to complete.

Job submitted, visible in Spark UI (http://localhost:9000)
Job progress in Spark UI (http://localhost:4040)
Completed job output

This completes this demonstration of packaging a job with its relevant code files, and required dependencies so that the dependencies provided with the job are used, and they do not have to be installed on the Spark cluster nodes.

In real projects, the process of packaging and deployment of the job to the cluster is managed by a CI/CD pipeline, and the job is usually scheduled and executed using an orchestrator such as Apache Airflow as part of a bigger pipeline. The above examples have been written for beginners, and should provide a good reference of the overall process, however, the steps may vary slightly depending on the project, the nature of the target Spark cluster, tech stack, and many other things.

Code repository

The code is simple, and the scope of the article is on the process of packaging and executing the Spark job. The code repository containing the example code files is:

References for further reading

Below are some good references I have used in the preparation of this article. They provide much more information about the process than what has been covered here:

Some notes on using PyCharm IDE

This article is designed to be IDE agnostic, and therefore, the examples demonstrated use command line and text editors. However, using a IDE such as PyCharm can simplify things such as local execution and testing, adding unit tests, Virtual environment creation and management etc. I’ll demonstrate the process of creating an empty project and setting up the Virtual environment using venv in PyCharm Community Edition. The community edition is free and It can be downloaded from:

First, Open PyCharm, and select New Project. From the window that opens, select Pure Python, and name it pyspark_test_project_pycharm. In Interpreter Type, select custom environment, and select to create a Virtualenv using Python 3.9. Finally, click Create to create the project.

New Python project in PyCharm CE

This creates a new project with Virtual Environment set-up.

New project created

Next, add the requirements.txt file, common.py file and job.py file. Copy the code from the first section of the article for local testing. Then install the requirements.

A pop-up usually shows up at the top of the code editor window if some or all packages from requirements.txt file are not installed. Click to install PySpark.

Install PySpark from requirements.txt file

Now the code is ready to run locally. Next, on the top right of the window, click the Current File dropdown next to the green Run button:

Creating a run configuration

This opens up the run configuration window to configure the local run. Create a new Python run configuration, and give it any name of your preference:

Creating a Python run configuration

Next, point to job.py file in the script to run:

Creating a run configuration — selecting the Python file to run

Then, save the configuration by clicking OK. Next, with the new configuration selected, click the run button on the top right to run the code locally:

Running the job in PyCharm

This executes the job in PyCharm:

Job executed

Using PyCharm as an IDE for developing and testing Spark applications can greatly simplify the process. PyCharm provides features such as local execution and testing, unit testing, and virtual environment management, which can streamline the development workflow. Additionally, PyCharm provides a convenient way to configure and run your Spark job locally. You can create a run configuration and specify the main Python file to execute. This eliminates the need for manual command-line execution and simplifies the process of running and debugging your Spark application.

When it comes to packaging the Spark application for deployment the same process can be followed as demonstrated earlier. You can use tools like venv-pack, Conda, or PEX to package your code and dependencies into a deployable artifact. This can be done in the Terminal in PyCharm, which also automatically activates the virtual environment present in the project.

Conclusion and Key Takeaways

This article covers the steps of setting up, packaging, and executing PySpark jobs on Spark clusters. Key highlights include:

  • Virtual Environment Setup: Using venv for an isolated Python environment and installing PySpark and other dependencies with a requirements.txt file.
  • Local execution for testing: Executing job for locally for testing and debugging.
  • Python File Packaging: Packaging main Python files and additional modules into a .zip file for cluster submission.
  • Handling Dependencies: Methods for including external Python packages necessary for the job, such as Pandas, Numpy, and Apache Arrow. The article covers packaging dependencies using venv-pack in the code examples, and provides links to Spark documentation for alternative packaging mechanisms including Conda, PEX, and Zip.
  • Execution on Spark Clusters: Covered the process of submitting jobs to Spark clusters, emphasizing modifications needed for different types of clusters.

In conclusion, this article serves as a guide for beginners on setting up, packaging, and executing PySpark jobs on Spark clusters. The step-by-step instructions and code examples have been included to make it a reference for anyone looking to understand the process. Please provide feedback in comments.

--

--

Suffyan Asad

Data Engineer | Passionate about data processing at scale | Fulbright and George Washington University alum | https://pk.linkedin.com/in/suffyan-asad-421711126