Publishing Domain Events from Application Logs using Fluent Bit Kafka Output

Anil Dogan
Trendyol Tech
Published in
8 min readJan 24, 2022

In the current environment, using Microservices is a must for a company that wants its application to scale, be robust, or be easy to change. They bring companies the confidence of constantly improving their application through time. However, great Microservice architecture comes with great problems to solve as a developer.

One of the crucial requirements for Microservices is to communicate with each other without affecting the main purpose of the source service because it is not the source service’s job to let them know their updates to another service. It affects the main business of that source service by adding a new dependency to a transaction.

As Trendyol Marketplace team, we encounter a problem similar to the problem I explained above. We have one source service that handles user Authentication, Authorization, etc...

Let's call that Auth Service.

Then we need a new service that keeps track of user sessions, such as user login date, user logout date, or some information such as IP, agent, etc...

Let's call that Session Service.

The Session Service needs the information that is coming from Auth Service, such as:

  • User login time
  • User session extension time
  • User logout time

As Auth Service, in order to provide this information, we have to add a new dependency to publish these events to transactions like User logins, User logouts. However, we don't want to add another dependency to those transactions because they are one of the crucial businesses of the Trendyol Marketplace. It is not acceptable to increase the login response time or break a logout transaction because we failed to publish the session event.

The problem is called “dual write” which means an application has to change data in two different systems, such as when an application needs to persist data in the database and send a Kafka message to notify other systems.

The solution we used in this problem is called the Outbox Pattern. In this article, I will show how we implement the Outbox Pattern by sending Kafka messages from Application Logs using Fluent Bit Kafka Output.

Before jumping into the solution, let me show some information about Fluent Bit;

Fluent Bit

Fluent Bit is an open-source log processor which allows us to collect logs or metrics from different sources and process them at scale.

With Fluent Bit you can:

  • Use different inputs. It can be a CPU health metric, logs in a Kubernetes container, or just a txt file that contains some logs.
  • Filter your events as your needs.
  • Use different outputs to feed your end log data.

Key Concepts

There are some key concepts to understanding how Fluent Bit works.

Event or Record

As Fluent Bit, any log to be processed is considered as an event. According to Fluent Bit documentation, as an example you can consider the log below:

It means the file above contains four different events and two different components which are the timestamp and the main message.

Filtering

With Fluent Bit, you can filter your events as your desirable format to process. You can do operations such as;

  • Select and drop specific components with an SQL-like query.
  • Add some information to your events, such as IP, geo-information, etc...
  • Filter out events using just SQL-like WHERE clauses
  • Use windowing to make time-based operations.

You can take a look at different filters in the documentation here.

Routing

Fluent Bit allows us to deliver messages to various destinations. This phase allows us to stream data through different platforms. The routing is done by Matching where a Match represents a simple rule to select Events where it Tags matches a defined rule.

There are lots of Outputs made for Fluent Bit such as Elasticsearch, PostgreSQL, etc... However, the main focus of this article is to show how we used Kafka Output.

You can check out different data sources here.

Data Pipeline

This section is crucial for understanding how Fluent Bit works. The Data Pipeline shows how the data transfers through different pipes into the desired output.

In the chart above, the Input pipe fetches the logs from the source, it could be a Kubernetes container or just a txt file. Parser pipe transforms the data into a Fluent Bit-friendly format. Filter pipe eliminates the information that is irrelevant for the final output Buffer pipe makes the process faster, and finally, the Routing pipe delivers the data into the desired Outputs.

Implementation

High-level chart of the process

In this section, we will show how we implemented the process. This section consists of three different parts to achieve the end goal. Which are:

i. The Fluent Bit part where we take the raw log data from the Kubernetes container and Output it into Kafka.

ii. The source application part where we log the information into the Kubernetes container with Fluent Bit Kafka Output friendly format.

iii. The Kafka log processor application part where we consume the generic topic which is produced by Fluent Bit and publish the data into a topic-specific topic that is defined in the generic log message.

