Event-driven architecture made easy with Apache NiFi
This article shows how to build an event-driven architecture using Apache NiFi. It will make easier to stream events without adding process loading your application and practically without any coding.
Introduction
The event-driven architecture is an architectural paradigm based on applications that produce, detect, consume and react to events. What are these events? It can be anything that makes sense in your business domain. In the case of QuintoAndar we can have events such as “house_created”, “house_updated”, “contract_signed”, “contract_finished” and infinite other events running through the entire process of the renting or sale.
When we are working with monolithic applications, make them create flows that depend on a change in some data is quite easy. This is because it is not necessary to inform several different applications about the event that occurred. But this is no longer true when we have a microservice architecture in which we have scenarios where services A, B and C need to be notified about an event that occurred in service D for example. In this case, how will your application notify everyone that an entity has changed?
Here at QuintoAndar when the house status changes to published, we’ll need to notify that a new house has been published and should appear in the search for example. Will it make an HTTP call to all other services? These calls may fail, leading to inconsistent data also adding new remote calls for each service that needs this information can cause serious performance issues. Will other services be pooling for changes? This can add a high overload in your service that constantly replies to a lot of HTTP calls asking for changes. Furthermore, this strategy creates a high coupling between the service because when you need to change something in your data structure you’ll need to handle this change in all your dependant services.
If we use alternatives that make the services very dependent on each other, we end up with several microservices that do not work if one of them is momentarily unavailable, which is nothing more then creating a distributed monolith. And it is exactly at this point that an event-driven architecture comes in, where communication among microservices takes place through events. The system that owns the business domain produces the events and the other microservices that need this information, consume these events to react to changes that occurred in the domain.
What is Apache Nifi?
Apache NiFi is a project created to automate the data flow between systems. You can create everything from simple to complex streams without the need to code anything. Through a graphical interface, it is possible to create flows just by configuring processors and interconnecting them to build the desired data flow.
Nifi comes with more than 280 built-ins processors, in the 1.11.0 version, that can be used to read database data, make HTTP requests, post to SNS topics, SQS queues, Kafka topics, Pub/Sub topics, monitor flow activities, and many other options. And since that NiFi is Open Source, if you can’t find a processor that does what you need, you can create your processor or extend one that already exists to satisfy what you want to do.
Event Streaming with NiFi
Before we start talking about using NiFi for event streaming, we need to talk about some definitions that we already have adopted here at QuintoAndar to better understand the example that will be presented. We have as a definition that the business events generated by the application are posted on SNS topics with a header called “event_type” and another called “event_date”. We also have a definition that allows us to enrich the event using the NiFi flow, so we’ll use it in your demo by calling an application endpoint.
This flow that we are going to create will stream events by listening to changes in the review service database. We will use the visit review database to create the events. For the visit review flow, a review is created with status “PENDING”, when a person completes the review, the status changes to “DONE” and if the person refuses to fill the review form the status changes to “DISMISSED”. In this system, we have an audit table that receives a new line whenever a review is created or updated. So our NiFi flow will stream 3 different business events, which are: “review_pending”, “review_done” and “review_dismissed”.
So, let’s start, first we need to create a processor group to contain our flow, the processor group helps the NiFi interface to be more organized by not leaving all flows at the root and also helps to manage permission for each flow.
With the processor group created, let’s go inside it and create our first processor, which will read the rows from a table in the database. The processor we are going to use is called QueryDatabaseTableRecord.
We will configure it to read the review_aud table, this table follows the structure defined by the Hibernate Envers Framework. It will return all fields and the where clause will be that it will only return if the status field has been changed, for that we will use the field status_mod that is true when the status has changed. And we will use the rev field which is the audit revision id to maintain the processor’s state and ensure that it will not read the same line twice. The configuration will look like this:
This processor will generate a FlowFile for each line of the audit and with that line, we need to enrich the payload to set up the event and post it on the SNS topic, for that we will make an HTTP call to the review service. The endpoint uses the id of the review as part of the path, then we need to extract this information from the FlowFile and put it in a FlowFile attribute so that we can use it in the processor that makes the request. To do this we will use the EvaluateJSONPath processor.
Having placed the evaluation id as an attribute, we can create the processor that will make the HTTP call to the service and enrich the event, for that we will use the processor InvoqueHTTP.
After we get the response from the endpoint we’ll have all the data necessary to build the event payload, so let’s use a processor that will take the data that is in our FlowFile and format it in the pattern that we need for our event and only with the needed field. This processor is called JoltTransformJSON and as its name says, it uses JOLT to transform the JSON that it receives.
To do the transformation, we will use the spec shown in the figure below. In the figure we have the first field on the left witch is the payload we received after the HTTP call and is the FlowFile content. In the middle field, we have the jolt spec to transform the payload into the format needed to post on the SNS topic and in the last field on the right we have the result of applying the spec to the JSON input.
Now that we have our payload ready to post on the SNS we just need to extract some information to use as headers in the message that will be posted. This will allow SNS subscriptions to filter which events and from what date they want to consume the events since we will extract the values of eventType and eventDate for attributes from the FlowFile content.
After placing the values as attributes of the FlowFile, we have nothing left to do but post this payload on the SNS topic and finish our flow. So let’s create the processor for that.
Now we need to configure it to post in the correct topic and also to send the headers in the message that will be posted, as we did in the gif below.
The complete flow was as in the image shown below and to achieve it we have followed these six steps:
- Create a QueryDatabaseTableRecord processor to read the data from the review_aud table;
- Create an EvaluateJsonPath processor to extract the review identifier and place it as a FlowFile attribute;
- Create an InvokeHttp processor to call the application endpoint using the review identifier to enrich the event payload;
- Create a JoltTransformJson processor to transform the FlowFile content in the standard format that we use to post on SNS topics;
- Create another EvaluateJsonPath processor to extract the eventType and eventDate values from the FlowFile content to FlowFile attributes;
- And finally, create a PutSNS processor to post the message in the desired topic with the needed message headers.
To make the creation process clearer, we don’t have included retry and monitoring flows that we use here, but it is quite simple to add to the flow. NiFi has built-in processors to handle retries, so we could find places where the flow can fail like the HTTP call, for example, and add a retry processor. And there are also processors for monitoring the flow and we could add it after the post in the SNS message to monitor the success or failure rate of the flow.
Conclusion
With this example, we can see that NiFi is a very powerful tool and that it helps us to create data flow without any coding. Others advantages to using Apache NiFi are:
- Visual organization of processes;
- Advanced monitoring;
- NiFi User Authentication and authorization for each data flow;
- Traceability of data trafficked by the ecosystem;
- Low coupling among services;
- Programming language agnostic;
- GitHub versioning NiFi flow configurations;
- Low implementation and maintenance cost (codeless);
- Event replay facility
- Reliably deliver messages
- Simple message filtering
- Prioritized Queuing
- Queue back pressure build-in
- Easy to scale based on the workload
All these advantages, among others, will make it easier to build an event-oriented architecture, removing the need to stay pooling or creating a webhook structure to know that some important business event has happened.
If you liked this article and want to help us change the way people live: