Data Flow With NiFi: (Writing Data to MySQL, MongoDB and Slack from Stream Data API)

bilative
12 min readOct 28, 2021

--

Hi everyone,

In the world of the data there are so many things to do. Analyzing data provide us to take beneficial insignt and we take actions with results of the analysis. But data that we analyze not come us as csv (in real world 😃), sometimes we take them from databases or APIs. But where they are comes to databases? Taking them with APIs to Python is not something efficient you know, we should save them somewhere, how?

We’ll share one of the answers of this question. Actually there is so many options maybe, but we’ll see one of the easiest; NiFi.

NiFi

Apache NiFi is a flow-based tool that we can configure the flow of the data step by step. NiFi let us to take data from different sources (from files, SQLs, NoSQLs, APIs, Kafka etc) and tranform it with NiFi Expression Language and load/send it to any target sources that we want. At the same time we can specify some actions as output of the processes like sending messages to Slack or e-mail.

In this article I’ll show you one of my mini projects. In this project I used some concepts with NiFi to take data and trigger some events. You’ll see this concepts in this article:

  • Live Stream Data API as data source of NiFi,
  • Transformation and filtering steps with NiFi Expression Language,
  • MongoDB and MySQL as target data sources,
  • Sending message action to Slack as output of some events,

In this article I’ll explain every details step by step, so this articles could looks little long.

Lets start.

Firstly I launched all scnario on virtual machine on GCP, but platform you use is not matter, machine on AWS, Azure or on local machine is ok. There is only 1 critic point, if you use cloud servises you should specify Firewall Rules, to connect NiFi on your local machine/browser.

First step is to install NiFi, I used NiFi 1.13.2, and you can find the binaries file here. Again if your machine is on cloud services, you should delete default host configuration. The steps you should follow is

entering /conf/nifi.properties file and converting

* “nifi.web.http.host = 127.0.0.1” to “nifi.web.http.host =”

after this we can start NiFi with “sh nifi.sh start” command.

Now we can go to “ip:8080/nifi” on our local computer (if you installed nifi on your local machine, you can go to 127.0.0.1:8080/nifi).NiFi is running and we can start our project!!

Now NiFi is ready to built our data flow.

Now we should decided data source for our task, this could change with your task. I decided to take AviationStack. This website provide us Live Stream Flights Data with API.

Our path for this project is gonna be like this:

  • Taking data from API,
  • Splitting data (json)flight by flight,
  • Transforming data to target format,
  • Specifying attributes to filtering steps,
  • Specifying Attributes to trigger events,
  • Sending data to target point. (MySQL, MongoDB, Slack, )

Our final picture will look like this:

Now, In NiFi we build flow with processors, you can find the processors right side of the nifi logo;

To create a new processor you should left-click it and drag it to center of the screen, after this we can choose/search the processor type that we want to use. In this example we’ll use invokeHTTP, so I found it and clicked add.

You can search anything you need. Possible output could give you some guidance.

After double click on new processor (or right-click -> configuration) we can start to configure processors. In this processor we need to specify properties of our Web API. So I created an API Key from AVIATIONSTAKC. For example in our example we should send “get request” to api, and we set aviation api url as “Remote URL”.

Only first 2 rows’ should added. You can use different APIs.

And in “Setting” part we should set “Automatically Terminate Relationships” options to specify when our processor should terminate to flow data? In our case we should choose all except “response”, because job of our processor is to give us response. After this we can press apply button.

Before configure processor we saw a processor like left one, There is an “alert symbole”, so there is something to do with this processor. And after set the configurations we saw it like right one with “paused symbole” this means configurations set, processor/task is ready to run.

Now we should take a breath and observe the data we take. To observe data we took, we should add another processor and connect 2 exist processors to each other. For this you can create another invokeHTTP processor and connect first one to second one (second one is dummy processor, we’ll delete it later). When you connect them; NiFi want you to choose relationship type, you should choose “response”, because invokeHTTP send http get request response as output.

For every processor connections we should choose only needed relationship type.

After connection you can start first processor. And couple seconds later you can choose “refresh” by right click the interface. And After refreshing you’ll see data size of the queue like this:

If you started a processor and it works well flow could start. But if your 2 consequence processors are running you could not see queue probably. Because there could not be data in the queue, it goes next one instantly.

If you right click the chain-wire and choose “list queue”, you’ll see and check the list of the data.

Different flight informations listed on “data” in this json data. For example 1 is the first flight’s information, 2 is second flight’s informations. And there is about more than 100 flights in this batch.

