Apache Flinkでリアルタイムストリーム処理を実装してみる

Akihito Tanaka
all worldly things are transitory.
37 min readFeb 5, 2017

Apache Flinkを使ったバッチ処理を作ってみてから大分経ってしまったが、今回はリアルタイムストリーム処理を作ってみようと思う。

いきなりコードをバーンっと出すのもアレだしナニなので、既存のBigDataを意識したストリーム処理ライブラリ(Spark Streaming)を振り返りつつ、Apache Flinkのリアルタイムストリーム処理用のライブラリの話から。

昨今のリアルタイムストリーム処理

Apahce Flinkは所謂BigDataと呼ばれる規模のデータをストリーム処理するためのフレームワークである。既存のHadoopSparkのようなフレームワークは大量のデータをいくつかの束に分けてガサッと分散処理する。Sparkなんかはリアルタイムでも使えると言われてはいるが、データの束をバッチ処理することには変わらない。1度に処理するデータ量にもよるが1レコードに対するレイテンシは早くても数秒という単位だと思っている。バッチ処理に使うのであれば全くそれで問題ない。

しかし、逐次流れてくるデータからリアルタイムでなんらかの情報を得たり、データを評価して反応を返したいとなると話は変わってくる。1つのデータに対するレイテンシは小さい方が良く、データを受け取ってから結果を得るまで数秒もかかっていては遅いケースもある。

例えば、流入してきたアクセスログをなんらかのマスターデータと紐づけてグルーピングして集計するようなケースを考えてみる。Spark Streamingを使うと次のような流れになるだろう。

  1. Mapperによるログのパース & マスターデータとの紐づけ
  2. グルーピングするためのshuffle
  3. Reducerによるグループ単位での集計

Spark Streamingは名前に Streaming とついてはいるものの実態はマイクロバッチである。バッチ処理の間隔を短くしてできる限りレイテンシを小さくすることで、ニア・リアルタイムに結果を得ることができる。Spark Streamingについて詳しく知っているわけではないので認識に間違いがあるかもしれないが、基本的な処理の流れは通常のRDDを使った処理と同じである。短時間の間に溜め込んだデータに対して一通り1.の処理を適用してshuffleのために一旦ストレージに処理後のデータを吐き出す。Reducerはストレージからshuffle後のShuffledRDDを構成し、3.の処理を適用する。3.の処理を実行している間に処理すべきデータを受け取っていたとしても、3.の処理が終了するまでは、そのデータに対する1.の処理を開始することはできない。例えば、1.の処理が始まってしまった直後に受け取ったデータに関しては1回のバッチ処理にかかる時間のおよそ2倍のレイテンシになってしまう。

前述したフレームワークとは異なり、Apache Flinkはデータの束をバッチ処理するのではなく流入してきたデータを逐次的にストリーム処理する。Flinkを使って前述のケースを実装すると、Sparkと同じ1.〜3.のような流れになる。一連の処理はshuffleのタイミングで分割され異なるプロセスで分散処理される。1.の実行を終えたデータは逐次的に当該グループに対する3.の処理を行うプロセスに送られ集計処理されることになる。この間に受け取ったデータはSpark Streamingのように待たされることはなく、逐次的に1.の処理が適用される。つまり、そのデータに対する「1.〜3.の処理を適用するのに必要な時間」 = 「レイテンシ」となる。もちろん処理の内容によってはバッチ処理した方が効率的なケースもあるだろうが、多くの場合はマイクロバッチ方式よりも1レコードに対するレイテンシは小さくなる。

SparkをHadoopのMapReduceをオンメモリで処理できるようにしたものと解釈するならば、FlinkはMapReduceをストリーム処理として実装したものと解釈できる。

