PySpark: a real cluster with Docker. Really understanding Spark!

Giuseppe Maggi
7 min readApr 13, 2024

--

In this article, our intention is to illustrate the functioning of a “real” cluster Spark based on Docker image.

The advantage of this approach consists in:

  • having a real working cluster even though good for test and experimentation but not for real production contexts;
  • the velocity of preparation of this environment.

What we’ll do

First of all, we’re going to create a cluster based on Bitnami Spark Docker image. Then, we’ll run a script on the working cluster and, at the end of this execution, we’ll read the output and try to understand as well as possible how the cluster worked.

What we need

We need only the docker command. For the sake of simplicity, we’ll use Docker technology to create the cluster and we won’t install anything else out of the cluster. As a consequence of this, we’ll pass the Python script to a specific Docker container (the master) to be executed inside of that.

How can we find a ready-to-use docker environment (as an alternative to direct installation)?

There are several alternatives instead of installing Docker directly on our work machine and, for example, we can use:

  • the Docker Playground, a light and free environment with Docker installed;
  • Google Cloud Shell, available for anyone has a Gmail account. It’s a complete Linux environment with many system tools, programming languages and, last but not least, Docker and minikube (a test version of Kubernetes).

Preparing the cluster

1. Create the network

First of all, a cluster needs a network. It’s based on a number of nodes (each one is a Docker container in our case) that can communicate with each other by services over TCP/IP architecture.

This is the command:

$ docker network create sparknet

We chose sparknet as the name of the network. Feel free to use another name but it’ll be substituted in the next snippets of code.

2. Run the master node

Now, it’s the time to start nodes. Master and workers will be started the same way exception for SPARK_MODE environment variable to set either as “master” or as “worker”.

This is the command to run the master:

$ docker run -d --rm --name master \
--network=sparknet \
-e SPARK_MODE=master \
-p 4040:4040 \
bitnami/spark

The option -p 4040:4040 is to execute port-forwarding towards the 4040/TCP port used by Spark to publish its user interface service(SparkUI), fundamental to visually monitor the cluster working.

Now, this is a delicate point. We have to get the ID of the master container:

$ docker ps
CONTAINER ID IMAGE COMMAND ... NAMES
94fc33404190 bitnami/spark "/opt/bitnami/script…" ... master

In our case, we must remember the code 94fc33404190 to be used as a kind of URL of the master node.

3. Run the workers

Now, it’s time to create several workers. How many? We decide. Let’s say we want three workers at the moment. We can use the following command substituting the two upper-cased placeholder:

  • NAME: it’s the name of the new container but you know we can’t have two containers with the same name. So, we’ll use worker1 for the first one, worker2 for the second one and so on. Obviously, you can choose the nomenclature you like;
  • MASTER_ADDRESS: it’s the name of the master container you got with the previous docker ps command. In our case, it’s 94fc33404190 so, for us, MASTER_ADDRESS is equivalent to spark://94fc33404190:7077 where 7077 is the TCP port to communicate with the master node.
$ docker run -d --rm --name NAME \
--network=sparknet \
-e SPARK_MODE=worker \
-e SPARK_MASTER_URL=MASTER_ADDRESS \
bitnami/spark

Now, we should repeat this command for the number of worker we desire. In our, example we created three workers named worker1, worker2, worker3.

This is our cluster, ready to work:

CONTAINER ID   IMAGE           COMMAND                  CREATED              STATUS              PORTS                    NAMES
e0d717b3ad24 bitnami/spark "/opt/bitnami/script…" About a minute ago Up About a minute worker3
8296a4db7b49 bitnami/spark "/opt/bitnami/script…" About a minute ago Up About a minute worker2
7148dd89de1e bitnami/spark "/opt/bitnami/script…" About a minute ago Up About a minute worker1
94fc33404190 bitnami/spark "/opt/bitnami/script…" 2 minutes ago Up 2 minutes 0.0.0.0:4040->4040/tcp master

Our first script execution

This is the PySpark script we’re gonna try:

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('ClusterApp001') \
.getOrCreate()
rdd=spark.sparkContext.parallelize(range(1,10000))
print(f"Sum of values = {rdd.sum()}")
spark.stop()

It consists just in a simple sum of the first 10.000 integer numbers (minus 1) and its name is first_script.py.

Notice that we didn’t specify the master when defining the SparkSession: this is because we’ll do it with the spark-submit command.

We could execute our code from outside the cluster, for example installing the pyspark library on our host machine, but at the beginning of the article we promised to use only the cluster nodes so we proceed this way:

  • we’ll copy the script to the master node (we choose the /tmp folder as a location);
$ docker cp first_script.py 94fc33404190:/tmp/
  • we’ll execute the script via spark-submit from the inside of the node.
$ docker exec -it 94fc33404190 spark-submit \
--master=spark://94fc33404190:7077 \
/tmp/first_script.py

We’ll get a lot of output and in the next paragraph we’ll examine it nearly line by line.

