Using Apache Nifi to Load Tweets from Twitter API to MemSQL
In this post I will create data flow using Apache Nifi. This data flow get tweets from twitter and then load to table in MemSQL database.
So, let’s dig into creating database and table in MemSQL, creating data flow in Apache Nifi, creating Database Connection pooling service in Apache Nifi to connect to MemSQL.
Prerequisites
- MemSQL installed and configured (I used on premise single host)
- Apache Nifi installed and started (I used Cloudera flow management Apache Nifi 1.9.0.1 version)
- Twitter account Keys and Token. We can get it from app details when login from twitter developers site
Steps for creating database and tables in MemSQL
- Run MemSQL command from OS
2. Check existing databases and create testDB database
3. Create tweets_tab table. This table will contain tweet created date, tweet language and tweet text
Steps for creating Apache Nifi processor
We can load processors from Apache Nifi UI through below steps:
1. Navigate Apache Nifi UI at http://<hostip>:8080/nifi
2. Drag drop Processor icon to work area
3. Select needed processor then click Add
4. Change processors property by double click on processor then go to properties tab
The list of processors exists on Apache Nifi site ( https://nifi.apache.org/docs.html)
In Our example we used the following processors:
- GetTwitter
- EvaluateJsonPath
- AttributesToJSON
- ConvertJSONToSQL
- ExecuteSQL
- LogMessage
Steps to Load and change processors properties
- GetTwitter Processor: : This processor used to pull tweets from Twitter streaming API. We need to put Consumer Key, Consumer Secret, Access Token and Access Token Secret which we get from Twitter development site
2. EvaluateJsonPath processor: This processor used to evaluate JSON expressions that pull from Twitter then assign the result of those expressions to flow-file attributes
Relationships
The relationship between GetTwitter processor and EvaluateJsonPath processor is success
3. AttributesToJSON: This processor will create JSON file for flow-file attributes that created from previous processor
Relationships
The relationship between EvaluateJsonPath processor and AttributesToJSON processor is matched relationship. (I choice automatically terminate for failure and unmatched relationship options from EvaluateJsonPath setting)
4. ConvertJSONToSQL: This processor converts JSON file to DML statement. To use this processor, we need to create JDBC Connection Pool.
We will create JDBC Connection Pool that connect to MemSQL database. We create this pool by select “Create New Service” from drop down list on JDBC Connection pool property as:
When popup appear, we need to select “DBCPConnectionPool 1.9.0.1.0.0.0–90” and put name for this pool
After click create button, we need to click on arrow that appear on right and click yes
Then click on configure icon and configure connection to database
We must have “mysql-connector-java-5.1.47.jar” library and put the path of this jar file in Database Driver Location property
After click Apply button, we must click Enable icon
Now we need to go back to ConvertJSONToSQL processor properties tab and completer another property, we need to select INSERT option for statement type property and put MemSQL table “tweets_tab” in Table Name property
Relationships
The relationship between AttributesToJSON processor and convertJSONToSQL processor is success relationship (I choice automatically terminate for failure relationship options from AttributesToJSON setting)
5. ExecuteSQL processor: This processor used to execute insert statement come from ConvertJSONToSQL processor.
We need to select the same Connection pool that created on ConvertJSONToSQL for Database Connection Pooling Service property
Relationships
The relationship between ConvertJSONToSQL processor and ExecuteSQL processor is sql relationship (I choice automatically terminate for original relationship options from ConvertJSONToSQL setting).
6. LogMessage processor: This processor used to write Log message at specific level
Finally the processors relations will be like below
Steps to Run processors and check data in MemSQL table
The following video shown the processors configurations and how to start processors then check data in MemSQL table after loaded from Twitter API