This is the data that return from API. There is some key point about the data;

  • Data comes as JSON format,
  • Every flight informations comes as a list in JSON
  • If we took them as bunch like this, in every bunch there could be houndreds of “flight_date” and we can’t handle with them directly.

So we should take flights piece by piece. To make this possible we should split Json to parts. And a hint: name of the list variable is “data” as you can see by green arrow.

So we should use a new processor for this task. Actually name of our processor same as our aim; “SplitJson”. Btw, you can search things directly in processor search bar, probably you can find what you need with this way. After we add new processor called “SplitJson” we should double-click it and set “Properties -> JsonPath Expression“ as the point that we wanna split. In our example this is “data” so I made it like this.

And we set the “settings -> automatically terminate relationship” as “original and failure”. Because we want that data flow should continue only while data splitted. Original ones and failured ones should not go to next processor.

To see the output we can add dummy processor again and connect it to splitJson processor, and choose relationship type as “split”. And our latest picture should be smth like this:

last processor is dummy processor. We added it only to see data on queue.

When we click second queue and choose “list queue” option, we can see that our splitting process is succesfull.

When you check them (by clicking the eye symbole) you can see every flight splitted by parts. So you’ll se smth like below.

Now each flight informations splitted different batches. So we can filter, load and transform them flight by flight. This was neccessary for our trigger processes. Firstly we should decide if we want to all variables or not? And If we need some convertion or not? My answer is; yes, I wanna do some fixes on data. Firstly I dont want them as a format like this, schema paths are not needed for my project, so I wanna take them with no main attribute; all of them gonna be on same line. And second one I wanna create a new variable called “uniqueID”, because I need a variable that shows me the uniquely flights with their date, flightname, and icao. This types of needs require a specific processor. This processor make our aim possible with a special syntax. JoltTransformJSON; with this processor we can specify our needs and we could write the convertion script with NiFi expression language.

For this we add a new processor called JoltTransformJSON, and connect it the processors with “split” connection type.

And after double click on JoltTransformJSON we can click “ADVANCED” button. And we’ll see a pop-up with 3 windows. Now upper window is to write commands to make needed convertion possible. And bottom-left one is to check command with sample input and right one is to see the result. We should fill the bottom-left one with our json sample, you’ll see it blank firtly.

Actually writing command with NiFi expression language is easy but firstly you should observe some examples and start with easy steps. Now in our example we gonna take only some specific columns and change their column names. And additionally we’ll add a new column as uniqueID.

In this part you can see the jolt transform script to change names of the entities and create new uniqueID. This part is actually easy but you need too observe some example. You can find the usable options here.

There is smth like this in NiFi; if you wanna specify a condition (like if else in programming) to trigger events you should specify the entities before create conditions. And for this task we need to use a new processor called “EvaluateJsonPath”.

Notice that there is paused symbole on configured processors. So we can start end stop them when we want. But unconfigured one have exclamation points on itself.

And after doucle-click the EvaluateJsonPath we can configure it. We should choose if we wanna create a content or attribute; in our situation we are creating a attribute. And after this we click the + button and add some new attributes. We put only 2 parameter when we create an attribute; name and the value of the attribute. We can choose anything as name of the attribute, but we should write the value in a format like → $.entityname. And It is ok to specify only entity we wanna use in (if else) conditions. Conditions will specify another processor.

You can think like these ones are the variables that we can use on next processors.

Now we’ll create conditions to trigger events. To specify conditions we use “RouteOnAttribute” processor, and connect it with the processor before. Configurations’ll use the attributes we identified.

In the scenario I followed, we need 2 conditions.

  • If airline name = “Turkish Airlines”?
  • If flight goes from Turkey to Turkey?

To make create this condition criterias I configure processor like this:

in first condition I converted all “airlines_name” to upper letters, and later compare them with “TURKISH AIRLINES”. And second one also includes same logic.

${airline_name:toUpper():contains(‘TURKISH AIRLINES’)} as “airline_check”

${starting_timezone:toUpper():contains(‘TURKEY’):and(${target_timezone:toUpper():contains(‘TURKEY’)})} as “from_tr_to_tr”

  • ALERT! One of my friends warned me; timezone of Turkey is “Europe/Istanbul”, so we shouldn't write “TURKEY” here, we need to write “ISTANBUL” instead.

To specify a condition / route on attribute we specify the name of the attribute and the command for our aim. You can check the command that we can use here. After this part we’ll trigger our events with these attributes.