また、リアルタイムストリーム処理を実現するフレームワークとして、既存のものにApache Stormがある。ご存知の方も多いと思うがStormはFlinkよりも前から開発されているフレームワークでTwitterでツイートのトレンド分析などにも用いられている。ストリームに対する処理を Boltと呼ばれるインタフェースを持ったクラスとして定義し、定義したBoltをつないで Topology と呼ばれるストリームの流れを定義する。実行時にはBoltの単位でプロセス/スレッドが生成され実行される。一連の処理をどのように分割するかは開発者に委ねられており、Flinkよりも自由度は高いように思う。ただし、ちょっとしたストリーム処理を実装したい場合でもBoltをクラスとして定義してTopologyとして繋ぐコードを書く必要がある。高階関数を使ってストリーム処理を定義できるFlinkと比べると書き味は劣ると言わざるを得ないが、JavaにLambda式が登場する以前に開発されたフレームワークなのでその点は仕方がない。

パフォーマンスに関しては米Yahoo!がStorm StreamingとFlink、Stormのベンチマークをブログに掲載している。もう1年以上前の記事なのでそれぞれ現在のバージョンで計測し直すと結果が変わってくる部分もあるかもしれないが、それぞれのスループットとレイテンシーを比較したグラフにマイクロバッチ vs ストリーム処理の結果が明確に現れていて、マイクロバッチのSpark Streamingは高いスループットを受け止めようと思ったら、そのぶんの大きなレイテンシーが必要になる。

というわけで、Apache Flinkはスケーラブルな低レイテンシー高スループットなリアルタイムストリーミング処理を今っぽい書き味で実装できるフレームワークなのである。

Flinkが標準でサポートしている外部システム

Flinkがデフォルトでサポートしている外部システムはv1.1の時点で以下のようなものがあり、それぞれreadまたはwriteするためのコネクターが提供されている。

今回はこの中でも手軽に実装できる Twitter Connectorを使ってストリーム処理をScalaで実装してみようと思う。

Twitterをデータソースにリアルタイムにストリーム処理を実装してみる

今回は取得したツイートを形態素解析して名詞を取り出し、出現回数をウィンドウ集計するアプリケーションを作る。集計結果は随時標準出力する。

1. DataStream APIとTwitter Connectorのセットアップ

FlinkのDataStreamAPIとTwitter Connectorを使うには、以下のようにbuild.sbtに依存を定義する。

libraryDependencies ++= Seq(
"org.apache.flink" % "flink-scala_2.11" % "1.1.3",
"org.apache.flink" % "flink-streaming-scala_2.11" % "1.1.3",
"org.apache.flink" % "flink-connector-twitter_2.11" % "1.1.3"
)

依存を取り込んだら、 TwitterSource を生成してストリーム処理のための StreamExecutionEnvironment に設定する。

val props = new Properties()
props.setProperty(TwitterSource.CONSUMER_KEY, "your-consumer-key")
props.setProperty(TwitterSource.CONSUMER_SECRET, "your-secret")
props.setProperty(TwitterSource.TOKEN, "your-token")
props.setProperty(TwitterSource.TOKEN_SECRET, "your-token-secret")
val twitterSource = new TwitterSource(props)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = env.addSource(twitterSource)

これでTwitter Streaming APIからツイートを取得できる。

試しに以下のようなコードを実行してみると、

import java.util.Properties
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.twitter.TwitterSource

object TweetPrinter extends App {
val props = new Properties()
props.setProperty(TwitterSource.CONSUMER_KEY, "your-consumer-key")
props.setProperty(TwitterSource.CONSUMER_SECRET, "your-secret")
props.setProperty(TwitterSource.TOKEN, "your-token")
props.setProperty(TwitterSource.TOKEN_SECRET, "your-token-secret")
val twitterSource = new TwitterSource(props)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = env.addSource(twitterSource)

dataStream.print()
env.execute("Twitter noun counter")
}