The output

First of all, let’s look for the result:

...
Sum of values = 49995000
...

Ok, this is the result so we’re sure that the script worked.

Anyway, we’re interested to understand how the script worked over the cluster and if it called the workers.

In this article, we don’t get into the details of the workflow of a Spark cluster but we propose this picture to remember the main actors of this context.

Cluster working: from Apache Spark official documentation

The main concepts in a Spark workflow are:

  • Cluster Manager is the software acting as a leader in the cluster but we’re applying the standalone mode so Spark itself is the Cluster Manager for us;
  • SparkContext is the connecting element between the application and the cluster;
  • each action of a Driver program is translated into one or more jobs and each job (though stages) is executed as one or more tasks;
  • the Executors are the pieces that manage the concrete execution of single tasks.

Now, we need to observe the other lines of the output to get important aspects.

Showing output lines, we’ll remove date/time information at the beginning of each line to improve the readability.

The SparkContext declared the main features of the environment:

INFO SparkContext: Running Spark version 3.5.1
INFO SparkContext: OS info Linux, 6.1.75+, amd64
INFO SparkContext: Java version 17.0.10

Then, there’s the submission of the application:

INFO SparkContext: Submitted application: ClusterApp001

The SparkUI started on the 4040/TCP port:

INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
...
INFO Utils: Successfully started service 'SparkUI' on port 4040.

Then, the preparation of the app execution began:

StandaloneAppClient$ClientEndpoint: Connecting to master spark://94fc33404190:7077...
TransportClientFactory: Successfully created connection to 94fc33404190/172.20.0.2:7077 after 55 ms
StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20240413105603-0001

Our app received an ID, app-20240413105603–0001, based on current date/time.

Then, executors went into action:

Executor added: app-20240413105603-0001/0 on worker-20240413104318-172.20.0.3-34857 (172.20.0.3:34857) with 2 core(s)
Granted executor ID app-20240413105603-0001/0 on hostPort 172.20.0.3:34857 with 2 core(s), 1024.0 MiB RAM
Executor added: app-20240413105603-0001/1 on worker-20240413104652-172.20.0.5-38715 (172.20.0.5:38715) with 2 core(s)
Granted executor ID app-20240413105603-0001/1 on hostPort 172.20.0.5:38715 with 2 core(s), 1024.0 MiB RAM
Executor added: app-20240413105603-0001/2 on worker-20240413104643-172.20.0.4-45735 (172.20.0.4:45735) with 2 core(s)
Granted executor ID app-20240413105603-0001/2 on hostPort 172.20.0.4:45735 with 2 core(s), 1024.0 MiB RAM

We have three workers so we got six lines, two for each worker. Notice that IP addresses of the workers are mentioned indicating CPU and memory resources allocated to each one (2 core(s), 1024.0 MiB RAM).

The SparkContext announced that the job started related to the sum operation at the line number 7:

SparkContext: Starting job: sum at /tmp/first_script.py:7
DAGScheduler: Got job 0 (sum at /tmp/first_script.py:7) with 2 output partitions
DAGScheduler: Final stage: ResultStage 0 (sum at /tmp/first_script.py:7)

Our data (the 10.000 minus 1 integers) where divided into two partitions and executed in two tasks assigned both to a single executor (it doesn’t matter why at the moment):

TaskSchedulerImpl: Adding task set 0.0 with 2 tasks resource profile 0
TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (172.20.0.3, executor 0, partition 0, PROCESS_LOCAL, 7599 bytes)
TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1) (172.20.0.3, executor 0, partition 1, PROCESS_LOCAL, 7599 bytes)
...
...
TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 5399 ms on 172.20.0.3 (executor 0) (1/2)
TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 5573 ms on 172.20.0.3 (executor 0) (2/2)
DAGScheduler: Job 0 finished: sum at /tmp/first_script.py:7, took 18.342935 s

After this, the result is printed and Spark starts to stop all of the activities:

SparkContext: SparkContext is stopping with exitCode 0.
SparkUI: Stopped Spark web UI at http://04875399c0a2:4040
StandaloneSchedulerBackend: Shutting down all executors
StandaloneSchedulerBackend$StandaloneDriverEndpoint: Asking each executor to shut down
SparkContext: Successfully stopped SparkContext
...
ShutdownHookManager: Deleting directory /tmp/spark-5a11...
ShutdownHookManager: Deleting directory /tmp/spark-420f...
ShutdownHookManager: Deleting directory /tmp/spark-5a11...

Conclusion

This is the way we chose to show how a Spark cluster work. Computationally, the example is simple but it showed the distribution of work, the activation of executors, the subdivision of data into partitions as well as the repartition of a job into tasks.

In the next articles, we’ll use the same structure of the cluster to create more computationally intense works to analyze!

--

--

Giuseppe Maggi

Software Engineer | AWS Solutions Architect | Technical writer. I write about AWS, Cloud in general, Blockchain and Data Science