Spring Kafka Retryable Topic

Mehtap Tamtürk Kaba
Doğuş Technology
Published in
3 min readJan 2, 2024

Kafka’nın güçlü özelliklerinden biri, her partition için Consumer’a ait bir offset değerinin olmasıdır. Bu offset değeri, Consumer’ın okuyacağı, sıradaki mesajın değeridir. Bu değerin arttırımı otomatik yapılabileceği gibi, manual olarak da yönetilebilmektedir. Hata nedeniyle bir mesajı işleyemeyip, tekrar denemek istediğimiz durumlar için manual yönetmeyi tercih edip, başarılı durumda offset’i arttırmayı tercih edebiliriz. Fakat bu durum, sıradaki mesajların işlenmesini bir süreliğine bloklayacaktır. Bunun yerine asenkron yöntemler tercih edebiliriz.

Neden İhtiyaç Duyarız?

Hata durumunda, sıradaki mesajların işlenmesini durdurmak yerine; hata alan mesajı farklı bir topic’e atıp tekrar işlenmesini sağlayabiliriz.

Kafka mesajlarının işlenmesi sırasında hata alınması durumunda, mesajları belli aralıklarla ve belirli sayıda tekrar işlemesi için RetryableTopic annotation’ı kullanılabilir. Deneme sayısı tamamlandığında ise hata alınmaya devam ediliyorsa, mesaj DLT kuyruğuna atılır.

Nasıl Kullanılır?

Size en uygun ayarları yapabilmeniz için öncelikle RetryableTopic annotation’ının alabildiği bazı değerlerin üzerinden geçelim:

attempts: Mesajın kaç kere işlenmeye çalışacağının sayıdır. Öntanımlı değeri 3'tür. Tüm denemeler tamamlandığında hata alınmaya devam ediyorsa, mesaj DLT kuyruğuna atılır.

backoff: Mesajların işlenme aralığını belirlemek için kullanılır. Backoff sınıfından bir değer alır. Backoff için ayrıntılı örnekleri aşağıda bulabilirsiniz.

exclude / excludeNames: Belirtilen exception class’larını hariç tutmanızı sağlar. Listeye eklediğiniz hatalardan herhangi biri fırlatıldığında retry mekanizması devreye girmeyecektir.

include / includeNames: Sadece belirtilen exception’lar fırlatıldığında retry mekanizması devreye girer.

kafkaTemplate: Mevcut kafkaTemplate bean’inizin ismini verebileceğiniz gibi, retry’a özel Kafka template için farklı bir bean de tanımlayabilirsiniz.

autoCreateTopics: Retry ve DLT topic’lerinin otomatik oluşturulup oluşturulmayacağını belirler.

retryTopicSuffix / dltTopicSuffix: Otomatik oluşturulan topic’lerin sonuna eklenecek ekleri belirlemek için kullanılır.

dltStrategy: Eğer DLT’ye ihtiyaç duymuyorsanız NO_DLT olarak tanımlayabilirsiniz.

sameIntervalTopicReuseStrategy (3.0.4 ve sonrası) / fixedDelayTopicStrategy (3.0.4 öncesi): Oluşturulacak retry topic stratejisini belirlemek için kullanılır. Bir (SINGLE_TOPIC) ya da attempts değeri kadar (MULTIPLE_TOPICS) retry topic oluşturulur.

Backoff için örnekler:

  • Sabit artış değerine sahip
Backoff(delay = 600000) // 10dk’da bir
  • Katlanarak artan değere sahip
Backoff(delay = 60000, multiplier = 2) // 1, 2, 4, 8... dk sonra tekrarlanır.
  • Değerleri placeholder ile tanımlamak için
Backoff(delayExpression = “${delay}”, multiplierExpression = “${multiplier}”)

@RetryableTopic örneği:

 @RetryableTopic(
backoff = @Backoff(delay = 300000),
attempts = 12,
sameIntervalTopicReuseStrategy =
SameIntervalTopicReuseStrategy.SINGLE_TOPIC,
kafkaTemplate = “kafkaRetryableTopicTemplate”,
exclude = { SerializationException.class,
DeserializationException.class,
NullPointerException.class
}
)
@KafkaListener(topics = “my-topic”)
public void processMessage(RetryableDto retryableDto) {
log.info(“Retrying process RetryableDto : {}”, retryableDto);
// process message
}

Yukarıdaki örnekte 5 dk’da bir toplamda 12 defa, yani 1 saat boyunda mesaj yeniden işlenecektir. Eğer herhangi bir denemede hatasız işlenirse, deneme sonlanacaktır.

SINGLE_TOPIC tanımlandığı için retry için tek topic oluşturulacaktır. Eğer bu tanım yapılmasaydı 12 adet retry topic oluşturulacaktı.

exclude’da tanımlanan hatalardan herhangi biri fırlatılırsa, tekrarlama işlemi yapılmayacaktır.

İsterseniz kendinize özgü bir RetryableException yazıp, include’a bu değeri tanımlayarak, sadece bu hata fırlatıldığında tekrar denenmesini sağlayabilirsiniz.

DLT Kuyruğundakileri İşleme

Tanımlanan sayıda deneme tamamlanıp, hata alınmaya devam edildiği durumda mesaj DLT kuyruğuna atılır. Bu mesajları işlemek isterseniz DltHandler annotation’ı kullanabilirsiniz.

Örnek kullanım:

 @DltHandler
public void handleDltMessage(RetryableDto retryableDto) {
log.error(“DLT handler message: {}”, retryableDto);
}

Dikkat Edilmesi Gerekenler

RetryableTopic kullanımının asenkron işleme avantajı bize performans kazanımı sağladığı gibi, bu kullanımın bazı dezavantajları da mevcuttur.

RetryableTopic kullanımı mesajların işlenme sırasını bozulabilmektedir. Bu durumu bir örnekle açılayalım: t anında ana topic işlenirken bir mesajımız hata alsın ve retry topic’ine atılsın. t + 1 anında ana topic’e başka bir mesaj gelip başarıyla işlensin. t + 2 anında retry topic’teki mesajımız başarıyla işlensin. Bu durumda ilk gelen mesaj ikinci mesajdan sonra işlenmiş olur. Sizin için sıralama önemliyse, mesaj işleme esnasında gerekli konrollerinizi yapmanızı öneririm.

Diğer bir dezavantaj ise, mesajların çift işlenme riskidir. Bu ihtimali göz önünde bulundurarak geliştirmelerinizi yapabilirsiniz.

Kaynaklar

https://www.baeldung.com/spring-retry-kafka-consumer

--

--