EXPEDIA GROUP TECHNOLOGY — SOFTWARE

Kinesis Data Stream for Async Events Handling

All you need to know about Kinesis Data Stream

Roberto Pecoraro
Expedia Group Technology

--

Photo by Marc-Olivier Jodoin on Unsplash

Kinesis Data Stream (aka KDS) is a managed, scalable, cloud-based service that allows real-time processing of streaming large amount of data per second. It can collect and process data in real time as well as Apache Kafka. AWS Kinesis offers key capabilities to cost-effectively process streaming data at any scale, along with the flexibility to choose the tools that best suite the requirements of your application. It enables you to process and analyze data as soon as it’s available and responding in real time instead of having to wait until all your data is collected before the processing can begin.

The high-level architecture on Kinesis Data Streams:

The producers put records into the Kinesis Stream. AWS provides Kinesis Producer Library (KPL) to simplify producer application development and to achieve high write throughput to a Kinesis data stream.

A Kinesis data Stream is a set of shards and each shard has a sequence of data records. Data records are composed of a sequence number, a partition key, and a data blob (up to 1 MB), which is an immutable sequence of bytes.

The consumers get records from the Kinesis Data Stream and process them. You can build your applications using either Kinesis Data Analytics, Kinesis API or Kinesis Client Library (KCL).

How data flows through the kinesis stream

How KPL and KCL works

The KPL allows developers to put records into the kinesis stream in an asynchronous way based on a specific PartitionKey and message body. A partition key is used to group data by shard within a stream, in this way the Kinesis Data Stream segregates the data records belonging to a stream into multiple shards. It uses the partition key, that is associated with each data record, to establish which shard a given data record belongs to. KPL will potentially write on all the shards of the stream depending on how the partition key is created, so the developer should guarantee a good randomness in case of having multiple shards.

The KCL, on the other hand, allows developers to retrieve and process data from a specific shard in the stream, or more than one when connected to many shards. These are the possible scenarios of the shard assignment:

KCLs < Nº of shards -> each KCL will be assign to one ore more shards, so each KCL will have at least one shard processor assigned to.

KCLs = Nº of shards -> each KCL will be assigned to only one shard, so each KCL will have only one shard processor assigned to.

KCLs > Nº of shards (scenario to avoid) -> at least one KCL won’t be connected to any shard, so it’s actually useless and not needed.

The KCL takes care of many of the complex tasks associated with distributed computing, such as load-balancing across multiple instances, responding to instance failures, checkpointing processed records, and reacting to re-sharding.

In order to optimize costs and performance, by design, both producer and consumer provide record aggregation and de-aggregation. Basically, KPL is able to aggregate data records that will be sent to the stream and be consumed by de-aggregation from KCL.

Kinesis stream limitations in order to fine-tune the costs and performances

  • Each shard can support up to 5 getRecords operations per second, up to a maximum total data read rate of 2 MB per second.
  • Shards support up to 1,000 “put records” operations per second, up to a maximum total data write rate of 1 MB per second (including partition keys).
  • As a consequence of the points above, in order to scale the throughput you should adjust the number of shards.
  • You are charged on a per-shard basis. Since billing is per shard provisioned, you may have as many shards as you want (if you have enough money).

How to configure a KCL app and scale a Kinesis Stream

Based on our experience, you should take into account the following parameters in order to tune the application to reach the target throughput:

  • 5 getRecords operations per second for each shard
  • KCL getRecords operation latency (this latency is strictly correlated to the above operations per second)
  • KCL records iterator age
  • KCL idleMillisBetweenCalls property which is the polling frequency of the getRecords operation.
  • WCU (write capacity unit) and RCU (read capacity unit) related to the KCLs dynamoDB table.

In the next section we’ll summarize the steps taken to tune our stack.

How the BERPS Kinesis Stream and KCL was configured to target the desired throughput

Our requirement was to handle around 1k events per minute as max throughput. So considering that each event take 4 seconds in order to be processed by our stack we decided to have the following configuration:

  • 5 shards
  • 4 pods → 4 KCL → 5 shard processors
  • idleMillisBetweenCalls set to 250ms, considering that the getRecords operation latency is about 10ms and the shard limitation is 5 getRecords operations per second
  • each getRecords operation fetch exactly one message from the shard, this was configured for resiliency reason
  • each shard processor has a threadpool size of 10 and queue size of 6.
  • the size of the record payload

Given the above we have:

4 events per sec * 5 shards * 60 sec = 1200 events per min.

