Working with CSV and Nifi

Esdraslimasilva
10 min readJan 7, 2024

--

Nifi is a flow automation tool, like Apache Airflow. But it was built to work via GUI instead of progamming. Today we are going to build a Nifi flow to process three csv files and put them into a Postgresql Database.

You can see all the flow bellow

We’ll get three different files in the csv format, and we’re gonna inject the data into the Postgres database. you can create the tables with the following script

CREATE TABLE companies (
id bigint NOT NULL,
name text NOT NULL,
image_url text,
created_at timestamp without time zone NOT NULL,
updated_at timestamp without time zone NOT NULL
);

CREATE TABLE campaigns (
id bigint NOT NULL,
company_id bigint NOT NULL,
name text NOT NULL,
cost_model text NOT NULL,
state text NOT NULL,
monthly_budget bigint,
blacklisted_site_urls text[],
created_at timestamp without time zone NOT NULL,
updated_at timestamp without time zone NOT NULL
);

CREATE TABLE ads (
id bigint NOT NULL,
company_id bigint NOT NULL,
campaign_id bigint NOT NULL,
name text NOT NULL,
image_url text,
target_url text,
impressions_count bigint DEFAULT 0,
clicks_count bigint DEFAULT 0,
created_at timestamp without time zone NOT NULL,
updated_at timestamp without time zone NOT NULL
);

By the way, we’re going to use the data and schema provided by the Citus Tutorial

Setup

Here i am not gonna cover the installation and set up of Nifi and Postgres, but you’ll need both installed and up and running to proceed.

Understanding FlowFiles and Processors

Flow Files are objects in Nifi that handle the data, that it is called content and some attributes and metadata, referred as attributes. It’s important to notice the difference because all operations work upon these two concepts

In our case, the content will be the rows, and we’ll use the filename attribute to filter and send different data to difference processors.

Processors are peaces of code that perform some operation or transformation. You can think of processors as programming functions. They receive FlowFiles and pass to another processors.

Getting the data

Download and move to a known directory the following files:

Now we can build the flow

Building The Flow

1. Reading the files

To Make anything in Nifi, we need processors. To get a File and read its content, we need the GetFileProcessor. To add a Processor drag and drop to the canvas the upper left icon:

You’ll see a screen with different processors, to find one quickly, just type in the search bar. To change the processor behavior we can change its properties, to do so, click with right mouse button in the processor and choose the option configure. The GetFileProcessor has the following properties

The properties that are in bold are required for the Processor work. In the pic you can see the Input Directory and the File Filter properties, they are the key properties. The input directory is the path to the directory where the files you’ve download are located.

In my chase i saved them in the /home/limz/citus-learning directory (it has this structure because i am using Linux). You should provide the absolute path on your computer.

In file filter you can either put the file name like ads.csv but we want to get all three files, that’s why the value is the following

[^\.]*.csv

It just saying that all files that ends with .csv must be a target

2. Routing the FlowFiles

Notice that when Nifi get’s the file, we now have a FlowFile with the content being csv file’s content. Now that we can get the three files we need to send them to different processors flows because each one has a different structure, and each one is going to a different table in the database.

To do that, we need a processor called RouteOnAttribute, sou we’re gonna send the flowfile to a specific path based on some attribute. Bring the processor to canvas and right click to change its properties.

You’re going to see just one property: Routing Strategy. Choose Route to Property Name, that says to processor that each FlowFile that match a condition needs to be redirect independently, so if you provide 3 different conditions, there will be 3 path available to send the flowfile.

To provide a condition, we are going to add a custom property to the processor. To do that, click on the plus sign located in the right on the processor properties tab.

The Property Name is the condition name. You can name whatever you want. When you hit Enter you’ll need to provide the condition itself. Here we are going to use the Apache Nifi Expression Language to access the FlowFile attribute.

The logic is to send data coming from different files to different paths, so the data coming from the ads.csv will be sent to a specific flow. Bellow you can see the three conditions

The syntax can be weird, but we’re accessing the filename attribute of the flow file and checking if it is the same as we provide int the parenthesis.

3. Connecting The two processors

Now we need to tell nifi that the flowfiles got in the GetFile Processor need to be passed to the RouteOnAttribute Processor. We do this using the Relationship concept. It is just a link between two processors.

To connect both, hover over the GetFile and you are going to seee a arrow, drag and drop the arrow to the RouteOnAttribute processors. You’ll see all relationship (or conditions) to connect both.

For this case there is just Success relationship, meaning that all FlowFiles that were successfully processed, are going to be passed via this relationship. Click and add and let’s continue

4. Adding Headers to the CSV file

We have a problem, our csv files do not have a header, meaning the the first row is already the data itself, so there’is no way of knowing each column is what. For that we are going to use a processor.

You can simply edit the file with Vscode or anything like that, but we’re here to learn right? If you think is a waist of time, building a processor for this, you can just edit the file and add the following headers above the first line:

ads.csv

