DataOps 02: Spawn up Apache Spark infrastructure by using Docker
When working on real data products, we will register an account on cloud providers such as Amazon, Azure, or Google so that we are able to use their PaaS right away without spending time installing everything from scratch.
However, there’s still somebody who doesn’t want to be entangled in credit cards or exhausted going through authentication steps, you can DIY on your personal computer.
In this article, I will guide you to install Pyspark with Docker so that you can start playing around with this Big Data processing technology without touching any cloud services.
Github: https://github.com/ongxuanhong/de02-pyspark-optimization
List of services
- Apache Spark includes 1 driver and 2 workers.
- MinIO is for distributed data storage.
- Jupyter lab for hands-on with Pyspark testing.
We proceed to create a docker-compose.yml file to package these services
version: "3.7"
services:
spark-master:
build:
context: ./spark
dockerfile: ./Dockerfile
container_name: "spark-master"
environment:
- SPARK_MODE=master
- SPARK_LOCAL_IP=spark-master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- "7077:7077"
- "8080:8080"
volumes:
- ./spark/spark-defaults.conf:/opt/bitnami/spark/conf/spark-defaults.conf
networks:
- spark_network
spark-worker-1:
image: docker.io/bitnami/spark:3.3
container_name: "spark-worker-1"
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=2G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
- spark_network
spark-worker-2:
image: docker.io/bitnami/spark:3.3
container_name: "spark-worker-2"
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=2G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
- spark_network
minio:
hostname: minio
image: "minio/minio"
container_name: minio
ports:
- "9001:9001"
- "9000:9000"
command: [ "server", "/data", "--console-address", ":9001" ]
volumes:
- ./minio:/data
environment:
- MINIO_ROOT_USER=minio
- MINIO_ROOT_PASSWORD=minio123
- MINIO_ACCESS_KEY=minio
- MINIO_SECRET_KEY=minio123
networks:
- spark_network
mc:
image: minio/mc
container_name: mc
hostname: mc
environment:
- AWS_ACCESS_KEY_ID=minio
- AWS_SECRET_ACCESS_KEY=minio123
- AWS_REGION=us-east-1
entrypoint: >
/bin/sh -c " until (/usr/bin/mc config host add minio http://minio:9000 minio minio123) do echo '...waiting...' && sleep 1; done; /usr/bin/mc mb minio/warehouse; /usr/bin/mc policy set public minio/warehouse; exit 0; "
depends_on:
- minio
networks:
- spark_network
spark-notebook:
build:
context: ./notebooks
dockerfile: ./Dockerfile
container_name: "spark-notebook"
user: root
environment:
- JUPYTER_ENABLE_LAB="yes"
- GRANT_SUDO="yes"
volumes:
- ./notebooks:/home/jovyan/work
- ./notebooks/spark-defaults.conf:/usr/local/spark/conf/spark-defaults.conf
ports:
- "8888:8888"
- "4040:4040"
networks:
- spark_network
networks:
spark_network:
driver: bridge
name: spark_network
Apache Spark
Here I create 3 services including spark-master, spark-worker-1, spark-worker-2 to simulate a cluster of 3 machines in which 1 driver (master) plays the role of coordinator of tasks and 2 workers machines act as the person performing the tasks.
The docker image used is docker.io/bitnami/spark:3.3, created by bitnami team with Spark version 3.3. Some environment variables to consider:
# used to define the role of the service as master or worker
SPARK_MODE={master|worker}
# name the host spark-master so that workers can identify it
SPARK_LOCAL_IP=spark-master
# worker points to master via the specified URL
SPARK_MASTER_URL=spark://spark-master:7077
# setting RAM parameters
SPARK_WORKER_MEMORY=2G
# setting CPU parameters
SPARK_WORKER_CORES=1
In spark-master, there is a special Dockerfile file, used to install additional JAR files for connecting to MinIO and storing files in Delta Lake format, because by default docker.io/bitnami/spark:3.3 does not have those.
FROM docker.io/bitnami/spark:3.3
USER root
# Install prerequisites
RUN apt-get update && apt-get install -y curl
RUN curl -O https://repo1.maven.org/maven2/software/amazon/awssdk/s3/2.18.41/s3-2.18.41.jar \
&& curl -O https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.12.367/aws-java-sdk-1.12.367.jar \
&& curl -O https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.2.0/delta-core_2.12-2.2.0.jar \
&& curl -O https://repo1.maven.org/maven2/io/delta/delta-storage/2.2.0/delta-storage-2.2.0.jar \
&& mv s3-2.18.41.jar /opt/bitnami/spark/jars \
&& mv aws-java-sdk-1.12.367.jar /opt/bitnami/spark/jars \
&& mv delta-core_2.12-2.2.0.jar /opt/bitnami/spark/jars \
&& mv delta-storage-2.2.0.jar /opt/bitnami/spark/jars
Along with spark-defaults.conf is mounted to the spark/conf/spark-defaults.conf directory to prepare the necessary settings.
spark.jars jars/delta-core_2.12-2.2.0.jar,jars/hadoop-aws-3.3.2.jar,jars/delta-storage-2.2.0.jar,jars/aws-java-sdk-1.12.367.jar,jars/s3-2.18.41.jar,jars/aws-java-sdk-bundle-1.11.1026.jar
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.hadoop.fs.s3a.endpoint http://minio:9000
spark.hadoop.fs.s3a.access.key minio
spark.hadoop.fs.s3a.secret.key minio123
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.connection.ssl.enabled false
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
I create Makefile to prepare some shortcuts of commands
build:
docker-compose build
up:
docker-compose up -d
down:
docker-compose down
restart:
make down && make up
After running the make build && make up
command successfully, we can go to localhost:8080 to check if the Spark services have been initialized or not.
We will see the Spark master UI as shown above with 2 workers that have been spawned up and attached to the master.
MinIO
We use Docker image minio/minio to build a distributed data storage system. To avoid data loss every time we re-initialize the service, we can mount ./minio:/data so that the files stored in the container are saved for use on the next startup.
We can see the credentials are declared directly here
MINIO_ROOT_USER=minio
MINIO_ROOT_PASSWORD=minio123
MINIO_ACCESS_KEY=minio
MINIO_SECRET_KEY=minio123
Note that, in the real environment, we should have a key vault management mechanism to increase the security of the system. In this exercise, we list it in docker-compose.yml for easy installation and sharing.
Besides, we use Docker image minio/mc to automatically create a bucket named warehouse.
We run the make restart
command and access localhost:9001 to check whether the minio has been successfully initialized or not.
We will see the UI of MinIO as shown above.
Jupyter lab
Well, the last tool that any Data team uses is Jupyter lab.
For convenience, I use Docker image jupyter/all-spark-notebook:python-3.8 to build Jupyter lab. Since we need to work with MinIO with file storage in Delta Lake format, I created a Dockerfile to install additional JAR files for this service. Also, when I run the spark-submit command, Spark requires the version of the client to match the version of the cluster, then I download and replace spark-3.3.1 instead of spark-3.3.0 on Jupyter lab.
FROM jupyter/all-spark-notebook:python-3.8
USER root
RUN curl -O https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz \
&& tar zxvf spark-3.3.1-bin-hadoop3.tgz \
&& rm -rf spark-3.3.1-bin-hadoop3.tgz \
&& mv spark-3.3.1-bin-hadoop3/ /usr/local/ \
&& rm -rf /usr/local/spark \
&& rm -rf /usr/local/spark-3.3.0-bin-hadoop3 \
&& ln -s /usr/local/spark-3.3.1-bin-hadoop3 /usr/local/spark
RUN curl -O https://repo1.maven.org/maven2/software/amazon/awssdk/s3/2.18.41/s3-2.18.41.jar \
&& curl -O https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.12.367/aws-java-sdk-1.12.367.jar \
&& curl -O https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.1026/aws-java-sdk-bundle-1.11.1026.jar \
&& curl -O https://repo1.maven.org/maven2/io/delta/delta-core_2.12/2.2.0/delta-core_2.12-2.2.0.jar \
&& curl -O https://repo1.maven.org/maven2/io/delta/delta-storage/2.2.0/delta-storage-2.2.0.jar \
&& curl -O https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.2/hadoop-aws-3.3.2.jar \
&& mv s3-2.18.41.jar /usr/local/spark/jars \
&& mv aws-java-sdk-1.12.367.jar /usr/local/spark/jars \
&& mv aws-java-sdk-bundle-1.11.1026.jar /usr/local/spark/jars \
&& mv delta-core_2.12-2.2.0.jar /usr/local/spark/jars \
&& mv delta-storage-2.2.0.jar /usr/local/spark/jars \
&& mv hadoop-aws-3.3.2.jar /usr/local/spark/jars
Similarly, we will need spark-defaults.conf mounted to the spark/conf/spark-defaults.conf directory to prepare the necessary settings.
We run make build && make restart
again and use the command docker logs spark-notebooks
to get the token generated by Jupyter lab. For example, you will see the generated URL link below
[I 2023-01-27 08:28:13.152 ServerApp] Serving notebooks from local directory: /home/jovyan
[I 2023-01-27 08:28:13.152 ServerApp] Jupyter Server 1.19.1 is running at:
[I 2023-01-27 08:28:13.152 ServerApp] http://7a9c24c1328c:8888/lab?token=785963a0d1580af8e14b7d857953720afb64b583fbc84173
[I 2023-01-27 08:28:13.152 ServerApp] or http://127.0.0.1:8888/lab?token=785963a0d1580af8e14b7d857953720afb64b583fbc84173
[I 2023-01-27 08:28:13.152 ServerApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).
[C 2023-01-27 08:28:13.154 ServerApp]
To access the server, open this file in a browser:
file:///home/jovyan/.local/share/jupyter/runtime/jpserver-18-open.html
Or copy and paste one of these URLs:
http://7a9c24c1328c:8888/lab?token=785963a0d1580af8e14b7d857953720afb64b583fbc84173
or http://127.0.0.1:8888/lab?token=785963a0d1580af8e14b7d857953720afb64b583fbc84173
Access the link above, we will be able to enter Jupyter lab with a familiar UI
Try out Pyspark
I have prepared some notebooks for us to start experimenting, which are located in the folder de02-pyspark-optimization/notebooks/. In addition, you can use sample data to upload to MinIO which is located in de02-pyspark-optimization/notebooks/sample_data/
Here is an example of how we submit Spark from jupyter notebook
from datetime import datetime
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
spark = (SparkSession.builder.appName("pyspark-rdd-demo-{}".format(datetime.today()))
.master("spark://spark-master:7077")
.getOrCreate())
sqlContext = SQLContext(spark)
sc = spark.sparkContext
spark.sparkContext.getConf().getAll()
Because we have set the default parameters in the spark-defaults.conf file, when we build SparkSession, we only need to determine where the master is, so the script looks neat and easy to understand.
The default port when we submit the code to Spark will be 4040, if we submit the next spark job, the port will increase gradually to 4041, 4042, and so on.
We access localhost:4040, if successful, we will enter the UI like below
Let’s run a few basic lines of code to familiarize yourself with Spark RDD
data = [1, 2, 3, 3]
rdd = sc.parallelize(data, 2)
rdd.collect()
# output:
# [1, 2, 3, 3]
We check Spark Job UI again and we will see that the collect() action function has been executed successfully
Click on the link collect at /tmp/ipykernel_560/1896220800.py:1 as shown in the figure, we can further check how the collect() function is executed in detail.
Try to write delta lake files and switch between versions
# load sample data from MinIO
df_orders = spark.read.format("csv").load("s3a://warehouse/olist_orders_dataset.csv", header=True)
display(df_orders.limit(20).toPandas())
# version 1: write full records
(
df_orders.write.mode("overwrite")
.option("compression", "snappy")
.option("path", "s3a://warehouse/olist_orders_dataset.delta")
.format("delta")
.saveAsTable("olist_orders_dataset")
)
# version 2: limit 10 records
(
df_orders.limit(10)
.write.mode("overwrite")
.option("compression", "snappy")
.option("path", "s3a://warehouse/olist_orders_dataset.delta")
.format("delta")
.saveAsTable("olist_orders_dataset")
)
# select version 1. Output = 99441
(
spark.read.option("versionAsOf", 0)
.format("delta").load("s3a://warehouse/olist_orders_dataset.delta")
.count()
)
# select version 2. Output = 10
(
spark.read.option("versionAsOf", 1)
.format("delta").load("s3a://warehouse/olist_orders_dataset.delta")
.count()
)
We will see how Delta Lake format start to compute and store files to MinIO via Spark Job UI
Check MinIO warehouse/olist_orders_dataset.delta we can see how Delta Lake format store files
End
Now that you have learned the detailed steps of installing Pyspark through Docker, get started running Spark scripts with your simulated cluster container. I hope that this sharing will help you by pass struggling when start studying Apache Spark.