How to gracefully shut down a Spark Beam pipeline consuming from Kafka

DV Engineering
DoubleVerify Engineering
7 min readJul 27, 2023

--

Written By: Fabio Ortiz

Currently, there is no out-of-the-box solution to gracefully shut down an unbound Spark-Beam pipeline. There are implementations for a graceful shutdown for runners of Google Dataflow and Apache Flink. There is also a proposal in the Beam project to implement this functionality across all runners. However, as of this writing, users of Beam running in a Spark container are left to their own ingenuity. We are going to show an effective solution that is generic enough to be used by any Beam runner. At DoubleVerify, we have been using this implementation effectively for the past couple of years.

The problem

Consuming data from an unbounded Kafka source in Apache Beam presents the problem of how to stop it without losing data. The KafkaIO component provided by Beam to consume from Kafka only has one option to commit offsets to Kafka. This option is commitOffsetsInFinalize, which automatically commits the offsets of the read events to Kafka for you. However, the downside is that it is done right after reading the events and before they are delivered into the sink. Since an unbounded pipeline has two stages, one reading and one processing the events which run in parallel, you are never able to commit events after they are saved to a sink.

How a graceful shutdown should work

To understand the problem, I will provide a graph of our pipeline, which shows a shutdown. In the graph, you can see green and orange bars. The green bar represents the pipeline reading from Kafka, and the orange bar represents the processing of those events, including writing them to a GCS file (the sink). For reference, the pipeline batch interval is 15 minutes. The moment I hit the stop button at around 12:05, this is shown as a red triangle. At that moment, the pipeline stops reading events from Kafka, which is why there are no more green bars. The pipeline simply processes the events that are represented by the orange bar. The orange triangle represents the moment the pipeline is stopped. At that moment, I apply an update to the pipeline parameters and I perform a restart, which is shown by the green triangle. The pipeline starts reading again, which is shown by the green bar at around 12:30. At around 12:45, both a reading and processing bar appear. This occurs because events have been queued up while the pipeline was down, which immediately triggers another reading due to the first reading reaching the threshold for parameter maxRecordsPerBatch equal to 150,000,000. In the orange bar, the first events are processed and sent to the GCS sink. After 14:00, the pipeline has consumed the excess of events and resumes its normal cadence of reading and processing.

Shutdown code implementation

This shutdown implementation takes advantage of the KafkaIO component method named withConsumerFactoryFn. The code to be supplied to this function can be seen below:

