一瞬で作るストリームデータ収集基盤

yamagenii
9 min readDec 10, 2020

--

この記事は Voicy Advent Calendar 2020 の 10日目の記事です。

前日は yuji0316さんの Goのジェネリクス先取り入門1でした。様々な議論の果てについにジェネリクスが搭載されますね。Goのプログラミングのスタイルがどう変化するのでしょうか。

ログ分析基盤でのストリームの必要性

今回はデータ分析基盤内の、ログ収集基盤の話です。
現在ではアプリケーションのログをリアルタイムで収集して分析するニーズが増加してきて
「ラムダアーキテクチャ」「カッパアーキテクチャ 」「データフローモデル」
という形でよりリアルタイムで効率的に処理できるアーキテクチャへと技術トレンドが変遷してきました。
特にデータフローモデルは、ストリーミング処理とバッチ処理の上位互換とみなして、処理を行うことに特徴があります。

ストリーミング処理の中でバッチ処理を扱えるようにしたプログラミングモデルとして設計されているのがApacheBeamであり、ApacheBeamの分散処理バックエンド一つとしてGCPのDataflowが存在します。

上記のアーキテクチャを考慮した基盤をパブリッククラウドならサーバレスで実現できます。
また今はIaC(Infrastructure as Code)も発展しているので、コードに起こしておくことでよりポータビリティが高く実現できます。

ということで今日はGCP環境とIaCを活用することで簡単なストリームデータ基盤を1コマンドで立ち上げてみましょう。

今回作るリアルタイムログ収集基盤の概要

自分のキーボード入力をリアルタイムにBigqueryにインサートする基盤

簡単ですね!

こちらは単純な例ですが、PubSubとDataflowを組み合わせることで、柔軟なストリームパイプラインを作ることができます。これもIaCで定義することができるので、誰が構築しても同じものを簡単に作れます。

こちらはメルカリさんが構築したストリームパイプラインです。

またdataflowには加工のないような単純なデータパイプラインにはテンプレートが提供されています。それをありがたく使用させてもらいましょう。

使用する技術

GCP
- PubSub
- Dataflow
- BigQuery

IaC
- Terraform v0.12.29

Terraformのコード

テンプレートも提供されていて、アーキテクチャが決まっていればあとはコード化するだけで、サーバレスなインフラが簡単に構築できます。

説明は不要で、「以下に全て書いてあります」、というのがIaCの良いところですね。

# credentialとgcp_projectは変更してください。
terraform {
required_version = "= 0.12.29"
backend "gcs" {
bucket = "gcp_project"
prefix = "terraform/processor"
credentials = "credential_file_path"
}
}
locals {
gcp_credentials_file = "credential_file_path"
gcp_project = "gcp_project"
gcp_region = "asia-northeast1"
}
# pubsubresource "google_pubsub_topic" "log-stream-topic" {
name = "log-stream"
}
resource "google_pubsub_subscription" "bq-stream-sub" {
name = "bq-stream"
topic = google_pubsub_topic.log-stream-topic.name
message_retention_duration = "1200s"
retain_acked_messages = true
ack_deadline_seconds = 20expiration_policy {
ttl = "300000.5s"
}
retry_policy {
minimum_backoff = "10s"
}
enable_message_ordering = false
}
# dataflowresource "google_dataflow_job" "key-log-stream" {
name = "key-log-stream"
template_gcs_path = "gs://dataflow-templates-asia-northeast1/latest/PubSub_Subscription_to_BigQuery"
temp_gcs_location = "gs://${google_storage_bucket.stream-processor-bucket.name}/dataflow"
region = local.gcp_region
parameters = {
inputSubscription = google_pubsub_subscription.bq-stream-sub.id
outputTableSpec = "${local.gcp_project}:${google_bigquery_dataset.stream_datalake.dataset_id}.${google_bigquery_table.default.table_id}"
}
on_delete = "cancel"
}
resource "google_storage_bucket" "stream-processor-bucket" {
name = "yamagenii-stream-processor"
location = "asia-northeast1"
}
# Bigqueryresource "google_bigquery_dataset" "stream_datalake" {
dataset_id = "stream_datalake"
description = "stream log dataset"
location = "asia-northeast1"
}
resource "google_bigquery_table" "default" {
dataset_id = google_bigquery_dataset.stream_datalake.dataset_id
table_id = "key_stream"
time_partitioning {
type = "DAY"
}
schema = <<EOF
[
{
"name": "event_timestamp_int",
"type": "INTEGER",
"mode": "NULLABLE"
},
{
"name": "key_name",
"type": "STRING",
"mode": "NULLABLE"
}
]
EOF
}

インフラ構築コマンド

設計書をコードに落として、それをデプロイできるのがIaCの強みですね

これ以上の説明はいりません。

$ terraform apply

キーロガー

簡易的なキーロガーを作成して、送信しています。

from pynput.keyboard import Key, Listener
from google.cloud import pubsub_v1
import time
import json
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path('yamagenii-stream-processor', 'log-stream')
def on_press(key):
try:
char = key.char
except:
char = ''
finally:
print(char)
unixtime = int(time.time())
data = {
"event_timestamp_int":unixtime,
"key_name":char
}
data = json.dumps(data)
print(data)
data = data.encode("utf-8")
publisher.publish(topic_path, data)
def on_release(key):
pass
if __name__ == '__main__':
try:
with Listener(on_press=on_press, on_release=on_release) as listener:
listener.join()
except:
pass

最後に

説明はアーキテクチャ部分だけで、設計の思想がわかったらあとはコードにしたらストリームデータ基盤が構築できます。しっかり思想は理解しながらパブリッククラウドを最大限に活かすことで生産性をあげることができます。

次はmiyukiaizawaさんからです!お楽しみに!

--

--