ツイートのJSONデータを順次取得できる。

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-809929248]
01/28/2017 15:29:48 Job execution switched to status RUNNING.
01/28/2017 15:29:48 Source: Custom Source(1/1) switched to SCHEDULED
01/28/2017 15:29:48 Source: Custom Source(1/1) switched to DEPLOYING
01/28/2017 15:29:49 Sink: Unnamed(1/4) switched to SCHEDULED
01/28/2017 15:29:49 Sink: Unnamed(1/4) switched to DEPLOYING
01/28/2017 15:29:49 Sink: Unnamed(2/4) switched to SCHEDULED
01/28/2017 15:29:49 Sink: Unnamed(2/4) switched to DEPLOYING
01/28/2017 15:29:49 Sink: Unnamed(3/4) switched to SCHEDULED
01/28/2017 15:29:49 Sink: Unnamed(3/4) switched to DEPLOYING
01/28/2017 15:29:49 Sink: Unnamed(4/4) switched to SCHEDULED
01/28/2017 15:29:49 Sink: Unnamed(4/4) switched to DEPLOYING
01/28/2017 15:29:49 Sink: Unnamed(2/4) switched to RUNNING
01/28/2017 15:29:49 Sink: Unnamed(1/4) switched to RUNNING
01/28/2017 15:29:49 Source: Custom Source(1/1) switched to RUNNING
01/28/2017 15:29:49 Sink: Unnamed(3/4) switched to RUNNING
01/28/2017 15:29:49 Sink: Unnamed(4/4) switched to RUNNING
1> {"delete":{"status":{"id":8429043580,"id_str":"8429043580","user_id":51476299,"user_id_str":"51476299"},"timestamp_ms":"1485585180704"}}
1> {"created_at":"Sat Jan 28 06:33:00 +0000 2017","id":825230176036364288,"id_str":"825230176036364288","text":"RT @thuna2525: \u660e\u65e5\uff01\uff01\uff01\n\u984c\u540d\u306e\u306a\u3044\u97f3\u697d\u4f1a\uff01\uff01\uff01\n\n\u30c9\u30e9\u30af\u30a8\uff01\uff01\uff01\uff01\uff01","source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":178701790,"id_str":"178701790","name":"U_MA","screen_name":"humanoid_uma","location":"\u65e5\u672c","url":null,"description":"28\u6b73\/\u30a2\u30eb\u30b3\u30fc\u30eb\u3088\u308a\u30ab\u30d5\u30a7\u30a4\u30f3\u597d\u304d\/\u897f\u5ddd\u8cb4\u6559\/\u677e\u5ca1\u5145\/\u76ca\u82e5\u3064\u3070\u3055\/\u30dd\u30eb\u30ce\/\u30d6\u30f3\u30b5\u30c6\/\u30ac\u30eb\u30cb\u30c7\/may'n\/supercell\/\u30ac\u30f3\u30c0\u30e0\/\u7279\u64ae\/3\u6708\u306e\u30e9\u30a4\u30aa\u30f3\/\u6cb3\u4e0b\u6c34\u5e0c\/\u7a2e\u6751\u6709\u83dc\/\u30d0\u30c9\u30df\u30f3\u30c8\u30f3\/\u8266\u3053\u308c\/GOD EATER\/\u30c0\u30f3\u30ac\u30f3\u30ed\u30f3\u30d1\/\u4e0a\u91ce\/\u795e\u7530\/\u79cb\u8449\u539f","protected":false,"verified":false,"followers_count":155,"friends_count":267,"listed_count":5,"favourites_count":19488,"statuses_count":31104,"created_at":"Sun Aug 15 13:17:07 +0000 2010","utc_offset":32400,"time_zone":"Tokyo","geo_enabled":false,"lang":"ja","contributors_enabled":false,"is_translator":false,"profile_background_color":"131516","profile_background_image_url":"http:\/\/abs.twimg.com\/images\/themes\/theme14\/bg.gif","profile_background_image_url_https":"https:\/\/abs.twimg.com\/images\/themes\/theme14\/bg.gif","profile_background_tile":true,"profile_link_color":"009999","profile_sidebar_border_color":"EEEEEE","profile_sidebar_fill_color":"EFEFEF","profile_text_color":"333333","profile_use_background_image":true,"profile_image_url":"http:\/\/pbs.twimg.com\/profile_images\/824373704557678592\/_Q5fXgkf_normal.jpg","profile_image_url_https":"https:\/\/pbs.twimg.com\/profile_images\/824373704557678592\/_Q5fXgkf_normal.jpg","profile_banner_url":"https:\/\/pbs.twimg.com\/profile_banners\/178701790\/1485385795","default_profile":false,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"retweeted_status":{"created_at":"Sat Jan 28 06:12:58 +0000 2017","id":825225132046049281,"id_str":"825225132046049281","text":"\u660e\u65e5\uff01\uff01\uff01\n\u984c\u540d\u306e\u306a\u3044\u97f3\u697d\u4f1a\uff01\uff01\uff01\n\n\u30c9\u30e9\u30af\u30a8\uff01\uff01\uff01\uff01\uff01","source":"\u003ca href=\"http:\/\/twitter.com\/download\/iphone\" rel=\"nofollow\"\u003eTwitter for iPhone\u003c\/a\u003e","truncated":false,"in_reply_to_status_id":null,"in_reply_to_status_id_str":null,"in_reply_to_user_id":null,"in_reply_to_user_id_str":null,"in_reply_to_screen_name":null,"user":{"id":2224527547,"id_str":"2224527547","name":"\u30c4\u30ca\u30b5\u30f3\u30c9","screen_name":"thuna2525","location":null,"url":null,"description":"\u6b4c\u3063\u305f\u308a\u30b2\u30fc\u30e0\u3057\u305f\u308a\u304a\u8a71\u3057\u3057\u305f\u308a\u3002 \u30d5\u30a9\u30ed\u30d0\u306f\u307b\u307c\u3057\u307e\u305b\u3093\u3002","protected":false,"verified":false,"followers_count":82,"friends_count":56,"listed_count":4,"favourites_count":442,"statuses_count":7222,"created_at":"Sun Dec 01 08:59:20 +0000 2013","utc_offset":32400,"time_zone":"Seoul","geo_enabled":false,"lang":"ja","contributors_enabled":false,"is_translator":false,"profile_background_color":"642D8B","profile_background_image_url":"http:\/\/abs.twimg.com\/images\/themes\/theme10\/bg.gif","profile_background_image_url_https":"https:\/\/abs.twimg.com\/images\/themes\/theme10\/bg.gif","profile_background_tile":true,"profile_link_color":"FA743E","profile_sidebar_border_color":"000000","profile_sidebar_fill_color":"000000","profile_text_color":"000000","profile_use_background_image":true,"profile_image_url":"http:\/\/pbs.twimg.com\/profile_images\/755566882040651776\/yILemR6L_normal.jpg","profile_image_url_https":"https:\/\/pbs.twimg.com\/profile_images\/755566882040651776\/yILemR6L_normal.jpg","profile_banner_url":"https:\/\/pbs.twimg.com\/profile_banners\/2224527547\/1459445007","default_profile":false,"default_profile_image":false,"following":null,"follow_request_sent":null,"notifications":null},"geo":null,"coordinates":null,"place":null,"contributors":null,"is_quote_status":false,"retweet_count":2,"favorite_count":1,"entities":{"hashtags":[],"urls":[],"user_mentions":[],"symbols":[]},"favorited":false,"retweeted":false,"filter_level":"low","lang":"ja"},"is_quote_status":false,"retweet_count":0,"favorite_count":0,"entities":{"hashtags":[],"urls":[],"user_mentions":[{"screen_name":"thuna2525","name":"\u30c4\u30ca\u30b5\u30f3\u30c9","id":2224527547,"id_str":"2224527547","indices":[3,13]}],"symbols":[]},"favorited":false,"retweeted":false,"filter_level":"low","lang":"ja","timestamp_ms":"1485585180659"}

