Delayed Kafka Producer

Vineet Narayan
Airtel Digital
Published in
7 min readNov 13, 2022
Kafka and Aerospike

We will focus on three things:-

  • Why do we need such a mechanism
  • Solution for this problem
  • Other Application uses of this solution

Introduction

We have a message but we don’t want to process the message right now rather after some time. Other messaging queue support delayed messages. Kafka does not natively support a delay in the message.

Strategy

Apart from Kafka, we have a database and cache layer in our system. We can leverage the functionality of the NO SQL database to store messages up to a certain duration. Aerospike, Redis, Mongo etc provide the functionality of storing messages with expiry. We can then attach a change notification listener available for these databases which pushes data to Kafka.

Solution

Prerequisites

Assume you have basic knowledge in:

  • Kafka
  • Aerospike

We will create a Delay Service to hold all the brains for giving delay and producing to the original topic. Delay service let you postpone the delivery of new messages to consumers for several seconds. If you send a message to the event_delayer_publisher topic, any messages that you send to the queue remain invisible to consumers for the duration of the delay period. Client integration would simply be producing the same message to the event_delayer_publisher along side meta-info of the original topic and the delay needed.

The high level would look like following:-

Delay Service

Pre-processor Queue consumer

The queue will accept the below message and process this message. Pre-processor will validate the request and save the message to the Aerospike in set kafka_message_cache. Id for the same message is saved to another set of Aerospike kafka_message_id, alongside expiry calculated based on current time and postTime. The idea is to expire the message in the kafka_message_id set and receive the same change notification in another consumer.

Kafka client producer:-
Topic:- event_delayer_publisher
Message:-
{
"topic": "topic_name", // original topic name
"key": "key", // key to be used when producing to original topic after delay --optional
"value": {}, // message for the original topic
"headers": {}, // headers for the original topic --optional
"postTime": 12312031310, // epoch milli time when this message needs to produced
"failurePostCallbackUrl": "" // post webhook url for sending back message to client if system failed to produce/accept the message --optional
}

Delay Service Aerospike:-

set1:- kafka_message_cache
doc: {
id=string,
topic=string,
key=string,
value=object,
headers=Map,
postTime=long,
failurePostCallbackUrl=string,
tries=int
}

set2:- kafka_message_id
doc: {
id=string
}

Aerospike:-

Save the message in set1:kafka_message_cache without any expiry.
Save the message in set2:kafka_message_id with expiry equal to produce time.
Info:- expiry in aerospike is in seconds, the id of both sets is same.
Change Notification:- Put change notification on set2.

Aerospike-Kafka-Outbound

We will use Kafka outbound already distributed by Aerospike. Kafka outbound will listen to the change notification on set2 and then produce a message to Kafka.

Post-processor Queue consumer

The queue will accept messages from Kafka Outbound connector. The queue is going to receive writes and deletes on set2. The consumer is going to filter writes and only process delete requests. It will find the id from the expired message and use this id to fetch the message from set1. Now the message is ready to produce. It will read the JSON and convert it to Kafka value, header and topic and produce the message.

Failure Handler

Whenever retry on Aerospike saves or retry on Kafka production fails. The message will be sent back to the client using Rest API.

Rescue Scheduler

There might be a chance that the message might drop during this whole process. So, we need a short polling and a long polling scheduler to rescue those messages. To enable the scheduler you need to create a secondary index on the field postTime. Shorting polling will run every 10 seconds and poll for messages that should be produced between current-10s and current-2mins. Long polling will run every 30 mins and poll for messages that should be produced 2 hours and above ago.

Delay Service Design

Local Deployment:-

I will be using ubuntu 20 for demo purpose:-

Delay Service codebase

https://github.com/1vin33t1/java-utility

Kafka

https://kafka.apache.org/quickstart

Aerospike

Aerospike Feature file:-

You can request a single server feature file from Aerospike using the below link. https://aerospike.com/get-started-aerospike-database/

Open feature file and set asdb-change-notification to true

asdb-change-notification         true

Aerospike installation process:-

https://aerospike.com/download/

# aerospike
wget -O aerospike.tgz 'https://www.aerospike.com/enterprise/download/server/latest/artifact/ubuntu20'
mkdir aerospike-server-enterprise
tar -xvf aerospike.tgz --directory aerospike-server-enterprise
cd aerospike-server-enterprise
chmod +x asinstall
chmod +x dep-check
sudo ./asinstall

save your feature file here:- /etc/aerospike/features.conf

Aerospike Config file:- /etc/aerospike/aerospike.conf

# Aerospike database configuration file for use with systemd.

service {
proto-fd-max 15000
feature-key-file /etc/aerospike/features.conf
}

logging {
file /var/log/aerospike/aerospike.log {
context any info
}
}

