Building an Effective NiFi Flow — RouteText

Mark Payne
Cloudera
Published in
12 min readDec 20, 2022

I’ve spoken a lot lately about record-oriented Processors like QueryRecord and PartitionRecord. They allow for great flexibility and power while offering fantastic performance. But sometimes we have to deal with data that’s not record-oriented. While JSON is incredibly popular and has a lot of upsides, sometimes we get to work with a data format that’s far more “fun .” Maybe we receive some weird hybrid of CSV and JSON. Or maybe we get something that looks like this:

<4>Name<3>DOB<7>Address<15>Account Balance
<8>John Doe<10>01/01/2000<13>123 My Street<7>1520.29
<8>Jane Doe<9>1/14/1970<16>3322 Your Street<8>14232.44
<9>Jacob Doe<10>02/04/2008<16>3322 Your Street<0>

We might decide that the producer of this data is just going to be on the hook to update it to something reasonable and modern like JSON or Avro. And that might be the right call. But in the real world, we don’t always have that luxury. This kind of data already exists, and we don’t have the power to change it. It’s out there, and sometimes we have to deal with it, whether we like it or not.

And to be honest, this reality was a large driver behind why we designed NiFi the way we did. Many applications and services tend to assume that data fits into a particular box. That data is well-formed and is in a common format such as CSV, JSON, or Avro. But for most large enterprises, it’s simply not the case that all of our data meets all the constraints we’d like it to.

With NiFi, we want to handle all of your data — binary data, textual data, structured or unstructured data, and everything in between. So, we have several options for dealing with this kind of data, depending on what we need to do with it.

We could create our own custom Record Reader in Java. Or we could use a Scripted Record Reader using Groovy or Python. But then we have to have the skills to write that, and we have to test, debug, and maintain it. And if we want to preserve the format, we’d need a Record Writer for it, too.

But we can also deal with this data as simply raw text. RouteText is an incredibly powerful processor for dealing with raw text.

What Is it?

The RouteText Processor allows us to easily route lines of text to a specific relationship, without breaking the data apart into individual lines. This allows the Processor to deliver exceptional performance.

Additionally, it allows us to group together lines of text and separate those lines from other lines that are not like them. Effectively, this grouping gives us much the same capability as PartitionRecord but for raw textual data.

The Processor offers several different options for determining whether or not a line of text matches your criteria. You can check if a line starts with some value, or ends with some value. If it contains some value or is exactly equal to some text. If it matches a Regular Expression or if it contains text that matches some Regular Expression.

But probably the most powerful mechanism that it provides is the ability to use the NiFi Expression Language to evaluate a line. This gives us the ability to harness the full power of the Expression Language, which can handle encoding/decoding text, escaping/unescaping, parsing dates, and any number of other powerful capabilities.

When / Why to Use It

RouteText has three main use cases. Firstly, it provides the ability to route individual lines of text to the appropriate relationship. Or relationships. In much the same way that QueryRecord gives us the ability to have a single stream of records come in and fork off many sub-streams, RouteText gives us the ability to have a single stream of text come in and fork off many sub-streams.

Secondly, it gives us the ability to filter out specific lines that we don’t care about. Why would a Processor designed for routing text be used to filter out text? Well, you can think of it as routing particular lines of text to /dev/null while everything else is routed elsewhere. For example, we could filter out any line that starts with # or filter out the first 4 lines in the FlowFile.

Finally, the Processor gives us the ability to partition the text, or group lines together. And why would we want to do that? Well, for much the same reason that we’d want to partition record-oriented data using PartitionRecord.

How to Use It — “Strategy” Properties

The PartitionRecord offers a handful of properties that can be used to configure it. But two of them are the most important.

Routing Strategy

First, let’s take a look at the “Routing Strategy” property. The Processor behaves very differently based on the value of this property. If the value is set “Route to each matching Property Name” the Processor will route the data in the same manner as QueryRecord. Each property that the user adds has a property name that becomes the name of a new Relationship. This allows us to fork arbitrary streams of data from a single stream.

But what if we want to supply multiple matching criteria? For example, maybe we want to keep data that starts with abc and ends with xyz. For this, we can set the property to “Route to ‘matched’ if line matches all conditions.” In this case, the Processor will have only two relationships — matched and unmatched, regardless of how many properties are added. A line of text will go to matched only if all conditions are met. Essentially, this is an AND of all conditions.

Similarly, we can choose only lines of text that match one of the criteria. All lines that starts with abc OR end with xyz. For this, we set the “Routing Strategy” to “Route to ‘matched’ if line matches any condition.”

It’s worth noting that the “Route to each matching Property Name” means each line of text will go to zero, one, or more Relationships. With “Route to ‘matched’ if line matches all conditions” and “Route to ‘matched’ if line matches any condition,” each line of text will go to exactly once Relationship.