ツイートを削除したイベントなんかも含まれているので適当にフィルタリングする必要がある。ツイートの本文はJSON中のメンバー text に格納されている。

2. Twitter Streaming APIから取得したデータからツイートのみを取り出す

そのままだと余計なデータが多分に含まれているので、まずはJSONからツイート本文を取り出す。そのためにはまずJSONをパースしなければならない。今回は定番のJacksonを使ってパースする。

まずは、Jacksonの依存をbuild.sbtに追加する。

libraryDependencies ++= Seq(
"org.apache.flink" % "flink-scala_2.11" % "1.1.3",
"org.apache.flink" % "flink-streaming-scala_2.11" % "1.1.3",
"org.apache.flink" % "flink-connector-twitter_2.11" % "1.1.3",
"com.fasterxml.jackson.core" % "jackson-core" % "2.8.5",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.8.5",
"com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.4"
)

ツイートのみ取り出す処理はこんな感じで書ける。

val tweetStream = dataStream.map { jsonStr =>
val jsonParser = new ObjectMapper()
jsonParser.readValue(jsonStr, classOf[JsonNode])
}.filter(_.has("test")).map(_.get("text").asText())

tweetStream.print()

はじめのmapで毎回ObjectMapperを生成しているのが気になるがとりあえずそこは気にしない。

