Building an Effective NiFi Flow — QueryRecord

Mark Payne
Cloudera
Published in
12 min readDec 5, 2022

Of the 400+ Processors that are now available in Apache NiFi, QueryRecord is perhaps my favorite. In most cases, it is quite simple to use, if you know the basics of SQL. And it can be used for many different use cases.

What Is It?

In short, it allows you to treat each FlowFile as if it were a database table and run SQL against that table, providing the results as the output FlowFile.

The Processor uses a Record Reader and a Record Writer, which means that we can also have it implicitly convert data from one format to another. For example, we can use a JSON Reader and an Avro Writer so that we read incoming JSON and write the results as Avro.

When/Why to Use It

We can use QueryRecord to rename a field in our data; we can use it to filter out records (rows in the table) that we don’t care about; we can filter out columns that we don’t care about. We can combine multiple columns into a new single column. We can provide a default value for a column. Pretty much anything that you might do in a SQL SELECT statement on a database, except for JOINing with other tables (we’ll look at how to do that later, when we discuss data enrichment).

But the use case that I really love is the ability to fork a single stream of data into many streams. Take, for example, the case of processing Syslog data. We can evaluate a single SQL SELECT query over the data to get the results, such as SELECT * FROM FLOWFILE WHERE severity <= 3. This would return to us all of the data that has a severity level of ERROR or more severe.

But we can also register many SQL SELECT statements, forking a separate stream for each set of results. For example, we may want to send all those Syslog events that are error-level or above to some sort of alerting mechanism. Maybe we push a notification to Slack or put a message on a Kafka queue. Now, what if we have another use case where, given the same Syslog events, we want to push logs about security/authorization messages (facility code 4) to a different Kafka topic? And maybe all log messages with a hostname of “compromised-server.mydomain.com” to another destination?

We can register all of these queries at once using just a single QueryRecord processor. And then we can handle each of these streams however it makes sense for our use case. If a new use case pops up for the data, we just add another query to fork off a new stream.

Let’s take a look at how we can accomplish each of these use cases.

How to Use It — Setup

Before we can delve too much into the specifics of the QueryRecord Processor, we need to first understand that it’s a Record-Oriented Processor. That means that it must be configured with a Record Reader and a Record Writer, as shown in the property configuration screen:

By making use of a Record Reader and Record Writer, the processor gives you, as the flow developer, huge flexibility. You don’t have to worry about converting your data into the one format the Processor understands. If you have CSV data, that’s great — it works nicely with SQL. If you have Avro data that you’ve fetched from Apache Kafka, that’s great too. You can use data of any format, as long as there’s a Record Reader for it. And the output can be whichever format you’d like, as well.

Out of the box, NiFi provides many different Record Readers. The most commonly used are probably the CSVReader, the JsonTreeReader, and the AvroReader. But there’s also a reader for Syslog; for Grok, which allows you to parse arbitrary log data as Records; a reader for XML; one for Parquet; and a few others.

Continuing with the examples discussed above, we’ll choose to use the SyslogReader. There is no SyslogWriter, though. That’s okay, because most of the time when users want to query syslog data, they actually want the output to be something that’s a little more structured / parseable such as JSON. For this example, we’ll use the JsonRecordSetWriter.

Note: if we did want the output to be Syslog format, we could easily accomplish that by using the FreeFormTextRecordSetWriter as the Record Writer. We configure the Syslog Reader’s “Raw message” property to true so that the Record contains a field named _raw containing the raw syslog message. Then configure the “Text” property of the writer as simply ${_raw}.

We can accept the default configuration for the Syslog Reader and the JSON Writer. And for most of the properties of QueryRecord. But in our case, we’ll change the “Include Zero Record FlowFiles” property to false. If an incoming FlowFile has no data that matches our query, we don’t want an empty FlowFile. We’d rather have nothing output.

Our configuration now looks like this:

We can now add one or more queries by clicking the ‘+’ icon in the upper-right of our Properties table. When we add a new property, the name of the Property becomes a new Relationship for the Processor. The value becomes a SQL query to run against the data. Any data that matches the SQL query will be sent to that Relationship.

How to Use It — Filter Out Records