network {
service {
address any
port 3000
}

heartbeat {
mode multicast
multicast-group 239.1.99.222
port 9918

# To use unicast-mesh heartbeats, remove the 3 lines above, and see
# aerospike_mesh.conf for alternative.

interval 150
timeout 10
}

fabric {
port 3001
}

info {
port 3003
}
}

namespace test {
replication-factor 1
memory-size 3G
default-ttl 3600
nsup-period 3
nsup-threads 3
storage-engine device {
file /opt/aerospike/data/test.dat
filesize 5G
data-in-memory true
}
set kafka_processed_id {
stop-writes-count 100000
}
}

xdr {
dc kafkaDC {
connector true
node-address-port 127.0.0.1 8085
namespace test {
ship-only-specified-sets true
ship-set kafka_message_id
ship-nsup-deletes true
}
}
}
Information regarding nsup config:-
nsup-period 3 -> namespace supervisor periodic run time
nsup-threads 3 -> no. of thread for namespace supervisor
xdr -> cross data replication
node-address-port 127.0.0.1 8085 -> address of kafka outbound
namespace test -> namespace you want to listen
ship-only-specified-sets -> send change notification only for specified sets
ship-set -> send notification of this set
ship-nsup-deletes -> send notification also for set which are expired by nsup (namespace supervisor)
# start aerospike 
sudo systemctl start aerospike;

Aerospike-Kafka Outbound

installation

# aerospike-kafka-outbound
wget 'https://download.aerospike.com/artifacts/enterprise/aerospike-kafka-outbound/5.0.1/aerospike-kafka-outbound-5.0.1.all.deb';
sudo dpkg -i aerospike-kafka-outbound-5.0.1.all.deb;
sudo systemctl enable aerospike-kafka-outbound;
sudo systemctl start aerospike-kafka-outbound;

Kafka outbound config:- /etc/aerospike-kafka-outbound/aerospike-kafka-outbound.yml

# Change the configuration for your use case.
#
# Refer to https://docs.aerospike.com/connect/kafka/from-asdb/configuring for details.

# The connector's listening ports, TLS and network interface.
service:
port: 8085

# Format of the Kafka destination message.
format:
mode: flat-json
metadata-key: metadata

# Aerospike record routing to a Kafka destination.
routing:
mode: static
destination: as_outbound_default_event
namespaces:
test:
format:
mode: flat-json
metadata-key: metadata
sets:
kafka_message_id:
routing:
mode: static
destination: as_outbound_msgid_set

# Kafka producer initialization properties.
producer-props:
bootstrap.servers:
- 127.0.0.1:9092
acks: all
retries: 2

# The logging properties.
logging:
file: /var/log/aerospike-kafka-outbound/aerospike-kafka-outbound.log

Setup Video

https://youtu.be/TpW5g2VAO5g

Pros & Cons

Pros:-

  • Single delay service for multiple clusters of Kafka
  • No need for a separate topic for the delay
  • Dynamic delay for each message
  • Simple and robust client integration
  • Exposing REST-API for saving messages, can be used as a simple fallback for Kafka downtime
  • Delay can be up to 5 year

Cons:-

  • Delays are not precise and have an error rate of 1–3 secs and sometimes 10 secs if rescued by the scheduler.
  • Aerospike space limitations can be mitigated using hybrid memory.
  • Aerospike is the single point of failure but it is resilient against network partition and the service is sending failures back to the client.
  • Messages are only virtually queued and do not guarantee FIFO

Production Experience in Airtel:-

We have to migrate from RabbitMQ to Kafka and one of the requirements during migration is to have an in-house solution for delayed messages in Kafka using already available tools. I have been using the above design for the delayed production of messages in Airtel for 2 years. We produce around 10 million records per day in this service. As stated above message production is not precise and there will be an error rate of 1–3 secs. In a week, we see around 100 messages rescued by Scheduler mainly during high TPS. This service saved us many times when one of the Kafka brokers is down and this is used as a fallback to retry those messages. We have injected the delay service Rest API in Kafka failure callback to let it retry from the delay service. We have used it for retry mechanisms (for eg:- enquiry, fulfilment etc.), subscription, snooze notifications and simple Kafka fallback.

How to Integrate your Kafka with Delay Queue in Airtel :-

  • You need to raise request for consumer and topic creation in your cluster for event_delayer_publisher topic.
  • After consumer and topic creation, you just need to send the message into event_delayer_publisher as per below contract. You have successfully integrated with Delay Queue.
Kafka client producer:-
Topic:- event_delayer_publisher
Message:-
{
"topic": "topic_name", // original topic name where message needs to be produced
"key": "key", // key to be used when producing to original topic after delay --optional
"value": {}, // message for the original topic
"headers": {}, // headers for the original topic --optional
"postTime": 12312031310, // epoch milli time when this message needs to produced
"failurePostCallbackUrl": "" // post webhook url for sending back message to client if system failed to produce/accept the message --optional
}
  • Kibana URL for both Pre-Prod and Prod are available.

--

--