Scalable and Reliable Kafka Ingestion from Rails

Rahul Singh
motive-eng
Published in
12 min readJun 15, 2021

A key aspect of fleet management is real-time location tracking of all the entities in your fleet (these entities can be trucks, drivers, or trailers). To this end, KeepTruckin introduced Fleet View last year, a new experience for fleet managers, which displays a single, unified, real-time view of all the entities of a fleet on a map.

We achieve this real-time location tracking of entities by processing the GPS coordinates and telemetry data that we receive from the vehicles and the Vehicle Gateways (VG) attached to the vehicles. In this blog, we share the challenges and decisions we faced in producing data to Kafka from Ruby-on-Rails.

Building this system presented us with the following challenges:

  1. Scale: Vehicle Gateways generate and transmit 260 million GPS coordinates and telemetry measurements in a single day. Because we want to show the location of entities in real time, the backend systems must process these GPS coordinates with minimal processing delay.
  2. Reusability: Location data is fundamental to a number of existing KeepTruckin features, as it will be to many features we envision for the future. This means our backend system must enable new features to easily leverage this real-time location data.
  3. Enabling microservices architecture: Traditionally, KeepTruckin has been powered by a Rails-based monolith. We are in the process of moving towards domain-specific microservices implemented in Golang. We wanted to build our backend system in a manner that helps this transition.
  4. Robust data pipelines: We wanted to enable parallel processing of the data to generate different types for analytics and insights. Here are two examples of features that use this data: (1) We show the state of the vehicle in real time, including the engine state and speed. (2) Similarly, we generate speeding alerts in real time by using this data.
  5. Real-time analytics: We wanted to develop product features that enable our customers to consume this data and the insights gathered in real time.

The backend components required to build this system fall into three high-level categories of functions:

  • Ingest location data
  • Process and persist location data
  • Provide APIs to access location data
Functions of backend components; this blog focuses on the first component: ingesting the GPS data

Keep reading to learn in detail about how we approached the first component, the one responsible for ingesting the GPS data we receive from Vehicle Gateways. We’ll start with a high-level architecture of our ingestion component, where our key challenge lay in enhancing the Rails component to send data to Kafka. We’ll explain the various design challenges we encountered in sending messages to Kafka from Rails in a scalable and reliable fashion. Finally, we’ll present you with some statistics about the ingestion component’s performance.

High-Level Architecture

High-level architecture of the GPS data ingestion component
  1. Vehicle Gateways that are cellular-enabled send their location data directly to the API server via a RESTful API call. The API server is implemented in Ruby-on-Rails (RoR). VGs send their data to the server at periodic intervals in a compressed format. Each payload of the API call contains a batch of multiple GPS coordinates collected at different times.
  2. Vehicle Gateways that are not cellular-enabled send their data to the KeepTruckin mobile application being used by the driver while driving the vehicle. The mobile application then sends the location data to the API server by making a RESTful API call to the RoR server.
  3. On receiving the API call, the API server writes this compressed data into a database as a new row in a table. On insertion into the database, each batch of data is assigned a row identifier that uniquely identifies this newly inserted data in the database table.
  4. The unique identifier obtained in the above step is used to create a background job. This background job is submitted into a job queue by making a call to our job queue management system. We use beanstalkd as our job queue management system.
  5. A job worker process from the worker pool dequeues this job from the work queue. The worker is also implemented in RoR by using the Backburner gem.
  6. To process this job, the worker process reads the row from the database by using the row identifier. It then decompresses the job payload and extracts the location data from the payload.
  7. The worker process then sends the location data to a Kafka cluster running on the Confluent Cloud.

High-Level Objectives

Our biggest challenge in ingesting location data lay in figuring out how to send hundreds of millions of these GPS coordinates to Kafka from Rails. We had to evaluate our options under the following constraints:

  • No data loss: We use location data for more than real-time GPS tracking of entities in the fleet; we also use it to generate historical location reports. This establishes that we can’t afford to lose any location data, because any one record may correspond to an important event, such as starting or stopping a vehicle.
  • Low latency: The worker process that sends data to Kafka processes more than just location data; it also processes a number of other message types sent by the Vehicle Gateways. Processing raw VG data performed by these worker processes is the first step in many latency-sensitive workflows. Adding a large latency overhead to the processing already performed by this worker process can negatively impact all the downstream workflows. We therefore wanted to minimize the latency overhead of sending data to Kafka.

Design Decisions

Decision 1: Send to Kafka Directly vs. Send via Proxy

We first researched existing approaches used by other developers for sending data to Kafka from a Rails application. Many of these approaches resorted to sending data to Kafka from Rails via a component in the middle, rather than sending directly.

In Kafka Producer Pipeline for Ruby on Rails, the authors send messages to the local SysV message queue first; while in Scalable and reliable data ingestion at Pinterest | by Pinterest Engineering | Pinterest Engineering Blog, the authors write messages to the local disk from which an agent uploads the data to Kafka. In Introducing Kafka to a Rails application, the author built a proxy service in Golang. The messages are first sent to this service, which then sends them to Kafka.

Choosing a Managed Kafka Cluster