The first use case that we mentioned was that of filtering out records that we don’t care about. For example, we may receive huge amounts of Syslog data. Perhaps we don’t care about all of it. Perhaps we care only about the data that has a severity level of 5 or below. That is, we don’t care about messages with a severity of 6 (Informational message) or 7 (Debug message).

This is an easy enough task to accomplish. Just consider if the syslog data were written to a relational database and you wanted to write SQL against that database. Obviously you’d first need to know what the table name was and what columns were available.

When using this Processor, we know that the table name will always be FLOWFILE, so that’s the easy part. We need to know what columns are available, though. The schema for the data is determined by the Record Reader and sometimes the data itself. For example, if using a JSON Reader, the schema can be completely different for one JSON document from another. In the case of Syslog data, though, the schema will always be the same, regardless of the data. If we take a look at the documentation for the Syslog Reader, it tells us exactly what the schema will look like. At least, it tells us what the schema looks like in Avro format:

{
"type" : "record",
"name" : "nifiRecord",
"namespace" : "org.apache.nifi",
"fields" : [ {
"name" : "priority",
"type" : [ "null", "string" ]
}, {
"name" : "severity",
"type" : [ "null", "string" ]
}, {
"name" : "facility",
"type" : [ "null", "string" ]
}, {
"name" : "version",
"type" : [ "null", "string" ]
}, {
"name" : "timestamp",
"type" : [ "null", "string" ]
}, {
"name" : "hostname",
"type" : [ "null", "string" ]
}, {
"name" : "body",
"type" : [ "null", "string" ]
} ]
}

What this boils down to is that there are 7 fields in the schema. That is, there are 7 fields in our SQL table. Each of them is of type String and each is nullable. The names of these columns are priority, severity, facility, version, timestamp, hostname, and body. Of note, if the SyslogReader is configured to add the “raw message” there would be an additional 8th field named _raw that would also be a String.

So, given that, if we want to filter out all data that has a severity of 6 or 7 we would do so by writing the SQL:

SELECT *
FROM FLOWFILE
WHERE severity < 6

So, we do just that. We add a new Property to the Processor by clicking the ‘+’ icon in the top-right of the Properties table. We give the property a name, say “important messages” and give it a value of SELECT * FROM FLOWFILE WHERE severity < 6.

Now, when we click Apply, the Processor will have a new Relationship available — “important messages” (the name of the newly added property). We can then configure our dataflow however is necessary in order to process this data.

For example, we may choose to send all of these important messages to a Kafka ‘syslog’ topic:

How to Use It — Filter Out Columns

Given that we are able to treat our data as a database table using SQL, it becomes easy to then filter out columns that we don’t care about.

For example, we may have configured the SyslogReader to include the “Raw message” because one or more of our queries needed it. But now we have a stream of data that needs any messages pertaining to the hostname “compromised-server.mydomain.com” but we don’t want the raw messages, just the individual columns. And perhaps the “version” field is irrelevant to us, so we’ll drop it also. We can do this simply enough:

SELECT priority, severity, facility, "timestamp", hostname, body
FROM FLOWFILE
WHERE hostname = 'compromised-server.mydomain.com'

Two things to note here: In SQL, timestamp is a reserved keyword, so we need to ensure that we quote it with double-quotes to denote that it’s the name of a column and not to be treated as a keyword. Secondly, note that when we quote our hostname, it’s a String literal — so we need to use single-quotes. Literals are quoted with single quotes, entity names are quoted with double quotes.

We’ll give this query the name “compromised”:

And this data will be routed to a database for our example:

How to Use It — Combine Multiple Columns

SQL provides us the ability to combine multiple columns into a single column within the SELECT clause, as well. We could, for instance, combine the hostname and timestamp fields into a single field that might be used as a unique key:

SELECT "timestamp" || '-' || hostname AS key, priority, severity,
facility, "timestamp", hostname, body
FROM FLOWFILE

(The astute reader may note that this would, in fact, be a terrible key as the combination of hostname and timestamp are by no means guaranteed to be unique, but this is just an example 🙂)

We could also use, for example, the COALESCE keyword to get the first value that’s not null:

SELECT COALESCE("timestamp", hostname, body) AS my_field, priority,
severity, facility, "timestamp", hostname, body
FROM FLOWFILE

How to Use It — Provide Default Value for Column

We can also use the COALESCE keyword to provide a default value for a particular column. Because COALESCE returns the first non-null value, we can coalesce the current (possibly null) value with a default value:

