DataOps 02: Spawn up Apache Spark infrastructure by using Docker

Ong Xuan Hong
8 min readJan 28, 2023
Apache Spark local cluster by Docker

When working on real data products, we will register an account on cloud providers such as Amazon, Azure, or Google so that we are able to use their PaaS right away without spending time installing everything from scratch.

However, there’s still somebody who doesn’t want to be entangled in credit cards or exhausted going through authentication steps, you can DIY on your personal computer.

In this article, I will guide you to install Pyspark with Docker so that you can start playing around with this Big Data processing technology without touching any cloud services.

Github: https://github.com/ongxuanhong/de02-pyspark-optimization

List of services

  • Apache Spark includes 1 driver and 2 workers.
  • MinIO is for distributed data storage.
  • Jupyter lab for hands-on with Pyspark testing.

We proceed to create a docker-compose.yml file to package these services

version: "3.7"
services:
spark-master:
build:
context: ./spark
dockerfile: ./Dockerfile
container_name: "spark-master"
environment:
- SPARK_MODE=master
- SPARK_LOCAL_IP=spark-master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- "7077:7077"
- "8080:8080"
volumes:
- ./spark/spark-defaults.conf:/opt/bitnami/spark/conf/spark-defaults.conf
networks:
- spark_network

spark-worker-1:
image: docker.io/bitnami/spark:3.3
container_name: "spark-worker-1"
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=2G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
- spark_network

spark-worker-2:
image: docker.io/bitnami/spark:3.3
container_name: "spark-worker-2"
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=2G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
- spark_network

minio:
hostname: minio
image: "minio/minio"
container_name: minio
ports:
- "9001:9001"
- "9000:9000"
command: [ "server", "/data", "--console-address", ":9001" ]
volumes:
- ./minio:/data
environment:
- MINIO_ROOT_USER=minio
- MINIO_ROOT_PASSWORD=minio123
- MINIO_ACCESS_KEY=minio
- MINIO_SECRET_KEY=minio123
networks:
- spark_network

mc:
image: minio/mc
container_name: mc
hostname: mc
environment:
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=minio123
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; exit 0; "
depends_on:
- minio
networks:
- spark_network

spark-notebook:
build:
context: ./notebooks
dockerfile: ./Dockerfile
container_name: "spark-notebook"
user: root
environment:
- JUPYTER_ENABLE_LAB="yes"
- GRANT_SUDO="yes"
volumes:
- ./notebooks:/home/jovyan/work
- ./notebooks/spark-defaults.conf:/usr/local/spark/conf/spark-defaults.conf
ports:
- "8888:8888"
- "4040:4040"
networks:
- spark_network

networks:
spark_network:
driver: bridge
name: spark_network

Apache Spark

Here I create 3 services including spark-master, spark-worker-1, spark-worker-2 to simulate a cluster of 3 machines in which 1 driver (master) plays the role of coordinator of tasks and 2 workers machines act as the person performing the tasks.

The docker image used is docker.io/bitnami/spark:3.3, created by bitnami team with Spark version 3.3. Some environment variables to consider:

# used to define the role of the service as master or worker
SPARK_MODE={master|worker}

# name the host spark-master so that workers can identify it
SPARK_LOCAL_IP=spark-master

# worker points to master via the specified URL
SPARK_MASTER_URL=spark://spark-master:7077

# setting RAM parameters
SPARK_WORKER_MEMORY=2G

# setting CPU parameters
SPARK_WORKER_CORES=1

In spark-master, there is a special Dockerfile file, used to install additional JAR files for connecting to MinIO and storing files in Delta Lake format, because by default docker.io/bitnami/spark:3.3 does not have those.

FROM docker.io/bitnami/spark:3.3

USER root

# Install prerequisites
RUN apt-get update && apt-get install -y curl

RUN curl -O https://repo1.maven.org/maven2/software/amazon/awssdk/s3/2.18.41/s3-2.18.41.jar \
&& curl -O https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.12.367/aws-java-sdk-1.12.367.jar \
&& curl -O https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.2.0/delta-core_2.12-2.2.0.jar \
&& curl -O https://repo1.maven.org/maven2/io/delta/delta-storage/2.2.0/delta-storage-2.2.0.jar \
&& mv s3-2.18.41.jar /opt/bitnami/spark/jars \
&& mv aws-java-sdk-1.12.367.jar /opt/bitnami/spark/jars \
&& mv delta-core_2.12-2.2.0.jar /opt/bitnami/spark/jars \
&& mv delta-storage-2.2.0.jar /opt/bitnami/spark/jars

