Emoji trends on Twitter using Akka Streams

Árpád Tamási
samebug
Published in
10 min readNov 4, 2017

Are you curious how the world feels right now? I am, so we will implement a pretty simple sentiment analysis using emojis in tweets. It will tell if there are more 😃 and less 😢 in tweets now than usually so the world looks happy or we see more 🎄 in December.

We use Scala, sbt, Akka Streams, Guice and Redis. I use IntelliJ IDEA, but you are free to use your favorite IDE.

I plan to take this journey in two or three posts. I keep the application simple for the sake of readability but feel free to extend the app to know how your country, people speaking your language feel right now or use the words in tweets to implement a more sophisticated sentiment analysis.

The tested source code is available on GitHub. I might make typos here, so check whether it works. Feel free to clone or fork it.

One more thing before we start. This is overengineered. It can be done in a few lines. Don’t waste Akka Streams to increment Redis counters in real life. Use it to orchestrate processes with time consuming steps. My intent is to give you an architecture that you can use to solve real problems with Akka Streams. I just found counting emojis simple and exciting enough to keep your attention during this process.

Start IntelliJ IDEA, clickCreate New Project, then choose Scala and SBT. On the next screen change the name to twitter-trends, then click on Finish.

Open build.sbt, it should look like this:

name := "emoji-trends"version := "0.1"scalaVersion := "2.12.4"

We use

Add the following snippet to build.sbt, save it and let IntelliJ refresh the project.

libraryDependencies ++= Seq(
"org.twitter4j" % "twitter4j-stream" % "4.0.6",
"com.vdurmont" % "emoji-java" % "4.0.0",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"com.typesafe.scala-logging" %% "scala-logging" % "3.7.2",
"com.typesafe.akka" %% "akka-stream" % "2.5.6",
"com.typesafe.akka" %% "akka-slf4j" % "2.5.6",
"net.debasishg" %% "redisclient" % "3.4",
"net.codingwell" %% "scala-guice" % "4.1.0"
)

Our source directory is src/main/scala. The main package is com.acme.emojitrends.

Create a class named EmojiCounter in the stream package with the following content.

package com.acme.emojitrends.streamimport akka.actor.ActorSystem
import akka.stream.ActorMaterializer
class EmojiCounter {
private implicit val system = ActorSystem("EmojiTrends")
private implicit val materializer = ActorMaterializer()
private implicit val executionContext = system.dispatcher
}

We need the system and the materializer since Akka streams are materialized to Akka Actors. The execution context is needed for asynchronous execution in Scala.

Akka Streams are composed of processing stages. Imagine them as boxes that do something with the elements flowing on the stream. The linear stages are Source that emits,Flow that transforms and Sink that swallows elements. You can read more about them in Akka Streams docs.

We start by creating a source that will emit tweets, in Twitter API terminology Statuses.

Import akka.stream, akka.stream.actordsl._ and twitter.status.

private val overflowStrategy = OverflowStrategy.backpressure
private val bufferSize = 1000
private val statusSource = Source.queue[Status](
bufferSize,
overflowStrategy
)

There are many ways to create Sources, check them out in the api. Twitter4J sends tweets to a listener where we can put them on a queue. It accepts elements until the buffer gets full. Backpressure overflow strategy blocks until we get some space so we will not lose a single tweet. Read more about them in the docs.

Temporarily we connect the source to Sink.ignore to create the initial Graph. Computation graph is the description of a stream processing topology in Akka Streams terminology.

private val graph = statusSource.to(Sink.ignore)
private val queue = graph.run()

You can read about other Sinks in the doc, but Sink.ignore is the simplest and enough for us now.

This is our current graph:

Initial graph

Now we extend Twitter4J StatusAdapter and put the tweets on the queue.

override def onStatus(status: Status) =
Await.result(queue.offer(status), Duration.Inf)

Since we decided not to lose a single tweet (by using backpressure overflow strategy) we need to wait for the queue to accept our offer.

We need to make the stream visible to feel that we have done something. Akka Streams logging makes it pretty easy. It needs an implicit LoggingAdapter.

private implicit val LoggingAdapter = 
Logging(system, classOf[EmojiCounter])
private val graph = statusSource
.log("QUEUED", {status => status.getText})
.to(Sink.ignore)

Checkpoint: Now EmojiCounter is ready to receive tweets. It looks like this:

