Kubernetes for data engineering
In thousands of posts, Kubernetes is called either the best technology ever or invention straight from the hell. I can’t add much to that general discussion, but if you’re interested in how we use it specifically for data engineering with Spark over a couple of years, read on. This post will focus on overall approaches, and a subsequent one will discuss Spark-on-Kubernetes in detail.
What’s special about data engineering
Data engineering workloads that use Spark and store all data in a cloud data lake are very different from a usual production backend infrastructure:
- There are very few stateful services, and these services can all use a cloud database such as RDS. Compare this with production infra, where using virtual or physical instances for critical stateful databases is a typical approach.
- The rate of traditional requests is low (at most a few per second) and they are mostly trivially routed HTTP requests. You don’t have GRPC requests through 20 services with load balancing.
- Scaling is often dramatic. On Saturday evening, we have just several servers processing hourly batches. On Monday morning, that can raise to 300.
- We use a lot of external services and experiment with an even larger number. Airflow, Zeppelin, Metabase, Superset, Redash all might be running on the same day.
- Finally, data teams usually don’t have a broad knowledge of infrastructure tools.
Where does that lead us? We have a lot of similar stateful services, and we need automatic scaling and a way to deploy third-party tools. We have no experience to manage virtual machines, but also don’t have a single case where we need them. Kubernetes looks rather attractive — we can standardize on a single tool and forget about an entire class of operational burden.
The holy grail in operating software systems recently is infrastructure-as-code — where you describe all your setup as strict code, and then a magic tool makes sure your infrastructure matches the code. Terraform and Ansible are examples of such tools. However, they are one-off sync tools. You run them, they update everything, but if anything explodes in a minute, nothing will be there to fix things.
What Kubernetes introduces is a notion of active controller that continuously steers the world back to its desired state. For a simple component, you might need things like pod (a single process), deployment (set of process replicas), service (internal round-robin router for a deployment), and ingress (external access point). There are active controllers that start containers, register their IP addresses, update load balancers and create DNS records. And should anything break down, containers are restarted, load balancers are recreated, and everything keeps running.
This all is a bit abstract, so let’s look at an example of setting up a BI tool called Metabase. It already has definitions for all Kubernetes objects, which can be installed and customized using the Helm tool:
helm upgrade --install --namespace metabase metabase stable/metabase
The values.yaml file customizes the installation, and our current version has just 27 lines. First, we specify the Docker image to use (using a local copy to avoid depending on external services):
repository: <internal docker repo>/analytics/metabase
Then we specify database connection, using an AWS RDS instance:
host: <internal rds database>.rds.amazonaws.com
... username and password settings ...
And finally, we specify how to access our new service, by configuring ingress:
- metabase.<internal domain>
This part is probably most technically complex and specific to your cloud provider, but then it’s identical for all services, except for the hostname.
With that in place, Metabase is available on the domain we specify, with quite a number of benefits on top:
- The install can be removed and put back with a single command and is reproducible. We can easily have production and stage installs alongside.
- If the service fails, it will be restarted automatically. If the node fails, the service will be moved to another node. This is managed by core Kubernetes controllers.
- On the restart, the traffic will automatically be routed to the new instance of service, thanks to a third-party ingress controller.
- If the restart happens too often, we’ll receive an alert. If the service uses too much CPU time or memory, we’ll also receive an alert. And we can manually review the metrics in Grafana. This is managed by the Prometheus controller.
- The logs are also available via Grafana, thanks to the Loki service.
Personally, I think that’s a pretty good return for writing 27 lines of code. Of course, we had to first install the third-party controllers, but they are written by others and are relieving us from day-to-day work.
And now let’s talk about autoscaling. This is again specific to your cloud provider, and in our case we use eksctl to define nodegroup with the minimum and the maximum number of nodes:
- name: infra-b-k1-19
availabilityZones: [ "eu-central-1b" ]
This is enough for a standalone cluster autoscaler component to bring up nodes when necessary and stop them when no longer needed.
The automations I listed above are all useful, but they are just examples of the active controller approach. Kubernetes managed to lay out foundations for a lot of useful controllers to build upon.
Keeping it simple
The benefits above are attractive, but a data engineering team does not have spare Kubernetes experts that can build a cluster from scratch and add a service mesh on top. And we don’t need it — because we mostly run large compute jobs with no latency requirements. So, it’s reasonable to use off-shelf components as much as possible
- We use AWS EKS to create and manage the Kubernetes cluster
- We use the AWS load balancer controller to route traffic
- We use stock versions of DNS controller, cluster autoscaler, and Prometheus stack.
- If anybody breaks, we ask AWS support (where we have the luxury of enterprise plan)
Finally, using Kubernetes should not open floodgates for everything — and we actively avoid cool but unnecessary tools. No service meshes where a hand-written proxy will suffice. No fancy GitOps CD, just Jenkins or GitHub Actions.
The dark side of automation
The idea of active controllers that adjust the world to our declarative description has a dark side.
First, it might be a better way, but it’s also a new way. A positive example is JupyterHub. Its Zero-to-JupyterHub setup is full of features and has excellent documentation. In fact, that documentation has taught me half of the things I know about Kubernetes today. I’d recommend that you read it, even if you don’t plan to use JupyterHub.
Other projects might have limited, or buggy, Helm charts, or have three competing ones for you to randomly choose. Still, most established projects have at least basic support. Here I would recommend spending an evening with Helm docs so that you can dive in and find a solution if necessary.
Second, there are more moving parts. You deploy a Helm chart, which updates a config map, which is noticed by a controller, which updates another config map and then a sidecar updates the actual config file inside a container and the service reloads it. Any step can fail and bring down an important service until you fix the problem (after figuring this step exists). It is a tradeoff: you automate and much simplify daily operations, but the complexity means that the worst-case time-to-repair is now higher.
That’s the thing you might find the most problematic, and you should allocate some time for solving such issues, and preferably have an expert you can lean on in an emergency.
Third, the controllers and other critical workloads are often running inside Kubernetes themselves. CoreDNS is a service that helps pods find other pods but is itself a replica set with two pods. Therefore, misconfigurations affecting CoreDNS have a chain effect — they will break pretty much everything. For extra fun, the monitoring stack will also break, so you’ll have a bricked cluster with no debugging tools, just your intuition.
The same situation is possible with other services and controllers I mentioned above. More relevant for data engineering, you might be using Spark Operator to run your jobs, and if it goes down, then no jobs will be able to run. By the way, these are actual examples from practice. We found that most such failures happen during cluster upgrades or other massive changes, so there are process suggestions:
- Only upgrade the cluster on Monday morning, with all the team awake and caffeinated
- Read the release notes for Kubernetes and all components. For EKS you can consult the official documentation and reports from Marcin Cuber.
The examples above abstracted us away from thinking about hardware. But there’s one case where it breaks down — the Spark executors:
- Spark needs a lot of memory; we found we need “high-memory” instance types in AWS, such as r5.2xlarge, with 8 cores and 64GB. Fast local disk is also a big benefit, so we need r5d.2xlarge instance, or similar.
- Spark works much better with large instances. A JVM process using all those 8 cores and 64 GB of memory works much better than 8 independent processes.
To fulfill these requirements we need to create a separate set of nodes for Spark executors, which can be accomplished using labels:
# Nodegroup definition
- name: spark-executors
instanceTypes: ["r5d.2xlarge", "r5dn.2xlarge"]
purpose: spark-executor# Pod definition
- key: purpose
This makes sure that Spark only uses performance nodes, but we now have a reverse problem. Nothing prevents other services to get scheduled of these performance nodes. If we use interruptable (“spot”) nodes for Spark, it means that a critical service can get scheduled there, and be quickly interrupted. And even if that does not matter, Spark is a noisy neighbor — it uses all CPU there is, and generates lots of disk I/O, and Kubernetes cannot fully shield us.
We’d much rather not allow critical services at all, and we can do this using taints:
# Nodegroup definition
- name: spark-executors
purpose: "spark-executor:NoSchedule"# Pod definition for Spark executors
- key: "purpose"
The taint actively deters pods from using the tainted node, unless the pods declare tolerance for the taint, so no critical service will be scheduled on spark nodes.
Using taints and tolerations, we’ve separated hardware into three groups
- Infrastructure nodes that use reliable general-purpose instance types with network disks
- Spark driver nodes that use similar hardware as infrastructure nodes, but are frequently autoscaled
- Spark executor nodes — interruptible instances types with local disks and lots of memory
It served us well, and I’d recommend a similar approach.
Our experience using Kubernetes for data engineering with Spark is mostly positive. We do need to regularly add new services and we have to scale hardware daily, and for us, Kubernetes may not be perfect, but it’s the best tool overall.
Thanks to Petr Ivanov, Dmitry Ivanov, and Georgy Kolpakov for reviewing drafts of this post.