akka-streamのマイクロサービスへの適用について

はじめに

Nyle
Nyle Engineering Blog
18 min readDec 20, 2016

--

こんにちは皆さま師走の折、いかがお過ごしでしょうか。
クリスマスも年末年始も特に予定がなく、どう引きこもり生活を満喫しようか思案中の開発室はちわれです。先日、株式会社ネットマーケティング様、株式会社ヒトクセ様と一緒に合同イベントを行わせて頂きました。そのイベントで登壇させて頂いたので、今回はその時に発表させて頂いた「akka-streamのマイクロサービスへの適用」について記事を書かせて頂きます。
イベントについては、こちらの記事をご覧下さい。

akka-stream

akkaについては今までブログでakka-clusterやakka-persistentなど取り上げて来ましたが今回はakka-streamを取り上げ、マイクロサービスへのどう適用させるかを説明して行きたいと思います。まずは、akka-streamを使う上で知っておかなくてはならない基本的な概念について書いて行きます。

基本的な概念

どういう物か

Reactive StreamのAkka実装
(ノンブロッキングでback pressureが可能な非同期ストリーム処理の標準仕様)
http://www.reactive-streams.org/

Materializer

Materializeという単語は、日本語では「実現する」という意味になり、akka-streamではStreamを実行する環境を表します。

Source

データの元を表します。データの出力、発信を行う(Publisher)側になります。Sourceはあくまでデータ発信を行う側なので、インプットのチャンネルは持たず、アウトプットチャンネルをひとつだけ持っています。

Sink

台所のシンクと同じ意味のシンクです。SinkはStreamのデータの流れ着く先データの受信(Subscriber)側になります。Sinkはデータの受信だけを行うのでSourceとは反対にアウトプットのチャンネルを持たず、インプットのチャンネルをひとつだけ持っています。

Flow

データフローなどのフローと同じ意味で「流れ」を表します。Flowはインプットとアウトプットのチャンネルをひとつずつ持っています。実装の仕方により様々な使い方が出来ますが、よく見るのはSourceで取得したデータの編集やフィルタなどを行なってSinkに流すのが良く見る例かと思います。

上記の様にFlowとSourceを連結すれば連結した物が新しいSourceとなり、FlowとSinkを連結すれば新しいSinkとなります。そしてSource + Flow + Sinkと3つを連結した物をakka-streamではRunnableGraphと言います。

RunnableGraph

Graphはakka-streamのプロセスのトポロジーを表します。トポロジーは元々は位相幾何学のことですが、ここで言うトポロジーはネットワークトポロジーと同じ様に形態を表していると私は解釈しています。RunnableGraphも、Runnable(実行可能な)、Graph(図)という意味ですので、要はakka-streamの一連の処理の流れを形態を表す物になります。

BackPressure

SourceとSinkの説明でも、単語が出ていますがSourceとSinkはPub/SubメッセージングモデルのPublisherとSubscriberと同じ様な関係性となっています。これを非同期で行う時に、以下の様な問題が発生します。

問題1
PublisherがSubscriberにメッセージを投げすぎて、Subscriber側が溢れてしまう。

問題2
PublisherがSubscriberに遠慮してメッセージをあまり投げないと、Subscriberのリソースが余ってしまう。

これらの問題を解決するのがBackPressureです。BackPresureはSubscriberが自分が処理できる量をPublishserに伝えます。PublisherはSubscriberから伝えられた情報に合わせて、適切な量のメッセージをSubscriber側に送るpull-based-backpressureを採用しています。これによってSubscriberを溢れさせず、Subscriberのリソースを最大限に使って動作させることが可能となります。BackPressureについて詳しくはLgihtbend社の下記ページのスライドやakka公式ドキュメントを参照して下さい。
BackPressure自体はakka-streamが勝手に裏側で制御してくれている様なので、特に制御について意識する必要はなさそうです。

Understanding Akka Streams, Back Pressure and Asynchronous Architectures

akka公式ドキュメント

実装例とマイクロサービスへの適用

さて、今までの所で簡単ではありますがakka-streamの基本的な概念ついて説明できました。ここからは実装例を交えながら、akka-streamがどうマイクロサービスに適用できるのかを書いて行きたいと思います。

実装例

とても単純で簡単な画像クローリング「画像をクローリングして」、「サイズをチェックし」、「一定サイズ以上だけ移動する」構成は以下の様になっています。

使った物

  • akka-stream 2.4.8
  • akka-actor 2.4.8
  • scala-io-file 0.4.3
  • scala-io-core 0.4.3
  • scala-scraper 1.1.0
    scala-scraperはスクレーピングのライブラリです。Githubに公開されており、とても使い勝手が良いので興味のある方は下記リンクからどうぞ
    https://github.com/ruippeixotog/scala-scraper

Source スクレーピングサービス

import java.io.Fileimport net.ruippeixotog.scalascraper.browser.JsoupBrowser
import net.ruippeixotog.scalascraper.model.Element
import net.ruippeixotog.scalascraper.dsl.DSL._
import net.ruippeixotog.scalascraper.dsl.DSL.Extract._
import scala.concurrent.{ExecutionContext, Future}
import scalax.io.Resource
class ScrapeImpl {
val browser = JsoupBrowser()
var sequence = 1
implicit val ec: ExecutionContext = ExecutionContext.Implicits.global
/**
* urlのページをスクレープ
*/
def scrape(url: String) = Future {
val doc = browser.get(url)
val images = doc >?> elementList("img") //htmlからimgタグだけ抽出
images match {
case Some(l) => download(l)
case None => List.empty[File]
}
}
/**
* 画像ダウンロード
*/
def download(list: List[Element]) = {
for {
e <- list
if (e.attr("src").startsWith("http://") || e.attr("src").startsWith("https://"))
} yield {
save(e.attr("src"))
}
}
/**
* ダウンロードした画像を保存
*/
private def save(url: String) = {
val data = Resource.fromURL(url).byteArray
val file = new File("保存場所のディレクトリのパス"+ sequence)
sequence += 1
Resource.fromFile(file).write(data)
file
}
}