In all these approaches, the primary reason for avoiding sending messages to Kafka directly from Rails is the operational overhead of deploying and maintaining a highly available and reliable Kafka cluster. To avoid this overhead, we decided to use Confluent Cloud Confluent Cloud™: Managed Apache Kafka® Service for the Enterprise, the managed Kafka service provided by Confluent Inc.

Ability to Handle Send Failures

Also worth noting is that our architecture handles situations where synchronously sending messages to Kafka fails. In such situations, the worker process also fails. Beanstalkd allows such failed jobs to be processed again by the other worker processes. This means the request can be retried. Note that the actual raw data sent by the Vehicle Gateway is present in the database as well. Moreover, the architecture also handles scenarios where the VG fails to send data to the server via the API call. In this case, the VG keeps the data and retries after a certain time.

Simplicity of Design and Implementation

Sending to Kafka via an intermediate software component is a more complicated design. We wanted to avoid the extra implementation and maintenance overhead of this approach by sending to our Kafka cluster directly from RoR.

Our Choice: Send to Kafka Directly

For the above reasons, we decided to send messages directly to Kafka from our Rails application.

Decision 2: Selecting the Right Gem

There are a number of Ruby gems that can be used for sending messages to Kafka from a Rails application.

  • The most popular seems to be the ruby-kafka gem from Zendesk.
  • There are other gems that are wrappers around ruby-kafka. For example, the delivery-boy gem is a wrapper API around ruby-kafka.
  • Then there are gems that use ruby-kafka as the client for communicating with Kafka, but provide a framework for consuming from and producing messages to Kafka from RoR. Examples are Phobos and Karakfa. These gems handle other aspects of message processing from Kafka, such as message routing and retrying.

For our purposes, we only needed to produce from an RoR background job to Kafka, and did not have any plans to consume from RoR. We therefore chose the ruby-kafka gem directly. This approach has also been used by others, including Heroku and BlueApron.

Decision 3: Using the Right Settings for Ruby-Kafka

The ruby-kafka gem provides three different ways of sending messages to Kafka:

  • Non-buffered synchronous delivery
  • Buffered send with synchronous delivery
  • Buffered send with asynchronous delivery

Non-Buffered Synchronous Delivery

In this mode, the library synchronously sends the message to Kafka. In case of failure, the library can be configured to retry a specified number of times before giving up. The following code snippet shows an example invocation of this mode:

class KafkaClientNonBuffered
RETRIES = 1

class << self
attr_accessor :client
end

options = { logger: Rails.logger }

def self.produce(msg, topic_name)
@client ||= Kafka.new(["kafka1:9092"], options)
client.deliver_message(msg, topic: topic_name, retries: RETRIES)
end
end

Buffered Send with Synchronous Delivery

In this mode, there are two different method calls for sending messages to Kafka. The first method lets us add multiple messages to a buffer maintained by the library. After we have added multiple messages to the buffer and are ready to send the messages to Kafka, we can make a synchronous call to actually deliver the messages in the buffer to Kafka.

If any of the messages in the buffer fail to be sent to Kafka, the synchronous call raises an exception. The messages that could not be delivered remain in the buffer and are retried on the next call to deliver messages to Kafka. The following code snippet shows an example invocation of this mode:

class KafkaClientBufferedSync
REQUIRED_ACKS = 1
MAX_RETRIES = 0
ACK_TIMEOUT_SECS = 10

class << self
attr_accessor :client
end

options = { logger: Rails.logger }

def self.produce(msg, topic_name)
@client ||= Kafka.new(["kafka1:9092"], options)
@producer ||= client.sync_producer(
required_acks: REQUIRED_ACKS,
max_retries: MAX_RETRIES,
ack_timeout: ACK_TIMEOUT_SECS
)
client.produce(msg, topic: topic_name)
end
def self.deliver
client.deliver_messages
end
end

Buffered Send with Asynchronous Delivery

The first step of this mode is exactly like the one explained above, wherein we add messages to the library’s buffer.

In the second step, when we want to actually deliver messages to Kafka, we do not have to do that in a synchronous fashion and block to send the messages to Kafka. This mode allows us to configure the right conditions when the messages in the buffer will be asynchronously sent to Kafka. The conditions can depend either on the amount of time that has elapsed, or on the number of messages the buffer has accumulated. The following code snippet shows an example invocation of this mode:

class KafkaClientBufferedAsync
DELIVERY_INTERVAL_SECS = 10
DELIVERY_THRESHOLD = 10
REQUIRED_ACKS = 1
MAX_RETRIES = 0
ACK_TIMEOUT_SECS = 10

class << self
attr_accessor :client
end

options = { logger: Rails.logger }

def self.produce(msg, topic_name)
@client ||= Kafka.new(["kafka1:9092"], options)
@producer ||= client.sync_producer(
delivery_interval: DELIVERY_INTERVAL_SECS,
delivery_threshold: DELIVERY_THRESHOLD,
required_acks: REQUIRED_ACKS,
max_retries: MAX_RETRIES,
ack_timeout: ACK_TIMEOUT_SECS
)
client.produce(msg, topic: topic_name)
end
def self.deliver
client.deliver_messages
end
end

