How to Ingest & Enrich IoT Data at Scale into Snowflake with Apache NiFi

No, You Don’t Need To Work Long Hours to Make This Happen

Hashmap
Hashmap
May 1 · 13 min read

by Ryan Templeton, Snowflake & Chris Herrera, Hashmap

“Almost every machine that comes out of our factory has data streaming from it. There are 560,000 connected assets that we can leverage.” Tom Bucklar, Director of IoT and Channel Solutions, Caterpillar

The strategy expressed by Tom Bucklar in the SmartIndustry.com article is not unique, in fact, it is THE standard for all manufacturing, heavy-equipment, or industrial equipment manufacturers in the world. This is not news — it’s the way the world has been trending for the past several years, however, the difference today is that technology available now is making the value of this data accessible to everyone.

Collecting IoT Data

In traditional industrial infrastructures there is a supervisory control and data acquisition (SCADA) system that is in charge of collecting data and monitoring processes. The data collected by the SCADA system gets logged to a historian generally to satisfy regulatory or compliance needs. However, the value contained within these systems is massive, and it’s only getting more and more important to utilize this data to metric against a business process.

Additionally, the value of this data multiplies when it is joined with data from other sources such as weather, an asset model, digital twin, HR, etc. (i.e. the typical “Big Data” problem). The struggle has always been how to access this data and make it available to consume by business users. Historians were not built to handle large reads for the purposes of data analytics.

Building Infrastructure vs Delivering Analytics Value

Traditionally, the investment has been in standing up the infrastructure to collect and manage this practically unbounded volume of data, ensuring reliable collection, and robust and performant data access.

The very “scientific” graph above attempts to show traditional Industrial IoT analytic investments as they relate to the value generated for the company. Traditionally the majority of effort is expended in non-value generating activities, such as procuring hardware, software, configuring databases, writing applications and ETL logic. The more that is removed, the more effort is placed where it should be, on value generating activities, such as process optimization, predictive/prescriptive analytics, etc.

A Shift to Analytics Value Delivery

This post will detail, through the use of Snowflake’s Cloud Data Warehouse and Apache NiFi, how an organization’s effort shifts dramatically towards value creation through analytics, and less on non-differentiating effort in standing up additional IT infrastructure.

The ability to allow your in-house engineers to perform recipe optimization, anomaly tracking process prediction against historical data, and quality control, rather than spend effort optimizing disk drives and paying for compute and storage space that will never be used is a step change in the way that companies can utilize their process data from their instrumented machines today.

Cross industry examples are depicted below:

Traditional Approaches Demand Specialized Skill Sets

Dealing with IoT use cases introduces interesting considerations developers don’t typically have to deal with in typical batch oriented data processing. Unbounded streams of IoT data arriving at your favorite cloud provider or data center rarely get there in the format you need them to be.

Developers have relied on stream processing solutions such as Apache Spark Streaming or Apache Flink in conjunction with Apache Kafka to handle data transformation and enrichment.

While all of the above are excellent solutions, they do tend to demand a specialized skill set and can add cost, complexity and more moving parts than necessary to an overall solution depending on the desired use case and business outcome.

Imagine An Easy Button for Getting IoT Data Into Snowflake

So let’s get to demonstrating an IoT use case that uses Apache Nifi in conjunction with Snowflake’s Cloud Data Warehouse, and specifically Snowflake Stored Procedures to ingest and enrich data at scale.

This architecture provides an attractive alternative to some of the more complex solutions as code is easily expressed through SQL and can be updated quickly without the need to recompile or halt the workflow. If the use case requires a data pipeline with Kafka and Spark/Flink, then NiFi does a great job of front-ending the overall data pipeline.

We’ll provide some quick highlights on both Snowflake and NiFi, and then jump right into the IoT data flow.

Snowflake Cloud Data Warehouse

Snowflake Cloud Data Warehouse is a cloud-native, fully relational ANSI SQL data warehouse service available in both AWS and Azure with patented technology providing tremendous performance, unlimited scalability, and a consumption-based usage model for a wide variety of analytic workloads.

Apache NiFi

Apache NiFi is a data flow, routing, and processing solution that comes with a wide assortment of Processors (at this writing 286) providing a easy path to consume, get, convert, listen, publish, put, query data. In addition, NiFi has 61 ready-to-run Controller Services that are used for a variety of system focused data flow business requirements.

Ingesting & Enriching IoT Data At Scale — How Does It Work?

Throughout the rest of this post we will build the following IoT data flow:

The NiFi flow, in this post, is built to read from an OPC-UA server (in this case Kepware) via the OPC-UA NiFi Bundle. This allows you to gather a tag list and fetch the values for those tags. From there the flow adds metadata in the form of location information to allow for the correlation with a corresponding asset or device once the data is ingested into the final repository. From there, the records are formatted and the content is merged via the merge content processor.