ここではscala-scraperを使って指定されたurlのページをスクレーピングしてimgタグだけ抽出し、ローカルに画像をダウンロードして保存しています。
難しいコードではないので見ていただければ大体何をしているか分かるとは思いますが、それぞれ簡単に説明させて頂きます。
scrapeでは引数で受け取ったurlのページをスクレーピングしてhtmlを取得しています。
browser.get(url)の部分がそれにあたります。たったこの1文でhtmlを取得できます。
今回は画像をダウンロードしたい為、imgタグだけが必要なのでhtmlの中からimgタグだけを抽出します。doc >?> elementList("img")の部分で行なっており、結果がListで返って来ます。imgタグが抽出できていた場合は、downloadを呼び出しimgタグの画像をダウンロードします。
downloadではscrapeから渡された、Element(htmlエレメント)のLsitから要素を取り出しimgタグのsrcがhttpかhttpsで始まっているかチェックしています。これはこの後説明するsaveの中で使っているscalaxのメソッドが画像のパスが「http://catcher-in-the-tech.net/hoge/image/fuga.jpg」の様にURLの形式でないと画像をダウンロードできない為、パスがhttpかhttpsで始まっているものだけに絞っています。最後にsaveでは画像をローカルに保存しています。今回ファイルを扱うのにはScala incubatorのscala-io-fileとscala-io-coreを使いました。使っても別に魔法少女にさせられたりはしないので安心してください。Resource.formURL(url).byteArrayでURLからArray[Byte]で画像データを取得し、空のFileオブジェクトを作って画像情報を書き込んで保存しています。Flow 画像サイズチェックサービスimport javax.imageio.ImageIO
import java.io.{File, IOException}
class ImageCheck {
def imageSizeCheck(file: File) = {
try {
val image = ImageIO.read(file)
image.getHeight() >= 100 && image.getWidth() >= 100 //100 x 100以上かチェック
} catch {
case e: IOException => false
}
}
}
次にFlowとなる画像サイズチェックです。こちらではSourceであるスクレーピングサービスから渡ってくるファイルのサイズが縦横がどちらも100pxをこえているか判定しています。
Sink 画像移動サービスimport java.io.File
import scala.sys.process._
class FileMove {
def moveImage(file: File) = {
val path = file.getPath()
Process("mv " + path + "移動先のディレクトリのパス" + file.getName).lineStream
}
}
Sinkでは移動対象となる画像を移動先のディレクトリに移しています。
ScalaでLinuxコマンドを実行したい時はscala.sys.process.Processを使います。
Streamimport akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent._
import java.io.File
class ScrapeStream {
implicit val system = ActorSystem("HogeSystem")
implicit val materializer = ActorMaterializer()
implicit val ec: ExecutionContext = ExecutionContext.Implicits.global
val targetUrls: List[String] = List (
// スクレーピングしたいページのURL
)
val scraper = new ScrapeImpl()
    
val source = Source(targetUrls).mapAsyncUnordered(3) { url =>
scraper.scrape(url)
}
val step = Flow[List[File]].mapAsyncUnordered(3) { files =>
Future (
for {
file <- files
if (imageSizeCheck(file))
} yield file
)
}
val sink = Sink.foreachParallel[List[File]](3) { files =>
files.map(file => moveImage(file))
}
val graph = source via step to sink  //source, flow, sinkを連結してgraphを作成。def run(): Unit = {
graph.run() //runメソッドで実行
}
}
ここからがakka-streamの実装部分です。今まで紹介してきたSource、Flow、Sinkで行う処理をそれぞれのインスタンスに設定していきます。
ますはSourceからです。SourceではスクレーピングするURLのリストからURLを並列に取り出し、並列にスクレーピング処理を実行しています。akka-streamではsourceの元となるデータがListなどのシーケンスであっても並列に要素を取り出してくれます。同じ要素を2回取得してしまう様なこともありません。その後mapAsyncUnorderdedでスクレーピングの処理を並列に実行します。mapAsyncUnorderdedの引数の数値はいくつまで並列に実行するかを指定します。この数値が多いほど並列で処理を行うので速度はあがりますが当然その分リソースを使うので、どれ位の数値を指定するかはシステムの要件やサーバーのリソースとの相談になるかと思います。また、mapAsyncUnorderdedの他にmapAsyncや通常のmapも用意されています。mapは並列で実行しないので並列に処理を行いたい場合は、mapAsyncかmapAsyncUnorderdedを使う必要があります。mapAsyncとmapAsyncUnorderdedの違いは順番を保証するか否かです。元となるデータの順序を保証したい場合はmapAsyncを使って下さい。
次にFlowです。FlowではSourceから渡ってきた画像ファイルのサイズチェックを実行しています。型パラメータにList[File]を指定していますがこの型を指定している理由は今回の実装例ではSourceから渡ってくるのがList[File]だからです。最終的にSourceとFlowをGraphにする際に連結しますが、連結する際はSourceの出力とFlowの入力の型が合っている必要があります。Sinkです。SinkではFlowでのサイズチェックの結果、縦横がどちらも100px以上の画像のListが渡ってくるので、その画像ファイルを移動する処理を実行します。
SinkにはmapAsyncやmapAsyncUnorderdedはないので代わりにforeachParallelを使っています。
Source、Flow、Sinkの用意ができました。しかしvalに設定しただけでは動きません。Source、Flow、Sinkは全て遅延評価の為、別途実行用のメソッドを呼び出す必要があります。今回はGraphを用意してGraphから実行するのでGraphを作っています。今まで用意したSource、Flow、Sinkを連結してGraphを作成します。SourceとFlowはviaメソッドで連結しFlowとSinkはtoメソッドで連結します。これでSource->Flow->Sinkの順に処理が行われデータが流れていきます。Graphを実行するにはrunメソッドを呼び出せば良いので最後にGraphのrunメソッドを実行するメソッドを実装しています。パフォーマンスakka-streamについての記事なので、やはりパフォーマンスには触れておきたいと思います。akka-streamについて調べると「n倍早くなった!!」という記事を見かけることが多いかと思います。今回の実装例で載せているコードはakka-streamを使っているコードですが、akka-streamを使っていないバージョンも作って比較した所akka-streamを使っていないバージョンは、スクレーピング 〜 画像移動 (画像数:150枚)までに30秒台前半という時間でしたが、akka-streamを使ったバージョンでは同様の処理が約3秒で終了し約10倍のパフォーマンスという歴然の差が出ました。この様な結果になるのは単純な話ですが、akka-streamを使ったバージョンでは並列で処理をしている為です。akka-streamを使わないバージョンは1スレッドで動くのに対し、akka-streamを使ったバージョンはSourceで3スレッド、Flowも3スレッド、Sinkも3スレッドで、「3×3×3 = 9」と全体を通して9スレッドで動くので約10倍の差が出ています。
マイクロサービスへの適用前項までがakka-streamの実装例となります。ここからどうマイクロサービスと関わってくるのだろうと思っている方も多いと思います。しかし実はここまでの所で簡単ではありますがマイクロサービス化を既に実現できています。もう一度実装例のコードを見て頂ければ分かるかと思いますが、最終的にakka-streamでSource、Flow、Sinkを連結させていますが、各機能のサービスはそれぞれ独立した一つのマイクロサービスとして成り立っています。Graphとする場合は入出力の型が合っている必要はありますが、そこさえ合っていれば内部の実装を変えてもお互いに何ら影響はありません。Sourceの部分をスクレーピングではなくCDNサーバーから画像を取得する処理に変えても良いし、Flowの部分を画像をサムネイル化する処理に変えても全く問題ありません。Sinkの処理だけ修正してSinkだけデプロイする様なことも可能です。この様にakka-streamに沿った実装をしただけで自然とマイクロサービス化が実現できてしまうのです。また今まで見て頂いた様に入出力させ合っていれば既存のマイクロサービスを簡単に連携させることが出来ます。(もちろんもっと細かいオプションの設定や、複雑な機能を実現しようとすれば難易度は上がります。) それ以外にも既存のモノリシックなサービスをマイクロサービス化したい時にも、akka-streamを使えば自然と機能が出力、編集、入力と分割される効果もあります。またDDD的な観点で言えば、Graphを使うことによってデータフローをそのまま実装に当てはめることが出来るので、ドメインに沿った自然な実装も同時に叶えてくれます。akka-streamは高いパフォーマンスだけでなくマイクロサービスというアーキテクチャも一緒にもたらしてくれるのです。

まとめ

最後に簡単にまとめさせて頂きます。
  • akka-streamを使うことによって、ノンブロッキングで高速な非同期処理を手軽に実装できる。
  • akka-streamに沿って疎結合を意識した実装を行えば自然にマイクロサービス化が行える。
  • 複数のマイクロサービス間の連携を手軽に実現できる。
  • モノリシックなサービスをマイクロサービスに分割することも出来る。
  • ドメインに沿った自然な流れで実装を行うことができる。
それでは皆さま、良いお年をお迎え下さい。

--

--