実行してみるとこんな感じで様々な国の方のツイートが取得できる。

1> 律「日本人なら米食え!米ぇ!」
#keionbubot
3> 高ぶり
4> RT @brightas_sun: @brightas_sun 너무 무력감이 느껴졌다 이 가부장적인 집안에 나는 오로지 엄마를 돕기위해 시골에 내려온건데. 당사자는 벗어날의지도없고 오히려 이집안 최약자인 나를 탓하는구나.
1> @cowandcanary film: "This is how humans evolved from apes" me, screaming the moment they mention evolution, this is my town linguists????
1> RT @CHANBAEK_FAMILY: real__pcy มากดส่งหัวใจในไลฟ์ของbaekhyunee_exo ด้วยอ่ะ 🙈🙈🙈
(cr; 412_se_hun) https://t.co/eL1lhcrJOC
1> RT @NamziiNamzii: อันฟอลได้ กูช่วยชาติอยู่ 🔴⚪️🔵⚪️🔴
#MissUniverse #Thailand
#MissUniverse #Thailand
#MissUniverse #Thailand
#MissUniverse #…
4> RT @TransforiKON: บ๊อบบี้: เป็นฮยองก็จริงนะ รักฮยองมากก็จริง แต่ฮยองบ้าอะ https://t.co/0X5XlRkzlV
4> @_sophiacuison Saan ?
1> RT @fiatnnp: The Crown for Charita MissThailand.
#MissUniverse
#Thailand https://t.co/BRV5K3PuYv

3. 日本語のツイートを形態素解析する

様々な国の言葉が混ざった状態なのでこの中から日本語のツイートを絞り込み、手軽に使える日本語の形態素解析ライブラリ “kuromoji”を使って名詞を取り出す。

まずは日本語のツイートを絞り込む。少々雑だがひらがなが含まれていれば日本語のツイートとする。

val japaneseTweets = tweetStream.filter(_.matches("^.*[\\u3040-\\u3096]+.*$"))

こんな感じで日本語に絞り込まれていることがわかる。

1> キャスのタイトルに女の子って入れて詐欺しまくり
2> ちゅるりらちゅるりらだっだっだっฅ( ̳• ·̫ • ̳ฅ)
2> なんか活活2馬安定な気がしてきた
4> とっとと料理上手な人と結婚して幸せになってほしい
1> ベリンボロならった。オーソドックスなヤツはもはややらないのね。それだけでも勉強になったぞ。
1> @jnt_7_oO みるべき
2> Jamiroquai - Automaton 新曲きたー https://t.co/YJ6wo0I1vg
1> RT @akagi_miria_bot: 今度こそゲットしてね♪ https://t.co/0Ug1dDOSOT

このままだとReplyやRT、リンクなんかも含まれてしまっているので、スペースで分割してそれらを取り除く。

val filtered = japaneseTweets.flatMap(_.split(" ")).filter(sentence => !sentence.contains("RT") && !sentence.contains("http") && !sentence.contains("@"))

RT等が含まれていたツイートが概ね日本語の文章のみになる。時々無残に破壊された顔文字や絵文字が含まれているっぽいけど気にしない。

3> 【大雪】「除雪してるのか」自治体に苦情殺到 鳥取大雪でこぼこ道路なぜ[H29/1/26]
4> これ見るとほんと初期にログボ骨にしてって嘆いてたのは正しかった
2> カメラで撮ってもボケるのが😭😭😭
1> 3人で長谷津観光~
4> よう兄弟!って話かけないと(
4> ^ω^
4> )

