Advancements in machine learning, data analytics, and IoT, and the business strategic shift towards real-time data-driven decision making, increase the demand for stream processing. Apache Kafka and Kafka Streams experience rising popularity as a solution to build streaming data processing platforms.
Natural language processing (NLP) can retrieve valuable information from texts and is a typical task tackled on such platforms.
TL;DR We implemented common base functionality to create production NLP pipelines. We use Avro, large messages, and error handling with modern monitoring and combine powerful Python libraries with Java. Read on to learn about the technical foundation we created and the libraries we share.
Because Kafka Streams, the most popular client library for Kafka, is developed for Java, many applications in Kafka pipelines are written in Java. However, several popular libraries for NLP are developed in Python (spaCy, NLTK, Gensim, …). These libraries have large communities behind them and are very popular. Thus, using Python including such NLP libraries as part of streaming data pipelines is aspired to achieve excellent results. Consequently, working with streaming applications written in Java and Python seamlessly in a modern streaming data pipeline becomes necessary.
Using Python and Java for streaming applications in combination implies that applications are decoupled. Then, deployment processes, error handling, and monitoring are more complex. Also, the consistency and (de-)serializability of the data consumed and produced by applications has to be language-agnostic.
In this blog post, we illustrate how we laid down the foundation to develop NLP pipelines with Python, Java, and Apache Kafka. We discuss all the above-mentioned challenges and showcase common utility functions and base classes to develop Python and Java streaming applications, which work together smoothly. We cover the following topics:
- Developing, configuring, and deploying Kafka applications written in Python and Java on Kubernetes
- Using Avro for serialization in Java and Python streaming applications
- Managing errors in a common way using standardized dead letters
- Handling large text messages in Kafka using a custom s3-backed SerDe
- Monitoring streaming data processing platform using Grafana and Prometheus
Example NLP Pipeline
As an example, for this blog post, we set up a streaming data pipeline in Apache Kafka: We assume that we receive text messages in the input topic serialized with Avro.
The pipeline consists of two steps that are implemented in two separate streaming applications:
- spaCy Lemmatizer: A Python streaming application that extracts non-stop-word lemmas using spaCy.
- TFIDF-Application: A Java streaming application that receives documents as a list of lemmas and calculates the corresponding TFIDF scores.
To develop the Lemmatizer in Python, we use Faust, a library that aims to port Kafka Streams’ ideas to Python. SpaCy extracts lemmas in this example. For the Java application, we are using the well-known Kafka Streams library.
To deploy Kafka on Kubernetes, you can use the Confluent Platform Helm Charts. For this example, we need the following services: ZooKeeper, Kafka brokers, and the Confluent Schema Registry.
In the following sections, we focus on the most important parts of the development, configuration, and deployment of the Python and the Java application to build the example. We will not discuss the lemma extraction using spaCy and the TFIDF calculation in Java. You can find the entire code for this example NLP pipeline in the demo repository.
Using Common Configuration Options to Develop Streaming Applications in Python and Java
Kafka streaming applications require a minimum set of configurations to launch. A typical streaming application needs information about the input topics, the output topic, the brokers, the schema registry URL, etc. These configurations are mandatory for stream processing applications using Kafka Streams in Java or Faust in Python.
To align Kafka Streams and Faust deployment configurations, we built utility functions and base classes for both application frameworks.
- The Java library for Kafka Streams can be found here: common-kafka-streams
- The Python package for Faust is here: faust-bootstrap
They introduce a common way:
- to configure Kafka streaming applications,
- to deploy applications into a Kafka cluster on Kubernetes via our common Helm Chart using a standardized
- and to reprocess data.
The common configuration from the Helm chart is passed as environment variables and set as matching command-line arguments. This configuration includes:
brokers: List of Kafka brokers
input-topics: List of input topics
output-topic: The output topic
error-topic: A topic to write errors to
schema-registry-url: The URL of the schema registry
clean-up: Whether the state store and the Kafka offsets for the consumer group should be cleared
delete-output: Whether the output topic should be deleted during the cleanup
We now demonstrate how we can easily spin up applications using those libraries.
Python — Faust-Bootstrap Application
faust-bootstrap with our spaCy Lemmatizer application, we create a
LemmatizerApp class that inherits from
faust-bootstrap application has to implement the abstract methods
We set the Id of the application in
get_unique_app_id() (line 12). In
_register_parameters() (line 15) we can add additional parameters to the application that are exposed as configuration options in the deployment. In
setup_topics() (line 18), we configure the input and output. Message keys and values are defined to be bytes (lines 19 and 20).
Finally, we provide the topology of our streaming application. Faust uses agents to process streams. We can either use Faust’s
app.agent decorator or register our agents in the
buildTopology() method. Here, we register our agent in the
buildToplogy() method (lines 26 and 27). To do so, we define the basis of our agent as an inner function of another function that expects the topics that our agent requires:
To start the application, we run:
The base for our streaming application is now ready to start and can be configured either by using environment variables or command-line arguments.
Java — Common-Kafka-Streams Application
Creating the basis for our TFIDF application with our
common-kafka-streams is easy:
We create a subclass of
KafkaStreamsApplication and implement the abstract methods
getUniqueAppId(). We define the topology of our application in
buildTopology() (line 14) and we set the application Id by overriding
getUniqueAppId() (line 24). By calling
startApplication(new TFIDFApplication(), args); in the main method (line 20), the passed in command-line arguments are populated with the values from matching environment variables. Like in our Faust-Bootstrap application, the application is now ready to start and can be configured either by using environment variables or command-line arguments.
Deployment of our Applications
To deploy the applications, we use our
common-kafka-streams Helm Charts. You need to add our Helm Charts repo and update the repo information by running:
$ helm repo add bakdata-common https://raw.githubusercontent.com/\
$ helm repo udpate
Now, you have to build and push Docker images for the applications written in Java or Python to your preferred Docker registry.
To containerize KafkaStreams applications we use Jib. For Faust applications, we use a custom Dockerfile.
Then, we configure the streaming application deployment with Helm. In
values.yaml files, you can set the configuration options. For example, the file for the
spaCy Lemmatizer in the demo pipeline looks like this:
Common configurations appear in the
streams section. You can find the default values and some example parameters here.
The Helm Chart populates the corresponding environment variables to match those defined by the common configuration options of
faust-bootstrap. Therefore, the deployment of both applications, Faust and Kafka Streams, to Kafka on Kubernetes is as simple as this:
$ helm upgrade --debug --install --force --values values.yaml <release_name> bakdata-common/streams-app
The deployed pod launches our streaming application, which is configured just as defined in the
At some point during the development, you might want to process all input data again. Essentially, you want to reset the application state.
faust-bootstrap applications can reset all offsets for the consumer group, as well as the application state. We provide a Helm Chart to run such cleanup:
bakdata-common/streams-app-cleanup-job. You can start it with:
$ helm upgrade --debug --install --force --values values.yaml <release_name> bakdata-common/streams-app-cleanup-job
If you want to delete all involved topics (output and internal) and reset the schema registry during the cleanup, you can set
$ helm upgrade --debug --install --force --values values.yaml <release_name> bakdata-common/streams-app-cleanup-job --set streams.deleteOutput=true
That is important if the schema becomes incompatible. However, remember that deleting the output is risky if you do not also control the downstream applications.
Using Avro for Serialization in Faust and Kafka Streams
Avro Schemas allow to validate, serialize, and deserialize data passed between different streaming applications. In our example with Python and Java, Avro is particularly important as we require a standardized serialization. Such common ground speeds up and simplifies error-resistant development.
For our example, we define the following Avro Schema for the input topic:
The Avro Schema for the topic the spaCy Lemmatizer application writes to and the TFIDF application reads from is:
Finally, the output topic receives messages using the following Avro Schema:
Avro in Faust-Bootstrap Applications
To use Avro for serialization in Faust applications, we introduce
Faust-bootstrap uses the
faust_avro_serializer for message values by default. Faust uses models for the description of data structures in keys and values. The following defines the models for the input and output topic of our spaCy lemmatizer application:
faust_avro_serializer uses the Avro Schema definition in the
_schema attribute (line 7 and 25) in the Faust models to serialize messages. Here, we set the schema definitions, shown before. Finally, we configure our application to use these schemata for (de-)serialization. We can easily do this via
setup_topics()in the LemmatizerApp-Class:
Avro in CommonKafkaStreams Applications
Avro is commonly used in Kafka Streams. Our Java application expects
LemmaText objects. Keys are simple Strings. To deserialize the input, we add additional configuration to the KafkaProperties:
Now the KafkaStream application uses Avro to deserialize the messages from the input topic. Avro ensures compatible messages in the topic the Python Faust application writes to and the Java Kafka Streams application reads from.
Sooner or later, you experience exceptions in your streaming applications written in Python or Java. For example, errors occur at the beginning of a pipeline, where unparsable raw data occurs. Then, Kubernetes restarts the crashed container, and the application will retry the processing. The processing continues, if the error was temporary. If the exception relates to a specific corrupt message, your streaming application will repeatedly fail and block the pipeline. Thus, you may want to process other incoming data instead of stopping the processing completely.
To tackle this, we developed common error handling for Kafka Streams and Faust applications. For Kafka Streams, we published a separate library:
kafka-error-handling. For Faust, the error handling functions are included in
For both libraries, the error handling works analogously. Successfully processed messages are sent to the output topic. If an error occurs, it is sent to a given error topic and will not cause the application to crash. The error topic contains dead letters following a common Avro schema. Dead letters describe the input value that caused the error, the respective error message, and the exception stack trace. That simplifies the investigation to solve the issue and the processing will not be interrupted until the error is resolved.
As mentioned above, both,
faust-bootstrap, include an error topic configuration option. This refers to the error topic that can be used for error handling.
Error Handling in Faust-Bootstrap Applications
To configure the error topic to use the dead letter schema, we add the following to
setup_topics() in our LemmatizerApp Class:
To handle errors in our agent, we add the following to our agent:
create_spacy_agent now additionally expects the error topic as an argument.
process_text (line 8) is a method that handles the processing of the text using spaCy. You can find its definition in the demo repository. To capture whether
process_text threw an error, we use
capture_error (line 8). If the returned value is an Exception, we check whether the error should cause the application to crash (
forward_error line 11) or send a dead letter to the error topic.
Error Handling in Common Kafka Streams Applications
We also integrated error handling into the Kafka Streams DSL. Consider that we want to map input data using a
kafka-error-handling, we can easily capture all errors that may occur when mapping the data:
Like in the Python example, we capture
mapper errors (line 9). For each error, we create a dead letter (lines 10 and 11) and send it to the error topic (line 12). Successfully processed values can be sent to the output topic (lines 15 and 16).
Processing Large Text Files in Kafka
Text messages tend to be very large. However, Apache Kafka limits the maximum size of messages sent to a topic. Although the limit is configurable, some messages eventually can exceed this limit. We recently discussed the problem and how we solved it using our custom s3-backed SerDe that stores large messages on Amazon S3.
For our example pipeline, we now assume that the spaCy lemmatizer receives s3-backed messages. Clearly, the same applies to the lemmatizer output topic. Therefore, our Java and Python applications must support s3-backed SerDe. In the following, we introduce s3-backed SerDe support in our NLP pipeline to support large texts in Apache Kafka.
s3-backed SerDe in Faust-Boostrap Applications
To use s3-backed SerDe in our Faust application, we need to register a new serializer that uses the
faust-s3-backed-serializer. To determine, whether our application should use s3-backed SerDe we add an additional parameter via the above-mentioned
We configure s3-backed SerDe in
create_s3_backed_serde(...) (line 26) and create the serializer in
create_s3_serde(...) (line 9). The returned value (line 12) ensures that
faust_avro_serializer is used as the first serializer followed by
s3_backed_serializer to handle messages exceeding the configured maximum message size.
s3-backed SerDe in Common Kafka Streams Applications
To add s3-backed SerDe to the Java application, we use
kafka-s3-backed-serde. To deal with s3-backed messages in the input, we add the following to the
Like in the Faust application, we use Avro to serialize values. We set the
S3BackedSerde as the default value SerDe (line 8) and set Avro as its value SerDe (line 9).
Deployment with s3-backed SerDe
To configure the s3-backed SerDe, we add the following value to the
values.yaml for both applications:
streams.config.s3backed.base.path configures the base path in Amazon S3 for storing the s3-backed messages. Additionally, we need to configure our Amazon S3 credentials as environment variables for both deployments. We can either do this in the
values.yaml or when running the Helm upgrade command:
helm upgrade --debug --install --force --values values.yaml <release_name> bakdata-common/streams-app \
--set env.AWS_ACCESS_KEY_ID=<acces_key_id> \
--set env.AWS_SECRET_ACCESS_KEY=<access_key> \
Monitoring the data flow in sophisticated streaming data platforms is crucial. There exist great monitoring solutions for Kubernetes. Besides Kubernetes, the Cloud Native Computing Foundation also hosts Prometheus as their second project. Prometheus was explicitly built to monitor systems through recording real-time metrics. For visualization, Prometheus recommends Grafana. Prometheus can be set up as a data source for Grafana. Then, we can query the metrics in Prometheus to build comprehensive dashboards. Moreover, several dashboards for Apache Kafka already exist and can be imported to start monitoring data pipelines quickly.
Apache Kafka uses JMX as the default reporter for metrics. The Confluent Platform Helm charts install the Prometheus JMX Exporter as sidecars in all affected pods and JMX metrics are enabled by default for all components. Our Helm Charts also allows to deploy the Prometheus JMX Exporter with our streaming applications and thus to expose JMX metrics.
Setting up the monitoring for our NLP pipeline in the Kubernetes cluster is straightforward. We first deploy Prometheus and Grafana. You can add them using Helm:
helm install stable/prometheus
helm install stable/grafana
After setting up Prometheus as a data source in Grafana you can import your desired dashboard into Grafana. For example, the Confluent Platform Helm Charts provide a Gafana dashboard you can use.
Additionally, we use the Kafka Lag Exporter to view consumer group metrics. The consumer group lag refers to the difference between the last message produced and the last message committed to partitions. This tells us how far behind our NLP application is compared to data ingestion. This is crucial for real-time data processing.
Finally, we added custom dashboards to monitor the data flow. As an example, we monitor the number of messages, and the number of bytes transferred:
Apache Kafka is a state-of-the-art stream processing platform. NLP is one common task in streaming data pipelines that often requires to use popular Python packages in combination with Java to accomplish excellent results. We built tools and libraries to accomplish NLP pipelines at scale where Java and Python interoperate seamlessly in Apache Kafka.
Now that you made it until the conclusion, you learned that it requires several connecting pieces, including configuration, deployment, serialization, and error handling, to allow for language-agnostic streaming data pipelines in production.
Thank you for reading.