Spark Essentials: A Guide to Setting Up, Packaging, and Running PySpark Projects
Introduction
In my recent experiences at work, I’ve encountered a multiple questions regarding setting-up Spark projects. These inquiries span from initiating new projects to checking out and setting up existing codebases in both PySpark and Scala environments. Based on challenges and questions that have repeatedly come up, I’ve decided to share my insights more broadly. This article is crafted as a guide, aimed at explaining the process of initiating PySpark projects and preparing them for submission to Spark clusters.
Whether you are a beginner just stepping into the world of big data, or a seasoned developer, this guide is tailored for you. We’ll delve into the practical aspects of setting up Spark projects using PySpark, Spark’s Python API. I expect that this will be a helpful reference that you or your colleagues can refer to when setting up codebase of Spark projects and getting ready to work.
Here’s what we’ll cover:
- Setting Up PySpark Projects: Learn the essentials of setting up a PySpark project using
venv
, complete with instructions for both command line and PyCharm setups. This will ensure you have a well-structured and isolated environment for your Python-based Spark applications. - Packaging and Running Projects: This section will walk you through the process of packaging your projects and running them on a Spark cluster, preparing you for real-world Spark deployment.
By the end of this article, you should have an understanding of the process of setting up PySpark projects, running them locally, packaging, and running them on Spark clusters, equipping you with the necessary skills to tackle big data challenges with Apache Spark.
Note: Please note that the commands and instructions in this article are for Linux and MacOS, and anyone following them on a computer running Microsoft Windows would need to adapt multiple steps, specially the commands that run on the terminal. But in such cases, I request the readers to still follow along, as the concepts presented are very useful.
So lets begin…
Setting up and running a simple PySpark job
This section covers setting up and running a PySpark project, and will cover:
- Using
venv
to create a virtual environment - Installing
pyspark
and other required libraries using arequirements.txt
file - Creating the Spark session for both local testing and for running on the cluster
- Packaging the job and submitting to a test cluster, along with packaged code files and dependent Python packages
This article covers performing each step from the command line, and is designed to be IDE agnostic, therefore, the steps are generic and can be used in any IDE. So lets begin, first, setting up Virtual Environment and installing PySpark:
Setting up, creating Virtual Environment and installing PySpark
First, ensure that your system has Python 3.9 installed. To check, run the python --version
command:
If not, please download and install Python 3.9:
In this tutorial, we use Python 3.9, but it’s important to note that PySpark can be compatible with other Python versions. If you’re using a different version of Python, you might need to adjust some steps accordingly. It’s always a good practice to use a Python version that matches the compatibility requirements of the Spark version you’re working with. For more details, please refer to the PySpark documentation here:
Lets begin, first open up the terminal, and create a new folder, lets call it pyspark_test_project
:
Next step is to set-up a virtual environment for the project . Setting up a virtual environment is not a necessity, but I prefer to create one in each of my projects as it allows using different versions of Python and packages with convenience.
This article uses venv
for creating the virtual environment as mentioned before. The following commands create a virtual environment and activate it:
The aforementioned commands creates a virtual environment in the directory venv
and activate it.
You can confirm that Python is now running from the virtual environment by running the command which python
, which should point to the virtual environment folder. Additionally, all packages will be installed within the virtual environment:
Note: To deactivate the virtual environment, run deactivate
. If the virtual environment gets deactivated, for example, by closing the terminal, it can be reactivated.
For more information about virtual environments and venv
, you can refer to the following documentation:
Next step is to install PySpark, and the latest version at the time of writing this article is 3.5.0. While you can use pip install
to install PySpark, we'll use a requirements.txt
file to keep a list of installed packages for the project. Run the following commands to put PySpark 3.5.0 in the requirements.txt
file and then install it:
To further study, visit the following link to the documentation of Requirements files:
Now, we can start writing some code! Create two python files, job.py
containing an example Spark job, and common.py
containing the code to create a Spark session. The example job uses TLC Trip record data for September 2023, and it can be downloaded from:
Another folder called data
is also needed to put the data. Create them using the following commands:
Then download the trips data into the data
folder. Next, put the following code in common.py
:
The Spark session has been configured to use 8GB of RAM, and 4 cores. This can be modified according to the specs of your system.
And the code of the job in the job.py
file is:
The example Spark job reads the trip data, repartitions it in 4 partitions, aggregates it by pickup location, and calculates the average tip amount per pickup location. It then filters the aggregated data to include only the locations with more than 20 trips.
The job then shows the top 10 pickup locations with highest average tip amount, and finally, it writes the data to the output
folder as CSV files.
To run the job, execute the python job.py
command. Make sure the virtual environment is active before executing the command to run the job. The above command runs the job, and the following results and logs are outputted on the terminal:
Also, the output of the job is saved in the output
folder, with a _SUCCESS
file and a few .csv
files containing the result. The presence of the _SUCCESS
file is the indication that the Spark job writing the output ended successfully. To run the job again, either delete the output
folder, or change the location where the data is outputted.
Now, a question might come up in your mind, the job was not submitted to any Spark cluster, not even a cluster running locally, so where did it run? This job was executed on Spark in Local Mode. In local mode, the job is not submitted to a cluster, rather, the local hardware (i.e. the computer this job is being executed on) runs the driver code as well as the worker tasks. The local mode is useful for testing and debugging Spark jobs.
We specifically set-up local mode while creating the Spark session. The code .master("local[4]")
in the creation of Spark Session creates a local session with 4 cores. This local session is created when the Spark session is created in the main
function. Spark starts up in local mode, runs the job, and then terminates. Lets add a wait at the end of the main function, and examine the local mode environment.
To do that, put import time
at the start of the job.py
file, and time.sleep(100000000)
at the end of the main function. Then, delete the output
folder, and run the job again. The job will wait before terminating, and therefore, the local Spark environment created to execute the job will not terminate. Open a browser, and visit http://localhost:4040
to access the Spark UI:
When the Spark UI opens, click on Environment tab. It mentions that the master is local[4]
, and the driver memory is 8GB, the same as configured in the Spark session.
Next, click on executors tab in the top menu:
In local mode, all the tasks run on the driver, which has 4 cores as was configured earlier. In addition to the local mode, Spark applications can also be run on Spark clusters in Client or Cluster modes.
Packaging and Executing on a Spark cluster
Next step is to prepare the application to execute on a Spark cluster. The following steps will be required to prepare the job and to run it on a cluster. These steps are quite generic for PySpark jobs, whether they are to be submitted to self-managed clusters, or to managed clusters on one of the public clouds such as for Amazon Elastic MapReduce (EMR). Some additional steps maybe required for public clouds which can be determined from their documentation. The steps are:
- Update the code that creates Spark Session to exclude
local
as master. - Prepare two artifacts: the Python file containing the main function and the driver code that acts as the entry-point of the job, and a package (.zip in this case) containing other Python files.
- Submit the job to the test cluster using the
spark-submit
command.
A note about the first point (updating the creation of Spark Session): In real projects, the best practice is to write unit-tests to test the code, and in that case, a local session should be created to run the unit-tests, whereas the main code should only create Spark Session for the environment the job is destined for.
This method is one of the possible ways to package the code for submitting to Spark clusters, and there are other options available as well. A major factor in determining the packaging mechanism is the delivery of packages that the code depends on. Depending on how the dependent packages are used, they maybe required on the master node only, or on all the nodes.
For example, if a Python package is being used in a User Defined Function (UDF), it should be available to all the executors, and therefore, be installed on all the nodes. Also, the type of cluster that the job executes on is an important factor to consider.
Generally:
- For jobs submitted to short-lived (ephemeral) clusters that are spun up on for particular jobs or sets of related jobs, it is usually appropriate to install the required libraries when spinning up the cluster. Spark distributions provided by public clouds mostly provide a mechanism to run scripts, that can, in addition to other things, install the required packages. An example of such mechanism is Bootstrap Actions in Amazon Elastic MapReduce (EMR). This is becoming a common mechanism due to the popularity of ephemeral clusters on public clouds due to cost-effectiveness, and long-running clusters are only spun up for streaming use-cases or if absolutely necessary.
- In a shared cluster environment, directly installing dependencies on nodes might not be feasible due to potential conflicts with the dependencies of other jobs. In such cases, bundling dependencies with each job ensures they are accessible when needed.
If require packages are needed to be submitted with the job, the method being discussed is not ideal, as packaging in a .zip file does not support including packages built as Wheel. The other packaging options available are:
- Packaging using Conda (conda-pack)
- Packaging using Virtualenv
- Packaging using PEX
These mechanisms have been covered in detail in Spark documentation:
Setting up a locally running cluster for testing
Before preparing the job, lets create a Spark cluster to run it on. For this, I’ll use the excellent example of setting up a 3-node Spark cluster using Docker and docker-compose by Marco Villarreal. I have forked and modified the scripts to run Spark 3.5.0 for this article. It can be obtained from:
The branch is adapt-for-spark-3.5.0
.
Prerequisites:
Then execute the following commands to checkout the code, and set-up the Spark cluster:
The above commands start a 3-node Spark cluster, 2 workers and 1 master node, with each worker node having 2 cores and 2GB RAM:
The Spark cluster status and node details can also be viewed by accessing http://localhost:9000
in a browser.
In addition to the Spark nodes, the code mounts two folders:
apps
folder in the code repository is mounted to the/opt/spark-apps
folder in the master nodedata
folder in the code repository is mounted to the/opt/spark-data
folder in the master node
Using these volume mounts, code and data can be put on the cluster.
Link to original blog post:
Link to original code repository:
Preparing, packaging, and executing the job on the test cluster
As mentioned before, the master has been set to local[*]
for local testing while creating a Spark session, this needs to change. In fact, a master will not be supplied, as it’ll be provided with the spark-submit
command, and will be picked up from there. This is in accordance with the documentation here:
master
is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in local mode. In practice, when running on a cluster, you will not want to hardcodemaster
in the program, but rather launch the application withspark-submit
and receive it there. However, for local testing and unit tests, you can pass “local” to run Spark in-process.
Link:
Lets modify the the code that creates the Spark session, i.e. in common.py
file:
When not running in local mode, there is no master specified, as it is no longer required. In fact, it is an optional part of the spark-submit
syntax.
The job.py
file also requires a minor update: set the path of data to /opt/spark-data/*.parquet
as this is the path where we’ll copy the data to process. Additionally, the wait (time.sleep) at the end has also been removed. The code looks like:
Now, both files are ready. In this example deployment, the structure will be:
job.py
file is the main file, and will be submitted to the cluster as-is.- The other file (
common.py
) will be put in a zip file calledjob_zip.zip
file, and will also be submitted with the job. This is an example of how related files in a project can be packaged and submitted to thespark-submit
command. - Copy the data and the files to the Spark cluster set-up earlier using docker-compose
- Run the job using the
spark-submit
command
So, lets package the files. Navigate to the folder containing the code, and then run the following commands. Before running them, make sure ZIP is installed, and the cluster is running. Additionally, replace <path to data>
and <path to docker-spark-cluster folder>
with paths on your system:
With the code and data available to the cluster, running the job requires to connect to the master node, and running the spark-submit
command. To connect to the spark master, run the following command:
Next, run spark-submit:
Remember to update the <master URL>
placeholder in the command with the URL of Spark master on your system. It can be determined by visiting localhost:9000
on the browser to view the Spark environment details:
Note: Please note that the spark master URL might change if the docker-compose
cluster is stopped and restarted, so in that case, make sure to get the latest Spark URL.
After the job is complete, it prints out the output as well as Spark logs:
Also, the output is written to the output folder with current timestamp in the data
folder:
This is a simple example of a Spark job that is packaged for execution on a Spark cluster. These steps are generic, and the process may vary slightly depending on the specific target Spark cluster, such as Amazon EMR. I hope this provides a good overview of the steps required for packaging and running PySpark jobs on production clusters.
A little complex example: Packaging a job with dependent libraries
In addition to Python files, your PySpark job may require additional Python packages not present on the Spark cluster. Let’s enhance the previous job example by incorporating packages like Pandas, Numpy, and Apache Arrow and examine how to package these dependencies with the PySpark job. These packages are not present on the Spark cluster used for testing:
PySpark job to calculate average trip duration for each vendor
So, create two new files in the pyspark_test_project
directory: second_job.py
and analysis.py
. The file second_job.py
contains the main function. Its code is:
In the analysis.py
file, the code uses a Pandas UDF to compute the trip duration, and aggregates to calculate the average duration per vendor (VendorID
column). The code is:
The job now uses Pandas UDF, which require PyArrow and Pandas libraries, so requirements.txt
will be updated as:
And Spark session creation code also needs to be updated to enable Pandas UDFs. Update the create_spark_session
function in common.py
as:
The job gives the following output, and writes the same in the output file:
Packaging code files and dependencies for execution on test cluster
Next step is to prepare the job for execution on the test cluster. This job, in addition to two python files — analysis.py and common.py, requires Numpy, Pandas, and Apache Arrow as dependencies. These dependencies are not present on the Spark cluster, and they have to be submitted with the Spark job. If only the code is packaged and submitted, the job fails with package not found exceptions. For this demonstration, the following items are created in the packaging process of the job:
second_job.py
file: The main file of the second analysis job.analysis_job.zip
package: This ZIP package contains thecommon.py
andanalysis.py
files. Its preparation is done using the same method as demonstrated in the prior example. These are provided to the cluster by using the—py-files
argument.job_dependencies.tar.gz
package: The package containing the dependent libraries, and also contains Python from the virtual environment. The compilation of this package will be done using thevenv-pack
Python package, which packs the current state of the virtual environment i.e. packages all the libraries installed in the virtual environment. The package is then copied to the cluster, and is passed to the executors using the--archives
flag. For providing the dependencies to the driver, and for making Spark use Python from the package, it needs to be unpacked on the master node. To ensure that Spark uses Python in the package, thePYSPARK_PYTHON
flag needs to be set to Python in the package folder that has just been unpacked.
The job_dependencies.tar.gz
package needs to be prepared on a system that is running the same Operating System as the one running on the Spark cluster’s master node, and should be as close to it as possible. Ideally, this can be done by compiling the package on the master node itself, but depending on access policies of organizations, it may not be possible.
In this article, we’ll use a Docker container running the same OS: amazoncorretto:8u392
as the Spark nodes, to package the dependencies. The code files in analysis_job.zip
can be packaged on any system. If job dependencies are not packaged on a compatible environment, Python itself or dependent libraries may not execute on the cluster. On my system running MacOS 14.2.1, if the dependencies are packaged, the job fails on the cluster with the error that Python binary cannot be executed.
The compilation of this package is according to the process defined for packaging the dependencies using Virtual environment in the Spark documentation. The documentation explains this process, as well as alternative approaches in detail. Link is:
The process can be summarized visually as the following diagram:
Packaging dependencies in a ZIP package in a similar was as packaging the dependent code files was not possible because packages that are built as wheels cannot be packaged in a ZIP. This is also mentioned in the documentation linked above:
However, it does not allow to add packages built as Wheels and therefore does not allow to include dependencies with native code.
If attempted to submit Numpy and Pandas packaged in a ZIP, the job fails with the exception Error importing numpy: you should not try to import numpy from its source directory
.
In addition to using venv-pack
, the dependencies can also be packaged using Conda or PEX, and as ZIP as mentioned earlier.
To create the package of libraries, an environment similar to the Spark master node is needed. A Docker container running the same OS can be used for this purpose. For this, add a folder in the project called build_image
, and add the following Dockerfile
to it:
Additionally, create a folder called package_env
in the project folder. This will be mounted as volume to the docker container to put the requirements.txt
file in the container, and to retrieve the package. The following commands will do it:
Next, build the docker image, and run it in interactive mode. The commands need to be executed from the project folder pyspark_test_project
:
And then, on the docker container, run the following commands:
The above commands initialize a virtual environment with Python 3.9. The --copies
flag ensures that Python is copied into the virtual environment folder venv
. Then the libraries are installed using the provided requirements.txt
file. Finally, the package job_dependencies.tar.gz
is created using venv-pack
.
After executing the commands, the package is created in the package_env
folder.
This package needs to be copied to the Spark cluster, to the same shared folder that was used earlier in the first example. But before that, lets create another package containing common.py
and analysis.py
files. Run the following commands on the terminal after exiting from the docker container. Use the exit
command to exit the interactive mode and to shut-down the container.
The commands are:
Finally, the 3 items need to be copied to the Spark cluster, to the apps
folder in the docker-spark-cluster
folder that has been mounted as a volume on the Spark master node. Run the following commands:
Next steps are:
- Connect to the Spark master node in interactive mode
- Create a folder called
environment
- Unpack the
job_dependencies.tar.gz
to theenvironment
folder - Point Spark to use Python in the
environment
folder - Execute the job
The following commands perform these steps. Make sure that the cluster is running
The job appears in the Spark UI at http://localhost:9000
, and job progress can be tracked in the Spark UI at http://localhost:4040
. The job takes a few minutes to complete.
This completes this demonstration of packaging a job with its relevant code files, and required dependencies so that the dependencies provided with the job are used, and they do not have to be installed on the Spark cluster nodes.
In real projects, the process of packaging and deployment of the job to the cluster is managed by a CI/CD pipeline, and the job is usually scheduled and executed using an orchestrator such as Apache Airflow as part of a bigger pipeline. The above examples have been written for beginners, and should provide a good reference of the overall process, however, the steps may vary slightly depending on the project, the nature of the target Spark cluster, tech stack, and many other things.
Code repository
The code is simple, and the scope of the article is on the process of packaging and executing the Spark job. The code repository containing the example code files is:
References for further reading
Below are some good references I have used in the preparation of this article. They provide much more information about the process than what has been covered here:
- Documentation on Packaging PySpark jobs: https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html
venv
documentation: https://docs.python.org/3/library/venv.html- Documentation on Pandas UDF and using PyArrow in PySpark: https://spark.apache.org/docs/3.3.1/api/python/user_guide/sql/arrow_pandas.html
- A good Stack Overflow question about packaging Python libraries with PySpark job: https://stackoverflow.com/questions/36461054/i-cant-seem-to-get-py-files-on-spark-to-work
- Installing packages and packaging in virtual environment: https://packaging.python.org/en/latest/guides/installing-using-pip-and-virtual-environments/#creating-a-virtual-environment
- Creating a Thick Virtual Environment without symlinks: https://stackoverflow.com/questions/28827411/creating-virtualenv-without-symlinks
- Packaging files in a ZIP but not the containing folder: https://unix.stackexchange.com/questions/182032/zip-the-contents-of-a-folder-without-including-the-folder-itself
Some notes on using PyCharm IDE
This article is designed to be IDE agnostic, and therefore, the examples demonstrated use command line and text editors. However, using a IDE such as PyCharm can simplify things such as local execution and testing, adding unit tests, Virtual environment creation and management etc. I’ll demonstrate the process of creating an empty project and setting up the Virtual environment using venv
in PyCharm Community Edition. The community edition is free and It can be downloaded from:
First, Open PyCharm, and select New Project. From the window that opens, select Pure Python, and name it pyspark_test_project_pycharm
. In Interpreter Type, select custom environment, and select to create a Virtualenv using Python 3.9. Finally, click Create to create the project.
This creates a new project with Virtual Environment set-up.
Next, add the requirements.txt
file, common.py
file and job.py
file. Copy the code from the first section of the article for local testing. Then install the requirements.
A pop-up usually shows up at the top of the code editor window if some or all packages from requirements.txt file are not installed. Click to install PySpark.
Now the code is ready to run locally. Next, on the top right of the window, click the Current File dropdown next to the green Run button:
This opens up the run configuration window to configure the local run. Create a new Python run configuration, and give it any name of your preference:
Next, point to job.py
file in the script to run:
Then, save the configuration by clicking OK. Next, with the new configuration selected, click the run button on the top right to run the code locally:
This executes the job in PyCharm:
Using PyCharm as an IDE for developing and testing Spark applications can greatly simplify the process. PyCharm provides features such as local execution and testing, unit testing, and virtual environment management, which can streamline the development workflow. Additionally, PyCharm provides a convenient way to configure and run your Spark job locally. You can create a run configuration and specify the main Python file to execute. This eliminates the need for manual command-line execution and simplifies the process of running and debugging your Spark application.
When it comes to packaging the Spark application for deployment the same process can be followed as demonstrated earlier. You can use tools like venv-pack, Conda, or PEX to package your code and dependencies into a deployable artifact. This can be done in the Terminal in PyCharm, which also automatically activates the virtual environment present in the project.
Conclusion and Key Takeaways
This article covers the steps of setting up, packaging, and executing PySpark jobs on Spark clusters. Key highlights include:
- Virtual Environment Setup: Using
venv
for an isolated Python environment and installing PySpark and other dependencies with arequirements.txt
file. - Local execution for testing: Executing job for locally for testing and debugging.
- Python File Packaging: Packaging main Python files and additional modules into a
.zip
file for cluster submission. - Handling Dependencies: Methods for including external Python packages necessary for the job, such as Pandas, Numpy, and Apache Arrow. The article covers packaging dependencies using
venv-pack
in the code examples, and provides links to Spark documentation for alternative packaging mechanisms including Conda, PEX, and Zip. - Execution on Spark Clusters: Covered the process of submitting jobs to Spark clusters, emphasizing modifications needed for different types of clusters.
In conclusion, this article serves as a guide for beginners on setting up, packaging, and executing PySpark jobs on Spark clusters. The step-by-step instructions and code examples have been included to make it a reference for anyone looking to understand the process. Please provide feedback in comments.