いよいよkuromojiをつかって形態素解析する。com.atilika.kuromoji.ipadic.Tokenizerクラスの tokenize(String text) を使うことでツイートを形態素に分解できる。

// トークンに分割する
val tokenized = filtered.flatMap { jt =>
val tokenizer = new Tokenizer()
tokenizer.tokenize(jt)
}

出力してみるとこんな感じ。

1> Token{surface='工学部', position=0, type=KNOWN, dictionary=com.atilika.kuromoji.dict.TokenInfoDictionary@33886b61, wordId=51631}
1> Token{surface='とか', position=3, type=KNOWN, dictionary=com.atilika.kuromoji.dict.TokenInfoDictionary@2e3e1ba4, wordId=259468}
1> Token{surface='理学部', position=5, type=KNOWN, dictionary=com.atilika.kuromoji.dict.TokenInfoDictionary@2d7f9414, wordId=83970}
1> Token{surface='の', position=8, type=KNOWN, dictionary=com.atilika.kuromoji.dict.TokenInfoDictionary@174fd01c, wordId=259536}
1> Token{surface='講義', position=9, type=KNOWN, dictionary=com.atilika.kuromoji.dict.TokenInfoDictionary@1ff06379, wordId=251153}
1> Token{surface='を', position=11, type=KNOWN, dictionary=com.atilika.kuromoji.dict.TokenInfoDictionary@44da6a84, wordId=259514}
3> Token{surface='サードパーティークライアント', position=0, type=UNKNOWN, dictionary=com.atilika.kuromoji.dict.UnknownDictionary@a5b8dea, wordId=23}
1> Token{surface='他', position=12, type=KNOWN, dictionary=com.atilika.kuromoji.dict.TokenInfoDictionary@17a61e7e, wordId=259615}
1> Token{surface='学部', position=13, type=KNOWN, dictionary=com.atilika.kuromoji.dict.TokenInfoDictionary@9e433b3, wordId=43235}

kuromojiは形態素解析する際にはライブラリに内包している辞書に存在している語句をベースに分解するので辞書に存在しない言葉(上記サンプルの場合、「他学部」)は意図しない位置で分割されてしまう。が、気にしない。

トークンに分割され品詞情報が取得できるようになったので、ここで名詞に絞り込んで語句を取り出す。

val nouns = tokenized.filter(_.getPartOfSpeechLevel1 == "名詞").map(_.getSurface)

するとこんな感じになる。

4> 家
4> 今日
2> 春野
1> ホテル
1> 街
3> ✨
3> 夜
1> 石鹸
3> 久
2> (*>∇<)
1> 匂い
3> さん
2> ノ
3> ✨
3> !!!!
3> 綺麗
1> 有名人
1> 著名
1> 人
4> Eplus

絵文字とか、記号とか顔文字とか微妙な感じのデータも残っていることを気にしなければ、ここまでで日本語のツイートを形態素解析して名詞を取り出せたことになる。

4. 単語をWindow集計する

取り出した名詞が一定時間あたりにどの程度ツイートの中に出現するかWindow集計してみる。今回は5分のwindow幅で1分単位でスライドしながら集計してみる。

実際に実装してみると以下のようになる。

 val summary = nouns.map((_, 1))
.keyBy(0)
.timeWindow(Time.minutes(15), Time.minutes(1))
.sum(1)
.filter(_._2 > 5)

nounsに対する最初の map では後続の処理で sum するために 1 と組み合わせてタプルを作る。

次の keyBy はストリームを指定したキーでパーティション分割する。キーとする項目のインデックスまたはキー名を指定する。今回は抽出した名詞をキーとしたいので、インデックス 0を指定する。

timeWindow でウィンドウ集計のためのウィンドウを指定する。第1引数にはウィンドウの時間幅、第2引数にウィンドウのスライド間隔を指定する。上記コードでは15分のウィンドウ幅で1分おきに集計範囲をスライドさせることを指定している。

