Publish events to PubSub Topics using Apache Ni-Fi

Apache Nifi is a real time event processing platform which can be used for data movement between disparate systems .

NiFi can also be used to connect to cloud services such as AWS,Google cloud or Microsoft Azure using the built in processors for moving data across the cloud platforms.

Following story is about publishing the real time events to google cloud pub-sub whenever any new object is created in the google cloud storage bucket .

PubSub is a publish and subscribe platform on google cloud which can be used for event processing or can be used as data communication channel between the systems just like other messaging systems .

Following are the steps which can be used to build a workflow in order to publish the OBJECT_FINALIZE event of google cloud storage to a pub-sub topic.

Use Case :

Select data from hive table , convert the file attributes like file name ,file format (Avro/Csv) and write the file to GCS storage bucket and send notification to pub sub topic with message attributes like (filename , table name,row count,file creation date,etc)

  1. Create a topic in Google cloud pub sub using your
  2. Create a NiFi flow by dragging the following processors to the nifi canvas


3.Add the required controller configurations :

a)hivecontroller service to enable connection to your hive server and database.

b)GCSController service to enable connection to your google cloud storage instance

4)Download the custom pubsub processor code from the below and build the nar package and add it to nifi lib folder

5)Add the above processor to the canvas and connect it to the putgcsobject processor . Add the topic name and project configuration in the configurations tab

Nifi PubSub flow

The above flow will publish the Json event to the pubsub topic mentioned in the GCPPubSubPublisher processor with the metadata of the file written in storage bucket . (It will not send the file contents to the pubsub instead it will just send and event notification that there is a new file or object available in the bucket for consumption or processing)