package com.acme.emojitrends.streamimport akka.actor.ActorSystem
import akka.event.Logging
import akka.stream._
import akka.stream.scaladsl._
import twitter4j.{Status, StatusAdapter}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
class EmojiCounter extends StatusAdapter {
private implicit val system = ActorSystem("EmojiTrends")
private implicit val materializer = ActorMaterializer()
private implicit val executionContext = system.dispatcher
private implicit val LoggingAdapter =
Logging(system, classOf[EmojiCounter])
private val overflowStrategy = OverflowStrategy.backpressure
private val bufferSize = 1000
private val statusSource = Source.queue[Status](
bufferSize,
overflowStrategy
)
private val graph = statusSource
.log("QUEUED", { status => status.getText })
.to(Sink.ignore)
private val queue = graph.run() override def onStatus(status: Status) =
Await.result(queue.offer(status), Duration.Inf)

}

We need an app to run it. Create an object named EmojiTrends in our main package and initialize a twitter stream. You need to create a Twitter application. To get the keys select Keys and access tokens tab, click Create my access token, copy the keys from there and replace the corresponding properties in the source below.

package com.acme.emojitrendsimport com.acme.emojitrends.stream.EmojiCounter
import com.typesafe.scalalogging.LazyLogging
import twitter4j.TwitterStreamFactory
import twitter4j.conf.ConfigurationBuilder
object EmojiTrends extends App {
val configuration = new ConfigurationBuilder()
.setOAuthConsumerKey("YOUR CONSUMER KEY")
.setOAuthConsumerSecret("YOUR CONSUMER SECRET")
.setOAuthAccessToken("YOUR ACCESS TOKEN")
.setOAuthAccessTokenSecret("YOUR TOKEN SECRET")
.build
val twitterStream = new TwitterStreamFactory(configuration)
.getInstance()
}

Create a counter and connect it to the stream.

val counter = new EmojiCounter
twitterStream.addListener(counter)

Let’s start the stream. We get just a sample of tweets, for more you need an enterprise account at Twitter.

twitterStream.sample()

Checkpoint: our app is ready to run and looks like this:

package com.acme.emojitrendsimport com.acme.emojitrends.stream.EmojiCounter
import com.typesafe.scalalogging.LazyLogging
import twitter4j.TwitterStreamFactory
import twitter4j.conf.ConfigurationBuilder
object EmojiTrends extends App with LazyLogging {
val configuration = new ConfigurationBuilder()
.setOAuthConsumerKey("YOUR CONSUMER KEY")
.setOAuthConsumerSecret("YOUR CONSUMER SECRET")
.setOAuthAccessToken("YOUR ACCESS TOKEN")
.setOAuthAccessTokenSecret("YOUR TOKEN SECRET")
.build
val twitterStream =
new TwitterStreamFactory(configuration)
.getInstance()
val counter = new EmojiCounter
twitterStream.addListener(counter)
twitterStream.sample()
}

One last step: we need to configure logging. Create the directory src/main/resources and add application.conf. This configures Akka to use slf4j logging and sets the log level to debug.

akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "debug"
}

Create logback.xml in the same directory.

<configuration>
<appender
name="console"
class="ch.qos.logback.core.ConsoleAppender">
<target>System.err</target>
<encoder>
<pattern>[%d{HH:mm:ss.SSS}] %-5level %msg%n</pattern>
</encoder>
</appender>
<logger name="com.acme.emojitrends" level="debug"/>
<root level="info">
<appender-ref ref="console"/>
</root>

</configuration>

Huh. Let’s run it.

Cool! We see tweets, emojis, so the parts are connected. Let’s tidy it up a bit before moving forward so we don’t get overwhelmed by a huge pile of spaghetti code.

Our sources contain secrets, constants and boilerplate code to initialize services. Typesafe configuration helps with the secrets and constants, dependency injection with the boilerplates. If you are familiar with Guice and Typesafe Config just skip to Count emojis and tweets.

Add the following content to application.conf. Replace the keys and secrets with your own. You can even leave them as they are and provide the values at runtime but I won’t cover that in this post.

emojiCounter {
bufferSize: 10000
}
twitter {
messageQueueSize: 10000
threadPoolSize: 10
oAuth: {
consumerKey: "YOUR CONSUMER KEY"
consumerSecret: "YOUR CONSUMER SECRET"
token: "YOUR TOKEN"
tokenSecret: "YOUR TOKEN SECRET"
}
}

We use Guice to keep the business logic readable by by getting rid of construction boilerplate code. Guice modules go to the guice package.

Create a Scala class for TwitterModule which provides the Twitter Stream that gives us the tweets. You can see here how to read configuration values. For more information check Typesafe Config docs.

Notice that it creates the stream the same way we created in the app, but reads the constants from the configuration.

package com.acme.emojitrendsimport javax.inject.Singletonimport com.google.inject.{AbstractModule, Provides}
import com.typesafe.config.Config
import net.codingwell.scalaguice.ScalaModule
import twitter4j.{TwitterStream, TwitterStreamFactory}
import twitter4j.conf.ConfigurationBuilder
class TwitterModule(config: Config) extends AbstractModule with ScalaModule {
override def configure(): Unit = {}
@Provides
@Singleton
def provideTwitterStream(): TwitterStream = {
val oAuthConf = config.getConfig("oAuth")
val configuration = new ConfigurationBuilder()
.setOAuthConsumerKey(oAuthConf.getString("consumerKey"))
.setOAuthConsumerSecret(oAuthConf.getString("consumerSecret"))
.setOAuthAccessToken(oAuthConf.getString("token"))
.setOAuthAccessTokenSecret(oAuthConf.getString("tokenSecret"))
.build
new TwitterStreamFactory(configuration).getInstance()
}
}

Create EmojiTrendsModule.

package com.acme.emojitrends.guiceimport com.google.inject.AbstractModule
import com.typesafe.config.ConfigFactory
import net.codingwell.scalaguice.ScalaModule
class EmojiTrendsModule extends AbstractModule with ScalaModule {
private lazy val config = ConfigFactory.load
override def configure(): Unit = {
install(new TwitterModule(config.getConfig("twitter")))
}
}

Let’s try it. Open EmojiTrends and replace twitter streams creation with this:

private val emojiTrendsModule = new EmojiTrendsModule
private val injector =
new ScalaInjector(Guice.createInjector(emojiTrendsModule))
private val twitterStream = injector.instance[TwitterStream]

Run it to check if everything works.

Let’s inject EmojiCounter. First create AkkaModule:

package com.acme.emojitrends.guiceimport javax.inject.Singletonimport akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, Materializer}
import com.google.inject.{AbstractModule, Provides}
import com.typesafe.config.Config
import net.codingwell.scalaguice.ScalaModule
import scala.concurrent.ExecutionContextclass AkkaModule(config: Config) extends AbstractModule with ScalaModule { private lazy val actorSystem = ActorSystem("EmojiTrends") override def configure(): Unit = {
bind[ActorSystem].toInstance(actorSystem)
}
@Singleton
@Provides
def provideExecutionContext(implicit system: ActorSystem) =
system.dispatcher
@Singleton
@Provides
def provideMaterializer(implicit system: ActorSystem) =
ActorMaterializer()

}

Notice how we can use injected values even in the provider methods.

Now changeEmojiCounter to get dependencies and confguration in the constructor .

Create a companion object for the configuration.

object EmojiCounter {
case class Configuration(bufferSize: Int)
}

Change the class header to this:

class EmojiCounter
(configuration: Configuration)
(implicit val materializer: Materializer,
executionContext: ExecutionContext,
loggingAdapter: LoggingAdapter

Checkpoint: it should look like this:

package com.acme.emojitrends.streamimport akka.event.LoggingAdapter
import akka.stream._
import akka.stream.scaladsl._
import com.acme.emojitrends.stream.EmojiCounter.Configuration
import twitter4j.{Status, StatusAdapter}
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext}
class EmojiCounter
(configuration: Configuration)
(implicit val materializer: Materializer,
executionContext: ExecutionContext,
loggingAdapter: LoggingAdapter
) extends StatusAdapter {

private val overflowStrategy = OverflowStrategy.backpressure
private val bufferSize = 1000
private val statusSource = Source.queue[Status](
bufferSize,
overflowStrategy
)
private val graph = statusSource
.log("QUEUED", { status => status.getText })
.to(Sink.ignore)
private val queue = graph.run() override def onStatus(status: Status) =
Await.result(queue.offer(status), Duration.Inf)

}
object EmojiCounter {
case class Configuration(bufferSize: Int)
}

Provide the counter in EmojiTrendsModule.

@Provides
@Singleton
def provideEmojiCounter
(implicit system: ActorSystem,
ec: ExecutionContext,
materializer: Materializer) = {
implicit val adapter: LoggingAdapter =
Logging(system, classOf[EmojiCounter])
val counterConfig = config.getConfig("emojiCounter")
val configuration = EmojiCounter.Configuration(
counterConfig.getInt("bufferSize")
)
new EmojiCounter(configuration)
}

Open EmojiTrends and replace the constructor call with the injected counter.