i. The Fluent Bit part

In this part, we will explain how we configured Fluent Bit to be able to use with Kafka.

In order to use Fluent Bit in your Kubernetes container, you have to use Fluent Bit as a Deamonset so that it will be available on every node of your Kubernetes cluster. To get started run the following commands to create the namespace, service account, and role setup:

After running all these commands above, you can now create your Configmap to get logs from Kubernetes and publish them to Kafka.

To publish Configmap you first have to change the Kafka bootstrap server to your Kafka server which is on the “output-kafka.conf” part of the configuration. You can access the configuration at this link

After changing the bootstrap server you can just publish your Configmap using the command below:

To filter out what is for Kafka Output and what is not we are using a filter called grep which allows us to match or exclude specific records based on regular expression patterns for values or nested values.

Here is how we configured the filter to match only the logs with kafkaEnabled flag set to true:

At the end of this part, we should be able to fetch logs from Kubernetes and publish them to the generic Kafka topic if the message’s payload has a kafkaEnabled flag set to true.

ii. The source application part

This part shows how we publish logs in a way that Fluent Bit Kafka Output fetches.

The first need of this part is a log handler which in ordinary logs, just logs the standard template of Trendyol uses, and for the Kafka Logger part it logs in a format that we created above, the log format must have:

  • A flag that has kafkaEnabled flag set to true
  • A Kafka topic to publish into (More on that on part iii.)
  • An optional Kafka message key if we need to publish with keys.
  • Kafka message containing the payload

To separate the ordinary log pattern and Kafka Output log pattern, we use a custom Multi Layout pattern that logs as ordinary log format if the class is not the KafkaLogger class that we defined to log Kafka Output. If the class is the KafkaLogger, then it just logs a JSON with the fields I show above.

To demonstrate the differences of these logs the first line shows the standard log, and the second line shows how we log for Kafka Output:

In the conclusion of this part, we created a KafkaLogger class which just logs the information as a desirable format for Fluent Bit’s Kafka Output without affecting the other logging standards.

You can check out the demo app about how we implemented the logger in this link.

iii. Kafka Log Processor

This part aims to consume the generic application log topic (which is the topic that Fluent Bit Kafka Output produces) and publish it into the specific topic, with the key and the payload that we defined in the source application (check part ii.)

In order to achieve this goal, we created an application called kafka-log-processor (available here) to consume the application.log topic that we defined in part one from the Fluent Bit ConfigMap.

Then we handle this event to extract the topic name, payload, and key in order to produce that specific topic.

Here is how we implemented the handler:

For this service, we used golang because it acts as a middleware for applications, it requires faster response times and it doesn't require complex business logic.

This part shows how we implemented the log processor. This application is crucial because its role is to separate the topics that coming from Fluent Bit Kafka Output’s generic topic.

End Result

After the implementation, in this part, I will demonstrate how the process works.

Here is how we log from the Auth API:

The code above means, publish to user.login topic with the SessionModel object with username as a key.

Here is the content of the generic application.log topic which was generated from Fluent Bit Kafka Output:

And for the final result here is the payload of the user.login topic which was generated from kafka-log-processor API from consuming the payload we show above:

Conclusion

We tried to explain how we solve the “dual writes” problem. The problem was, our Auth Service needed two data sources, one for main Authentication DB and one for Kafka that sends User Session Messages.

Our solution was implementing the Outbox Pattern by publishing Kafka events from Application logs using Fluent Bit Kafka Output. This method enables us to eliminate another Kafka producer dependency in our source service. This elimination brings faster transactions and less error-free service because of the lack of dependency. Our next goal is to use this method in other businesses to enhance our Microservice structure.

Thanks for reading. You can always contact us if you have any questions 🙂

You can check out the source code of the implementation here.

If you want to deal with these kinds of challenges you can join us.

Thanks to the Seller Onboarding team.

References

--

--