The thread pool guarantees that if each event take about 4 seconds to be processed, it is able to handle the expected traffic.

Tips on various topic learned when troubleshooting issues

Record Aggregation

The record aggregation mechanism is enabled by default, which means that multiple records could be aggregated into one as stated before, to make more efficient use of available bandwidth and reduce cost.

In our architecture, in order to be resilient, we had the requirement to consume exactly one record for each read operation, so we had to write and read exactly one record each time. For this specific reason we had to disable explicitly the aggregation at KPL side otherwise we would have inconsistent behavior: KCL would have one message containing more records when we wanted exactly one.

We eventually spotted the issue and realized it was due to aggregation by inspecting the putRecords into the stream using the AWS CLI (https://docs.aws.amazon.com/streams/latest/dev/fundamental-stream.html).

Checkpoint mechanism

For each Amazon Kinesis Data Stream application, the KCL uses a unique Amazon DynamoDB table to keep track of the application’s state. The KCL uses the application name, as tag, in order to tag and associate the DynamoDB table created, for this reason each application name must be unique. By default, the KCL creates the table with a provisioned throughput of 10 reads per second and 10 writes per second.

Each row in the DynamoDB table represents a shard that is being processed by your application. In addition to the shard ID, each row also includes checkpoint, checkpointSubSequenceNumber, leaseCounter, leaseKey, leaseOwner, ownerSwitchesSinceCheckpoint and parentShardId. The most relevant are the checkpoint and leaseKey which respectively give the position related to the last message consumed and the shard id of the related shard processor.

KCL checkpoint mechanism in detail:

  • at startup KCL set as default value “LATEST”, if no checkpoint available in the KCL table, otherwise it will use the available checkpoint id. There is the possibility to override the default value using one of the following: AT_TIMESTAMP, TRIM_HORIZON, AT/AFTER_SEQUENCE_NUMBER or LATEST.
  • on each message consumed by KCL, you need to explicitly commit the checkpoint. It is necessary to track the last message consumed and keep the application state consistent.

Possible checkpoint values:

AT_TIMESTAMP: the estimate time of the event put into the stream. You will use this option if you want to find specific events to process based on their timestamp.

TRIM_HORIZON: the newest events in the stream, and ignore all the past events. You will use this option if you start a new application that you want to process in teal time immediately.

LATEST: the oldest events that are still in the stream shards before they are automatically trimmed (default 1 day, but can be extended up to 7 days).

AT/AFTER_SEQUENCE_NUMBER: the sequence number is usually the checkpoint that you are keeping while you are processing the events. These checkpoints are allowing you to reliably process the events, even in cases of reader failure or when you want to update its version and continue processing all the events and not lose any of them. The difference between AT/AFTER is based on the time of your checkpoint, before or after you processed the events successfully.

Resharding a Kinesi Data Stream

Resharding allows to increase or decrease the number of shards in a stream in order to adapt to changes in the rate of data flowing through the stream.
The KCL tracks the shards in the stream using an Amazon DynamoDB table. When new shards are created as a result of the resharding, the KCL discovers the new shards and populates new rows in the table.

What happen during the resharding process?

1) KPL starts to produce new messages on the new shards

2) KCL will continue to consume from the oldest shards until no more messages available and will tag those as “SHARD_ENDED”. After that, KCL will start to consume from the new shards: this assures to preserve the order in which the data has arrived on the stream.

This is the only way to scale the kinesis stream throughput.

TECH NOTE: Potential side effect in case of increasing shards: given that, by design, the table provision of 10 RCU and WCU beware that couldn’t be enough to handle the increased throughput. Therefore consider to increase those values properly.

Advanced Topic:

How and when to re-create the KCL DynamoDB table and restore the consumption from a specific checkpoint

Use case scenarios (WHEN):

  • KCL consumption speed is slower than the KPL record production (for example KPL is producing unexpected additional records)
  • KCL DynamoDB table is gone for some reason and KCL will stop to consume (for example the table has been delated by a mistake)

Restore the service availability (HOW):

We’ve configured KCL to use AT_TIMESTAMP checkpoint in order to specify from where to start the stream consumption and at the same time added the possibility to specify the DynamoDB table name via CFO (Cloud Formation Operator) template which KCL will use. We had to implement this custom solution because KCL by default doesn’t provide the possibility to change the checkpoint when the DynamoDB table is already in place. This configuration enable us to address any issue (included the above scenarios) permitting to start consuming the records in the stream from a custom timestamp in the past.

Authors: Roberto Pecoraro, Nicola Giacobbe

--

--