Dynamic IIoT Dataflows for Demand Based Surveillance via Apache NiFi
Smart Surveillance = Operating Savings and 211% Less Data to Transmit
by Chris Herrera
How Does NiFi 1.2.0 Help Enable Industrial IoT?
Apache NiFi 1.2.0 was released a month ago and brought with it some very cool features. You can read the complete list here, but I wanted to focus on two key features that were released, and how they could be instrumental in helping manage data flow challenges at the edge, for example, on a drilling platform, at a production facility, or in a manufacturing plant. The two features are the record oriented processing abstraction and the QueryRecord processor.
First, let’s talk about record oriented processing and why it’s important for Industrial IoT (IIoT). The data received in an industrial application context is typically structured with a high ingestion rate of small (size) events (sensor readings, alarms, configuration changes, etc.). While NiFi is great at moving data around, the processors involved must deal with the parsing logic and serialization logic at each step.
However, with NiFi 1.2 enabling us to treat this data as a schema-based record, we can operate on the data completely free from worrying about the incoming data format or how the data needs to be output. Out of the box, NiFi has readers and writers for Avro, log, JSON, and CSV data. There is also a writer that allows for a custom textual format.
Next I want to talk about one example of coolness that arises from the new record oriented processing abstraction. The NiFi QueryRecord processor operates on a record of data and allows execution of SQL on the stream.
Used in conjunction with another Apache project called Apache Calcite (the same framework that is used in Phoenix, Flink, Druid, Kylin, Hive, Storm, and a number of other projects), the NiFi QueryRecord processor is able to interrogate the stream and build dynamic relationships based on the results with some pretty decent performance.
Now all this might sound super neat, and likely you want to run off and start executing some streaming SQL queries on your data, but I want to focus on a real use case for this so that you can see how it would work in practice.
Not Just for Fun
Industrial environments often present very real challenges such as operating with constrained bandwidth or severely limited storage, and sometimes it’s required to deal with both situations.
Assume for a moment that you are an Oil and Gas Exploration and Production company that is drilling and producing wells where the only network connectivity available is a Broadband Global Area Network (BGAN) — a global satellite network with telephony using portable terminals (which can get quite expensive for even low bandwidth).
Likewise, imagine you are a Production Engineer operating a remote pumping station or metering station on a pipeline that only has a cellular connection. It’s not unusual to experience limited to no connectivity on an intermittent basis with only a small amount of storage space to which you can log event data.
Across industries, these and many other use cases require conserving bandwidth and storage and finding a solution to that challenge.
Now, this does not mean that you don’t log the data, or shouldn’t log the data, but it does imply that you’ll need to determine a way to place some intelligence for your data stream at the edge.
For example, if you want to classify your data and only stream in a specific case or upon a certain condition, then log only on exception, or simply modify your stream on demand based on a surveillance system. These enhancements can greatly simplify your life as a Drilling Engineer or Production Engineer.
The best way to explain how the NiFi 1.2 enhancements allow us to solve the bandwidth and “smart collection” issues detailed above is to roll up our sleeves and go through a detailed example together.
Our Scenario and Problem Statement
For one of our examples referenced previously, let’s assume Apache NiFi is currently managing the data flow from a remote production platform that has BGAN based satellite connectivity. With the current state of the industry, our company is attempting to keep the cost for bandwidth in check and has limited our total bandwidth to 64kbps.
Realistically, a good bit of data can be transmitted over a 64kbps pipe, but this pipe used is for ALL data transmission from the production platform, not just data associated with Apache NiFi data flows.
You have become aware of a situation where fairly high pump pressures are being recorded even when the downstream pump is off. Based on your experience as a production engineer, you know that this could be due to a blockage or a misconfigured valve in a downstream manifold.
To figure out exactly why you are getting the high pump pressure readings, more information is needed from additional sensors. Sending ALL the additional sensor information ALL OF THE TIME would be a waste of bandwidth and very costly.
You simply want to transmit additional data under the following condition — when you know that the upstream pump is off, BUT you have an elevated pipe pressure. Aha! The SQL basically writes itself from that last sentence!
You see where I am going with this now?
A Production Platform Data Flow Example
In this scenario assume the following deployment on the remote production platform:
There are over 998 sensors on the remote platform, however, in a nominal operating state only a very small subset of data (to be precise 28 tags or sensors) is being transmitted. However, in the case where there is an exception condition, namely the pipe pressure is greater than 500 but the pump status is < 0.5 we want to get information from all available 998 sensors.
Reader Note: Why is pump status 0.5? Pumps are not just on or off, they are Off, Turning On, On, and Turning Off and various stages in between. In this example 1 is on, 0 is off, and the fractions are all the states in between.
Ok let’s start getting together our Apache NiFi flow!
First I’ll get data via OPC-UA from Kepware’s KepServerEX (Kepware is providing the connectivity for both the Modbus and OPC-UA protocols). Fortunately, I was able to have a starting point from some excellent work done by a Hortonworks SE, Wade Salazar, and did not have to develop the OPC UA processor from scratch — I just enhanced it. The structure of that processor is shown in the figure below.
There are 2 processors — GetOPCData and ListOPCNodes. The ListOPCNodes processor allows a user to browse the hierarchy of the OPC UA address space and write that hierarchy out to a flowfile. GetData can use the output of ListOPCNodes (either as an incoming flowfile or read from disk using a GetFile processor) to read the actual data values from the OPC UA server in Kepware.
Both processors utilize the OPCUAClientService to manage the connection and the session with the OPC UA server. This is a great start, but a few enhancements had to be made to allow for modification of the flow.
The first change required was querying the server using an array of tags, rather than querying one by one — that was a simple change to the GetOPCData processor and the OPCUAClientService.
The second change was slightly more complex. To modify the flow dynamically, I needed to a way to access the different tag lists that were to be queried by the GetData processor.
To do that I enhanced the GetOPCData processor with the ability to read a preconfigured tag list from disk. This allowed me to have multiple GetOPCData Processors reading from different tag lists but sharing the same connection and session via the OPCUAClientService.
To enable an optional functionality for the processor to read the tag list from disk, rather than from the contents of an incoming flowfile, two new properties were added. One required property directs the processor to either read and cache the tag list from a preconfigured location on disk, or to read the tag list from the incoming flowfile. The second optional property is the location of the tag list file on disk, so that the processor can read and cache it.
The file is simply a text file that lists the nodes that need to be queried from the OPC server at one per line as in the figure below.
Once that change was made, all that was left was to make the flow. So, let’s dig into that now.
First I’ll add an OPCUAClientService controller service to my flow by clicking on the gear icon in the operator panel:
Once the modal window pops up I’ll click on the Controller Services tab and click the plus icon in the top right, and then filter my services using OPC and select the OPCUAClientService
by clicking Add. Once that is done I will do a quick configuration. Of note, I have not yet upgraded this processor to be compatible with the new versioning scheme, but rest assured, that is next on my to-do list.
Once back at the controller services tab I edit the properties by clicking the pencil icon towards the right of the row, and fill in the relevant information. In this example the OPCUAClient service is connecting to an unsecured OPC endpoint, but the controller service can handle other types of security.
Close the configuration modal by clicking Apply. Back at the controller services tab, the service is enabled by clicking the lightning icon, and then I opt to enable the service only (as there are no dependent services or processors yet) and click enable. Assuming there are no connectivity issues, your controller service should be reporting that it is enabled.
More Processors to Support the Record-Based Flow
Next I’ll add a few more processors that are required to support the new record-based flow. First is the Avro Schema Registry. This tells the record based processors how to interpret the data that it is operating on. That service will be added the same way I added the OPCUAClientService — then click on the pencil icon again to configure it.
The AvroSchemaRegistry operates off dynamic properties. Each property is essentially a name schema pair that the rest of the flow can then use to relate a piece of data to a schema. I should mention that the AvroSchemaRegistry is only one of 2 schema registries that ship with Apache NiFi. The other is the HortonworksSchemaRegistry.
I’ll then add a quick Avro schema that describes the data that the GetOPCData processor produces. Essentially the processor produces CSV style data (minus the header) in the format of tag name, time stamp, value, status. So I’ll build an avro schema that matches that, and call it opcData. The schema looks like the avro below:
I’ll add this property by clicking the plus icon and naming it opcData, and then copying the avro into the value.
Once I click apply, registration of our schema opcData takes place and I can head back and enable the Avro Registry, and add the remaining controller services.
The next controller service is the CSV Reader. This is going to be the service that is responsible for parsing each line of the CSV file into a separate record. I’ll add it to the flow in the same manner as the OPCUAClientService and the Avro Schema Registry and then configure it.
In this case the Schema access strategy is to use the ‘Schema Name’ Property. This means I must define the Schema Registry by pointing it to the instance of the Avro Schema registry that was configured and enabled above. I’ll set skip header line to false, as I don’t have a header line, and leave the rest as default, and then apply the changes and enable the service.
At this point there is only one controller service left to go!
The Final Controller Service
I’ll now add the AvroRecordSetWriter controller service since I’ll be writing Avro encoded records across the site-to-site link to the data center. So, I’ll add the service as I’ve done three times previously and then configure it.
Then, point the Schema Registry property to the instance of the Schema registry created above, and set the schema write strategy to be set ‘schema.name’ Attribute. The Schema access strategy will be Use ‘Schema Name’ Property. From there, apply and enable the service as before.
Ok, that may have seemed like a lot of work, but in reality it’s only 5–10 minutes of effort.
Now let’s create the flow.
Creating the Flow
First I’ll add the GetOPCData processor to the canvas, by dragging the processor icon to the canvas and filtering by OPC.
I’ll select GetOPCData by clicking on it and clicking add, followed by right clicking on the newly added processor and clicking configure from the context menu that appears. For the OPC UA Service, select the instance of the service that was configured above and for the return timestamp choose SourceTimestamp.
I want to read the tag list from an already created file, so I’ll choose the Tag List Source as Property. Additionally, set the Default Tag List Name to the file path where the tag list is and apply the changes. I can assume that this file was already created by running a flow using ListOPCNodes and using a PutFile processor to write that data to disk.
Next I need to inform the rest of the flow what schema this data will be using. This is done by adding an Update Attribute processor to the flow, and adding an attribute called schema.name and place a value of opcData.
This will let the record-based processors know what schema to use in the registry as they process the data. And yes, I could have hard coded this schema name into the other processors, but that limits the reuse of that part of the flow. Click apply.
Now for the fun processor…
Creating the QueryRecord Processor for Stream Interrogation and Dynamic Relationship Building
Add another processor to the canvas and this time filter for QueryRecord. For the Record Reader, point it to the CSV Reader controller service that was configured above, and for the Record writer configure the AvroRecordSetWriter above.
Now the fun part, let’s add a dynamic property called pump.issue. and for the value lets throw some SQL in.
FROM FLOWFILE WHERE (uri = ‘ns=2;s=SimulatedChannel.SimulatedDevice.PumpStatus AND data_value < 0.5 AND status=0) OR (uri = ‘ns=2;s=SimulatedChannel.SimulatedDevice.PipePressure’ AND data_value > 500 AND status=0) LIMIT 1 OFFSET 1
Basically, what this quick and dirty SQL statement does is flag when both conditions are true, namely that my PipePressure is greater that 500 and my PumpStatus is < 0.5 — from there, the processor creates a relationship called pump.issue.
One important point here — I set the property Include Zero Record FlowFiles to false. I want this processor to only create a flow file and send it to the pump.issue relationship when the SQL query has a result, because I am going to use that result to trigger a follow on GetOPCData Query.
At this point this is what my flow looks like:
I want to pause here and point out the fact that my flow is very nicely aligned because in NiFi 1.2 a new feature was added that allows the selection of a group of processors, a right click, and then align vertically or horizontally. If you dig organization and symmetry in your visual data flows, then rejoice!
It’s Time to Route
Anyway, now it’s time to route based on the output of our Query Record processor. I’ll add another GetOPCData processor and connect this to the pump.issue relationship created above when configuring the Query Record processor. When a flowfile is received from the Query Record processor on the pump.issue relationship, we know that we are in the issue state that we want to investigate, so I’ll issue a second query for the remaining tags through this newly added GetOPCData processor.
The GetOPCData processor is configured with the same instance of the OPCUAClientService controller service, but a different tag list, only referencing the rest of the tags that I want to collect (the other 970 tags).
Now all that is left is to convert this data to Avro file format which is accomplished using the Convert Record processor. Configure the Convert Record processor to use the CSV Reader and the AvroRecordSetWriter that was configured above.
Finally, run that output to the remote process group that will publish the data to the data center. Our total flow now looks like this:
This flow now enables a solution that intelligently collects and transmits more data to the data center over the WAN when a certain condition is met (high pump pressures are being recorded even when the downstream pumps are off). Once the controller services configuration is complete, it only takes about 5 more minutes to configure the flow.
So What Was the Difference — How Much Bandwidth Was Saved?
Overall, when the flow is running, a dramatic difference can be seen between the number of flow files created from the pump.issue branch compared to the original branch. Below is a screenshot of the content viewer when using a JSON writer:
In the simulation that is running, you can also get a view into the number of flow files being created just by looking at the queues in the NiFi data flow below.
THE RESULT — Intelligent, Demand-Based Surveillance + Operating Savings and 211% Less Data to Transmit
The pump issue branch on the NiFi flow is generating 211% less data over this simulated 1 hour period as compared to continuous data publishing which really helps address the common industrial challenge of operating with constrained bandwidth or severely limited storage, or a combination of both.
What shouldn’t be overlooked is that even more substantial benefits can be gained with this solution approach if the data transmission savings are assumed to be over a more realistic 3 day period with the pump issue only occurring for 1 hour.
For engineers in the drilling, production, or manufacturing industrial space, not only does NiFi’s smart data collection at the edge greatly simplify demand based surveillance (and in this case, allow collection and transmission of critical data for diagnosis and resolution of the high pump pressure issue), but it also saves on monthly bandwidth operating costs going right to the bottom line. No longer is it necessary to transmit ALL the additional sensor information ALL of the time.
This post was meant to illustrate the power of the new features in Apache NiFi when used in Industrial IoT at the edge for intelligent data collection. This is only one of many use cases that can benefit from bandwidth conservation, classification, and intelligent routing from the edge to the data center.
Feel free to share on other channels and be sure and keep up with all new content from Hashmap at https://medium.com/hashmapinc.
Chris Herrera is a Senior Enterprise Architect at Hashmap working across industries with a group of innovative technologists and domain experts accelerating the value of connected data for the open source community and customers. You can follow Chris on Twitter @cherrera2001 and connect with him on LinkedIn at linkedin.com/in/cherrera2001.