Along with spark-defaults.conf is mounted to the spark/conf/spark-defaults.conf directory to prepare the necessary settings.

spark.jars                                          jars/delta-core_2.12-2.2.0.jar,jars/hadoop-aws-3.3.2.jar,jars/delta-storage-2.2.0.jar,jars/aws-java-sdk-1.12.367.jar,jars/s3-2.18.41.jar,jars/aws-java-sdk-bundle-1.11.1026.jar
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.hadoop.fs.s3a.endpoint http://minio:9000
spark.hadoop.fs.s3a.access.key minio
spark.hadoop.fs.s3a.secret.key minio123
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.connection.ssl.enabled false
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

I create Makefile to prepare some shortcuts of commands

build:
docker-compose build

up:
docker-compose up -d

down:
docker-compose down

restart:
make down && make up

After running the make build && make upcommand successfully, we can go to localhost:8080 to check if the Spark services have been initialized or not.

localhost:8080

We will see the Spark master UI as shown above with 2 workers that have been spawned up and attached to the master.

MinIO

We use Docker image minio/minio to build a distributed data storage system. To avoid data loss every time we re-initialize the service, we can mount ./minio:/data so that the files stored in the container are saved for use on the next startup.

We can see the credentials are declared directly here

MINIO_ROOT_USER=minio
MINIO_ROOT_PASSWORD=minio123
MINIO_ACCESS_KEY=minio
MINIO_SECRET_KEY=minio123

Note that, in the real environment, we should have a key vault management mechanism to increase the security of the system. In this exercise, we list it in docker-compose.yml for easy installation and sharing.

Besides, we use Docker image minio/mc to automatically create a bucket named warehouse.

We run the make restart command and access localhost:9001 to check whether the minio has been successfully initialized or not.

localhost:9001

We will see the UI of MinIO as shown above.

Jupyter lab

Well, the last tool that any Data team uses is Jupyter lab.

For convenience, I use Docker image jupyter/all-spark-notebook:python-3.8 to build Jupyter lab. Since we need to work with MinIO with file storage in Delta Lake format, I created a Dockerfile to install additional JAR files for this service. Also, when I run the spark-submit command, Spark requires the version of the client to match the version of the cluster, then I download and replace spark-3.3.1 instead of spark-3.3.0 on Jupyter lab.

FROM jupyter/all-spark-notebook:python-3.8

USER root

RUN curl -O https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz \
&& tar zxvf spark-3.3.1-bin-hadoop3.tgz \
&& rm -rf spark-3.3.1-bin-hadoop3.tgz \
&& mv spark-3.3.1-bin-hadoop3/ /usr/local/ \
&& rm -rf /usr/local/spark \
&& rm -rf /usr/local/spark-3.3.0-bin-hadoop3 \
&& ln -s /usr/local/spark-3.3.1-bin-hadoop3 /usr/local/spark

RUN curl -O https://repo1.maven.org/maven2/software/amazon/awssdk/s3/2.18.41/s3-2.18.41.jar \
&& curl -O https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.12.367/aws-java-sdk-1.12.367.jar \
&& curl -O https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.1026/aws-java-sdk-bundle-1.11.1026.jar \
&& curl -O https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.2.0/delta-core_2.12-2.2.0.jar \
&& curl -O https://repo1.maven.org/maven2/io/delta/delta-storage/2.2.0/delta-storage-2.2.0.jar \
&& curl -O https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.2/hadoop-aws-3.3.2.jar \
&& mv s3-2.18.41.jar /usr/local/spark/jars \
&& mv aws-java-sdk-1.12.367.jar /usr/local/spark/jars \
&& mv aws-java-sdk-bundle-1.11.1026.jar /usr/local/spark/jars \
&& mv delta-core_2.12-2.2.0.jar /usr/local/spark/jars \
&& mv delta-storage-2.2.0.jar /usr/local/spark/jars \
&& mv hadoop-aws-3.3.2.jar /usr/local/spark/jars

Similarly, we will need spark-defaults.conf mounted to the spark/conf/spark-defaults.conf directory to prepare the necessary settings.

