Building an Effective NiFi Flow — PartitionRecord

Mark Payne
Cloudera
Published in
10 min readDec 12, 2022

Recently, I made the case for why QueryRecord is one of my favorite in the vast and growing arsenal of NiFi Processors. Part of the power of the QueryRecord Processor is its versatility. It can be used to filter data, transform it, and create many streams from a single incoming stream. All using the well-known ANSI SQL query language.

PartitionRecord works very differently than QueryRecord. But to a degree it can be used to create multiple streams from a single incoming stream, as well. It’s not as powerful as QueryRecord. But what it lacks in power it makes up for in performance and simplicity.

What Is it?

The PartitionRecord processor allows you to group together “like data.” We define what it means for two Records to be “like data” using RecordPath. RecordPath is a very simple syntax that is very much inspired by JSONPath and XPath.

So, if we have data representing a series of purchase order line items, we might want to group together data based on the customerId field. I.e., each outbound FlowFile would consist only of orders that have the same value for the customerId field. Or perhaps we’d want to group by the purchase date. Or the itemId. There are any number of ways we might want to group the data.

And once we’ve grouped the data, we get a FlowFile attribute added to the FlowFile that provides the value that was used to group the data. For example, we’d get an attribute named customerId with a value of 11111-11111 for the output FlowFile containing records for that customer.

So this Processor has a cardinality of “one in, many out.” But unlike QueryRecord, which may route a single record to many different output FlowFiles, PartitionRecord will route each record in the incoming FlowFile to exactly one outgoing FlowFile.

When / Why to Use It?

There are two main reasons for using the PartitionRecord Processor. Perhaps the most common reason is in order to route data according to a value in the record. This is achieved by pairing the PartitionRecord Processor with a RouteOnAttribute Processor.

The other reason for using this Processor is to group the data together for storage somewhere. For example, we may want to store a large amount of data in S3. In order to organize the data, we will store it using folders that are organized by date and time. For example, if the data has a timestamp of 3:34 PM on December 10, 2022 we want to store it in a folder named 2022/12/10/15 (i.e., the 15th hour of the 10th day of the 12th month of 2022). PartitionRecord allows us to achieve this easily by both partitioning/grouping the data by the timestamp (or in this case a portion of the timestamp, since we don’t want to partition all the way down to the millisecond) and also gives us that attribute that we need to configure our PutS3 Processor, telling it the storage location.

How to Use It

Like QueryRecord, PartitionRecord is a record-oriented Processor. Meaning you configure both a Record Reader and a Record Writer. If unclear on how record-oriented Processors work, take a moment to read through the “How to Use It — Setup” section of the previous post.

The Record Reader and Record Writer are the only two required properties. But we must also tell the Processor how to actually partition the data, using RecordPath.

To do this, we add one or more user-defined properties. The name of the property becomes the name of the FlowFile attribute that gets added to each FlowFile. The value of the property is a RecordPath expression that NiFi will evaluate against each Record. The result determines which group, or partition, the Record gets assigned to.

Let’s look at some examples.

How to Use It — Partition by Value

The simplest use case is to partition data based on the value of some field. So if we reuse the example from earlier, let’s consider that we have purchase order data. Let’s assume that the data is JSON and looks like this:

[{
"orderId": "1111-11111111",
"customerId": "2222-22222222",
"timestamp": "12/10/2022 09:35:02.281 UTC",
"items": [{
"itemId": "78201-22",
"name": "Keyboard",
"description": "An external keyboard",
"quantity": 1,
"price": 39.99
}, {
"itemId": "271-44",
"name": "Monitor",
"description": "A 19-inch monitor",
"quantity": 1,
"price": 98.74
}, {
"itemId": "7832-244",
"name": "Mouse",
"description": "A wireless mouse",
"quantity": 1,
"price": 19.50
}],
"itemCount": 3,
"orderTotal": 158.23
}, {
"orderId": "4444-44444444",
"customerId": "3333-33333333",
"timestamp": "12/10/2022 16:04:21.552 UTC",
"items": [{
"itemId": "2810-13",
"name": "Macbook Pro 13-inch",
"description": "Macbook Pro, 13-inch, base model",
"quantity": 1,
"price": 1299.00
}, {
"itemId": "78201-22",
"name": "Keyboard",
"description": "An external keyboard",
"quantity": 1,
"price": 39.99
}],
"itemCount": 2,
"orderTotal": 1338.99
}]

Consider a case in which we want to partition the data based on the customerId. To reference a particular field with RecordPath, we always start with a / to represent the root element. The customerId field is a top-level field, so we can refer to it simply by using /customerId.

Which gives us a configuration like this:

So what will this produce for us as output?

It will give us two FlowFiles. The first will have an attribute named customerId with a value of 2222–22222222 . Its contents will contain:

[ {
"orderId" : "1111-11111111",
"customerId" : "2222-22222222",
"timestamp" : "12/10/2022 09:35:02.281 UTC",
"items" : [ {
"itemId" : "78201-22",
"name" : "Keyboard",
"description" : "An external keyboard",
"quantity" : 1,
"price" : 39.99
}, {
"itemId" : "271-44",
"name" : "Monitor",
"description" : "A 19-inch monitor",
"quantity" : 1,
"price" : 98.74
}, {
"itemId" : "7832-244",
"name" : "Mouse",
"description" : "A wireless mouse",
"quantity" : 1,
"price" : 19.5
} ],
"itemCount" : 3,
"orderTotal" : 158.23
} ]

The second FlowFile will have an attribute named customerId with a value of 3333–33333333 and the contents:

[ {
"orderId" : "4444-44444444",
"customerId" : "3333-33333333",
"timestamp" : "12/10/2022 16:04:21.552 UTC",
"items" : [ {
"itemId" : "2810-13",
"name" : "Macbook Pro 13-inch",
"description" : "Macbook Pro, 13-inch, base model",
"quantity" : 1,
"price" : 1299.0
}, {
"itemId" : "78201-22",
"name" : "Keyboard",
"description" : "An external keyboard",
"quantity" : 1,
"price" : 39.99
} ],
"itemCount" : 2,
"orderTotal" : 1338.99
} ]

How to Use It — More Complex Expressions

Now, it can be super helpful to be able to partition data based purely on some value in the data. But sometimes doing so would really split the data up into a single Record per FlowFile. For example, what if we partitioned based on the timestamp field or the orderTotal field? Pretty much every record/order would get its own FlowFile because these values are rather unique. And we definitely, absolutely, unquestionably want to avoid splitting one FlowFile into a separate FlowFile per record! (If you don’t understand why it’s so important, I recommend checking out this YouTube video in the NiFi Anti-Pattern series. But TLDR: it dramatically increases the overhead on the NiFi framework and destroys performance.)

But what if we want to partition the data into groups based on whether or not it was a large order? For instance, we want to partition the data based on whether or not the total is more than $1,000.

The RecordPath language allows us to use many different functions and operators to evaluate the data. In this case, we’d want to compare the orderTotal field to a value of 1000.

We can use the expression:

/orderTotal > 1000

And the configuration would look like this:

And we can get more complex with our expressions.

Say we want to partition data based on whether or not the purchase time was before noon. We can use a Regular Expression to match against the timestamp field:

matchesRegex(/timestamp, '.*? (0\d|10|11)\:.*')

This regex basically tells us that we want to find any characters, followed by a space, followed by either a 0 and then any digit, or the number 10 or the number 11, followed by a colon and anything else. I.e., match anything for the date and only match the numbers 00–11 for the hour.

We can then add a property named morningPurchase with this value:

And this produces two FlowFiles. The first has a morningPurchase attribute with value true and contains the first record in our example, while the second has a value of false and contains the second record.

How to Use It — Partition by Multiple Expressions

The PartitionRecord processor allows configuring multiple expressions. Otherwise, the Processor would just have a specific property for the RecordPath Expression to use.

By allowing multiple values, we can partition the data such that each record is grouped only with other records that have the same value for all attributes. For example, let’s consider that we added both the of the above properties to our PartitionRecord Processor:

In this configuration, each FlowFile could be split into four outgoing FlowFiles. Those FlowFiles, then, would have the following attributes:

The first FlowFile, then, would contain only records that both were large orders and were ordered before noon. The second would contain any records that were large but did not occur before noon. The third would contain orders that were less than $1,000 but occurred before noon, while the last would contain only orders that were less than $1,000 and happened after noon.

Now, of course, in our example, we only have two top-level records in our FlowFile, so we will not receive four outbound FlowFiles. The Processor will not generate a FlowFile that has zero records in it. We receive two FlowFiles, with the first having attributes largeOrder of false and morningPurchase of true. The second has largeOrder of true and morningPurchase of false.

How to Use It — Routing Data

Now that we’ve examined how we can use RecordPath to group our data together, let’s look at an example of why we might want to do that. A very common use case is that we want to route all data that matches some criteria to one destination while all other data should go elsewhere.

For example, we might decide that we want to route all of our incoming data to a particular Kafka topic, depending on whether or not it’s a large purchase. All large purchases should go to the large-purchase Kafka topic. All other purchases should go to the smaller-purchase Kafka topic. But regardless, we want all of these records also going to the all-purchases topic. We can accomplish this in two ways.

Firstly, we can use RouteOnAttribute in order to route to the appropriate PublishKafkaRecord processor:

And our RouteOnAttribute Processor is configured simply as:

This makes use of the largeOrder attribute added by PartitionRecord. If that attribute exists and has a value of true then the FlowFile will be routed to the largeOrder relationship. Otherwise, it will be routed to the unmatched relationship. In any case, we are going to use the original relationship from PartitionRecord to send to a separate all-purchases topic.

Now, we could instead send the largeOrder data to some database or whatever we’d like. But because we are sending both the largeOrder and unmatched relationships to Kafka, but different topics, we can actually simplify this.

Rather than using RouteOnAttribute to route to the appropriate PublishKafkaRecord Processor, we can instead eliminate the RouteOnAttribute and send everything to a single PublishKafkaRecord Processor. Then, instead of explicitly specifying the topic to send to as large-purchases or smaller-purchases we can use Expression Language to determine which topic it goes to.

Specifically, we can use the ifElse expression:

${largeOrder:equals('true'):ifElse('large-purchases', 'smaller-purchases')}

We can use this Expression directly in our PublishKafkaRecord processor as the topic name:

By doing this, we eliminate one of our PublishKafkaRecord Processors and the RouteOnAttribute Processor. This gives us a simpler flow that is easier to maintain:

So this gives you an easy mechanism, by combining PartitionRecord with RouteOnAttribute, to route data to any particular flow that is appropriate for your use case.

But by promoting a value from a record field into an attribute, it also allows you to use the data in your records to configure Processors (such as PublishKafkaRecord) through Expression Language.

Conclusion

PartitionRecord provides a very powerful capability to group records together based on the contents of the data. It does so using a very simple-to-use RecordPath language. Whereas QueryRecord can be used to create n outbound streams from a single incoming stream, each outbound stream containing any record that matches its criteria, PartitionRecord creates n outbound streams, where each record in the incoming FlowFile belongs to exactly one outbound FlowFile.

This grouping is also accompanied by FlowFile attributes. This makes it easy to route the data with RouteOnAttribute. It also makes it easy to use the attribute in the configuration of a follow-on Processor via Expression Language.

See Also

Previous: QueryRecord
Next: RouteText

--

--