Spark Structured Streaming in K8s with Argo CD

Albert Franzi
Albert Franzi
Published in
5 min readSep 30, 2020

--

This post aims to cover how we enhanced the capability to deploy Spark Streaming applications in our K8s Cluster, at Typeform, using Argo CD. Easy, practical, and reusable.

ArgoCD + K8s + Spark

Over the last year, we have moved from a batch processing jobs setup, with Airflow using EC2s, to a powerful & scalable setup using Airflow & Spark in K8s (check out our previous article: deploying airflow 1.10.10 in Kubernetes).

The increasing need to support the latest Data and Machine Learning community advances, together with the need to enable multidisciplinary teams, forced us to design a solution that allows running multiple Spark versions at the same time while avoiding duplicated infrastructure and simplifying deployment, maintenance, and development.

Airflow + KubernetesPodOperator

Our journey started by testing out Airflow with K8s. To be more specific, by integrating Spark with the Spark Submit Hook and the KubernetesPodOperator.

The former provides the spark-submit command, while the K8s Pod Operator facilitates launching Spark jobs in K8s (code SparkK8sStandaloneOperator.py).

The process starts with uploading the assembly JAR from Artifactory to S3 and then executing the Spark Job inside K8s using the assembly JAR from S3.

Combining the code from S3 with the right Spark Docker image, allows us to reduce the size of our assembly JARs.

There is no need to feed your jars if Spark is already in the classpath of your docker images.

Spark has the feature to execute Spark jobs using the s3a system.

Dummy example

Note: To execute s3a JARs, the Spark Docker image needs to have at least AWS_SDK_VERSION=1.11.788

You can find our Dockerfiles at hadoop.Dockerfile & spark.Dockerfile.

Note that the requirement of AWS SDK Version ≥ 1.11.788, is due to the WebIdentityTokenCredentialsProvider since it’s the first version providing the build method called by Spark when authenticating on accessing S3 (check how IRSA works here).

The benefit of using the WebIdentityToken is we don’t have any AWS Secret Key in our K8s cluster. Everything works with IAM Roles defined for each service account on the namespace.

Our first idea was to use --master k8s, but since we were using the WebIdentityToken with IRSA as the authorization mechanism for AWS. We needed to specify the ServiceAccount for the executors, a feature that will be available in Spark 3.1.0 :(

Spark with SparkOperator

The setup described above with the KubernetesPodOperator in Airflow works fine for batch jobs. However, we wanted to move away from EMR for testing purposes, and we also wanted to support Streaming, so using Airflow wasn’t enough.

That’s when Spark on k8s operator comes to the scene, a great Kubernetes operator for managing the lifecycle of Apache Spark applications on Kubernetes developed by the community.

The Spark on K8s operator needs at least K8s 1.13 and Spark 2.4.5, we are lucky to be using already Spark 3.0.0 and K8s 1.16 pending to be migrated to 1.17 soon.

So, how does it work?

All it needs is a YAML spec with the arguments & configurations to run your spark jobs successfully.

All the YAML attributes are well defined in the operator documentation.

The main key here is the mainApplicationFile property fetching the jar file from S3.

Our advice is to use the Spark docker image, mentioned above (remember the WebIdentityTokenCredentialsProvider, we will need it) and put your jars into S3. So, the only remaining part is adding in your CD/CI pipeline a step where you upload the artifact to S3, so it can be used by the Spark Operator, and define the fs.s3a.aws.credentials.provider with the WebIdentityTokenCredentialsProvider.

We are using Travis with the S3 deploy provider to upload the jars to S3 on merging with the master branch.

Spark Operator spec example with Helm

As we mentioned above, we had the need of moving away from EMR. One of the main reasons being reducing costs, since having an EMR without jobs running, it equals to burning money. Besides, the EMR forced us to be attached to a specific Spark version in each cluster.

And last, but not least, using EMR means we have to wait until AWS releases the latest version in Spark (check EMR 6.x releases). This summer we were able to start playing with Spark 3.0.0 the day after its release (18th of June, 2020) when AWS was releasing the new EMR + Spark 3.0.0 in September 2020.

Spark + SparkOperator + ArgoCD

When it comes to deployment speed and simplicity, we always want to give data engineers and developers full control and flexibility to deploy their code as fast as possible, allowing short iteration cycles. And there is where our de facto CD tool shines: ArgoCD.

We wanted to follow the recommended Argo’s pattern: “App of apps” so defining one single ArgoCD’s app containing all our different streaming applications was the clear way to go, and we did it with the help of Helm.

Deployment flow with ArgoCD and Spark Operator

So, when we are ready to deploy the spark application to our K8s cluster, we just need to merge the master branch and update the version specified in the YAML specs. Then ArgoCD will automatically deploy the new version.

ArgoCD Application Spec
ArgoCD UI — Spark Applications running in the Spark Operator being Synchronized manually.

We have some Spark Streaming jobs running and consuming from a Kafka Cluster, by using the Spark Operator we delegate the error recovery to the operator by specifying the RestartPolicy.

In the beginning, we were using the OnFailure policy, however, we recently changed it to Always since the restart error count doesn’t get restarted (it will be nice if after X amount of time the counter got initialized again).

Our next step is integrating the Datadog Alert with the restart policy, so we can trigger Datadog Alerts in case of having the job stuck on the restarting step.

Bonus track: Spark History Server in K8s

Since the Spark jobs in K8s are ephemeral and logs are lost by default. To circumvent this situation, we have implemented the Spark History Server, so we were able to analyze our jobs once finished.

To be able to recover the logs and visualize them later, we configured our jobs to upload the logs to an S3 bucket and then the Spark History Server to read them from there.

SparkConfig setting up the properties required to feed the Spark History Server.
I’ve skipped most of the properties to focus on the one that matters — the SPARK_HISTORY_OPTS with the log directory pointing to S3a.

Note: Carlos Escura will publish an article about Spark History Server in K8s soon, stay tuned.

Next steps

Keep adding traceability with Datadog to improve and tune our jobs.

Once Spark 3.1.0 is ready, we would like to put our hands-on and test the ServiceAccount feature submitting batch jobs directly from Airflow in cluster mode.

Since we are really happy with the Streaming jobs being handled by ArgoCD, we will iterate in this direction.

Co-authored by Carlos Escura 🤓.

--

--