Sriram Venkatraman
5 min readOct 7, 2020

--

XML data processing- using Apache NiFi

Introduction

Apache NiFi is a platform that solves the real-time challenges of acquisition, enrichment & transporting data from a multitude of sources and designed to accommodate the diverse dataflow generated by systems & things. Apache NiFi has matured tremendously in the past few years & used widely by organizations to power their data integration needs.

This post will cover how best to use Apache NiFi to perform Extract, Transform, Load (ETL) types of tasks and its ability of processing XML & JSON formatted data based on my recent real-time requirement (both in On-Prem & Cloud based installations) development experience. The example developed here was built against Apache NiFi 1.11.4

Example Use case

Using Apache NiFi, we need to parse the XML data and split into multiple target entities. More details related to our use case:

1. Source file will be having a bunch of XML records (different XML structure records in the same file) related to students, marks scored in each subject and their hobbies.

2. Except Student profile element, remaining elements (marks & hobbies) are optional elements.

3. For this example, I have considered 2 different structures of XML records (in my real time requirement, 3 complex XML format types were handled).

a. XML record with “GroupID” element (Group to which students belongs to) and along with other elements information (Student profile, marks & hobbies).

b. XML record of individual student record (without “GroupID” element).

4. Requirement is to parse the XML data and load it into Relational model hive tables using Apache NiFi

a. Student_info

b. Student_marks

c. Student_hobbies

5. In case of no values for elements “marks” & “hobbies” then no need to insert any record to the respective tables.

6. For every unique values available in elements “marks” & “hobbies” a record needs to be inserted into the respective tables.

Below are the sample XML record types:

Sample XML record type 1
Sample XML record type 2
Sample source data file with bunch of XML records
Target Hive Data model
NiFi Data pipeline

How pipeline works?

1. For demo purpose, I’ve used “Generate Flowfile” processor to generate sample flowfile with XML records.

2. As a first step, We need to separate the different XML record structure records. So that we can perform applicable record type specific transformations. To achieve this, “QueryRecord” processor is used.

2(a). Considered every XML record as a single column. Schema is defined as below in the “UpdateAttribute” processor (2nd step in pipeline).

AVRO schema definition

2(b). Using the below logic in QueryRecord processor, XML record types are separated.

Query record output branches
QueryRecord Processor property settings
SQL query to separate the XML type 1 records
SQL query to separate the XML type 2 records

2(c). ExecuteStreamCommand is used to add the first line of the flowfile as “<root>” and the last line of the flow file as “</root>”. This is to let “JoltTransformRecord” processor know that the incoming flowfile will be array of XML records.

ExecuteStreamCommand Processor property settings

2(d). Idea is to use “JoltTransformRecord” processor to convert from XML to JSON. For that, XML read schema and the expected JSON format schema is defined in the “UpdateAttribute” processor available after “ExecuteStreamCommand”.

Avro schema for reading the XML record type 2

{“type”:”record”,”name”:”xmltype2",”namespace”:”org.apache.nifi”,”fields”:[{“name”:”student”,”type”:[{“type”:”record”,”name”:”studentType”,”fields”:[{“name”:”field”,”type”:{“type”:”array”,”items”:{“type”:”record”,”name”:”fieldType”,”fields”:[{“name”:”name”,”type”:”string”},{“name”:”value”,”type”:”string”}]}}}]}]},{“name”:”marks”,”type”:[“null”,{“type”:”record”,”name”:”marksType”,”fields”:[{“name”:”property”,”type”:[“null”,{“type”:”array”,”items”:{“type”:”record”,”name”:”propertyType”,”fields”:[{“name”:”name”,”type”:[“null”,”string”]},{“name”:”value”,”type”:[“string”,”null”]}]}}]}]}]},{“name”:”hobbies”,”type”:[“null”,{“type”:”record”,”name”:”hobbiesType”,”fields”:[{“name”:”activity”,”type”:[“null”,{“type”:”array”,”items”:{“type”:”record”,”name”:”activityType”,”fields”:[{“name”:”name”,”type”:[“null”,”string”]},{“name”:”isfavourite”,”type”:[“string”,”null”]},{“name”:”Type”,”type”:[“null”,”string”]}]}}]}]}]}]}

Avro schema for expected JSON format output for XML record type 2

{“name”:”xmltype2_transform”,”type”:”record”,”namespace”:”com.acme.avro”,”fields”:[{“name”:”student_items”,”type”:{“name”:”student_items”,”type”:”record”,”fields”:[{“name”:”studentID”,”type”:[“string”,”null”]},{“name”:”FirstName”,”type”:[“string”,”null”]},{“name”:”LastName”,”type”:[“string”,”null”]},{“name”:”Gender”,”type”:[“string”,”null”]}]}},{“name”:”marks”,”type”:[“null”,{“type”:”record”,”name”:”marksType”,”fields”:[{“name”:”property”,”type”:[“null”,{“type”:”array”,”items”:{“type”:”record”,”name”:”propertyType”,”fields”:[{“name”:”name”,”type”:[“null”,”string”]},{“name”:”value”,”type”:[“string”,”null”]}]}}]}]}]},{“name”:”hobbies”,”type”:[“null”,{“type”:”record”,”name”:”hobbiesType”,”fields”:[{“name”:”activity”,”type”:[“null”,{“type”:”array”,”items”:{“type”:”record”,”name”:”activityType”,”fields”:[{“name”:”name”,”type”:[“null”,”string”]},{“name”:”isfavourite”,”type”:[“string”,”null”]},{“name”:”Type”,”type”:[“null”,”string”]}]}}]}]}]}]}

2(e). Below is the JoltTransformRecord settings & the JOLT spec used to convert the “student” element section to “<Key>Value</Key>” format using “shift” operation.

JoltTransformRecord processor property settings with JOLT spec

2(f). Below will be the output after the JoltTransformRecord step is executed. XML will be converted into the expected JSON format.

Expected JSON output

2(g). Now we need to flatten the JSON record & extract the “student”, “marks” & “hobbies” elements separately to load the target hive table(s). Using QueryRecord processor this can be achieved. In the QueryRecord processor, “Rpath()”calcite function is used.

QueryRecord processor settings with SQL query to extract “hobbies” element
SQL query to extract “student” element
SQL query to extract “marks” element

After the execution of “QueryRecord” processor step, below will be the output in the flowfile belongs to “student” element of XML structure type 1 record.

QueryRecord processor output for “student” query

Below is the output in the flowfile belongs to “marks” element of XML structure type 1 record. Subject & Marks will be displayed as Array values.

QueryRecord processor output for “marks” query

2(h). QueryRecord processor relationships will be mapped to the respective Hive table load process group as below (in my case further transformation is performed inside the individual table process groups).

routing of QueryRecord relationships to the respective Target HDFS load Processgroup

2(i). To further flatten the below, we can make use of “ForkRecord” processor but in my case, I have used Custom Groovy script through “ExecuteGroovyScript” processor as I have performed few more transformations using Groovy.

Below is the Groovy code:

Considered each Array of values columns as matrix, used transpose() function in groovy to merge the arrays to generate column to row
Output before Groovy script execution
Output after Groovy script execution

2(j). Now the above records can be loaded as a file into HDFS path & similar steps can be followed to load other external Hive tables HDFS path.

Conclusion

Here, I have given a small glimpse & just scratched the surface of the power that Apache NiFi provides to us in ETL use case but it has more advanced features.

I would love to hear any questions, suggestions, comments, or feedback that you may have!

--

--