I am going to pause here for a moment and point out that this is a central point for optimization. The merge content processor is used to effectively buffer an amount of data that allows the flow to balance between not creating one million tiny files in S3 that will be wasteful to load so often, but also not buffering for so long that the business process you are trying to metric against is unable to be actioned on because the latency is too high. This is something that can be played with depending on the use case and available compute resources.

This merged content is then GZip compressed and uploaded to S3 to act as an external stage for Snowflake. Once the data is uploaded, NiFi will then execute the ELT stored procedure (via the ExecuteSQL processor) which will reach out to S3, parse out the data and load it. The last step is to evaluate the result to see whether the stored procedure succeeded or failed, and either delete the source file or pass to an error handler to write a log or alert a user.

Let’s Get Started

The messages in the Industrial IoT stream contain the discreet sensor readings including tag and asset information where the reading originated. Ideally we would like to normalize this data into separate dimension tables which can be further decorated with metadata that aids in data exploration.

We’ll create an IOTASSET table to hold information about the asset that generated the record. We’ll create an IOTTAG table to hold the name of the tag. This tag will include a reference to its parent asset.

Lastly we’ll create a IOTDATA table to hold the tag reading and the status flag value. This record will include a reference to it’s parent tag record. Normalizing the data like this gives analysts additional flexibility in how they want to view slices of the data and conserves storage space by not storing redundant assets and tag string values.

Create the Snowflake Tables

//create a sequence which we will use to populate the ID PK values
create or replace sequence iot_seq start = 1 increment = 1;
//create asset table
CREATE or replace TABLE IOTASSET(
LOCATION string not null,
CHANNEL string not null,
DEVICE string not null,
ASSETID INT default iot_seq.nextval,
constraint UK UNIQUE (LOCATION, CHANNEL, DEVICE),
constraint PK primary key (ASSETID));
//create tag table
CREATE or replace TABLE IOTTAG(
TAG string not null,
ASSETID int not null FOREIGN KEY REFERENCES IOTASSET(ASSETID),
TAGID int default iot_seq.nextval,
ALIAS VARCHAR,
constraint UK unique(TAG, ASSETID),
constraint PK primary key(TAGID));
//create iotdata table
CREATE TABLE IF NOT EXISTS IOTDATA(
TAGID int not null foreign key references IOTTAG(TAGID),
TS TIMESTAMP not null,
VAL DOUBLE not null,
FLAG BIGINT not null, --this is a error code passed by the originating system
constraint pk primary key(TAGID, TS));

Create a Snowflake Stage

Lastly, we need to create a Snowflake S3 STAGE. A stage is a reference to an external repository where Snowflake can reference data. You can see we include information on the format of the data files Snowflake can expect when it reads data from there.

//create iot_stage
CREATE OR REPLACE STAGE iot_stage
FILE_FORMAT = (TYPE = ‘CSV’ COMPRESSION = ‘gzip’)
URL = ‘s3://YourIoTBucketName/’ — make sure to leave the trailing slash
CREDENTIALS = (AWS_KEY_ID = ‘YourAWSKey’ AWS_SECRET_KEY = ‘YourAWSSecretKey’)
copy_options = (on_error=’skip_file’ purge=false);

Let’s Load Some Data

With these objects in place, you are ready to start loading data. The loading and ELT is essentially made up of 5 steps.

Creating a Stored Procedure

These above steps are wrapped in a create stored procedure which will take a single argument, the name of the file in the stage that will be processed by this stored procedure. The upstream NiFi process has the file name it just uploaded into S3. We can restrict the stored proc to a single uploaded file (micro batch) for a couple of reasons.

CREATE or replace PROCEDURE IOT_ETL_PROC(FILE_NAME string)
returns string
language javascript
as
$$
//create the temp table to host the data loaded from the stage file
snowflake.execute( {sqlText: “create temp table IOT_TEMP(location string, channel string, device string, tag string, val double, ts timestamp, flag bigint)”} );
//copy the data from S3 into the temp table
try{
snowflake.execute({sqlText: “copy into IOT_TEMP from (select $1, $2, $3, $4, to_double($5), to_timestamp($6), to_number($7) from @IOT_STAGE) FILES=(‘“ + FILE_NAME + “‘)”});
} catch (err){
return “Failed: error copying data to temp table”;
}
//IOTASSET insert
try{
snowflake.execute({sqlText: “insert into IOTASSET (location, channel, device) select distinct a.location, a.channel, a.device from IOT_TEMP a where not exists (select location, channel, device from IOTASSET b where a.location = b.location and a.channel = b.channel and a.device = b.device)”});
} catch (err){
return “Failed: error iotasset insert”;
}
//IOTTAG insert
try{
snowflake.execute({sqlText: “insert into IOTTAG(tag, assetid) select distinct a.tag, b.assetid from IOT_TEMP a inner join IOTASSET b on(a.location = b.location and a.channel = b.channel and a.device = b.device) where not exists(select tag, assetid from IOTTAG c where a.tag = c.tag and b.assetid = c.assetid)”});
} catch(err){
return “Failed: error iottag insert”;
}
//IOTDATA insert
try{
snowflake.execute({sqlText: “insert into IOTDATA(select b.tagid, a.ts, a.val, a.flag from IOT_TEMP a inner join IOTTAG b on(a.tag = b.tag) order by ts, tagid )”});
} catch(err){
return “Failed: error iotdata insert”;
}
//drop the temp table
snowflake.execute({sqlText: “drop table MY_TEST_DB.PUBLIC.iot_temp”});
return “Succeeded”;
$$
;

