Payment Mode Health Check Re-arch using Kafka Streams

Vikrant Chaudhary
Airtel Digital
Published in
4 min readSep 27, 2023

--

Introduction

At Airtel, we need to determine the real-time health of various banking institutions, including banks such as SBI, HDFC, ICICI, as well as payment platforms like Google Pay, PhonePe for UPI, and PayTM, and Amazon for wallets. These services are available across different payment modes in the payment ecosystem, including Net Banking, Cards, UPI, wallets, and BNPL.

Occasionally, these banks experience downtime in their production systems. To enhance customer experience and maintain a high success rate, we analyse real-time transaction data to assess the performance of each bank and subsequently determine its health. Based on this assessment, we enable or disable the bank on the UI.

Problem statement

The previous health system relied on payment state data from the database to assess the success rates of various banking instruments, including net banking, debit cards, and credit cards.

Problems with old solution

  • Influx scheduler queries the transactional database to aggregate data at the minute level and runs at a fixed interval of 15 minutes and is owned by the DevOps team.
  • Health service queries the influx database at an interval of 5 minutes, to fetch data for the last 30 minutes.
  • The floating window of 2 different schedulers lead to a loss in accuracy and we were not able to capture the Near Real-Time(NRT) success rate of the bank.

Proposed solution

  • Replacing Influx schedulers to aggregate data by Kafka Streams.
  • Push transaction data to Kafka Queue in parallel to the database.
  • KTable can be used to aggregate data by minute and reuse the logic for calculating success rate.

State flow diagram

KAFKA STREAMS IMPLEMENTATION

public void buildStreams(StreamsBuilder streamsBuilder) {
streamsBuilder.stream(kafkaProps.getPaymentTopic(), Consumed.with(Serdes.String(),
paymentQuickSilverEventSerde).withTimestampExtractor((consumerRecord, partitionTime) -> {
try {
return ((PaymentQuickSilverEvent) consumerRecord.value()).getUpdatedAt();
} catch (Exception e) {
log.error("Input record {} will be dropped because it has an invalid timestamp. exception {} ",
consumerRecord, e);
return partitionTime;
}
}))
.filter((k, v) -> isValidEvent(v))
.groupBy((k, v) -> STREAM_EVENT_KEY, Grouped.with(Serdes.String(), paymentQuickSilverEventSerde))
.windowedBy(TimeWindows.of(Duration.ofMinutes(kafkaProps.getAggregationTimeInMinutes())).grace(Duration.ZERO))
.aggregate(KStreamHealthResponse::new, (k, tr, kStreamHealthResponse) -> {
aggregateAndGroupPaymentEvent(tr, kStreamHealthResponse);
return kStreamHealthResponse;
}, materializedAsWindowStore(kafkaProps.getKafkaStreamAggregationStoreName(), Serdes.String(), kstreamResponseSerde))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()).withName(kafkaProps.getKafkaStreamAggregationStoreName()))
.toStream()
.foreach((key, v) -> {
addWeightedMeanHealthFromTimeBasedHealth(v);
v.setStart(HelperUtils.formatDateMilliSeconds(key.window().startTime()));
v.setEnd(HelperUtils.formatDateMilliSeconds(key.window().endTime()));
Span span = tracer.buildSpan(PaymentEventKStream.class.getName()).start();
Scope scope = tracer.activateSpan(span);
try {
kStreamService.updateDynamicRoutingAndHealthStatusPgWiseAndHealthStatusMaster(v);
} catch (Exception e) {
log.error("exception occured while processing kstream data", e);
}
scope.close();
span.finish();
});
}

Kafka Streams Brief :

Apache Kafka ships with Kafka Streams, a powerful yet lightweight client library for Java and Scala to implement highly scalable and elastic applications and microservices that process and analyse data stored in Kafka. A Kafka Streams application can perform stateless operations like maps and filters as well as stateful operations like windowed joins and aggregations on incoming data records.

How Kafka Streams fits in our use-case:

We had been sending payment events to Kafka. To calculate the success rate (SR), we performed aggregations on payment transactions using specific fields and time. Kafka Streams provided built-in support for this.

Code deep dive

KStream

streamsBuilder.stream(kafkaProps.getPaymentTopic(), Consumed.with(Serdes.String(),
paymentQuickSilverEventSerde).withTimestampExtractor((consumerRecord, partitionTime) -> {
try {
return ((PaymentQuickSilverEvent) consumerRecord.value()).getUpdatedAt();
} catch (Exception e) {
log.error("Input record {} will be dropped because it has an invalid timestamp. exception {} ",
consumerRecord, e);
return partitionTime;
}
}))

Above block creates a KStream on given topic, we have defined our custom TimestampExtractor as we needed windows based on updatedAt field of payment event.

Filter

.filter((k, v) -> isValidEvent(v))

Filtering records as we need to process only Success and Failed status.

Grouping

.groupBy((k, v) -> STREAM_EVENT_KEY, Grouped.with(Serdes.String(), 
paymentQuickSilverEventSerde))

Grouping, it groups all records that have the same key to ensure that data is properly partitioned (“keyed”) for subsequent operations.

Windowing

windowedBy(TimeWindows.of(Duration.ofMinutes(
kafkaProps.getAggregationTimeInMinutes())).grace(Duration.ZERO))

Windowing lets you control how to group records that have the same key for stateful operations such as aggregations or joins into so-called windows. Available window types are
Tumbling time window
Hopping time window
Sliding time window

Aggregating

.aggregate(KStreamHealthResponse::new, (k, tr, kStreamHealthResponse) -> {
aggregateAndGroupPaymentEvent(tr, kStreamHealthResponse);
return kStreamHealthResponse;
}, materializedAsWindowStore(kafkaProps.getKafkaStreamAggregationStoreName(),
Serdes.String(), kstreamResponseSerde))

After records are grouped by key via groupByKey or groupBy, they can be aggregated via an operation such as reduce. Aggregations are key-based operations, which means that they always operate over records of the same key. In aggregating operations, a windowing state store is used to collect the latest aggregation results per window.

Suppressing

suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()).
withName(kafkaProps.getKafkaStreamAggregationStoreName()))

Suppressing the intermediate results, emitting the final count for each user when the window is closed.

Acknowledgements

I extend my gratitude to every member of the Airtel payments team who has played a role in this transition.

--

--