Now we finished half of the project and we can start second part of it. Until this stage we prepared our data to send events. Now time is to trigger events. For this I’ll firstly share the picture with you and later explain them step by step.

There are 3 target event here:

  • Part 1- Load all data to MongoDB.
  • Part 2 - Send alert message to Slack when airline name is “TURKISH AIRLINES”,
  • Part 3 - Load data to MySQL when flight direction from Turkey to Turkey.

Now part 1.

Our first event is to sending all data to MongoDB. As you can see in the screenshot before MongoDB’s connection later than JoltTransform processor. I did this because I wanna send all data to MongoDB. There is no filtering with in this step, I wanna take them all to MongoDB. Bc of this I put this processessor before same processors.

For write all data to MongoDB we should create a processor named “PutMongo” and start to configure it. To specify configurations we should follow a path like this:

PutSlack -> Properties -> ClientService -> MongoDBControllerService

and click the “ →” symbole on the right side. -> click “+” symbole -> and add “MongoDBControllerService” -> “click gearwheel symbole” -> and fill required informations

Actually in my example I run this mongodb on local docker so ip address that I use is 127.0.0.1 and I used username and password as “username” and “password” you can change it with your username, password and ip:port informations. After this you should click the flash/lightning symbole to make it run. And that is all, now mongodb event ready to start.

When we made everything write, after start processor we’ll see data on MongoDB correctly.

Part 2:

Now firstly we’ll use put slack. For this step we should use Slack API, create new app and took “webhook Url” of our application. Later we’ll create a processor named “PutSlack” and connect it with the processor before, with relationship type “airline_check”. This processor’ll be triggered when a airline of flight is contains/equal to “TURKISH AIRLINES”.

Thats all. This configurations are enough to trigger and send message to slack channel. When NiFi see a data from “Adnan Menderes Airport” will send informations of the flight to Slack channel.

When we start our processor it’ll send messages to slack channel like this.

Part 3:

This event’ll run only when a flight is from Turkey to Turkey, so a flight that fly only in our country.

This time we need more steps to take. Firstly we need to create insert query from our json data and column names, and after this we need to put this insert query to engine that run sql queries.

Firstly we should create table in MySQL with entities that we’ll write data from NiFi. Column names have to be same. For example I created a table, and after write the data I looked like this:

I checked if I can write all data. And after this I cleaned the table to write only the data matching the condition.

After creating table on MySQL we are ready to apply steps in NiFi. We need to add a processor named “ConvertJSONToSQL” and “PutSQL” and connect them with the processor before. And we can start to configure first processor (ConvertJSONToSQL);

Firstly I set table name as FLIGHT_INFOS, you can set it what you called in MySQL. And selected INSERT as event type. And now we should click the symbole in the green circle to configure JDBC Connection Pool.

After this we should click “+” button and select DBCPConnectionPool and click “gearwheel” and start to fill configuration requirements.

Actually we should only fill these rows. My MySQL db is on local docker so I created connection url with 127.0.0.1:port. Secondly database Driver Class Name is something we can find on the internet by database type, this one is driver class name of the MySQL globally. And third one is the direction of the database driver. We should install database driver in the same machine with NiFi. This one is mine directory.

And lastly we should write our database username and the password.

After this specifications, we should click ok and run it with flash symbole.

And after finished configurations of the ConvertJSONToSQL we can specify “PutSQL”. Only thing that we should do is selecting JDBC Connection Pool as “DBCPConnectionPool”, we configured our MySQL connection informations processor before. So it’ll see the informations, we should not to set them again.

And thats all. Our all configurations are done. Now our all processors ready to start. Only thing that we should do is run them all.

One of the most important things about connections of the processors is their connection type. The next processors triggered only connection type valid. For example;

  • We selected “response” as first connection type because data could go to another processor if response seen.
  • We selected “split” in the second connection because next processor should be triggered if and only if next data could be splitted.
  • Or ConvertJsonToSQL and PutSQL connection type is sql, because only the sql strings should go to another processor.

Another important criteria we should keep in mind is termination criterias. Termination criterias specify when should we stop to send data to another processor. We should fill this for every processor. Criterias of last processors’ are not important, you can choose all criterias or only one.

Now everything is right. Our project runs well. Actually this is a long blog post, I tried to explain everysteps right, this is tutorial more than documentation maybe. If you have any problem with the steps. You can write me from social platforms.

Thank you to read this. I hope this could be useful for you.

linkedin,

github

website

twitter

--

--

bilative
bilative

Written by bilative

I’m a Data Scientist with passion to learn more about analysing and processing data. #datascience #statistics

No responses yet