How-To: running a Google Cloud Dataflow job from Apache NiFi

Sergey Lebedev
Nov 26, 2018 · 6 min read

In the previous story, we told how Google Cloud (GC) Dataflow and Apache NiFi had been cooperated to solve the data transformation task. In this story, we introduce NiFi GC Dataflow Job Runner — a NiFi processor to run GC Dataflow job. We are going to:

  1. Give a brief information on Apache NiFi and GC Dataflow
  2. Integrate NiFi GC Dataflow Job Runner processor into Apache NiFi bundle
  3. Create exemplary GC Dataflow job templates
  4. Load Apache NiFi data flow template and set it up to use GC Dataflow job templates
  5. Run Apache NiFi data flow and see the results

BE CAREFUL! Some of the stages assume reading remote instructions, extra time is needed.

(1) Brief information

Apache NiFi is a tool for building, controlling, debugging, and monitoring of data flows. It includes a plenty of ready-for-use processors to manipulate data. But also it allows integrating custom processors if there is a lack of some functionality, e.g. for running GC Dataflow jobs. Apache NiFi also provides template mechanism to save stereotyped data flows.

GC Dataflow job is a bunch of data manipulations executing on GC cluster. A job can be created from a template stored on GC Storage (GCS). While creation you may specify some parameters (if so provided by the template).

CLARIFICATION: both technologies provide a template mechanism to create customizable stereotyped solutions

(2) NiFi GC Dataflow Job Runner Integration

Now we are starting with the main element. The processor that allows running GC Dataflow job from a template you can be found here with an instruction on how to integrate it into Apache NiFi.

After the integration, you should be able to find it in the list of processors:

ExecuteGCDataflowJob processor

Stereotyped usage assumes the following configuration:

A stereotyped usage

(3) Exemplary GC Dataflow Job Templates Creation

These job templates are used to illustrate the job runner. They can be found here with an instruction on how to push them to GCS (you may read only Creation of job templates paragraph).

After deploying job templates you should be able to find them on the specified path in GCS:

Job tempates

The first one can be used for building stem word vocabulary for the given text files, the second one can be used for computing lexical diversity (type-token ratio) based on the resulting stem word vocabulary.

(4) Loading and Setting Up Apache NiFi Data Flow Template

Illustrative Apache NiFi template can be found here. To use it it is necessary 1) to import the template; 2) and to instantiate the template. Here is the picture that you should see after these actions:

Exemplary data flow

Brief Data Flow Description

Main flow: START HERE/ListGCSBucket — lists file paths on a specified GCS bucket folder => RouteOnAttribute — filters out file names => FIRST GCD JOB/ExecuteGCDataflowJob — launch the job to build a stem word vocabulary for all files in each folder from the list => SECOND GCD JOB/ExecuteGCDataflowJob — launch job to compute type-token ratio for each stem vocabulary

Notification flow: EXTRACT JOB NAME/EvaluateJsonPath — extract a job name and put it on flow file attributes => TRANSLATE FOR SLACK/JoltTransformJSON — prepare an attachment part of a message to be pushed in the specified Slack channel =>ATTACHMENT/EvaluateJsonPath — write prepared attachment on a flow file attribute => SAY ON SLACK/PutSlack — push message to the specified Slack channel.

NOTE: you can get description of a processor by choosing View usage option from the context menu called on a processor.

Data Flow Initialisation

As can be seen from exclamation signs in processors the data flow is not ready for launching. It is necessary to initialise processors (except for LogAttribute/FOR DEBUGGING, it just detains failure flow files):

ListGCSBucket. Set Project ID, Bucket (it is only a bucket name, not a full path) and Prefix (path to files without a preceding slash). Create, initialise and run GCP Credentials Provider Service (you may set Use Application Default Credentials on true to use local credentials)

ListGCSBucket processor properties

ExecuteGCDataflowJob. First GCD job. Modify Project ID, modify GCS path (a path to the build-vocabulary template in a form “gs://<GCS bucket>/<path>/<template name>”), modify destination property (push ‘+’ sing on top-right to add a property). Destination should end with a word that will be used as a prefix for output files. Create, initialises and start GCP Dataflow Service Provider

build-vocabulary job

CLARIFICATIONS: 1) ListGCSBucket + RouteOnAttribute prepare a flow of folders containing data to be processed; 2) Each launch of a first job process all files in a separate folder; 3) As it impossible to run jobs with the same name simultaneously, folder name is used to make job launch unique (build-vocabulary-${filename:substringBeforeLast(‘/’):substringAfterLast(‘/’):toLower()}); 4) the same trick is used to separate intermediate and final results computed with different jobs.

ExecuteGCDataflowJob. Second GCD job. Do likewise for this processors, except: modify source property to point on the destination path set for the first job, use a wildcard to match files (e.g., “gs://…/*”); GCS path should point on the compute-lexical-diversity template; if you have added GCP Dataflow Service Provider on the previous step, here you may just choose it; the processor also should have the destination path

compute-lexical-diversity job

PutSlack (unnecessary if you do not want to receive job state notifications). Set Webhook URL that you should take from your Slack workspace (being on the online Slack workspace page choose Menu => Configure Apps => Incoming WebHooks). Here you can find some instructions on how to enable this function. And also set Channel where you want to push messages

PutSlack processor properties

As a result, you should see all processors (except the one said) ready for the start.

NOTA BENE: NiFi GC Dataflow Job Runner processor allows parametrising of GC Dataflow job with the help of processor properties. Here we use source and destination one. But it will take an effect only if the parametrisation has been implemented in the template code. For this purpose ValueProviders are used within GC Dataflow job template code — see PipelineOptions.kt for the given example.

(5) Running Apache NiFi data flow

TAKE EXTRA CARE! Pay attention to a processor feeding NiFi GC Dataflow Job Runner with flow files. Dataflow Job Runner will launch a job at GCP (and it is not free!) as often as the feeder generates a flow file. That is why we highly recommend using a notification mechanism.

To test the flow we have used Abraham Lincoln___Lincoln’s Inaugurals, Addresses and Letters (Selections).txt (255.94 KB) from the Gutenberg corpus.

To start data flow just run all processors. When NiFi GC Dataflow Job Runner launches a job from a template you should receive a notification beginning with (so it is a fragment):

The Job has been started

When the job has finished you should receive the following message:

The Job has been finished

As a result, of build-vocabulary job processing (in our case it takes 2 min 38 sec with one worker) you should receive this kind of vocabulary (fragment):

Abraham Lincoln___Lincoln's Inaugurals, Addresses and Letters (Selections).txt/safeguard/1
Abraham Lincoln___Lincoln's Inaugurals, Addresses and Letters (Selections).txt/term/12

As a result of compute-lexical-diversity job processing(one worker, 3 min 0 sec) we receive:

Abraham Lincoln___Lincoln's Inaugurals, Addresses and Letters (Selections).txt TTR = 0.10835081829626522

Hint: To make ListGCSBucket repeat a bucket reading go to View state (from context menu) and choose Clear state.

datafabric

Knowledge engineering: Semantic web, Knowledge Graphs, Linked Data, Ontologies, Semantic Data Governance

Sergey Lebedev

Written by

datafabric

Knowledge engineering: Semantic web, Knowledge Graphs, Linked Data, Ontologies, Semantic Data Governance