Executing Python Scripts and SQL Quries In Apache NiFi

Yousef Alkhanafseh
TurkNet Technology
Published in
11 min readApr 2, 2024

It discovers how to execute Python scripts and SQL queries in Apache NiFi , covering essential steps and configurations.

Image generated by DALL-E 3.

I. INTRODUCTION

Generally, the field of data science is rich with a variety of tools and programming languages. Among these, Python and SQL are two of the most commonly employed in data science roles [1]. In addition, one of the most powerful dataflow tools is Apache NiFi. Consequently, this article is dedicated to providing a detailed demonstration of executing Python scripts and SQL queries within the Apache NiFi. The article also shows their logging methods, providing insight into the process of capturing and recording data in Apache NiFi. It thoroughly outlines the necessary prerequisites, step-by-step execution processes, and relevant information for each service, ensuring a smooth and easy understanding for the reader.

II. PRE-PREPARATIONS

1.Refer to “An Overview of Apache NiFi” article [2] for more information about Apach NiFi and its crictical concepts. [Optional]

2. Follow the steps of “Installation of Apache NiFi” article [3] to install Apache NiFi on your machine.

3. In the /opt/nifi directory, create the following subdirectories: drivers, python/scripts, python/logs, and sql/logs.

3.1 Go to /opt/nifi/ folder

cd /opt/nifi/

3.2 Create the needed directories using the following command

mkdir drivers && mkdir -p python/scripts && mkdir -p python/logs && mkdir -p sql/logs

4. Create a simple python script that only prints “HELLO WORLD!”

4.1 Ensure that the ‘nano’ text editor is installed on your system; if it is not, proceed with its installation

sudo apt-get install nano

4.2 Create a file named test.py and insert the following code:

nano /opt/nifi/python/scripts/test.py
print("HELLO WORLD!")

5. Download Microsoft JDBC Driver to enable the connection between Apache NiFi and SQL Server

5.1 Go to /opt/nifi/drivers directory.

cd /opt/nifi/drivers

5.2 Download the dbc-12.2.0.jre11.jar dependency

wget https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/12.2.0.jre11/mssql-jdbc-12.2.0.jre11.jar

6. Start Apache NiFi service.

nifi start

III. PREPARING APACHE NIFI CONTROLLER SERVICES

* DBCPConnectionPool

It is one of the controller services provided by Apache NiFi, used to establish a connection between Apache NiFi processors and the specified database [4]. Throughout this tutorial, the utilization of the DBCPConnectionPool for establishing a connection between Apache NiFi and MSSQL is extensively explained.

Follow the steps below to create and configure the DBCPConnectionPool controller service:

  1. Go to https://127.0.0.1:8443/nifi/ and sign in.
  2. Expand the Operate box, then, expand the ‘Operate’ panel, then click on the settings icon.

3. Alter the name of the Process Group, apply the changes, and navigate to the ‘Controller Services’ section.

4. Click on the ‘+’ symbol on the right, enter ‘DBCPConnectionPool’ in the search box, select it, and then click the ‘Add’ button below.

5. Click on the settings symbol of the DBCPConnectionPool, navigate to the ‘SETTINGS’ section and rename it to ‘MSSQLConnection’, then proceed to the ‘PROPERTIES’ section.

6. Adjust the following parameters as specified below

Database Connection URL:
jdbc:sqlserver://db_ip;port=1433;database=db_name;encrypt=true;trustServerCertificate=true

Note: Substitute ‘db_ip’ and ‘db_name’ with the host IP of the database and the database name, respectively.

— Database Driver Class Name:
com.microsoft.sqlserver.jdbc.SQLServerDriver

Database Driver Location(s):
/opt/nifi/drivers/mssql-jdbc-12.2.0.jre11.jar

Database User:
Change it with the username that is able to connect to the database.
your_username

Password:
Change it with the password of the username above.
your_password

7. Activate the created DBCPConnectionPool service by clicking on the ‘Enable’ icon, then click on the ‘ENABLE’ button.

It should execute without encountering any errors, as shown below.

* CSVRecordSetWriter

Another important Controller Service is CSVRecordSetWriter. As its name suggests, this service is responsible for enabiling the dataflow to write output files in CSV format [5].

During this tutorial, the usage of CSVRecordSetWriter is discussed step-by-step. The CSVRecordSetWriter Controller Service can be added by following the steps below:

  1. After navegating to the settings of Operator and clicking on the ‘Controller Services’ section, click on the ‘+’ symbol on the right, type ‘CSVRecordSetWriter’ into the search box, select it, and then click on the ‘Add’ button below.