SELECT priority, severity, facility, "timestamp",
COALESCE(hostname, 'unknown-host') AS hostname,
body
FROM FLOWFILE

Or we could even default the value to some other existing field:

SELECT priority, severity, facility, "timestamp",
COALESCE(hostname, priority, 'unknown-host') AS hostname,
body
FROM FLOWFILE

(Again, this is a terrible example, as we’re selecting the priority as the hostname, which makes no sense, but it demonstrates the principle).

How to Use It — Select Hierarchical Fields

Thus far, we have discussed how we can use QueryRecord to query syslog data. This is a simple, straight-forward use case because the data is “structured,” or “flat.” It maps very cleanly to a database schema, because each field in the data is a “scalar” value. We don’t have embedded records that make the data structure complex.

But what if we want to use SQL over data that’s more hierarchical, like JSON? What if we have a payload that looks like this:

[{
"title": "To Kill a Mockingbird",
"author": "Harper Lee",
"pageCount": 281,
"published": {
"date": "07/11/1960",
"publisher": {
"name": "J. B. Lippincott & Co.",
"city": "Philadelphia",
"country": "US"
}
}
}, {
"title": "A Tale of Two Cities",
"author": "Charles Dickens",
"pageCount": 448,
"published": {
"date": "11/01/1859",
"publisher": {
"name": "Chapman & Hall",
"city": "London",
"country": "GB"
}
}
}]

In this case, it makes sense that our SQL table would have 4 fields: title, author, pageCount, and published. But what if we wanted to select only the books whose publisher is in the United States? We’d need to access data in an embedded Record.

NiFi makes this possible using RecordPath functions. So, if we wanted to select only books that were published in the US, we could use:

SELECT *
FROM FLOWFILE
WHERE RPATH(published, '/publisher/country') = 'US'

This would yield:

[ {
"title" : "To Kill a Mockingbird",
"author" : "Harper Lee",
"pageCount" : 281,
"published" : {
"date" : "07/11/1960",
"publisher" : {
"name" : "J. B. Lippincott & Co.",
"city" : "Philadelphia",
"country" : "US"
}
}
} ]

This allows us to use RecordPath expressions to drill down into any number of layers of hierarchy. We can use these RecordPath expressions regardless of the data format, as well. If the data is JSON, Parquet with structs, Avro, XML, or some other format, these RecordPath expressions should still work.

How to Use It — Fork Streams

Finally, we arrive at the use case that makes me so enthusiastic about this Processor. The ability to issue multiple SQL queries in order to fork streams of data from a single incoming stream.

We’ve already touched on this a bit above, in the “How to Use It — Filter Out Columns” section. There, we added a second query to an existing QueryRecord processor and routed the second stream to a different location than the first.

But just to prove out the point, let’s add yet a third stream of data to this same example.

Our first two queries were defined as:

SELECT *
FROM FLOWFILE
WHERE severity < 6

To retrieve all “important messages” and

SELECT priority, severity, facility, "timestamp", hostname, body
FROM FLOWFILE
WHERE hostname = 'compromised-server.mydomain.com'

To retrieve all messages for a specific hostname. We’ll now add a third query, with a property name of “critical” that we can use to capture only critical-level logs and above:

SELECT *
FROM FLOWFILE
WHERE severity <= 2

These log messages were consider important enough to alert ourselves via Slack. Again, we add a property named “critical” with a value of the above SQL:

And applying changes causes the QueryRecord processor to become invalid because we’ve not yet connected the “critical” Relationship to any downstream components:

We can now connect this new “critical” Relationship to a Processor to send the notification to Slack:

This provides us an incredibly powerful capability!

Conclusion

So we’ve seen in this post a tremendous amount of power from just a single Processor — QueryRecord. And I’m not even very good with SQL. But it makes it incredibly easy to perform many common tasks, and even to send to many downstream consumers exactly the data that they need.

This is the first in a series of posts that will dive into several of these powerful Processors. At the conclusion, you’ll be well equipped to tackle any number of streaming tasks to deliver whatever data is necessary to anywhere that it needs to go. And to do it in a way that’s fast, efficient, easy to understand, and easy to update!

See Also

Previous: Series Intro — Building an Effective NiFi Flow
Next: PartitionRecord

--

--