id, company_id, campaign_id, name, image_url, target_url, impressions_count, clicks_count, created_at, updated_at

campaigns.csv

id, company_id, name, cost_model, state, monthly_budget, blacklisted_site_urls, created_at, updated_at

companies.csv

id, name, image_url, created_at, updated_at

To do that via processor we are going to use the ReplaceText processor. As we are going to send the FlowFiles to three different paths, we need to use three ReplaceText processors. Let’s configure together one of them.

After dragging the three ReplaceText processors, choose one and get into properties. To no get confused, you can rename the processor in the settings tab

In properties, we need to edit to change the replacement strategy. In this case, we want to add the headers line above the first line of the file. For this example i am going to set up the replace editor for the ads.csv file.

Choose prepend on the Replacemente Strategy property, and in the Replacemente Value copy and paste the headers:

id, company_id, campaign_id, name, image_url, target_url, impressions_count, clicks_count, created_at, updated_at

finally, in the Evaluation Mode change to Entire Text, otherwise, for each row on header line will be added. The properties should look like this

Do the same thing for the companies.csv and campaigns.csv replace text. The only thing will change is the replacemente value.

5. Splitting the records

Remember when i said that now we have FlowFiles with content being the csv file’s content? Now we need to split the rows to insert into the databse. Again we need three different instances for the same processor for each path.

The processor we’re going to use is the SplitRecord processor, pretty straightforward. Again i am going to use the ads.csv path to illustrate.

In the properties we have just three properties. Let’s focus on the Records Per Split, in our case we want one row each time, so we need to set the property to be 1.

In the Record Reader and Record Writer we need to pass Controllers. It is just helpers to provide to the controller the right interpretation for something. In this case we need to provide controller that will tell the processor that the flowfile coming is from a csv source, and tell how to treat that.

You must be thinking that would be much easier to just have some properties in the processor itself to provide that information, but it is important to notice that this processor works with a bunch of different types like json, csv and so on.

To provide the Reader, click on the Record Reader property value, click on the drop down and then click on the Create New Service

Save the changes, and now you’ll need to choose the right service. In our case is the CSVReader.

You can name the Controller Service Name whatever you want and click in the create button. Now we have to set up the controller (i know, we’re almost done hold on)

After that, you’ll see a little arrow pointing to the right, click on that save the changes and you’ll be redirected to all controller services you’ve set up. Click on the gear and let’s configure

In the properties tab, change the properties. I know you are overwhelmed by explanations, so you can just match with the following image and skip the bla bla bla

Here goes the bla bla bla. In shcema access we choose the infer schema option, so the controller will know all the columns. If we did not have provided the headers, this would be a lot more complicated.

The other important property is treat first line as header, that must be set to true, so it won’t be handled as data itself. Click in “ok”, and finally we need to enable the controller. To do so, just click on the lightning icon and then in enable

Now we need to do basically the same thing to the writer, but when you create the service you need to choose CSVRecordSetWritter and the controller services properties must look like this

enable the service and let’s continue, i can hear your *sigh

Do not forget to do the same thing to the other two processors (for the companies.csv and campaigns.csv). The good part is that you just need to choose the csv reader and writer that you’ve created and click in apply. No more configuration on these controllers.

so all three processors must be like this

6. Inserting into database

Finally we can get each record (or row in our case) and insert into the database. For that, let’s use the PutDatabaseRecord processor. In the properties we have the following. Again, one instance for each FlowFile path, and again i am going to use ads.csv path to illustrate.

In the properties we have the following

You can see again our CSVReader, thank God we’ve already configurated that, just need to select it. In DatabaseType you can select PostgreSQL, in statement type (the sql DML statement) in our case INSERT, and also the Table Name, that in the postgres database is called ads.

You probably noticed that we do not have properties for host, password db name and so on in the properties, and yes, we need another controller. Let’s do this quickly, the controller is in the Database Connection Pooling Service. You have already done this, create new controller service and choose DBCPConnectionPool and click on create

Now, let’s on the controller services properties (remember, to click on the right arrow on the right corner of the property and then click on the gear ok?)

We are going to connect via JDBC, and the needed configuration are the following

your connection url probablye will be something like this

jdbc:postgresql://localhost:5500/your_db_name

For JDBC work, just download the Postgresql driver here and move to a directory (i recommend to move to the a folder called drivers inside the Nifi directory)

In the Database Driver Class Name insert this

org.postgresql.Driver

And provide the driver path, in the Database Driver Location(s) property. Finally, insert the Database User and Database Password. Click in ok and enable the service.

Do the same thing for the campaigns.csv path and the companies.csv path. The only thing that will change is the table name for each process like campaigns bellow

When you setup all three instances you can finally see the all workflow ready to go!

Click on the board, right click on the board and click in start to see all the flow running. IF everything went fine, you can check your tables on the postgresql

Conclusion

Working with Nifi can be challenging, understand all these concepts are hard and will take a while to you figure it out. Thanks for reading, and hope this was useful to you.

Keep Goin’, bye!

--

--