2. Activate the service by clicking on the ‘Enable’ icon, then click on the ‘ENABLE’ button.

It is expected to run without encountering any errors, as illustrated below.

IV. APACHE NIFI PROCESSORS

This section provides detailed explanations of six key Apache NiFi processors which are ExecuteSQL, ExecuteSQLRecord, PutFile, UpdateAttribute, LOGAttribute, LOGMessage. All the processors mentioned, except for ExecuteSQL, are utilized in the case study section.

  • ExecuteSQL

It is a processor used in Apache NiFi to execute SQL queries. The obtained result can only be in the specified format from Apache NiFi itself, which is Avro format [6]. Therefore, this type of processor is usually preferred when the output file format is not critical.

  • ExecuteSQLRecord

Similar to ExecuteSQL, it can be considered a method used to execute SQL queries. Nevertheless, this method provides more flexibility, as the output file format, such as CSV, Parquet, TXT, etc., can be specified by the user [7].

  • PutFile

It is a processor capable of writing the contents of a FlowFile to a local file directory [8].

  • UpdateAttribute

It offers DataFlow Managers (DFMs) enhanced control over the FlowFile, allowing for further modifications to the flow, such as renaming the output file. Additionally, it enables the addition or deletion of various attributes based on regular expressions [9].

  • LOGAttribute:

This is a logging method used to capture general information about the process. It supports various logging levels, such as info, warn, error, and debug. When set to the info level, it records both the FlowFile properties and the FlowFile Attribute Map Content [10]. In the first section, the execution start datetime and output file size are displayed. In the other section, details such as the executed command type, executed command argument, and output log file name are included.

  • LOGMessage

Similar to LOGAttribute, this can be considered a logging method. However, it is used specifically to display custom log messages defined by the user [11]. For example, at the end of the flow, print ‘Process is successfully done.’

V. CASE STUDIES

In this section, two distinct explanations related to the execution of Python scripts and SQL queries are intended to be significantly displayed along with their respective output logs.

Executing Python Scripts

In this particular subsection, a detailed explanation of how Python scripts are executed within Apache NiFi, covering the necessary configurations, step-by-step procedures, is supposed to be provided.

The necessary steps to complete the process are outlined as follows:

  1. Add ExecuteProcess Processor to the canvas:

1.1 Drag and drop one Processor from Components Toolbar onto the canvas.

1.2 Search for “ExecuteProcess” processor, then add it.

1.3 Double-click on the processor to open its configuration, then go to “PROPERTIES” section, and fill the information as in the Figure, below.

Command: python3
Command Arguments: /opt/nifi/python/scripts/test.py
Working Directory: /opt/nifi/python/scripts

It’s crucial to understand that the command python3 may alternatively be python, depending on the method you employ to execute Python scripts on your system

2. Add UpdateAttribute Processor.
2.1. Repeat the steps outlined in 1. However, this time search for the ‘UpdateAttribute’ processor and proceed to add it to your workflow

2.2 Double-click on the processor to open its configuration, then go to “PROPERTIES” section, add new property and name it as filename

2.3 Modify the ‘filename’ property and assign it the value ‘test_script.log’, which will capture the output of the ‘test.py’ script specified in the ExecuteProcess processor.

3. Establish a connection between the previously created ExecuteProcessand UpdateAttribute processors.

4. Add the PutFile processor to the canvas and establish a connection between it and the preceding UpdateAttribute processor.

4.1 Double-click on the processor to open its configuration, and change the “Directory” property to:

Directory: /opt/nifi/python/logs

5. Add LogAttribute processor to the canvas, and make a connection between it and the previousPutFile processor.

5.1 Double-click on the processor to open its configuration, then go to POPERTIES section and chnage Log prefix to any prefix you prefer.

Log prefix: — Logging the Execution of Python Scripts —

6. Add LogMessage processor to the canvas, and make a connection between it and the previousLogAttribute processor.

6.1 Double-click on the processor to open its configuration, then go to “POPERTIES” section and chnage Log prefix to any prefix you prefer. This will print a private message in the log file.

Log prefix: \n — Executing Python script is successfully done — \n

7. Navigate to its “RELATIONSHIP” section and check “terminate” box so that it terminates when it is done.

8. To change the color of each processor, click on the processor, then select the Brush icon, choose a color, and click “APPLY”.

9. Add a label for each processor to provide a brief explanation above each one.

The final flow should be as the one depicted in the figure below.

10. To observe the progress, run each processor only once by right-clicking on each processor and selecting “Run Once”.

Each time, refresh the canvas by right-clicking on it and then clicking the “Refresh” button. The queue section of the connection between each processor should display a number indicating the queued data, which should be 1.

To view the output of the ExecuteProcess, which is executing a Python script in this case, navigate to the test_script.log file located in the /opt/nifi/python/logs directory.

To see the output of both LogAttribute and LogMessage processors, navigate to nifi-app.log file under /opt/nifi/logs directory.

Executing SQL Quries

The execution of SQL queries in Apache NiFi can be accomplished using the ExecuteSQLRecord processor. In this case, the same architecture used in the Executing Python Scripts subsection will be applied here. However, the ExecuteSQLRecord processor will be used instead of the ExecuteProcess processor.

Therefore, drag and drop a processor onto the canvas, search for ExecuteSQLRecord, and add it to the canvas.

Then, double-click on the processor to open its configuration, then go to properties section, and fill the information as follows:

  • Database Connection Pooling Service: MSSQLConnection
    Note: MSSQLConnection controller serviceis prepared in III.DBCPConnectionPool subsection
  • SQL select query: <your query>
  • Record Writer: CSVRecordSetWriter
    Note: CSVRecordSetWritercontroller serviceis prepared in III.CSVRecordSetWriter subsection.

Furthermore, adjust the configurations of the subsequent processors as detailed below:

  • UpdateAttribute > filename: sql_query.log
  • PutFile > Directory: /opt/nifi/sql/logs
  • LogAttribute > Log perfix: -Logging the Execution of SQL Query-
  • LogMessage > Log Message: ************** LOG MESSAGE **************
  • LogMessage > Log perfix: Executing SQL Query is successfully done

Eventually, the final flow must be as in the figure, below.

To see the output of ExecuteSQLRecord processor, navigate to sql_query.log file under /opt/nifi/sql/logs directory.

To see the output of both LogAttribute and LogMessage processors, navigate to nifi-app.log file under /opt/nifi/logs directory.

VI. CONCLUSION

In summary, this article provides a thorough and comprehensive guide on executing Python scripts and SQL queries within Apache NiFi. It meticulously details the execution processes, delves into the prerequisites, and explores logging methods. This detailed guide ensures that readers can easily grasp the process of capturing and recording data in Apache NiFi, making it an invaluable resource for both beginners and experienced DFMs in the field.

VII. REFERENCES

[1] Saltz, J., Armour, F., & Sharda, R. (2018). Data science roles and the types of data science programs. Communications of the Association for Information Systems, 43(1), 33.

[2] Alkhanafseh, Y. (2024). An Overview of Apache NiFi. Accessed on [17.03.2024]. Retrieved from: https://medium.com/@alkhanafseh/an-overview-of-apache-nifi-44819186674c

[3] Alkhanafseh, Y. (2024). Installation of Apache NiFi. Accessed on [17.03.2024]. Retrieved from: https://medium.com/@alkhanafseh/installation-of-apache-nifi-2856fca5bbdc

[4] Apache NiFi (n.d.). DBCPConnectionPool. Accessed on [21.03.2024]. Retrieved from: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-dbcp-service-nar/1.24.0/org.apache.nifi.dbcp.DBCPConnectionPool/index.html

[5] Apache NiFi (n.d.). CSVRecordSetWriter. Accessed on [21.03.2024]. Retrieved from: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-record-serialization-services-nar/1.23.2/org.apache.nifi.csv.CSVRecordSetWriter/index.html

[6] Apache NiFi (n.d.). ExecuteSQL. Accessed on [22.03.2024]. Retrieved from: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.5.0/org.apache.nifi.processors.standard.ExecuteSQL/index.html

[7] Apache NiFi (n.d.). ExecuteSQLRecord. Accessed on [23.03.2024]. Retrieved from: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.23.2/org.apache.nifi.processors.standard.ExecuteSQLRecord/index.html

[8] Apache NiFi (n.d.). PutFile. Accessed on [23.03.2024]. Retrieved from: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.23.2/org.apache.nifi.processors.standard.PutFile/index.html

[9] Apache NiFi (n.d.). UpdateAttribute. Accessed on [23.03.2024]. Retrieved from: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-update-attribute-nar/1.24.0/org.apache.nifi.processors.attributes.UpdateAttribute/index.html

[10] Apache NiFi (n.d.). LogAttribute. Accessed on [24.03.2024]. Retrieved from: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.23.2/org.apache.nifi.processors.standard.LogAttribute/index.html

[11] Apache NiFi (n.d.). LogMessage. Accessed on [24.03.2024]. Retrieved from: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.11.4/org.apache.nifi.processors.standard.LogMessage/

--

--