Introducing KPOps
We are introducing KPOps (short for Kafka Pipeline Operations), a new command-line application for deploying Apache Kafka data pipelines to Kubernetes. At bakdata, it serves as the backbone of our deployment system and brings large-scale data processing to life.
Before KPOps, our deployment system was hard to grasp and re-use. Configurations for different parts of a pipeline were stored across multiple deployment repositories. The CI/CD logic consisted of complex scripts, making it difficult to understand and maintain. In a large pipeline, a simple change would require engineers to carefully trigger steps in multiple places. Quite the killer for developer productivity.
Today, KPOps handles all the above complexity for us. Under the hood it leverages the APIs of Helm, Kafka REST Proxy, Schema Registry, and Kafka Connect. All commonly needed deployment operations are combined in a streamlined tool. Hereby, the complexity of interacting with Kubernetes backend services is handled automatically, to simplify the management of large data streaming environments containing a lot of moving parts.
KPOps bridges the gap between Kubernetes resources and Kafka configuration by combining both into one concise pipeline representation.
Data and DevOps engineers can now use the KPOps CLI to deploy large pipelines with a single command. Each step of your pipeline is deployed in proper order. Meanwhile, KPOps automatically manages and applies your topic configurations, topic schemas, as well as connectors. KPOps is also used to gracefully terminate and clean a pipeline, including all Kubernetes resources and Kafka-related states, such as consumer offsets.
Creating a pipeline
In KPOps, a data pipeline is a sequence of steps, essentially a DAG. Each step is one of several built-in or user-defined components. Out of the box, there are components for producers and streaming apps powered by streams-bootstrap, Kafka sink and source connectors, as well as any generic Kubernetes app (such as a REST service) managed through Helm. Pipeline definitions are written in YAML. KPOps’ editor integration enables autocompletion and schema validation as you type.
Let’s take a look at the usual word count pipeline example.
We want to define three pipeline steps. First, a producer writes sentences to a Kafka topic. The following streams app reads from this topic, counts the words and writes the counts to an output topic. Finally, a Kafka sink connector inserts the results into a Redis database.
pipeline.yaml
- type: producer-app
name: sentence-producer
namespace: kpops
app:
image: bakdata/kpops-demo-sentence-producer
imageTag: "1.0.0"
to:
topics:
${output_topic_name}:
type: output
- type: streams-app
name: word-counter
namespace: kpops
app:
image: bakdata/kpops-demo-word-count-app
imageTag: "1.0.0"
replicaCount: 1
to:
topics:
${output_topic_name}:
type: output
- type: kafka-sink-connector
name: redis-sink-connector
app:
connector.class: …connect.redis.RedisSinkConnector
redis.hosts: redis-headless:6379
As you can see, it takes minimal effort to define this pipeline. Each step has a component type and a unique name. Additionally, every step can configure a from
and to
attribute defining input and output topics (or other steps to read from). KPOps automatically creates and manages these topics for us. By default, each step consumes the output topic of its preceding step in the pipeline definition. Therefore, we can avoid boilerplate configuration while being able to customize it where necessary.
Using Pipeline Defaults
KPOps provides a powerful defaulting mechanism based on its component hierarchy. If you find yourself repeating configurations for multiple pipeline steps, it’s useful to make it a default instead. In our pipeline, both the producer-app
and streams-app
share the attribute namespace: kpops
. Thanks to the inheritance model of KPOps components, defaults can be applied even across different component types which inherit from the same base. In this case, kubernetes-app
is a shared parent component for producer-app
and streams-app
. Therefore, we can define the namespace globally for all Kubernetes apps in our defaults.yaml file.
defaults.yaml
kubernetes-app:
namespace: kpops
Of course, it’s possible to override defaults for a specific pipeline step if necessary.
defaults.yaml
kafka-app:
to:
topics:
${output_topic_name}:
type: output
pipeline.yaml
- type: producer-app
name: sentence-producer
app:
image: bakdata/kpops-demo-sentence-producer
imageTag: "1.0.0"
- type: streams-app
name: word-counter
app:
image: bakdata/kpops-demo-word-count-app
imageTag: "1.0.0"
replicaCount: 1
to:
topics:
${output_topic_name}:
type: output
configs:
cleanup.policy: compact
The snippet above shows the reduced pipeline definition using the defaults. We deliberately overwrite the default to
attribute of all Kafka apps for the word-counter streams app to set cleanup.policy
as an additional Kafka topic config.
Running the kpops generate
command combines your pipeline and default definitions into a self-contained pipeline representation, which KPOps can use for the deployment. The output is the full pipeline definition including merged custom and default configuration.
Deploying a Pipeline
Let’s now deploy the word count pipeline. In config.yaml, we configure all Kafka-related services running on the Kubernetes cluster.
To verify everything is correctly set up, we perform a dry-run before the actual deployment. KPOps displays a preview of all changes in a rich diff. For us, it is an essential part of our workflow to check the diff before applying the changes. Moreover, the dry-run checks and validates topic configs and connector configs to make sure the pipeline definition is valid before the deployment.
creating a new deployment
> kpops deploy word-count/pipeline.yaml --dry-run
#######################
Deploy wc-data-producer
#######################
KubernetesAppComponent - Helm release word-count-sentence-producer does not exist
+ apiVersion: batch/v1
+ kind: Job
+ metadata:
+ labels:
+ app: word-count-sentence-producer
+ chart: producer-app-2.9.0
+ release: word-count-sentence-producer
+ name: word-count-sentence-producer
+ ...
changing an existing deployment
...
######################
Deploy word-count-word-counter
######################
KafkaTopic - Topic Creation: word-counter-topic already exists in cluster.
KubernetesAppComponent - Helm release word-count-word-counter already exists
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
...
spec:
- replicas: 1
+ replicas: 3
...
The above example solely serves as a demonstration. For the complete word count example, please refer to the KPOps Quick start guide.
Integrating the Deployment
KPOps is portable and runs anywhere, be it on your local machine or a managed environment in the cloud. Since KPOps is designed as a non-interactive CLI, it easily integrates in your CI/CD workflows on any platform.
We provide an official GitHub action. Adopting this approach for other platforms is straightforward, since one only needs to properly use the CLI tool. Your contributions are highly welcome.
Extending KPOps
Custom components
To go beyond the built-in KPOps components, which are available out of the box, developers can define custom component types in Python by subclassing the core PipelineComponent
or one of the existing components. One can use this to further reduce the complexity when defining a pipeline with custom components such as data sources or sinks. It’s possible to encapsulate common configurations using this approach. For instance, we could create a custom RedisSinkConnector
by inheriting from the built-in KafkaSinkConnector
.
class RedisSinkConnector(KafkaSinkConnector):
# custom attributes
host: str
...
This enables the redis-sink-connector
inside our pipeline definition and allows us to define defaults for it in defaults.yaml, the same way as for built-in components. This way, you can easily leverage KPOps’ deployment power for your own components.
redis-sink-connector:
app:
connector.class: …connect.redis.RedisSinkConnector
Using Variables
Another great feature in KPOps is variable substitution. You can simply use any environment variable from the shell inside a pipeline definition, such as $K8S_NAMESPACE
.
defaults.yaml
kubernetes-app:
namespace: $K8S_NAMESPACE
With the ${component_<key>}
notation, any YAML key can be reused within the same component to avoid repetition. Once again, this comes in handy to define component defaults containing templates.
As an example, you can create pipeline step names including their component types.
pipeline.yaml
- type: producer-app
name: sentence-${component_type} # result: sentence-producer-app
The same works in the defaults. Here, you could use the component name to create topic names.
defaults.yaml
producer-app:
to:
topics:
${component_name}-topic: # result: sentence-producer-app-topic
type: output
Finally, KPOps understands the folder structure of our deployment project and names the pipelines automatically based on the path. Again, the templating feature comes in handy to create versatile defaults if we have multiple pipelines.
KPOps deployment project folder structure
📁 kpops-deployment
├── config.yaml
├── defaults.yaml
└── 📁 pipelines
├──📁 word-count
│ └── pipeline.yaml
└──📁 atm-fraud
└── pipeline.yaml
Here, we are using the ${pipeline_name}
variable to automatically assign a dedicated Kubernetes namespace.
defaults.yaml
kubernetes-app:
namespace: ${pipeline_name}
Now, when running KPOps, it deploys the two pipelines to different Kubernetes namespaces.
kpops deploy pipelines/word-count/pipeline.yaml
→ deploy to namespace: pipelines-word-count
kpops deploy pipelines/atm-fraud/pipeline.yaml
→ deploy to namespace: pipelines-atm-fraud
As you can see, KPOps is very flexible while providing defaulting for many cases. Its extensibility further drives this for pipelines where built-in components are not the perfect level of abstraction. As a result, you can maintain a very concise pipeline definition in your environment of choice.
KPOps is publicly available as open-source. We are committed to its active development and adoption in the Kafka community. To learn more, visit the project on GitHub or read the documentation. As always, your contributions are very welcome!