We run make build && make restartagain and use the command docker logs spark-notebooks to get the token generated by Jupyter lab. For example, you will see the generated URL link below

[I 2023-01-27 08:28:13.152 ServerApp] Serving notebooks from local directory: /home/jovyan
[I 2023-01-27 08:28:13.152 ServerApp] Jupyter Server 1.19.1 is running at:
[I 2023-01-27 08:28:13.152 ServerApp] http://7a9c24c1328c:8888/lab?token=785963a0d1580af8e14b7d857953720afb64b583fbc84173
[I 2023-01-27 08:28:13.152 ServerApp] or http://127.0.0.1:8888/lab?token=785963a0d1580af8e14b7d857953720afb64b583fbc84173
[I 2023-01-27 08:28:13.152 ServerApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 2023-01-27 08:28:13.154 ServerApp]

To access the server, open this file in a browser:
file:///home/jovyan/.local/share/jupyter/runtime/jpserver-18-open.html
Or copy and paste one of these URLs:
http://7a9c24c1328c:8888/lab?token=785963a0d1580af8e14b7d857953720afb64b583fbc84173
or http://127.0.0.1:8888/lab?token=785963a0d1580af8e14b7d857953720afb64b583fbc84173

Access the link above, we will be able to enter Jupyter lab with a familiar UI

localhost:8888

Try out Pyspark

I have prepared some notebooks for us to start experimenting, which are located in the folder de02-pyspark-optimization/notebooks/. In addition, you can use sample data to upload to MinIO which is located in de02-pyspark-optimization/notebooks/sample_data/

Here is an example of how we submit Spark from jupyter notebook

from datetime import datetime
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext

spark = (SparkSession.builder.appName("pyspark-rdd-demo-{}".format(datetime.today()))
.master("spark://spark-master:7077")
.getOrCreate())

sqlContext = SQLContext(spark)
sc = spark.sparkContext
spark.sparkContext.getConf().getAll()

Because we have set the default parameters in the spark-defaults.conf file, when we build SparkSession, we only need to determine where the master is, so the script looks neat and easy to understand.

The default port when we submit the code to Spark will be 4040, if we submit the next spark job, the port will increase gradually to 4041, 4042, and so on.

We access localhost:4040, if successful, we will enter the UI like below

localhost:4040

Let’s run a few basic lines of code to familiarize yourself with Spark RDD

data = [1, 2, 3, 3]
rdd = sc.parallelize(data, 2)
rdd.collect()

# output:
# [1, 2, 3, 3]

We check Spark Job UI again and we will see that the collect() action function has been executed successfully

Click on the link collect at /tmp/ipykernel_560/1896220800.py:1 as shown in the figure, we can further check how the collect() function is executed in detail.

Details for Stage

Try to write delta lake files and switch between versions

# load sample data from MinIO
df_orders = spark.read.format("csv").load("s3a://warehouse/olist_orders_dataset.csv", header=True)
display(df_orders.limit(20).toPandas())

# version 1: write full records
(
df_orders.write.mode("overwrite")
.option("compression", "snappy")
.option("path", "s3a://warehouse/olist_orders_dataset.delta")
.format("delta")
.saveAsTable("olist_orders_dataset")
)

# version 2: limit 10 records
(
df_orders.limit(10)
.write.mode("overwrite")
.option("compression", "snappy")
.option("path", "s3a://warehouse/olist_orders_dataset.delta")
.format("delta")
.saveAsTable("olist_orders_dataset")
)

# select version 1. Output = 99441
(
spark.read.option("versionAsOf", 0)
.format("delta").load("s3a://warehouse/olist_orders_dataset.delta")
.count()
)

# select version 2. Output = 10
(
spark.read.option("versionAsOf", 1)
.format("delta").load("s3a://warehouse/olist_orders_dataset.delta")
.count()
)

We will see how Delta Lake format start to compute and store files to MinIO via Spark Job UI

List jobs when try to save DataFrame by Delta Lake format
Delta: Compute snapshot for version: 1

Check MinIO warehouse/olist_orders_dataset.delta we can see how Delta Lake format store files

End

Now that you have learned the detailed steps of installing Pyspark through Docker, get started running Spark scripts with your simulated cluster container. I hope that this sharing will help you by pass struggling when start studying Apache Spark.

--

--