Tensorflow Pipelines on the Cloud with Streamsets and Snowflake

Isaac Langley
Cervello, a Kearney Company
7 min readAug 23, 2019

Machine learning, machine learning, machine learning. It seems as though data science blog posts talk about little else these days. But one crucial—often neglected — aspect of ML is what to do once a model has actually been built: how can we regularly feed new data to the model? Where should the model predictions be sent? And where should all of this data plumbing actually live?

In this post we’ll explore how a Tensorflow classification pipeline can be deployed on the cloud, making use of Streamsets Data Collector to orchestrate the ingestion, cleaning, and classification of Iris flower data. We’ll make use of the pre-trained TF model provided by Streamsets to demonstrate how an end-to-end ML solution can be deployed in under an hour.

An Introduction to Streamsets Data Collector

Streamsets Data Collector is a lightweight, drag & drop pipeline management tool, available free and open-source under an Apache 2.0 licence. Out of the box, Streamsets comes equipped with a vast array of pre-built connectors and processing tools, enabling users to build powerful, production-ready data pipelines in minutes. Data science stacks are particularly well supported, with pre-built processors for easily integrating Databricks and Tensorflow models into your workflow.

Streamsets’ lightweight UI makes pipeline construction easy and efficient.

Before we get started, you’ll need to install Streamsets to an instance running on the cloud computing provider of your choice (or locally, if you’re just looking to test this out for the first time).

We’ll also be making use of our favorite cloud data warehouse, Snowflake, as a final destination for our classified data. Once we have our data in Snowflake, it can be easily pulled into a multitude of other tools and services for further analysis, including Tableau and Excel.

Building out the Pipeline

The basic architecture of our classification pipeline.

Once you have Streamsets installed, fire up an instance and create a new pipeline. After giving your pipeline an appropriate name, you’ll be presented with an empty canvas like so:

The first thing we need to configure is the pipeline’s origin: the location of our raw, unclassified data. In our case, we have our data stored as JSON in an Azure blob, but unfortunately Streamsets lacks a pre-built connector for Azure blob storage, so we’ll need to configure our own custom connector. Select the Hadoop FS Standalone — HDP 2.6.2.1–1 option from the drop-down list, and enter the connection details of your cloud storage instance. In the case of Azure blob storage, we need to fill out the following:

Hadoop FS URI:
wasb[s]://<BlobStorageContainerName>@<StorageAccountName>.blob.core.windows.net/<path>
Hadoop FS Configuration:s.azure.account.key.<storage_account_name>.blob.core.windows.net = <your storage account key>fs.azure.account.keyprovider.<storage_account_name>.blob.core.windows.net = org.apache.hadoop.fs.azure.SimpleKeyProviderFiles Directory: <Blob Storage path for the iris file>File Name Pattern: *

For more on ingesting data from Azure, check out the official tutorial from Streamsets.

The iris data set has four features, all of which are numeric: petal length, petal width, sepal length, and sepal width. Let’s take a look at a sample row of JSON data (you can find a link at the end of the article):

   {
"petalLength": 1,
"petalWidth": 0.2,
"sepalLength": 4.6,
"sepalWidth": 3.6
}

Note that we have a mix of integers and floats, so we’ll need to carry out a transformation on the data before we can feed it into the classifier. The TF model needs to receive all input data in the form of floats, which we can easily orchestrate with the Field Type Converter processor block. Go ahead and drag the block onto the canvas, connect it up to your data source, and configure the Conversions tab as follows, leaving the General tab as is.

Now that out input data has been converted to a form that the Tensorflow model can understand, we can add a Tensorflow Evaluator processor to our pipeline and configure it to serve our model. This section of the tutorial is based on the official Tensorflow tutorial from Streamsets, so check that out if you run into issues. If you haven’t already, download the pre-trained iris model provided by Streamsets here, then point the Saved Model Path field to a folder containing the .pb model file. You’ll then need to configure the rest of the evaluator as shown below, which specifies the properties of the input and output from the model.

Our outputs are going to be dnn/head/predictions/ExpandDims and dnn/head/predictions/probabilities, which give the predicted class label (0, 1, or 2 for each species) and its associated probability.

We’re almost there! One final thing we need to do before we pass our data into Snowflake is convert the numerically-encoded class predictions to the corresponding species string: 0 to Iris Setosa, 1 to Iris Versicolor, and 2 to Iris Virginica.

There’s a couple of different ways we could go about doing this, but the most straightforward is with the Expression Evaluator processor. This block provides the functionality to create new, conditionally-defined columns using the Streamsets expression language. Drag the processor onto the canvas and connect it up to the TensorFlow evaluator, as below.

We only need to change two fields here, both of which are under the expressions tab. In Output Field we’re going to define a new column called /predictedSpecies, which is going to contain the species classification as a string. We then need to configure the expression defining the logic behind how the predictedSpecies column gets populated; this is essentially just an else/if statement:

if predicted_class == 0:
predictedSpecies = 'Iris setosa'
if predicted_class == 1:
predictedSpecies = 'Iris versicolor'
else:
predictedSpecies = 'Iris virginica'

In Streamsets expression language this can be formulated as

${record:value("/output/'dnn/head/predictions/ExpandDims_0'[0]")=='0'?'Iris setosa':(record:value("/output/'dnn/head/predictions/ExpandDims_0'[0]")=='1'?'Iris versicolor':'Iris virginica')

Pushing data to Snowflake

Now we’re ready to load our newly-classified data into Snowflake. Drag a Snowflake destination block onto the canvas and connect it up to the expression evaluator like so:

The ‘General’ tab can be left as-is, but you’ll need to enter your Snowflake account details under ‘Snowflake Connection Info’. Under the ‘Snowflake’ tab, define the virtual warehouse, database, and schema you would like to utilize — note that you will need to create these manually (via the Snowflake UI) if they don’t already exist. Go ahead and enter the name of the table where the classified data is going to be stored (we’ve gone for IRIS_CLASS), and make sure the Table Auto Create box is checked — this enables Streamsets to automatically create the required tables if they don’t already exist in the given schema.

We’ll also need to tell Streamsets where to stage the incoming data. Under the ‘Staging’ tab, enter the details of the external Snowflake stage you want to use (S3 or Azure); note that Streamsets lacks the capability to automatically create external stages, so you’ll have to define these manually first (more on staging data here).

Now we’re ready to test out the pipeline. Streamsets includes a handy preview feature, which allows you to push a given number of records through the pipeline without requiring the job to be deployed in full. Click the ‘ 👁’ button above the canvas, and make sure ‘ Write to Destinations and Executors’ is checked. Hit ‘run preview’, and after a few seconds you should have a fully populated table in Snowflake! Let’s take a look:

And there we have it. We’ve built a fully cloud-based ML pipeline — with little-to-no hand-coding — in under an hour. From Snowflake, it’s quick and easy to pull data into whatever application we want for further analysis, such as Tableau, Excel, or Jupyter.

If you’re interested in learning more about ML, data engineering, or how Cervello can help build productionize your machine learning pipeline, please reach out!

Resources

Download Streamsets Data Collector here.

More on the Streamsets Snowflake connector here.

The unlabeled JSON data file.

Documentation for the Streamsets Tensorflow Evaulator.

More about Cervello, an A.T. Kearney Company

--

--