Continuous NLP Pipelines with Python, Java, and Apache Kafka

Victor Künstler
bakdata
Published in
12 min readJul 6, 2020
Photo by Joao Branco on Unsplash

Advancements in machine learning, data analytics, and IoT, and the business strategic shift towards real-time data-driven decision making, increase the demand for stream processing. Apache Kafka and Kafka Streams experience rising popularity as a solution to build streaming data processing platforms.

Natural language processing (NLP) can retrieve valuable information from texts and is a typical task tackled on such platforms.

TL;DR We implemented common base functionality to create production NLP pipelines. We use Avro, large messages, and error handling with modern monitoring and combine powerful Python libraries with Java. Read on to learn about the technical foundation we created and the libraries we share.

Because Kafka Streams, the most popular client library for Kafka, is developed for Java, many applications in Kafka pipelines are written in Java. However, several popular libraries for NLP are developed in Python (spaCy, NLTK, Gensim, …). These libraries have large communities behind them and are very popular. Thus, using Python including such NLP libraries as part of streaming data pipelines is aspired to achieve excellent results. Consequently, working with streaming applications written in Java and Python seamlessly in a modern streaming data pipeline becomes necessary.

Using Python and Java for streaming applications in combination implies that applications are decoupled. Then, deployment processes, error handling, and monitoring are more complex. Also, the consistency and (de-)serializability of the data consumed and produced by applications has to be language-agnostic.

In this blog post, we illustrate how we laid down the foundation to develop NLP pipelines with Python, Java, and Apache Kafka. We discuss all the above-mentioned challenges and showcase common utility functions and base classes to develop Python and Java streaming applications, which work together smoothly. We cover the following topics:

  • Developing, configuring, and deploying Kafka applications written in Python and Java on Kubernetes
  • Using Avro for serialization in Java and Python streaming applications
  • Managing errors in a common way using standardized dead letters
  • Handling large text messages in Kafka using a custom s3-backed SerDe
  • Monitoring streaming data processing platform using Grafana and Prometheus

Example NLP Pipeline

Example NLP Pipeline with Java and Python, and Apache Kafka

As an example, for this blog post, we set up a streaming data pipeline in Apache Kafka: We assume that we receive text messages in the input topic serialized with Avro.

If you want to reproduce the example you can find the demo code for the Producer and the Loader to write messages to the input topic in the Github repository.

The pipeline consists of two steps that are implemented in two separate streaming applications:

  1. spaCy Lemmatizer: A Python streaming application that extracts non-stop-word lemmas using spaCy.
  2. TFIDF-Application: A Java streaming application that receives documents as a list of lemmas and calculates the corresponding TFIDF scores.

To develop the Lemmatizer in Python, we use Faust, a library that aims to port Kafka Streams’ ideas to Python. SpaCy extracts lemmas in this example. For the Java application, we are using the well-known Kafka Streams library.

To deploy Kafka on Kubernetes, you can use the Confluent Platform Helm Charts. For this example, we need the following services: ZooKeeper, Kafka brokers, and the Confluent Schema Registry.

In the following sections, we focus on the most important parts of the development, configuration, and deployment of the Python and the Java application to build the example. We will not discuss the lemma extraction using spaCy and the TFIDF calculation in Java. You can find the entire code for this example NLP pipeline in the demo repository.

Using Common Configuration Options to Develop Streaming Applications in Python and Java

Kafka streaming applications require a minimum set of configurations to launch. A typical streaming application needs information about the input topics, the output topic, the brokers, the schema registry URL, etc. These configurations are mandatory for stream processing applications using Kafka Streams in Java or Faust in Python.

To align Kafka Streams and Faust deployment configurations, we built utility functions and base classes for both application frameworks.

They introduce a common way:

  • to configure Kafka streaming applications,
  • to deploy applications into a Kafka cluster on Kubernetes via our common Helm Chart using a standardized values.yaml,
  • and to reprocess data.

The common configuration from the Helm chart is passed as environment variables and set as matching command-line arguments. This configuration includes:

  • brokers: List of Kafka brokers
  • input-topics: List of input topics
  • output-topic: The output topic
  • error-topic: A topic to write errors to
  • schema-registry-url: The URL of the schema registry
  • clean-up: Whether the state store and the Kafka offsets for the consumer group should be cleared
  • delete-output: Whether the output topic should be deleted during the cleanup

We now demonstrate how we can easily spin up applications using those libraries.

Python — Faust-Bootstrap Application

To use faust-bootstrap with our spaCy Lemmatizer application, we create a LemmatizerApp class that inherits from FaustApplication:

Every faust-bootstrap application has to implement the abstract methods get_unique_app_id(), setup_topics(), build_topology(), and _register_parameters().

We set the Id of the application in get_unique_app_id() (line 12). In _register_parameters() (line 15) we can add additional parameters to the application that are exposed as configuration options in the deployment. In setup_topics() (line 18), we configure the input and output. Message keys and values are defined to be bytes (lines 19 and 20).

Finally, we provide the topology of our streaming application. Faust uses agents to process streams. We can either use Faust’s app.agent decorator or register our agents in the buildTopology() method. Here, we register our agent in the buildToplogy() method (lines 26 and 27). To do so, we define the basis of our agent as an inner function of another function that expects the topics that our agent requires:

To start the application, we run:

The base for our streaming application is now ready to start and can be configured either by using environment variables or command-line arguments.

Java — Streams-Bootstrap Application

Creating the basis for our TFIDF application with ourstreams-bootstrap is easy:

We create a subclass of KafkaStreamsApplication and implement the abstract methods buildTopology() and getUniqueAppId(). We define the topology of our application in buildTopology() (line 14) and we set the application Id by overriding getUniqueAppId() (line 24). By calling startApplication(new TFIDFApplication(), args); in the main method (line 20), the passed in command-line arguments are populated with the values from matching environment variables. Like in our Faust-Bootstrap application, the application is now ready to start and can be configured either by using environment variables or command-line arguments.

Deployment of our Applications

To deploy the applications, we use our streams-bootstrap Helm Charts. You need to add our Helm Charts repo and update the repo information by running:

$ helm repo add bakdata-common https://raw.githubusercontent.com\
/bakdata/streams-bootstrap/master/charts/
$ helm repo udpate

Now, you have to build and push Docker images for the applications written in Java or Python to your preferred Docker registry.

To containerize KafkaStreams applications we use Jib. For Faust applications, we use a custom Dockerfile.

Then, we configure the streaming application deployment with Helm. In values.yaml files, you can set the configuration options. For example, the file for the spaCy Lemmatizer in the demo pipeline looks like this:

spaCy Lemmatizer values.yaml to deploy using our Kafka application Helm Charts

Common configurations appear in the streams section. You can find the default values and some example parameters here.

The Helm Chart populates the corresponding environment variables to match those defined by the common configuration options of streams-bootstrap and faust-bootstrap. Therefore, the deployment of both applications, Faust and Kafka Streams, to Kafka on Kubernetes is as simple as this:

$ helm upgrade --debug --install --force --values values.yaml <release_name> bakdata-common/streams-app

The deployed pod launches our streaming application, which is configured just as defined in the values.yaml.

Reprocessing

At some point during the development, you might want to process all input data again. Essentially, you want to reset the application state. streams-bootstrap and faust-bootstrap applications can reset all offsets for the consumer group, as well as the application state. We provide a Helm Chart to run such cleanup: bakdata-common/streams-app-cleanup-job. You can start it with:

$ helm upgrade --debug --install --force --values values.yaml <release_name> bakdata-common/streams-app-cleanup-job

If you want to delete all involved topics (output and internal) and reset the schema registry during the cleanup, you can set streams.deleteOutput=true:

$ helm upgrade --debug --install --force --values values.yaml <release_name> bakdata-common/streams-app-cleanup-job --set streams.deleteOutput=true

That is important if the schema becomes incompatible. However, remember that deleting the output is risky if you do not also control the downstream applications.

Using Avro for Serialization in Faust and Kafka Streams

Avro Schemas allow to validate, serialize, and deserialize data passed between different streaming applications. In our example with Python and Java, Avro is particularly important as we require a standardized serialization. Such common ground speeds up and simplifies error-resistant development.

For our example, we define the following Avro Schema for the input topic:

The Avro Schema for the topic the spaCy Lemmatizer application writes to and the TFIDF application reads from is:

Finally, the output topic receives messages using the following Avro Schema:

Avro in Faust-Bootstrap Applications

To use Avro for serialization in Faust applications, we introduce faust_avro_serializer. Faust-bootstrap uses the faust_avro_serializer for message values by default. Faust uses models for the description of data structures in keys and values. The following defines the models for the input and output topic of our spaCy lemmatizer application:

The faust_avro_serializer uses the Avro Schema definition in the _schema attribute (line 7 and 25) in the Faust models to serialize messages. Here, we set the schema definitions, shown before. Finally, we configure our application to use these schemata for (de-)serialization. We can easily do this via setup_topics()in the LemmatizerApp-Class:

Avro in Streams-Bootsrap Applications

Avro is commonly used in Kafka Streams. Our Java application expects LemmaText objects. Keys are simple Strings. To deserialize the input, we add additional configuration to the KafkaProperties:

Now the KafkaStream application uses Avro to deserialize the messages from the input topic. Avro ensures compatible messages in the topic the Python Faust application writes to and the Java Kafka Streams application reads from.

Error Handling

Sooner or later, you experience exceptions in your streaming applications written in Python or Java. For example, errors occur at the beginning of a pipeline, where unparsable raw data occurs. Then, Kubernetes restarts the crashed container, and the application will retry the processing. The processing continues, if the error was temporary. If the exception relates to a specific corrupt message, your streaming application will repeatedly fail and block the pipeline. Thus, you may want to process other incoming data instead of stopping the processing completely.

To tackle this, we developed common error handling for Kafka Streams and Faust applications. For Kafka Streams, we published a separate library: kafka-error-handling. For Faust, the error handling functions are included in faust-bootstrap.

For both libraries, the error handling works analogously. Successfully processed messages are sent to the output topic. If an error occurs, it is sent to a given error topic and will not cause the application to crash. The error topic contains dead letters following a common Avro schema. Dead letters describe the input value that caused the error, the respective error message, and the exception stack trace. That simplifies the investigation to solve the issue and the processing will not be interrupted until the error is resolved.

As mentioned above, both, streams-bootstrap and faust-bootstrap, include an error topic configuration option. This refers to the error topic that can be used for error handling.

Error Handling in Faust-Bootstrap Applications

To configure the error topic to use the dead letter schema, we add the following to setup_topics() in our LemmatizerApp Class:

To handle errors in our agent, we add the following to our agent:

create_spacy_agent now additionally expects the error topic as an argument. process_text (line 8) is a method that handles the processing of the text using spaCy. You can find its definition in the demo repository. To capture whether process_text threw an error, we use capture_error (line 8). If the returned value is an Exception, we check whether the error should cause the application to crash (forward_error line 11) or send a dead letter to the error topic.

Error Handling in Streams-Bootstrap Applications

We also integrated error handling into the Kafka Streams DSL. Consider that we want to map input data using a KeyValueMapper called mapper. Using kafka-error-handling, we can easily capture all errors that may occur when mapping the data:

Like in the Python example, we capture mapper errors (line 9). For each error, we create a dead letter (lines 10 and 11) and send it to the error topic (line 12). Successfully processed values can be sent to the output topic (lines 15 and 16).

Processing Large Text Files in Kafka

Text messages tend to be very large. However, Apache Kafka limits the maximum size of messages sent to a topic. Although the limit is configurable, some messages eventually can exceed this limit. We recently discussed the problem and how we solved it using our custom s3-backed SerDe that stores large messages on Amazon S3.

For our example pipeline, we now assume that the spaCy lemmatizer receives s3-backed messages. Clearly, the same applies to the lemmatizer output topic. Therefore, our Java and Python applications must support s3-backed SerDe. In the following, we introduce s3-backed SerDe support in our NLP pipeline to support large texts in Apache Kafka.

