XML data processing- using Apache NiFi
Introduction
Apache NiFi is a platform that solves the real-time challenges of acquisition, enrichment & transporting data from a multitude of sources and designed to accommodate the diverse dataflow generated by systems & things. Apache NiFi has matured tremendously in the past few years & used widely by organizations to power their data integration needs.
This post will cover how best to use Apache NiFi to perform Extract, Transform, Load (ETL) types of tasks and its ability of processing XML & JSON formatted data based on my recent real-time requirement (both in On-Prem & Cloud based installations) development experience. The example developed here was built against Apache NiFi 1.11.4
Example Use case
Using Apache NiFi, we need to parse the XML data and split into multiple target entities. More details related to our use case:
1. Source file will be having a bunch of XML records (different XML structure records in the same file) related to students, marks scored in each subject and their hobbies.
2. Except Student profile element, remaining elements (marks & hobbies) are optional elements.
3. For this example, I have considered 2 different structures of XML records (in my real time requirement, 3 complex XML format types were handled).
a. XML record with “GroupID” element (Group to which students belongs to) and along with other elements information (Student profile, marks & hobbies).
b. XML record of individual student record (without “GroupID” element).
4. Requirement is to parse the XML data and load it into Relational model hive tables using Apache NiFi
a. Student_info
b. Student_marks
c. Student_hobbies
5. In case of no values for elements “marks” & “hobbies” then no need to insert any record to the respective tables.
6. For every unique values available in elements “marks” & “hobbies” a record needs to be inserted into the respective tables.
Below are the sample XML record types:
How pipeline works?
1. For demo purpose, I’ve used “Generate Flowfile” processor to generate sample flowfile with XML records.
2. As a first step, We need to separate the different XML record structure records. So that we can perform applicable record type specific transformations. To achieve this, “QueryRecord” processor is used.
2(a). Considered every XML record as a single column. Schema is defined as below in the “UpdateAttribute” processor (2nd step in pipeline).
2(b). Using the below logic in QueryRecord processor, XML record types are separated.
2(c). ExecuteStreamCommand is used to add the first line of the flowfile as “<root>” and the last line of the flow file as “</root>”. This is to let “JoltTransformRecord” processor know that the incoming flowfile will be array of XML records.
2(d). Idea is to use “JoltTransformRecord” processor to convert from XML to JSON. For that, XML read schema and the expected JSON format schema is defined in the “UpdateAttribute” processor available after “ExecuteStreamCommand”.
Avro schema for reading the XML record type 2
{“type”:”record”,”name”:”xmltype2",”namespace”:”org.apache.nifi”,”fields”:[{“name”:”student”,”type”:[{“type”:”record”,”name”:”studentType”,”fields”:[{“name”:”field”,”type”:{“type”:”array”,”items”:{“type”:”record”,”name”:”fieldType”,”fields”:[{“name”:”name”,”type”:”string”},{“name”:”value”,”type”:”string”}]}}}]}]},{“name”:”marks”,”type”:[“null”,{“type”:”record”,”name”:”marksType”,”fields”:[{“name”:”property”,”type”:[“null”,{“type”:”array”,”items”:{“type”:”record”,”name”:”propertyType”,”fields”:[{“name”:”name”,”type”:[“null”,”string”]},{“name”:”value”,”type”:[“string”,”null”]}]}}]}]}]},{“name”:”hobbies”,”type”:[“null”,{“type”:”record”,”name”:”hobbiesType”,”fields”:[{“name”:”activity”,”type”:[“null”,{“type”:”array”,”items”:{“type”:”record”,”name”:”activityType”,”fields”:[{“name”:”name”,”type”:[“null”,”string”]},{“name”:”isfavourite”,”type”:[“string”,”null”]},{“name”:”Type”,”type”:[“null”,”string”]}]}}]}]}]}]}
Avro schema for expected JSON format output for XML record type 2
{“name”:”xmltype2_transform”,”type”:”record”,”namespace”:”com.acme.avro”,”fields”:[{“name”:”student_items”,”type”:{“name”:”student_items”,”type”:”record”,”fields”:[{“name”:”studentID”,”type”:[“string”,”null”]},{“name”:”FirstName”,”type”:[“string”,”null”]},{“name”:”LastName”,”type”:[“string”,”null”]},{“name”:”Gender”,”type”:[“string”,”null”]}]}},{“name”:”marks”,”type”:[“null”,{“type”:”record”,”name”:”marksType”,”fields”:[{“name”:”property”,”type”:[“null”,{“type”:”array”,”items”:{“type”:”record”,”name”:”propertyType”,”fields”:[{“name”:”name”,”type”:[“null”,”string”]},{“name”:”value”,”type”:[“string”,”null”]}]}}]}]}]},{“name”:”hobbies”,”type”:[“null”,{“type”:”record”,”name”:”hobbiesType”,”fields”:[{“name”:”activity”,”type”:[“null”,{“type”:”array”,”items”:{“type”:”record”,”name”:”activityType”,”fields”:[{“name”:”name”,”type”:[“null”,”string”]},{“name”:”isfavourite”,”type”:[“string”,”null”]},{“name”:”Type”,”type”:[“null”,”string”]}]}}]}]}]}]}
2(e). Below is the JoltTransformRecord settings & the JOLT spec used to convert the “student” element section to “<Key>Value</Key>” format using “shift” operation.
2(f). Below will be the output after the JoltTransformRecord step is executed. XML will be converted into the expected JSON format.
2(g). Now we need to flatten the JSON record & extract the “student”, “marks” & “hobbies” elements separately to load the target hive table(s). Using QueryRecord processor this can be achieved. In the QueryRecord processor, “Rpath()”calcite function is used.
After the execution of “QueryRecord” processor step, below will be the output in the flowfile belongs to “student” element of XML structure type 1 record.
Below is the output in the flowfile belongs to “marks” element of XML structure type 1 record. Subject & Marks will be displayed as Array values.
2(h). QueryRecord processor relationships will be mapped to the respective Hive table load process group as below (in my case further transformation is performed inside the individual table process groups).
2(i). To further flatten the below, we can make use of “ForkRecord” processor but in my case, I have used Custom Groovy script through “ExecuteGroovyScript” processor as I have performed few more transformations using Groovy.
Below is the Groovy code:
2(j). Now the above records can be loaded as a file into HDFS path & similar steps can be followed to load other external Hive tables HDFS path.
Conclusion
Here, I have given a small glimpse & just scratched the surface of the power that Apache NiFi provides to us in ETL use case but it has more advanced features.
I would love to hear any questions, suggestions, comments, or feedback that you may have!