val counter = injector.instance[EmojiCounter]

Run it.

Now we have a stable, maintainable app, we can do what we came for. First extract tweets from emojis. Open EmojiCounter and import scala.collection.JavaConverters._

Create the extractEmojis flow. We can do it asynchronously filtering out tweets without emojis and log them. We add the flow to the graph.

private val extractEmojis = Flow[Status]
.map { status =>
EmojiParser
.extractEmojis(status.getText).asScala
}
.async
.filter { emojis =>
emojis.nonEmpty
}
.log("EMOJIS", { emojis => emojis reduce {_ + _} })
private val graph = statusSource
.via(extractEmojis)
.to(Sink.ignore)

Run it.

The log looks good, we can start counting.

Let’s connect Redis. Add the following lines to application.conf. Make sure that Redis is available.

redis {
host: "localhost"
port: 6379
database: 3
}

Create RedisModule.

package com.samebug.twitter.trends.guiceimport javax.inject.Singletonimport com.google.inject.{AbstractModule, Provides}
import com.redis.RedisClientPool
import com.typesafe.config.Config
import net.codingwell.scalaguice.ScalaModule
class RedisModule(config: Config) extends AbstractModule with ScalaModule {
override def configure(): Unit = {}
@Provides
@Singleton
def provideRedisClientPool(): RedisClientPool = {
new RedisClientPool(
host = config.getString("host"),
port = config.getInt("port"),
database = config.getInt("database")
)

}
}

Install it in TwitterTrendsModule.

override def configure(): Unit = {
install(new TwitterModule(config.getConfig("twitter")))
install(new AkkaModule(config.getConfig("akka")))
install(new RedisModule(config.getConfig("redis")))
}

Create RedisEmojiCounterDao in the persistence package. Notice the Inject annotation. It makes the class injectable without implementing a provider in the module.

package com.acme.emojitrends.persistenceimport javax.inject.Injectimport com.redis.RedisClientPoolclass RedisEmojiCounterDao @Inject()(pool: RedisClientPool) {
def incrementTweets: Option[Long] =
pool.withClient { r =>
r.incr("tweets")
}
def incrementEmoji(emoji: String): Option[Long] =
pool.withClient { r =>
r.incr(emojiKey(emoji))
}
def tweets: Int =
pool.withClient { r =>
r.get("tweets") map {_.toInt} getOrElse 0
}
def occurrences(emoji: String): Int =
pool.withClient { r =>
r.get(emojiKey(emoji)) map {_.toInt} getOrElse 0
}
private def emojiKey(emoji: String) =
s"emoji:$emoji"
}

Add the counter dao to EmojiCounter.

@Provides
@Singleton
def provideEmojiCounter(dao: RedisEmojiCounterDao)
(implicit system: ActorSystem,
ec: ExecutionContext,
materializer: Materializer): EmojiCounter = {
implicit val adapter: LoggingAdapter =
Logging(system, classOf[EmojiCounter])
val counterConfig = config.getConfig("emojiCounter")
val configuration = EmojiCounter.Configuration(
counterConfig.getInt("bufferSize")
)
new EmojiCounter(dao, configuration)
}

After we filtered out tweets that do not contain emojis, just increment the tweet counter.

private val incrementTweets = Flow[Seq[String]]
.map { emojis =>
dao.incrementTweets
emojis
}
.async

Add it to the graph.

private val graph = statusSource
.via(extractEmojis)
.via(incrementTweets)
.to(Sink.ignore)

Now we count tweets, let’s count the emojis, too. Flow.mapConcat is similar to flatMap, converts the lists result of the stage to single elements. We keep only distinct emojis within single tweets (treat ❤️ ❤️ ❤️ as ❤️). We log the emojis with stars that show how special it is (using inverse document frequency).

private val incrementEmojis = Flow[Seq[String]]
.mapConcat { emojis =>
emojis.distinct.toList
}
.async
.map { emoji =>
dao.incrementEmoji(emoji)
emoji
}
.log("IDF", {emoji =>
val idf = math.round(
math.log1p(dao.tweets.toDouble / dao.occurrences(emoji))
)

s"$emoji ${"*" * idf.toInt}"
})

Add it to the graph.

private val graph = statusSource
.via(extractEmojis)
.via(incrementTweets)
.via(incrementEmojis)
.to(Sink.ignore)

Run it.

Looks cool. It became a bit lengthy so I just let it run for a while to collect data. Then I will continue with comparing the long-term frequencies with the current hour. Follow me to get notified about that post.

--

--