Java Route Parallelization & Aggregation using Apache Camel 🐫

— With in JVM

Ground Breakers: SpringBoot, Jersey, Camel, Postman, Karate, Gradle

Camel is known for EIP, however it has several other powerful features. One among them: Camel can manage the threads for you in a JVM, instead of you creating and managing them manually. It also allows you to run those threads in parallel.

First, let’s imagine a hypothetical business use case: A tenant wants to send a request to rent a property to multiple property owners.

📋 Client:

Tenant → Choose property owners → Fill in personal details → Send the request

📋 Server:

Receives request → Finds the number of property owners → Kicks off dynamic routes (a thread for each owner) → Loads owner configuration (Owner details, their endpoint etc)→ Transforms owner URLs → Applies owner specific regulations (Payloads) → Aggregates all these routes into one (as a single request) → Analyzes for errors → (Custom Function) Establishes the hypothetical communication with property owners.

Below is a diagram of the above text (click to enlarge).

👶 Basic and important Camel terminology to be familiar with:

  1. Exchange — an object that carries the data.
  2. Producer Template — Kicks off your route.
  3. Route — A custom DSL definition

❗ Objects Under Exchange to note:

  1. In
  2. Out
  3. Body

❗ Methods Under ProducerTemplate to note:

  1. send(CustomRouteId, Exchnage)
  2. asyncSend(CustomRouteId, Exchange)

❗ DSL words for Route Definition:

  1. from
  2. to
  3. bean
  4. process
  5. threads

These are good enough to start an amazing camel journey with. Lets take a look at them in action, awesomeness of camel is about to start:

👉Route Definition:

from("REST").process(saveToDB).dynamicRouter(method(dynamicRouter));

So, “REST” is an unique identifier for the above definition what we can use kick off this route.

👉Route KickOff:

Exchange response = producerTemplate.send("REST", exchange);

That’s it! All that is left to do is connect the dots to execute the each piece of business logic one after the other in terms how to define a route and kick it off.

Time for a 🍵5️⃣ min break. Alright, now start from the beginning one more time before proceeding to help make the below content easier to understand. 😅

Coming back to our business use case: When a tenant chooses multiple property owners and sends the request, a controller will pick up the request and kick off the above route having an Id: “REST”

This route will first save the request into a database and execute a dynamic router. Dynamic router is now initiating multiple other routes(a route per property owner) asynchronously (having a route id: TENANT)and returns immediately to the controller with an ID(useful for further verification of the status of this request), while the kicked off routes are in progress. This is a pretty cool concept that let’s Camel do the hard work without getting our hands dirty.

👉A TENANT route definition.

Clicking on the above image will take you to the code.

👉Aggregation Strategy:

Clicking on the above image will take you to the code.

Using old and new exchange from a Strategy Component, we know how long we took with the requested dynamic routes.

Aggregator will wait for each route to be completed in its configurable timeout of 1000 milliseconds.

For example, if Tenant chooses three property owners in the submitted request, DynamicRouter will kick off three routes, one per each property owner and Aggregator will wait until the three routes are completed before the route kicks off OwnerContactProcessor, per the TENANT route definition. All Clear? If not, read the route definition carefully again.

🍦Springboot & Camel integration can be found in the application.yml

spring:
jersey:
application-path:
kiraya
camel:
springboot:
main-run-controller: true

💻Find the code base for the above business use case implementation on github. Once it is cloned to your machine using git clone, get into the camel-routing-aggregation directory and run using below command:

Find an working example on github
# To Start the application
F:\camel-routing-aggregation> gradle bootRun

When a route is failed: we can clearly see where exactly we have the error upon looking at the logs:

Route Component failure tracing

Invalid Correlation Key Error is what we will see below when we missed passing the headers correctly for those processors in the routes. Make sure headers are set right at each route component for further processors to understand the route correctly.

When you submit a request with below payload:

{
"name":"Gopi",
"owners": [{"ownerId":"owner1"}, {"ownerId":"owner2"}],
"message": "TESTME"
}

Below is an example of what the logs show. Observe the threads that are managed by Camel for us, which is highlighted. Note that for the subsequent requests these threads will be re-utilized or created new as needed.

🙌Ting Tings:

  1. When a bean is passed to process(bean) method — Camel calls the overridden method automatically from its Super Interface Processor.
  2. When a bean is passed to method(bean) method — Camel expects you to add the annotation @Handler on a custom method to call.
  3. In/Out objects are and as part of exchange that can carry the data from one component to another. You can set headers and body in these objects.
  4. *Important*: An Out object from a component will become an In object at the other. Same like for In as well.
  5. The constant that we used to aggregate is “AGGREGATOR”: This was a custom definition defined in DynamicRouter. Its the same used in the route definition as well for the aggregation.

🍹Karate Test for you to kick off the endpoint that kicks off the routes. [karateTest.feature]

Find an working example on github

Below is a final example of threads for multiple requests, with efficient resource usage. Have fun with programming.

Thank you 👏 Kelly Birch for limiting and keeping the content at its possible simplicity.

Gopi Krishna Kancharla- Founder of http://allibilli.com

--

--