...    
// create pipeline
Pipeline p = Pipeline.create(options);
...
// make sure you call the custom consumer factory
return p.apply("Read from Kafka", KafkaIO.<byte[], GenericRecord>read()
.withBootstrapServers(options.getSourceBrokers().get())
.withTopics(srcTopics)
.withConsumerFactoryFn(new ConsumerFactoryFn())
...
// Define a consumer factory that overrides Beam default one
private static class ConsumerFactoryFn implements SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> {

ConsumerFactoryFn() {
}

@Override
public Consumer<byte[], byte[]> apply(Map<String, Object> config) {

// Create the consumer using config.
final Consumer<byte[], byte[]> consumer =
new MyKafkaConsumer(config);

return consumer;
}
}

When you provide the above function to the KafkaIO component, it overrides the default implementation, which uses a Kafka stock KafkaConsumer class to fetch data from the brokers. Instead, it will use the supplied class implementation; here this class is named MyKafkaConsumer. Below is the code implementation:

public class MyKafkaConsumer extends KafkaConsumer {

private static Logger LOGGER = LoggerFactory.getLogger(MyKafkaConsumer.class);

public MyKafkaConsumer(Map configs) {
super(configs);
}

@Override
public ConsumerRecords poll(long timeoutMs) {
try (Stream<String> stream = Files.lines(Paths.get("/tmp/mount/shutdown.conf"))) {
stream.forEach(line -> {
if(line.equalsIgnoreCase("true")){
this.pause(this.assignment());
LOGGER.info("Kafka Consumer is paused");
}else if(line.equalsIgnoreCase("false")){
this.resume(this.assignment());
}
});

} catch (IOException e) {
e.printStackTrace();
}

return super.poll(timeoutMs);
}
}

The class above overrides the poll method of the original KafkaConsumer class provided by the Kafka library, and is used by default in Beam’s KafkaIO. The poll method is responsible for fetching data from the brokers. The functionality provided above checks if file shutdown.conf contains a line with true. If so then, it will call the KafkaConsumer pause method that stops reading from all the topics and partitions. This effectively stops the reading of data from Kafka. Then, it returns by calling the original poll method in the KafkaConsumer that fetches data. However, if the pause has been called, no data will be returned. To resume consuming the shutdown.conf file, it is updated with an entry of “false”, which then resumes reading from all the topics and partitions.

Shutdown additional components

Besides the previously mentioned code, there are some other components needed for a full automatic shutdown. Other components are necessary because besides stopping reading from Kafka, you need to know when it is okay to shut down the pipeline. The first component requires that the Spark metrics be enabled so that they are provided through the Spark REST API. This step requires that you provide a file named metrics.properties to the Spark at location $SPARK_HOME/conf/. The file should contain the following entries, which tell Spark the REST endpoint where to serve the metrics:

The above configuration will provide the necessary metrics using a URL like: “http://mysparkui:4040/metrics/prometheus/”. There will be a large number of metrics, but you only need two:

metrics_application_name_0_driver_MyBeamPipeline_StreamingMetrics_streaming_lastReceivedBatch_records_Number{type="gauges"} 0
metrics_application_name_0_driver_MyBeamPipeline_StreamingMetrics_streaming_unprocessedBatches_Number{type="gauges"} 0

The first metric tells when the first batch with zero records is received. Once you see that metric, you would need to look for the second metric, which informs that there are no more batches pending. Sometimes, when the pipeline is very busy, there can be more than one batch and the second batch might contain more events. Once the two metrics return zero, you can shut down the pipeline.

Components overview

Here, we describe the components used in our pipeline and how they interact with each other. The first thing you would notice is that we run Spark and Kafka in the Google GKE clusters. The second main component is GitLab, which we use to automate the entire process. Our pipeline “start”, “stop”, “pause”, and “resume” steps are just buttons in a GitLab project. Everything is automated, including the monitoring. We also use Prometheus and Grafana to monitor the pipeline.

Gitlab is in charge of updating the shutdown.conf file with the flag that tells the pipeline to read or not to read from Kafka. The “Deploy Spark” calls the other three steps in the following order: “Pause, Stop, Resume”. “Pause” simply changes the flag from false to true. “Stop” waits for the Spark metrics described above to return zero values, and then sends a signal to the driver pod to shut down, which in turn shuts down the executors. “Resume” sets the flag back to false and clears all the checkpoints for Spark and Beam. Then, the driver is restarted and in turn, starts the executors.

Performance considerations

Some of you might have noticed that reading the shutdown file before polling might have a performance impact. In our case, this is not an issue because the file resides locally in each executor and is initialized to false (read enabled). It is only updated when stop or pause are called. Modern operating systems are able to cache files into memory, especially small ones like shutdown.conf. This means that the flag is read from memory and not from a file. The second performance flag that may be raised is in the custom poll method of class MyKafkaConsumer. Those are no-op operations because the Kafka consumer knows the current state of the partition that is consuming, since it is assigned to a single consumer. It is able to remember the last setting of pause or resume for each partition. We also fetch a larger chunk of data than the default, which improves our throughput.

Conclusion

We have been using the implementation described within this post for about two years, with the goal of providing a reliable, clean shutdown to our pipelines. The solution is generic so that it can be applied to any Beam runner flavor. We have also described a single push button implementation. The references below contain links to the current Beam proposal for a shutdown as well as the Apache Flink, and Google Dataflow for the reader to explore other options. A better implementation would be one in which the Kafka offsets are committed after the data has been delivered to the sink. This way, it does not matter if the pipeline crashes at any point without the pause being called, because after a restart, it will start from the non-committed offset without any data loss. For those cases, we have a system that reprocesses the missing data in a parallel pipeline. We will be exploring such an option where the offsets are committed after data is delivered to the next sink. For now, the implementation herein has worked well at DoubleVerify.

References

--

--

DV Engineering
DoubleVerify Engineering

DoubleVerify engineers, data scientists and analysts write about their work and share their experience