Enterprise Integration Patterns With Apache Camel

Diogo Ferreira
Quick Code
Published in
5 min readJun 16, 2021

Apache Camel is an integration framework. What does that mean? Let’s suppose you are working on a project that consumes data from Kafka and RabbitMQ, reads and writes from and to various databases, transforms data, logs everything to files and outputs the processed data to another Kafka topic. You also have to implement the error handling of the service (retries, dead letter channel, etc.) for everything to run flawlessly. It seems hard.

Apache Camel helps you to integrate with many components, such as databases, files, brokers, and much more, while keeping the simplicity and promoting enterprise integration patterns. Let’s see some examples, based on integration patterns. You can find the code in this repository.

Hi there! I want to tell you about a great open-source tool that is AWESOME and it does not get the love it deserves: Apache Camel.

Apache Camel is an integration framework. What does that mean? Let’s suppose you are working on a project that consumes data from Kafka and RabbitMQ, reads and writes from and to various databases, transforms data, logs everything to files and outputs the processed data to another Kafka topic. You also have to implement the error handling of the service (retries, dead letter channel, etc.) for everything to run flawlessly. It seems hard.

Apache Camel helps you to integrate with many components, such as databases, files, brokers, and much more, while keeping the simplicity and promoting enterprise integration patterns. Let’s see some examples, based on integration patterns. You can find the code in this repository.

We will start by consuming events from a Kafka topic and output to another one, taking advantage of the Event-Driven Consumer pattern. The events will be representation of text messages sent by a user.

Example of event schema that will be received from Kafka
Java code using Camel to read from topic and output message to another topic

That’s about it! We also added the log for us to see the message body in the logs. The log argument is passed using the Simple language, an Apache Camel language used to evaluate expressions.

Now let’s implement a message filter. This pattern filters out the messages that do not match certain conditions. In our case, we will only process those that have the type “chat”.

Camel route to filter out messages of type other than “chat”

Easy, right? We now unmarshal the message from JSON to the UserMessage POJO to be able to filter by type. We marshal again in JSON before sending it to another Kafka topic.

UserMessage POJO

Now suppose we want to store all messages in a file. Besides, for the messages where the emitter is “John Doe”, we want to store them in a different file, for testing purposes. For that, we can use the content-based router pattern.

After sending the message to the broker, it will now also store it in a file, depending on the emitter

If the file already exists, we will append the events and add a newline at the end of each event. For other emitters, we will do the same, but stores them in another file. It does look like an ‘if’ construct, right?

We can see a list of “devices” in the event, and we want to log them one by one. How can we do that? Using the Splitter pattern, we can iterate through any list. We can do it sequentially or parallelly. Let’s try to do it sequentially in this example.

Before sending the message to the output topic, we will iterate through each device and log it

We can split by any field that is an Iterable. As you can see, we are using again the Simple language to access the content of the event.

Let’s try something harder. We are receiving messages with text from various emitters, but we want to aggregate multiple text messages and create a new message with all messages for an emitter. To do that, we can use the Aggregator pattern. The aggregator pattern allows events to be buffered and wait for other events. When another event is received, it can be performed a custom aggregation, based on our needs. A new event is sent when a condition is met. That condition can be based on the number of events received, a timeout, or any other custom condition.

In our case, we will create a new POJO that will aggregate the text messages from an emitter. The new event will be sent after 5 seconds of the first event received for the emitter.

Aggregation to create a new event with the messages of an emitter

We are using an in-memory aggregation, but we could use other data stores, such as Postgres or Redis. We are using simple language to aggregate the emitter of the message, and we created a custom aggregation strategy, shown below.

In the custom aggregation strategy, for the first event (oldExchange==null), we create a new CombinedUserMessage with the text of the message. For all other events, we add the text of the message to the combined event.

Custom aggregation strategy

This is all great, but how do we apply transformations to a field? We now have a combined event, but what was great was if we could somehow process the combined event and turn it into plain text, by combining the multiple elements of the text messages. We can do that using the Message Translator pattern.

We can call bean() to apply a transformation to our message
Java Bean to apply transformations to a message

We can call bean functions directly from a Camel Route and perform all the transformations that we need,using plain Java code. Neat!

We can see that our Camel Routes are becoming bigger. How do we do if we want, for example, to separate them between files? Two in-memory components that allow us to do that: Direct and SEDA.

Direct is a synchronous endpoint that works like a call from a route to another route. Let’s use it to separate the route that stores the event in a file.

We separate our main route into two routes, connected with the Direct endpoint

Great! There is another in-memory component that will be useful for us: SEDA. SEDA works like Direct but is asynchronous, which means that puts the message in a queue for other thread to process. Let’s use SEDA to decouple the receiving of the message from Kafka from the routes that consume it.

With the SEDA endpoints, we now have only one consumer from Kafka instead of two

Now our routes are much simpler. Suppose we need to perform a periodic task, such as a cleanup. We can take advantage of the Timer endpoint. Let’s exemplify it by creating a route that runs every 5 seconds.

Simple timer that is activated every 5 seconds

Now that our application is almost ready for production, we have to improve fault tolerance. What happens if, for some reason, a message gets an error while in a route? Let’s implement the Dead Letter pattern. When there is an error in the route, the message is sent to another Kafka topic, so that later it can be reprocessed.

Configure a default error handler for all routes, using the dead letter pattern to send messages to a kafka topic

And that’s it! The error handler configuration applies to all routes in the class. We send the original message to the topic (the one that was first received in the route). We could also configure retry policies, with timeouts and other common fault tolerance configurations, but as we don’t need it, we will leave it as is.

Now that we are reaching the end of this article, I also wanted to show you something: it is possible to configure REST endpoints as Camel routes.

REST endpoint for a GET to /api/helo

As simple as that! We just configured a GET for the URL /api/hello, to be answered with “Hello World!”.

As you can see, Apache Camel is a framework that simplifies the integration with other components, supporting the enterprise integration patterns and making it easier to create data pipelines.

I hope you have liked it! Thank you!

--

--

Diogo Ferreira
Quick Code

Software Engineer @ Timefold. Passionate about cloud and data-driven architectures. https://diogodanielsoaresferreira.github.io/