Matching Strategy

The Processor has no specific property for the “Match Criteria.” Rather, it is up to the user to add one or more properties that specify whether or not a line of text matches their criteria. But if the user specifies a value of abc how should the Processor interpret that? Does it mean the line must begin with abc? Or does it need to end with abc in order to match? Or maybe just contain abc somewhere within the the line?

The “Matching Strategy” property determines this. Most of the values are pretty self-explanatory. But the “Satisfies Expression” value is particularly interesting. When this value is chosen, each user-defined property must be a valid NiFi Expression Language expression. And the Expression must return a boolean value.

Normally, Expression Language is used to reference FlowFile Attributes (or variables). But of course there is no FlowFile attribute that is specific to each line of text. This Processor, however, exposes two special variables that are available within Expression Language: line and lineNo. The line variable represents the actual line of text to be evaluated. The lineNo variable indicates the line number of that text in the FlowFile. For example, the lineNo will be 1 for the first line of text in the FlowFile, 2 for the second line of text, and so on. These two special variables that are available only in the context of this Processor make for a very powerful routing capability.

How to Use It — Fork Multiple Streams

The first use case that I discussed above was the ability to fork a single stream of text into multiple streams. Each line of incoming data will be sent to zero or more outbound FlowFiles. Given the text above, let’s consider a few rules that we might want to create:

  • Any account balance over 10,000 goes to a Relationship named large.balance
  • Any line showing no account balance goes to a Relationship named no.balance
  • Any line showing an account whose owner’s date of birth is between 2002 goes to a Relationship named adult.

Since we want to fork multiple streams, we know that the “Routing Strategy” property will be set to “Route to each matching Property Name” as described above.

We’ll also need to add three user-defined properties, one for each of these rules. But what will the values of those properties be? Well, we have options.

The first option is to set the “Matching Strategy” property “Matches Regular Expression” and then supply a Regular Expression for each of those rules. That might look like this:

large.balance = <\d+>.*?<\d+>.*?<\d+>.*?<\d+>\d{5,}\.\d{2}

no.balance = <\d+>.*?<\d+>.*?<\d+>.*?<0>

adult = <\d+>.*?<\d+>\d{1,2}/\d{1,2}/(19\d\d|2000|2001).*

Configured in the Processor, it would look like this:

And if you give it a try, you’ll find that it does indeed work! The results show:

large.balance produces the text: <8>Jane Doe<9>1/14/1970<16>3322 Your Street<8>14232.44

no.balance produces the text: <9>Jacob Doe<10>02/04/2008<16>3322 Your Street<0>

And adult produces the text:

<8>John Doe<10>01/01/2000<13>123 My Street<7>1520.29
<8>Jane Doe<9>1/14/1970<16>3322 Your Street<8>14232.44

… But let’s not kid ourselves into thinking that this is a great answer. It’s convoluted and confusing, to say the least. Unfortunately, as powerful as Regular Expressions are, they are not simple and they are not easy to read or maintain. But given that we’re dealing with parsing raw text, it might be the best option. This is one of the big benefits of record-based processors: they make a lot of this much simpler.

We can, however, make use of Expression Language and see if that will allow us to simplify things. Firstly, let’s look at the large.balance rule. If we know that there will be no > character in our data, we can actually simplify things quite a lot by using the getDelimitedField function:

${line:getDelimitedField(5, '>'):gt(10000)}

This allows us to treat the > field as a delimiter and get the 5th field, the account balance. Then we compare it against a value of 10,000. The no.balance rule becomes quite trivial as well, since it’s the last column:

${line:endsWith("<0>")}

Then we just need to consider the adult rule. We could again use Regular Expressions, using the matches function. But if we are confident that our data won’t have slashes other than the date of birth, we can do something much simpler:

${line:substringAfterLast(‘/’):substring(0, 4):lt(2002)}

This yields a much simpler configuration, but produces the exact same results:

How to Use It — Filter Lines Based on Text

The good news is that we started with the most complex of the examples.

Let’s take a look at something a bit simpler: filtering out lines of text that we don’t care about. This may be a bit less obvious of a use case. The Processor offers the ability to select which lines should go to some relationship. So it’s easy to think of it as “which lines do we want to keep?” But often that’s a lot harder to answer than “which lines do we want to get rid of?”

So let’s say that we want to strip out all lines that start with a # sign. How do we tell the Processor to do that? The simplest way is to set the “Matching Strategy” to “Starts with” and add a user-defined property whose value is #. Then we’ll set the Routing Strategy to “Route to ‘matched’ if line matches all conditions.”

Now, looking at this, you might think that it will keep only the lines that start with # — and that’s exactly the opposite of what we want!