If you have any files already uploaded into your IOT_STAGE, you can check by running:

list @IOT_STAGE;

Get the name from one of the files listed there and run:

call IOT_ETL_PROC(‘<FileName>’);

You should now have a successful stored procedure working in Snowflake!

Using NiFi with Snowflake Stored Procedures

Once you’ve confirmed the stored proc is working in your Snowflake environment we can add this back into our Nifi workflow. Following the PutS3Object processor add an ExecuteSQL processor. Configure the DB Connection pool using a regular Snowflake JDBC connection string. In the SQL select query field call the stored procedure and use the $filename attribute to pass in the file name — call IOT_ETL_PROC(‘$(filename)’) as shown below.

Under the Scheduling tab of this processor, review the setting for Concurrent Tasks. In cases when you have large amounts of data arriving very quickly, these upload files have the potential to accumulate in the queue. We can increase the number of concurrent stored procedure calls by upping this number. Due to the fact that our stored procedure is using a “temporary” table when loading from S3, each concurrent session is isolated from the others. The scheduling tab for NiFi’s Configure Processor is shown below.

Configure the Snowflake Warehouse

Configure the size of your Snowflake virtual warehouse to ensure your Stored Procedure execution times stay low and provide expected throughput for this Nifi processor. If you increase the number of concurrent tasks to a level higher than 8 you may want to consider enabling multi-cluster warehousing and increase the Maximum Clusters setting to support unexpected bursts of arriving data.

In this situation, as calls queue up, Snowflake will launch additional warehouses up to the maximum setting (scale out) to handle the increased workload. When the workload drops back down, the number of warehouses will shrink back to the minimum setting.

You can take a look at what we’ve configured in our example below.

The successful output from this call with be the string return value we built into the stored procedure. It will return either “Succeeded” or “Failed:” with the name of the table it was working on when the failure occured.

We can use a RouteOnContent processor to evaluate the returned value. Successful calls are routed to a DeleteS3Object processor where the file can now be safely removed. Failed calls can be routed to a separate path for alerting and manual remediation.

Ask Yourself Some Questions In the Planning Stage

That’s It — You are Now Ingesting and Enriching IoT Data in Snowflake with NiFi

In this post we described how it’s possible to leverage the streaming capabilities of Nifi in conjunction with Snowflake Stored Procedures (check out the Snowflake release blog post here) to handle arbitrarily complex transformations on IoT data prior to loading it into Snowflake’s Cloud Data Warehouse.

It would be great to hear from you on your plans to use NiFi with Snowflake!

Other Snowflake and NiFi Stories from Hashmap

If you’d like insight into other Snowflake and NiFi topics that we’ve written about recently, please check out the following:

Snowflake Stories

NiFi Stories


Feel free to share on other channels and be sure and keep up with all new content from Snowflake and Hashmap.

Ryan Templeton is a Senior Solutions Engineer at Snowflake with a record of architecting and delivering stream processing and data management solutions to customers across multiple industries. You can tweet Ryan @ryan_templeton and connect with him on LinkedIn.

Chris Herrera is Chief Technology Officer at Hashmap working across industries with a group of innovative technologists and domain experts accelerating high value business outcomes for our customers, partners, and the community. You can tweet Chris @cherrera2001 and connect with him on LinkedIn and also be sure to catch him on Hashmap’s Weekly IoT on Tap Podcast for a casual conversation about IoT from a developer’s perspective.

HashmapInc

Innovative technologists and domain experts helping accelerate the value of Data, Cloud, IIoT/IoT, and AI/ML for the community and our clients by creating smart, flexible and high-value solutions and service offerings that work across industries. http://hashmapinc.com

Thanks to Randy Pitcher II

Hashmap

Written by

Hashmap

Innovative technologists and domain experts accelerating the value of Data, Cloud, IIoT/IoT, and AI/ML for the community and our customers http://hashmapinc.com

HashmapInc

Innovative technologists and domain experts helping accelerate the value of Data, Cloud, IIoT/IoT, and AI/ML for the community and our clients by creating smart, flexible and high-value solutions and service offerings that work across industries. http://hashmapinc.com

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade