Lolocode
Published in

Lolocode

Stream and Visualize Twitter Event Data with Serverless within a Few Minutes to Elasticsearch

I recently saw an article by Zhong Hongsheng called From Streaming Data to COVID-19 Twitter Analysis: Using AWS Lambda, Kinesis Firehose and Elasticsearch. I got inspired to test this out quickly with Lolo to cut the time it takes to make this work.

Although I am only gathering the amount of tweets, content and detecting language, I didn’t need more than a few minutes to get something like below up and running.

This approach will cut out AWS Kinesis Firehose, AWS S3 Bucket, AWS Lambda altogether to set up a continuous data stream from twitter, transform this data (if you want) and send it on to Elasticsearch where you can visualize it using Kabana. The performance is the same, and besides the cost of Elasticsearch it is free as well. You can use MongoDB Atlas and Charts instead of Elasticsearch to do the same thing for a lower cost.

I didn’t go to any great lengths to sort out the data needed to do proper analysis, as that it up to you. I’m merely visualizing the amount of tweets per second based on the keyword covid, to keep some consistency to the original article.

The picture below will pretty much be our workflow. We set up event streaming with the Twitter Streaming API via NPM, transform the data adding something like language detection (if you want) and then send it on to Elastic where we can quickly visualize it with Kabana. It might be good to add in a buffer as well, which is quick but unnecessary now as there won’t be that many events coming through.

Be aware that to use the Twitter Stream API you need elevated access to Twitter API v2 which isn’t hard to get but you do need to apply for it. You can do so here. It took a minute or so to get it approved.

Start by logging on to your Lolo account. If you don’t have a Lolo account you can get a free access code here. Create a new app and add a new function in the bottom right and name it Twitter Event. We’re doing this on our own. Add in the code below in the handler.

Obviously change out the keys from XXXX you’ll get from Twitter.

const Twit = require('twit');exports.setup = async ctx => {  const { log, produceEvent } = ctx;  const T = new Twit({
consumer_key: XXXX,
consumer_secret: XXXX,
access_token: XXXX,
access_token_secret: XXXX
});
const stream = T.stream('statuses/filter', { track: 'covid' }) stream.on('tweet', tweet => {
if (!tweet) return;
log.info(tweet);
produceEvent(tweet);
});
stream.on('error', err => log.error('error: %s', err.message)); stream.on('disconnect', () => log.warn('twitter disconnect'));}
exports.handler = async(ev, ctx) => { const { route } = ctx;
if (ev) route(ev);
else ctx.log.warn('route got null event');
};

Remember to add in the module twit in your app Modules to have it installed for you. If you get stuck, more info on adding modules here.

Next, create a new function in the bottom right again where you’ll transform this data to filter out all the unnecessary data that you don’t want inserted to your database. Here I am adding a language detection NPM just to do something else. I’m pretty sure the twitter event will give us some kind of language data that we could have used instead.

Remember to add the languagedetect NPM in your app modules if you want to do this as well.

const LanguageDetect = require('languagedetect');const lngDetector = new LanguageDetect();exports.handler = async(ev, ctx) => {  const { route } = ctx;  const res = lngDetector.detect(ev.text);
ev.language = res[0][0];
const date = new Date();
const item = {
createdAt:date,
twitter_text:ev.text,
description: ev.description,
language: ev.language
}
route(item);};

Link the two nodes (i.e. functions) together so this code above will run when a new twitter event happens. We are re-routing this data as well so we can use it in another node/function.

Create your last function and call it Elastic or Elasticsearch and add in the code below in your handler. Remember to get your elasticSearch cloud id, username and password to access your deployment.

If you are new to Elasticsearch you can create a trial account for free and then just follow the instructions. It is very user friendly.

const { Client } = require('@elastic/elasticsearch')const client = new Client({cloud: {
id: XXXX
},
auth: {
username: XXXX,
password: XXXX
}
});
exports.setup = async(ctx) => { const { log } = ctx; client.info().then(response => log.info(response)).catch(error => log.info(error))};
exports.handler = async(ev,ctx) => { const { log } = ctx; log.info("inserted") await client.index({
index: 'covid-tweets',
body: ev
})
await client.indices.refresh({index: 'covid-tweets'})}

Remember to add the @elastic/elasticsearch module as well in your app modules. You should have three in there now.

Aight. You’re done. You can just save and run your application and wait for those events to start streaming.

To visualize, navigate to Kabana in the ElasticSearch console for your deployment and Create new data view to add in your covid-tweets index that has been created for you.

Do what you will with that data. I just got the record count and counted the amount of tweets per language.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store