Using Apache Nifi to Load Tweets from Twitter API to MemSQL

mohammad H. AbdelQader
5 min readMay 1, 2019

--

In this post I will create data flow using Apache Nifi. This data flow get tweets from twitter and then load to table in MemSQL database.

So, let’s dig into creating database and table in MemSQL, creating data flow in Apache Nifi, creating Database Connection pooling service in Apache Nifi to connect to MemSQL.

Prerequisites

  • MemSQL installed and configured (I used on premise single host)
  • Apache Nifi installed and started (I used Cloudera flow management Apache Nifi 1.9.0.1 version)
  • Twitter account Keys and Token. We can get it from app details when login from twitter developers site

Steps for creating database and tables in MemSQL

  1. Run MemSQL command from OS

2. Check existing databases and create testDB database

3. Create tweets_tab table. This table will contain tweet created date, tweet language and tweet text

Steps for creating Apache Nifi processor

We can load processors from Apache Nifi UI through below steps:

1. Navigate Apache Nifi UI at http://<hostip>:8080/nifi

2. Drag drop Processor icon to work area

3. Select needed processor then click Add

4. Change processors property by double click on processor then go to properties tab

The list of processors exists on Apache Nifi site ( https://nifi.apache.org/docs.html)

In Our example we used the following processors:

  • GetTwitter
  • EvaluateJsonPath
  • AttributesToJSON
  • ConvertJSONToSQL
  • ExecuteSQL
  • LogMessage

Steps to Load and change processors properties

  1. GetTwitter Processor: : This processor used to pull tweets from Twitter streaming API. We need to put Consumer Key, Consumer Secret, Access Token and Access Token Secret which we get from Twitter development site

2. EvaluateJsonPath processor: This processor used to evaluate JSON expressions that pull from Twitter then assign the result of those expressions to flow-file attributes

Relationships

The relationship between GetTwitter processor and EvaluateJsonPath processor is success

3. AttributesToJSON: This processor will create JSON file for flow-file attributes that created from previous processor

Relationships

The relationship between EvaluateJsonPath processor and AttributesToJSON processor is matched relationship. (I choice automatically terminate for failure and unmatched relationship options from EvaluateJsonPath setting)

4. ConvertJSONToSQL: This processor converts JSON file to DML statement. To use this processor, we need to create JDBC Connection Pool.

We will create JDBC Connection Pool that connect to MemSQL database. We create this pool by select “Create New Service” from drop down list on JDBC Connection pool property as:

When popup appear, we need to select “DBCPConnectionPool 1.9.0.1.0.0.0–90” and put name for this pool

After click create button, we need to click on arrow that appear on right and click yes

Then click on configure icon and configure connection to database

We must have “mysql-connector-java-5.1.47.jar” library and put the path of this jar file in Database Driver Location property

After click Apply button, we must click Enable icon

Now we need to go back to ConvertJSONToSQL processor properties tab and completer another property, we need to select INSERT option for statement type property and put MemSQL table “tweets_tab” in Table Name property

Relationships

The relationship between AttributesToJSON processor and convertJSONToSQL processor is success relationship (I choice automatically terminate for failure relationship options from AttributesToJSON setting)

5. ExecuteSQL processor: This processor used to execute insert statement come from ConvertJSONToSQL processor.

We need to select the same Connection pool that created on ConvertJSONToSQL for Database Connection Pooling Service property

Relationships

The relationship between ConvertJSONToSQL processor and ExecuteSQL processor is sql relationship (I choice automatically terminate for original relationship options from ConvertJSONToSQL setting).

6. LogMessage processor: This processor used to write Log message at specific level

Finally the processors relations will be like below

Steps to Run processors and check data in MemSQL table

The following video shown the processors configurations and how to start processors then check data in MemSQL table after loaded from Twitter API

References

--

--