The trick here is to auto-terminate the “matched” relationship instead of the “unmatched” relationship. Anything that doesn’t match our criteria (I.e., anything that doesn’t start with a # ) will be routed to “unmatched” and that’s what we’ll send to the next Processor:

Since we are auto-terminated the “matched” relationship we’re essentially selecting the things that go to /dev/null , or selecting the things to be removed. All other lines as passed along.

How to Use It — Filter Lines Based on Line Number

By using the same approach, we can also filter lines based on their line number. For example, we might want to strip out the first line of every FlowFile. To do this we’ll keep the Routing Strategy as “Route to ‘matched’ if line matches all conditions” but we will set the Matching Strategy to “Satisfies Expression.”

We’ll now add a property for our Expression Language Expression that indicates that the line should match if its line number is 1:

How to Use It — Partition / Group Text

So now we’ve seen how to filter out lines of text based on their content, we’ve seen how to filter lines of text based on their line number, and we’ve seen how to fork off multiple streams of data. But there’s one more important feature of the RouteText Processor — the ability to group data together based on their content.

In fact, just a couple of days ago, I had a user asking how they could parse a “CSV file” that had no header and had a different number of columns on each line. They had tried different CSV-based processors and record-based processors using a CSV Reader, but they couldn’t find anything that would work. They gave an example of this data:

RecType1|property|20221203|20221204
RecType2|12|RecType1.com|1234|XSF123|property1|20221203
RecType3|def.com
RecType4|20221206|67893
RecType1|property|20221130|20221201
RecType3|xyz.com

Now, it’s not surprising that this user struggled with CSV-based processors and record readers — the data is not CSV. Often time, we tend to think of CSV as just delimited textual data. But CSV does have a specification (and a lot of variations on that specification).

But what we have here is just raw textual data. The ask was to separate this data into separate FlowFiles, so that each FlowFile contained “like data.” That is, one FlowFile would have data where the first “column” was RecType1. The next would have data where the first column was RecType2 and so on. So the desired output was for FlowFile 1 to contain

RecType1|property|20221203|20221204
RecType1|property|20221130|20221201

FlowFile 2 should contain

RecType2|12|RecType1.com|1234|XSF123|property1|20221203

FlowFile 3 should contain

RecType3|def.com
RecType3|xyz.com

And the last FlowFile would contain

RecType4|20221206|67893

The interesting thing here is that there was no intent to filter data out or to route one line of text to one Relationship and another line to another Relationship. All of the data should go to the same Relationship. It just should be broken up — or grouped together — by the first field.

To solve this, we used RouteText. We configured the Routing Strategy to “Route to ‘matched’ if line matches any condition” and we configured the Matching Strategy to “Contains.” We then added a user-defined property with a value of | because we knew that every line would contain the pipe character (or if it didn’t, it was malformed data and we were okay throwing it away).

But the trick was to group the data by the first column. To do this, we can configure the “Grouping Regular Expression” property. The Regular Expression we wanted was to consume everything before the first pipe character.

So we used (.*?)\|.* That is — consume anything that is necessary, but as little as possible (that’s what .*? tells us), followed by a pipe (|) and then anything else. Also note the parentheses around the .*? — that tells us that it’s a capturing group. So whatever value the .*? matches will be the value that we group on. Or, said another way, we will group on whatever value comes before the first pipe (|) character.

This gives us exactly what we want. The first line will belong to the RecType1 group, while the second belongs to the RecType2 group and so on. And, of course, if we want to route the data later based on the group, there is an attribute added to the FlowFile with the name RouteText.Group .

Conclusion

If you’ve read any of my material, you’ll probably notice that I harp on use record-oriented Processors. There are two main reasons for this: they are extremely flexible and powerful, and they tend to offer great performance because they keep many tiny records bunched together into larger FlowFiles (NiFi generally tends to perform most optimally when FlowFiles are within the range of 300 KB to 3 MB or so each).

But sometimes it’s not possible to use record-based Processors because we have to deal with data that doesn’t conform to any of the necessary specs. But it’s still important to ensure that we’re not breaking that data apart into millions of tiny FlowFiles, as doing so will drop performance quickly.

The RouteText Processor gives us a very powerful, flexible, and easy-to-use mechanism for dealing with raw text data. It allows us to filter out text that we don’t care about, or specify what text we do care about. It allows us to route individual lines based on their contents, or where they are in the FlowFile (or both!). And it gives us the ability to separate out groups of text that don’t belong together while still ensuring that we’re able to keep together the text that does belong together.

All of this results in a powerful Processor that’s easy to understand and maintain, while still delivering on performance.

See Also

Previous: PartitionRecord

Next: ReplaceText

--

--