Setting up PySpark, Jupyter and Minio on Kubeflow/Kubernetes
This article will describe how to set up PySpark, JupyterHub, Kubeflow and Minio on Kubernetes.
Configure PySpark to talk to Spark
pip install pyspark --user
pip install findspark --userimport pyspark
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('sparktest').setMaster('k8s://https://kubernetes.default.svc:443')
conf.set("spark.kubernetes.namespace", "anonymous")
conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "default-editor")
conf.set("spark.kubernetes.authenticate.driver.oauthToken", "ABCXYZ...")
conf.set("spark.kubernetes.container.image", "docker.io/stevenzinck/spark:2.4.4-hadop_3.2.1")
conf.set("spark.kubernetes.allocation.batch.size", "5")
conf.set("spark.kubernetes.executor.instances", "1")
conf.set("spark.driver.bindAddress", "0.0.0.0")
conf.set("spark.driver.host", "jupyter")
conf.set("spark.driver.port", "37371")
conf.set("spark.blockManager.port", "6060")
The driver needs to authenticate to the Kubernetes API with a service account that has permission to create pods. Kubeflow sets up a Kubernetes service account called default-editor
. The namespace (created via Kubeflow) for my Notebook pods is called anonymous
.
To get the secret for the default-editor
account:
$ kubectl describe sa default-editor -n anonymous
Name: default-editor
Namespace: anonymous
Mountable secrets: default-editor-token-pprwn
$ kubectl describe secret default-editor-token-pprwn -n anonymous
Name: default-editor-token-pprwn
Namespace: anonymous
Labels: <none>
....
token: ABCXYZ.....
Create a Kubernetes Headless Service
Before we launch our SparkContext, we need to create a headless service that will sit in front of our Jupyter pod. This is necessary because pods are created with a dynamic IP, and we need to be able to give the Executor a static DNS name to use to connect to the Driver.
$ cat jupyter-svc.yaml
apiVersion: v1
kind: Service
metadata:
name: jupyter
namespace: anonymous
spec:
selector:
notebook-name: jupyter
clusterIP: None
ports:
- name: notebook
port: 8888
targetPort: 8888
- name: comm
port: 37371
targetPort: 37371
- name: blockmngr
port: 6060
targetPort: 6060
kubectl apply -f jupyter-svc.yaml
service/jupyter created
kubectl describe service/jupyter -n anonymous
Name: jupyter
Namespace: anonymous
Labels: <none>
Annotations: kubectl.kubernetes.io/last-applied-configuration:
{"apiVersion":"v1","kind":"Service","metadata":{"annotations":{},"name":"jupyter","namespace":"anonymous"},"spec":{"clusterIP":"None","por...
Selector: notebook-name=jupyter
Type: ClusterIP
IP: None
Port: notebook 8888/TCP
TargetPort: 8888/TCP
Endpoints: 172.21.224.152:8888
Port: comm 37371/TCP
TargetPort: 37371/TCP
Endpoints: 172.21.224.152:37371
Port: blockmngr 6060/TCP
TargetPort: 6060/TCP
Endpoints: 172.21.224.152:6060
Session Affinity: None
Events: <none>
The service selected the notebook with the label notebook-name=jupyter
and set up the appropriate endpoints.
Create the SparkContext in your Notebook
In: sc = SparkContext(conf=conf)
In: print(sc)
Out: SparkContext(..... )
Creating the SparkContext
will create a pod called sparktest
in the anonymous namespace and start the driver process on the Jupyter Notebook pod.
Check the logs to ensure the pod started correctly and connected to the Jupyter service:
$ kubectl logs sparktest-1234-exec-1 -n anonymous
19/12/20 17:46:19 INFO TransportClientFactory: Successfully created connection to jupyter/172.21.224.152:37371 after 89 ms (0 ms spent in bootstraps)
If everything has gone well, you should be able to execute spark jobs:
In: sc.parallelize([1,2,3,4,5]).count()
Out: 5
Connect to S3/Minio
Next, let’s set up a connection to S3 (or in this case, a local Minio instance). It’s imperative that your driver and executor pods have the hadoop-aws
and aws-sdk
jars in Spark’s CLASSPATH. The hadoop-aws
jar needs the aws-sdk
jar that it was built with. In the image referenced above, I’m using hadoop-aws-3.2.1.jar
and aws-javsa-sdk-bundle-1.11.695.jar.
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "minio")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "minio123")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "minio-hl-svc.default.svc.cluster.local:9000")
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
The s3 endpoint should be the name of the Kubernetes service that Minio created during installation (or the AWS S3 endpoint if using AWS). I installed the Minio Operator which set up the service for me. For reasons yet unknown, I couldn’t get the endpoint to work with the hostname only, even though the pod was able to lookup the hostname. Something to investigate later.
If all goes well, you should be able to use the s3a
connector to talk to S3/Minio:
In: json = sc.textFile("s3a://your-bucket/somefile.json")
In: json.collect()
Out: [ ..... ]
Next Steps
There’s still a lot to do before using this in production:
- Automate the creation of the headless service. We’ll have many Notebooks running and obviously creating the service manually isn’t acceptable.
- Performance tuning & benchmarking
- Backup, restore, monitoring and alerting for the entire stack