Kafka is a really cool and powerful platform and since we’ve many amazing software engineers at Wirecard Brazil, I want to talk about how we use Kafka here, the problems we aim to solve and the issues we had during the development.
In this post you will not find any code, I'm not going to show step by step how to implement it. You will find a real case of Kafka showing: why we choose to implement it, how we did it, what problems we faced when doing it, what we face now and so on!
Without further ado, let's get started!
We were having issues while syncing our data with the elastic search, and to fix it was our main motivation. We used to have a single elastic search cluster for all of our data from all APIs, while that is really bad it came quite handy in the beginning.
We have an application called reports-api that transform the url query into elastic search params and search for documents into this single elastic search cluster. Basically, given the following url:
The reports-api would translate into the following searchable params:
- the status must be either PAID or WAITING
- the paymentMethod should be either CREDIT_CARD or BOLETO
- the value must be between 5000 to 10000.
While the reports-api would do the search, it still needs this data available. The reports-sync, is an application that was responsible for getting all data from all application's data stores and insert them into the elastic search, let's take a look at the problems.
The reports-sync was retrieving the data in every database of each API applications, so for example: we have an application called invoice-api. The reports-sync needs to get this data inside the invoice-api database with a predetermined query and them insert it into the elastic search, moreover it also needs to transform all the raw data retrieved from the database into a business data, aka a json document. We also had another problem, we were retrieving the data in the database in chunks of data, this was leading to eventually missing data.
Now, let's understand those problems. To begin with the reports-sync had knowledge of some business rules that it shouldn't, it knew how to transform a raw data into a business data meaning that, it would get the data from the database and transform into a json, not only it knew what it shouldn't we also had a problem when changing the json; it was necessary to change in both the invoice-api and the reports-sync as both of them needed to know how provide the same json resource. This "Swiss Army Knife" nature, made reports api an application with no real owner. Its governance was a mess, no team was really responsible for its operation, and there was no well-defined domain.
Alas, another issue we've with the invoice is how depend it's from another service, the order. I'm not going though the details, but basically the order holds all payment information and so, whenever we want to retrieve an invoice we might need to retrieve the payment as well, which includes a dependency on another application.
Finally there is the problem of how we persist the data in the elastic search. The fact that the reports-sync was retrieving data though a select, was causing inconsistencies. We had a really problematic solution to sync this data, our query had to identify the offset of the current sync, given this when we had any problem in the elastic search cluster we used to have a though time manually identifying at which point we should start the re-sync. Time and time again we were missing some status updates, and some times an entire resource. Not to say that if we had any downtime in the reports-api or the reports-sync, it would affect all APIs at the same time!
In any case, while it worked quite well in the beginning, as we start to have more and more services that produce more and more data, that needed to be provided within the elastic search, we started to wonder if that was the best approach.
First solution, before we knew Kafka:
Before we even thought about Kafka, we decided that our solution was not at its best, so we started work on new architectures. The first thing we did with the invoice-api was to remove it from the reports environment.
We created a separated elastic search cluster and now the invoice-api was responsible to provide the data to the elastic search, also the invoice-api had the rules to transform any query param and fetch data from the elastic search. When any invoice was created, since the invoice-api already had the json response to the POST method, we were also inserting it into the elastic search; we didn't needed to replicate the logic that transform raw data into business data. We had a PUT method as well, it was an internal endpoint that the invoice-sync application would — after listening to an specific RabbitMQ queue — update the invoice document with the according status and them insert the data into the elastic search as well.
It was all about isolate the ownership of the invoice, now we had a well defined scope and domain. The team responsible for the invoice had all the knowledge they needed to manage all the process.
The invoice resource was no longer dependent on the reports-api, so if it was down, we're not affecting the invoice resource; also we are not replicating anywhere the logic of creating an invoice and we had a different cluster to its elastic search, so if we wanted to update the elastic search version, we would not affect other resources!
Again, while it worked we still weren't very happy with this solution. The invoice-api, now had too many responsibility, it was responsible to create the invoices, insert them into to elastic search and search them; we might as well add the invoice-sync responsibility to it!
We discovered Kafka! We were really excited about implementing it, the idea was simple: We would add a Kafka connect to get data from the database and then insert this data into the elastic search! Simple as that!
Well, if it would only be as simple as it seems… I would like to add a disclosure:
It was really tough to make Kafka work. While it was really exciting, it was not — by any means — easy. It was harsh, took a long time and not everyone understand every step of it, even today.
We have an infrastructure team, and at the beginning they were operating it and — as most people two years ago — we had no experience with it.
I'm not going to go too deep into the kafka configuration; but keep in mind that we had some tuning and that each one of our environments has 3 Kafka Brokers and 3 Zookeepers.
We started with a single kafka connect to rule all connectors across all applications, it didn't took long for us to realize that we were back to the reports-api problem, we should be creating separated kafka connect instances, one for each application. So we did it, every application had its on kafka connect, for every environment, in a separated container.
Our first idea was to implement a CDC pattern retrieving the data from the database. To do so we needed to create a connect and we decided to use debezium as our source connector to retrieve all data from the database binlog and insert them into the kafka connect's topic. The best table we could listen for inserts and updates was a table that holds the historic of invoices status, so anytime we had any update and/or insert into this table we would send the data to the source connector. While our application would never update this table, keep it in mind, it will come back later.
The invoice-sync also had a few changes, now it has a Kafka stream client. The source connector would produce events, those events would be transformed into json resources by the invoice-sync and then produced into the sink connector. Let's break this down:
To transform the raw data into a json resource we've a problem; remember when I said that an invoice depends on an order? Well to create an json resource, the invoice-sync requests might still needs to request the data from the order-api retrieving the payment information to build an invoice resource.
With the json resource in hands, we send it to a topic in a different kafka connect; the sink connector. As a reminder, before we send the json resource to this connect, we've a schema registry with avro schemas to ensure that it follows the expected resource.
And that’s it! Once we send the data to the sink connector it will provide it to the elastic search and we're good to go!
Now that I've explained how we implemented this solution and why, let's understand what is going on now.
Firstly, now we've a really stable and robust environment, about twenty projects are already using Kafka streams at Wirecard Brazil, not all of them uses it to produce documents to the elastic search, in fact most of then are not projects that whose purpose is to sync data to the elastic search. We've projects that monitor the kafka streams and some applications that listen to certain streams to trigger certain actions, but that is a story for another time.
Getting back to the kafka architecture we discussed here, all applications that started using it became considerable more stable, we reduced considerable the amount of tickets opened because of missing or out of date resources. The invoice api had a memory leak that every now and then would shutdown the application, when we removed mostly of the costly calls the application hardly had any problem at all, later we focus on how to fix this issue.
While is true that the invoice api uses kafka exclusively to produces documents to its elastic search, we're already producing events of invoices so any application that wants to work with those resources are able to get them without requesting data from the invoice api, thus not overloading it.
Of course, not everything is flowers and during the implementation of this architecture, we also had a few problems and I would like share then as well.
Pay attention to Kafka connect variables
It's easy to mess it up, the connect variables are important but if you forget something or add a different value, you can end up with everything not working. We had some problem with that in the beginning, so take your time and ensure its following your needs. At first, we were adding a thousand of partitions to some Kafka topics.
Loading retroactive data
One of the problems we had during the development of it, was to retrieve all retroactive data. While all new invoices were been added flawlessly, we needed to retrieve the old data.
At first we created a script to simulate the binlog insertion, it would select chunks of data from the database and fake the insert; it would eventually work, however the process ended up being overwhelming slow. It would take several hours to load all data to the binlog and to the source connector's topic. A problem we were facing was also with concurrency, the binlog faker script could retrieve an old data from an invoice that had just being updated, it would override the actual status with an old status. Given both problems, we dropped it and started thinking on a new solution.
With a glimpse of magic we thought on a better solution; clean, fast, simple. Remember when I said about an updated operation in a certain table being? Well we realise that this table has an updated_at column. We simple updated it. First we updated all lines reducing by 1 second, then we added the second back. What it did is that the debezium would immediately get all this data and send then to the database connect's topic in a matter of seconds.
Avro and bad parsed resource
This one does not have much to say, if you've a bad parsed object, you will receive and exception and the kafka connect will stop syncing. We had this problem a few times during early development, but in the beginning since we're not yet using the new elastic search index, we could simple remove everything from the kafka connector, fix what we did wrong and send all data again.
We also had this issue a few times in production and here is were it lays a few problems. For some reason we had a wrongly data inserted into the database and then the produced data would be wrong, even if we fix the database, the data was already dispatched by the debezium. There's a few ways to solve it, we solved it by removing all data from the database connect and then sending again the last 7 days. Notice we only decided to go with this approach because we had a retention time of 7 days and we're in a scenario where we were in knew for sure the outcome of it.
Avro is important, since you can receive data from multiple projects with different languages, it ensure that you will produce events only with the desired structure.
A Kafka connect for each application
Initially we were using a single kafka connect per environment, it turned out to be really problematic. I would purpose you to have a Kafka per application per environment, with that you will not have a few problems we had. When we first started with a single kafka connect, we had prepared the invoice-sync to work with avro, however the elastic search connect was serializing non Avro resources and we couldn't change it because one of the application that was also using this kafka connect was not ready to work with Avro.
Working with one kafka connect per application per environment do increase the complexity of working if it, a bit more since we use AWS ECS for them (as almost all of our services) and sometimes when we want to work directly with the connector, it's a bit boring to log in the machine, in the container and only then we're able to work with it; but it's worth.
Make sure you've your retention properly configured
Remind yourself that persistence is important. We had a project that was consuming a stream of events from a kafka, and recently we had an issue with one of the kafka connectors, it no longer was able to produce the data properly for the elastic search. Though we solved the issue, we noticed only later that we had the default seven days retention, because of that we were unable to retrieve the lost data easily, we had to resend all this data to the kafka connector.
We need to restart the Kafka Manager container to make it work; every now and then. We're still figuring out how we can make a better use of it, or to find out how we can fix it.
Complexity and learning curve
I want to stress this again. Kafka is a powerful tool, but it does increase considerable the complexity of your working environment. It is not simple to understand how to work with it and even duplicate it in a different environment is somewhat complex. Expect to have some time of study and learning, specially with people that did not participate of its implementation. Consider a talk to your team, showing how it works.
Before anything, ask yourself:
Do we need to work with Kafka streams or are we using it only for the sake of hype?
While I'm all in adopting new technologies and work with every new cool framework, understand that sometimes you might be over engineering it.
Where are we heading?
We know we have problems to solve. To begin with we've a dependency of the order-api and while we can't solve the entire dependency, we could solve part of it by sending only the order id to the invoice elastic search, and then — upon retrieval — merge it with the document from the order's elastic search.
We could add the logic of creating an invoice from the raw data to the invoice-sync but then we're back to the issue of multiple implementations of the same rule; we're still thinking on what is the best approach for that. Maybe we should work on an asynchronous approach, where the invoice-sync would produce events of non-transformed invoices and the invoice-api would consume it and produce the transformed invoice, with that approach we would still depends on the invoice-api but we would not have an api request.
We've some applications that still use the reports-api, for those we've created an application that is responsible to manage the connectors of all of them. While it's quite handy and a really cool application, ideally we should follow the pattern of different kafka connect instances and elastic searchs for each application.
Part of the projects do not use Avro and we've some applications that still do not use Kafka, since Kafka is a trending here, we still have a few projects to update.
We should aim to a more automate approach that would certainly reduce the manual-boring task that more connect instances brought. Ideally infrastructure as a code, immutable. Our whole environment should be created by ansible and or terraform scripts, no human interaction needed.
There's the fact that Kafka has released a new version, and our Kafka versions are a bit old, we should upgrade them and enjoy all the new stuff.
A Cloud-Native Experience for Kafka in has just been released, we are reading it, maybe it can be our next approach.
I guess that's all for now, if you've any questions please let me know!
Stay awesome :)