Reliable, High-Throughput Batching with the Kafka Consumer Snap

Patrick Taylor
SnapLogic Engineering
7 min readJan 28, 2021

OVERVIEW

SnapLogic recently released an update to our Confluent Kafka Snap pack containing a number of important enhancements. In this article, we take a closer look at the Kafka Consumer Snap’s new Output Mode setting, and how it can be used to achieve reliable, high-throughput performance for some common use cases where it’s important to process records in batches.

USE CASE 1: COPY ALL DATA FROM A TOPIC TO S3

Imagine a use case with these requirements:

  1. Copy all records from a Kafka topic to JSON files stored on S3.
  2. The topic can have any number of records.
  3. Each file should contain a maximum of 5,000 records.
  4. The first execution of the pipeline should copy all existing data in the topic.
  5. Each subsequent execution should copy all new data since the last execution.
  6. Ensure that each file is successfully written before reading more records.

This can be implemented with a fairly simple pipeline that looks like this:

Kafka Consumer configuration

Requirements 1 and 2 imply:

  • Set Message Count to the special value 0 so that all available records will be copied.

Requirement 3 says that we need to deal with records in batches:

  • Set Max Poll Records to 5000 to specify the desired batch size.
  • Set Acknowledge Mode to Wait after each batch of records.
  • Set the new Output Mode setting to One output document per batch. We’ll discuss the importance of this below.

Requirements 4 and 5 mean:

  • Set Group ID to a new value for the first execution; then leave it unchanged.
  • Set Auto Offset Reset to Earliest so that the first run will start from the earliest record.
  • Set the Consumer’s Seek Type to End so that subsequent executions will start consuming records following the offsets committed by the last execution.

Requirement 6 means:

  • Uncheck Auto Commit.
  • Use the Kafka Acknowledge Snap to allow the Consumer to track when it’s ok to commit offsets of processed records and poll for more records.

Importance of the Consumer’s new Output Mode setting

The key to solving this use case with such a simple pipeline is the Consumer’s new Output Mode setting. The default value for this setting is the backward-compatible behavior: One output document per record. The diagram below shows how this works.

Output Mode = One output document per record

This flattening of the record batches received from the broker into a stream of individual records is ideal for many use cases where records can be processed individually. But it presents a problem for our use case since we want to process records in batches. The JSON Formatter Snap can be configured to write all records to a single output file, or to write each record individually, neither of which is what we want.

We could try to re-batch the Consumer’s output records using a Snap like Group by N. But this is also problematic. What would we use for the value of N? If we set it 5000, the same value used for the Consumer’s Max Poll Records, this would re-impose the batching of records to resemble the batching as received from the broker. However, the Kafka broker can sometimes return a batch that has fewer records than the batch size specified by Max Poll Records, even when more records are available. If this were to happen, the Group By N snap would wait indefinitely to receive more input records, since it needs a full N input records to create a single output document. But since we’ve disabled Auto Commit, the Consumer snap will not output more records until the records it just output have been acknowledged by the Acknowledge Snap, which is downstream from the Group By N. The records that need to be acknowledged before more records will be fetched won’t ever be acknowledged if the Group By won’t let them proceed, so the Consumer Snap will abort with an acknowledge timeout exception. Changing the value of N for the Group By N Snap won’t help.

The solution for this problem is to set the Consumer’s Output Mode to One output document per batch. This preserves the batching of records as received from the broker:

Output Mode = One output document per batch

In this mode, every output document looks like this:

Each document has three top-level keys:

  • batch: An array of records, each with key, value, metadata.
  • batch_size: The number of records in this batch.
  • batch_index: The index of this batch with respect to all other batches output by this instance of the Snap during the current execution, starting from 0.

JSON Formatter configuration

To configure what the JSON Formatter Snap writes to its binary output, we can set Content to $batch to reference the array of records within each document received from the Consumer, and check Format each document so that each input document will result in a separate output file to be written by the File Writer.

We also configure the Binary header properties to write additional information to accompany each binary output:

  • Pass through the value of batch_index so it can be referenced by the File Writer’s File name expression.
  • Pass through the metadata from every record in the batch array so it can be accessed by the downstream Kafka Acknowledge Snap.

File Writer configuration

We configure the File Writer’s File name to an expression which references three variables:

  • _OutputPath: The value of a pipeline parameter named OutputPath, which we set to the URL of the root S3 directory where all the files from every execution should be written.
  • pipe.ruuid: The unique identifier of the pipeline execution (i.e. the runtime). This is used as the name of a subdirectory to create under the root S3 directory.
  • batch_index: The batch index from the Kafka Consumer’s output document, passed through the JSON Formatter as a binary header.

At the end of an execution, we’ll have a set of S3 files named batch-0.json, batch-1.json, etc.

JSON Splitter configuration

Even when using batch output mode, we need to acknowledge each record individually, so we use a JSON Splitter to split the array of metadata which the JSON Formatter extracted from the Kafka Consumer’s output. This passed through the File Writer as a binary header named metadata, available as $original.metadata.

The output from the JSON Splitter looks as follows. Each row is the value of the metadata object from an individual record from one element of the batch array of the Consumer’s output.

Kafka Acknowledge configuration

Every input document contains the metadata fields at the root level, so the Metadata Path is simply $.

USE CASE 2: MORE COMPLEX PROCESSING OF EACH BATCH

Imagine a scenario similar to Use Case 1, but instead of just formatting and writing each batch, we require more complex processing of each batch. Perhaps we need to group all of the records by key, write a file per unique key, and write only a subset of each record’s data instead of including all the metadata in every file.

To handle this, we can delegate the processing of each batch to a child pipeline using the Pipeline Execute snap:

A separate child pipeline instance is created to handle each document from the Kafka Consumer. This pipeline performs all of the per-batch processing: splitting, sorting, grouping, mapping, etc.:

The output of the child pipeline’s Merge Snap becomes the output of the Pipeline Execute Snap in the parent pipeline. Each document contains a subset of the records from the batch, all with a matching value for their key. Even though the records of the batch have been split up and re-grouped by key, all records from the batch will be acknowledged. Note that the Kafka Acknowledge Snap should always be in the same pipeline as the Kafka Consumer Snap.

We can get parallel processing by setting the Pipeline Execute’s Pool Size to a value greater than 1.

We won’t explore this solution in more detail here, as the Kafka lesson is exactly the same: To achieve reliable batch processing of any number of records with the Kafka Consumer,

  • Disable Auto Commit.
  • Set Output Mode to One output document per batch.
  • Set Acknowledge Mode to Wait after each batch of records.

--

--