s3-backed SerDe in Faust-Boostrap Applications

To use s3-backed SerDe in our Faust application, we need to register a new serializer that uses the faust-s3-backed-serializer. To determine, whether our application should use s3-backed SerDe we add an additional parameter via the above-mentioned _register_paramters().

We configure s3-backed SerDe in create_s3_backed_serde(...) (line 26) and create the serializer in create_s3_serde(...) (line 9). The returned value (line 12) ensures that faust_avro_serializer is used as the first serializer followed by s3_backed_serializer to handle messages exceeding the configured maximum message size.

s3-backed SerDe in Streams-Bootsrap Applications

To add s3-backed SerDe to the Java application, we use kafka-s3-backed-serde. To deal with s3-backed messages in the input, we add the following to the TFIDFApplication properties:

Like in the Faust application, we use Avro to serialize values. We set the S3BackedSerde as the default value SerDe (line 8) and set Avro as its value SerDe (line 9).

Deployment with s3-backed SerDe

To configure the s3-backed SerDe, we add the following value to the values.yaml for both applications:

streams.config.s3backed.base.path configures the base path in Amazon S3 for storing the s3-backed messages. Additionally, we need to configure our Amazon S3 credentials as environment variables for both deployments. We can either do this in the values.yaml or when running the Helm upgrade command:

helm upgrade --debug --install --force --values values.yaml <release_name> bakdata-common/streams-app \
--set env.AWS_ACCESS_KEY_ID=<acces_key_id> \
--set env.AWS_SECRET_ACCESS_KEY=<access_key> \
--set env.AWS_REGION=<region>

Monitoring

Monitoring the data flow in sophisticated streaming data platforms is crucial. There exist great monitoring solutions for Kubernetes. Besides Kubernetes, the Cloud Native Computing Foundation also hosts Prometheus as their second project. Prometheus was explicitly built to monitor systems through recording real-time metrics. For visualization, Prometheus recommends Grafana. Prometheus can be set up as a data source for Grafana. Then, we can query the metrics in Prometheus to build comprehensive dashboards. Moreover, several dashboards for Apache Kafka already exist and can be imported to start monitoring data pipelines quickly.

Apache Kafka uses JMX as the default reporter for metrics. The Confluent Platform Helm charts install the Prometheus JMX Exporter as sidecars in all affected pods and JMX metrics are enabled by default for all components. Our Helm Charts also allows to deploy the Prometheus JMX Exporter with our streaming applications and thus to expose JMX metrics.

Setting up the monitoring for our NLP pipeline in the Kubernetes cluster is straightforward. We first deploy Prometheus and Grafana. You can add them using Helm:

helm install stable/prometheus
helm install stable/grafana

After setting up Prometheus as a data source in Grafana you can import your desired dashboard into Grafana. For example, the Confluent Platform Helm Charts provide a Gafana dashboard you can use.

Additionally, we use the Kafka Lag Exporter to view consumer group metrics. The consumer group lag refers to the difference between the last message produced and the last message committed to partitions. This tells us how far behind our NLP application is compared to data ingestion. This is crucial for real-time data processing.

Finally, we added custom dashboards to monitor the data flow. As an example, we monitor the number of messages, and the number of bytes transferred:

Monitoring the Number of Messages and Bytes Transferred In and Out of Topics in the Overall Pipeline or Specific Topics

Conclusion

Apache Kafka is a state-of-the-art stream processing platform. NLP is one common task in streaming data pipelines that often requires to use popular Python packages in combination with Java to accomplish excellent results. We built tools and libraries to accomplish NLP pipelines at scale where Java and Python interoperate seamlessly in Apache Kafka.

Now that you made it until the conclusion, you learned that it requires several connecting pieces, including configuration, deployment, serialization, and error handling, to allow for language-agnostic streaming data pipelines in production.

Thank you for reading.

Thanks to Christoph Böhm, Alexander Albrecht, Philipp Schirmer, and Benjamin Feldmann

--

--