Our Choice: Non-Buffered Synchronous Delivery

For our implementation, we decided to use the Ruby-Kafka gem in the first mode: non-buffered synchronous delivery. We chose this mode for the following reasons:

  • No data loss: Because we cannot tolerate any data loss, using the buffered send with asynchronous delivery mode was not an option, as it poses the risk of data loss. The RoR worker may crash or be terminated ungracefully, in which cases all the messages present in the buffer that have not been delivered to Kafka will be lost. The buffered send with synchronous delivery is an option, but in our case, we did not need its added capability to buffer messages. We explain this more in the next point.
  • Application-level batching: The key benefit of buffering is to send messages to Kafka in batches, which results in better performance. But in our case, the Vehicle Gateway already batches location data and sends an entire batch of location data in a single API call. Because of this batching, a single Kafka message is already a batch of many location data points. Therefore, we didn’t anticipate major performance degradation by using the non-buffered API.

Decision 4: Number of Acknowledgements Needed

A Kafka cluster comprises multiple brokers. Each partition has a broker assigned as its leader. Each partition’s data is replicated to multiple brokers for fault tolerance.

When producing to Kafka, we can control when the leader broker acknowledges the write. We can configure the ruby-kafka client to operate in one of the following three modes supported by Kafka.

  • No acknowledgements needed: In this mode, the client does not wait for any acknowledgement from the leader. This mode has the lowest latency penalty, but it comes with a high risk of data loss.
  • Acknowledgement from all the replica brokers: In this mode, the client indicates to the leader that the leader should wait for replication to all the replica brokers to succeed before it acknowledges the client. This mode offers minimal possibility of data loss, but it also introduces a high latency for writing to Kafka.
  • Only acknowledgement from leader: In this mode, the client indicates to the leader that the leader can acknowledge the client as soon as the leader has persisted the data in its own storage. This option provides a sweet spot between avoiding data loss and incurring high latency to produce messages to Kafka.

We configured the client to operate in the last mode.

Decision 5: Acknowledgment Timeout

The ruby-kafka client can be configured with a maximum wait time for receiving acknowledgment from the broker. While the client is waiting for the broker to acknowledge that a message was successfully written, the application is blocked. The longer we set this timeout, the longer the application will be blocked should a broker become either unresponsive or slow in responding to requests.

When the ruby-kafka client is used in the non-buffered synchronous mode, it does not expose the ability to configure this timeout. Internally it chooses a timeout of 10 seconds. We found that this value meets our requirements of failing fast in cases of broker latency issues, instead of blocking the entire worker process for a long time.

Decision 6: Error Handling

The ruby-kafka library enables us to retry the produce operation to Kafka if it fails. When using the non-buffered synchronous client, you can supply the number of retries as an argument to the produce method call.

  • Setting these retries to a low value will lead to a higher chance of failures.
  • Setting these retries to a high value will lead to the application blocking for a longer time in cases of broker failures. As we discussed in the previous section, the timeout to wait for acknowledgment from the broker is 10 seconds for each attempt. More retries means longer latency for producing to Kafka.

Also note that in our case, the Kafka producer is running inside a worker process. The Backburner library provides its own ability to retry failed jobs. On top of that, we retry failed jobs three times in our system, which provides us with another layer of retry mechanism, in addition to the one provided by the library.

We set the maximum retry setting to ‘1’ for our client to minimize the Kafka producer latency, reassured by our additional layer of retry mechanism.

Key Statistics

We have been using the Ruby-Kafka-based client to send messages to Kafka from Rails in production since March 2020. We use the client in non-buffered synchronous mode. Some key statistics about our experience with this client are as follows:

  1. The client sends almost 800 requests per second (RPS) at peak traffic.
  2. These requests amount to 4000 GPS coordinates and other telemetry data being sent every second.
  3. The client has a p95 latency between 10 and 15 milliseconds.

We also tried running the client in buffered asynchronous mode. In this mode, the client had a p95 latency between 0.5 and 1 milliseconds. As expected, this mode has an extremely low latency, but we decided to deploy the non-buffered synchronous mode client to prevent any chance of data loss.

Key Learnings

There are many ways to produce to Kafka from Rails, and the best option for you depends on a number of factors. Some key factors are:

  • Kafka deployment: Is your Kafka cluster in-house or are you using a hosted solution?
  • Requirements: Do you plan to either produce to Kafka, consume from Kafka, or both?
  • Latency constraints: Is the production to Kafka in a critical path of your application?
  • Data loss constraints: Is some amount of data loss acceptable in your application?

Acknowledgments

This project was a joint team effort, and would not have been possible without help and guidance from the Asset Tracking team, comprising Ali Syed, Brian Guzman, Eddy Kang, Gautam Das, and Valerie Aguilar. Acknowledgments would be incomplete without a huge shout-out to KeepTruckin’s Kafka Operations team, composed of Ricardo Guevara and Luis Zaldivar, who provided constant support during the rollout of this project.

Come join us!

Check out our latest KeepTruckin opportunities on our Careers page and visit our Before You Apply page to learn more about our rad engineering team.

--

--