その次の sum でウィンドウの時間幅内のデータについて集計を行っている。引数には集計に使うデータのインデックスを指定する。タプルの2要素目が集計に用いられる値となる。0始まりで数えるため、1を指定している。

最後のfilterは集計結果が5より大きいもののみを抽出している。

実際に実行してみると、次のような結果を得ることができる。

1> (トランプ,14)
2> (中,47)
1> (うち,12)
2> (県,7)
1> (イラスト,6)
2> (`,9)
1> (A,13)
2> (間,8)
2> (動画,18)
1> (気,50)
2> (反応,8)
1> (設定,9)
2> (人間,10)
2> (安定,6)
1> (以外,10)
2> (向け,6)
1> (一緒,13)
2> (もの,38)
1> (T,6)
2> (是非,10)
1> (無理,13)
1> (戦,11)
2> (画像,13)
1> (話,35)
2> (気軽,6)
1> (30,9)
2> (や,6)
2> (さん,149)
1> (...,48)
2> (ポイント,10)
2> (仕事,21)
2> (4,12)
2> (昨日,7)
2> (ちょ,6)
2> (人,120)
2> (たくさん,6)

これは集計結果の一部であるが、“さん”や“人”などツイートの内容によらず多くのツイートで共通して出現しそうなワードが当然頻出していることがわかる。トレンドワード等を調べたいのであればこのままではダメで、TF-IDF等を使ってツイートの特徴量をとらえた上でカウントする必要がある。

最後に今回のコードの全容を掲載しておく。

build.sbt

name := "flink-sample"

version := "1.0"

scalaVersion := "2.11.7"

libraryDependencies ++= Seq(
"org.apache.flink" % "flink-scala_2.11" % "1.1.3",
"org.apache.flink" % "flink-streaming-scala_2.11" % "1.1.3",
"org.apache.flink" % "flink-connector-twitter_2.11" % "1.1.3",
"com.fasterxml.jackson.core" % "jackson-core" % "2.8.5",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.8.5",
"com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.4",
"com.atilika.kuromoji" % "kuromoji-ipadic" % "0.9.0"
)

TwitterNounCounter.scala

import java.util.Properties

import com.atilika.kuromoji.ipadic.Tokenizer
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.twitter.TwitterSource

import scala.collection.JavaConversions._

object TwitterNounCounter extends App {
val props = new Properties()
props.setProperty(TwitterSource.CONSUMER_KEY, "your_consumer_key")
props.setProperty(TwitterSource.CONSUMER_SECRET, "your_secret")
props.setProperty(TwitterSource.TOKEN, "your_token")
props.setProperty(TwitterSource.TOKEN_SECRET, "your_token_secret")
val twitterSource = new TwitterSource(props)
// Source
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dataStream = env.addSource(twitterSource)

// ツイートを取り出す
val tweetStream = dataStream.map { jsonStr =>
val jsonParser = new ObjectMapper()
jsonParser.readValue(jsonStr, classOf[JsonNode])
}.filter(_.has("text")).map(_.get("text").asText())

// ひらがなが含まれていれば日本語のツイートとみなす
val japaneseTweets = tweetStream.filter(_.matches("^.*[\\u3040-\\u3096]+.*$"))

// RT, Reply, リンクを取り除く
val filtered = japaneseTweets.flatMap(_.split(" "))
.filter(sentence => !sentence.startsWith("RT") && !sentence.contains("http") && !sentence.startsWith("@"))

// トークンに分割する
val tokenized = filtered.flatMap { jt =>
val tokenizer = new Tokenizer()
tokenizer.tokenize(jt)
}

// 名詞のみに絞り込む
val nouns = tokenized.filter(token => token.getPartOfSpeechLevel1 == "名詞").map(_.getSurface)

// window集計する
val summary = nouns.map((_, 1))
.keyBy(0)
.timeWindow(Time.minutes(15), Time.minutes(1))
.sum(1)
.filter(_._2 > 5)
summary.print()

env.execute("Twitter noun counter")

}

--

--