Handling Streaming Data + Storage

Simon Liu
Akatsuki Taiwan Technology
2 min readFeb 27, 2019

At Akatsuki we require a robust maintainable way of handling logs generated from millions of players. These logs are then used for customer service, KPI and data analytics. In this blog post we will be creating a highly scalable pipeline for handling logs using Google’s Dataflow templates; utilizing Pub Sub, Data Flow, BigQuery and Storage from GCP.

Requirements

Preparation

This step will create resources in GCP including pub/sub topic, template storage area, BigQuery dataset and table.

# Create pub/sub topic
gcloud pubsub topics create my-logs
# Create storage bucket for template
gsutil mb gs://tysliu-templates/
# Set variables for BQ
DATASET=test_dataset
TABLE=test_table
# Create BQ dataset
bq --location=asia-east1 mk -d --default_table_expiration 1209600 \
--description "This is a test dataset." \
${DATASET}
# Create BQ table
bq mk -t --expiration 1209600 --description "This is a test table" \
${DATASET}.${TABLE} \
name:STRING,qty:INTEGER,timestamp:DATETIME

Working with dataflow template repository

git clone git@github.com:GoogleCloudPlatform/DataflowTemplates.git

We will be working with this template today. For simplicity we will be using the template as it is without changing it.

src/main/java/com/google/cloud/teleport/templates/PubSubToBigQuery.java

Compile template within the cloned repository folder

# Set the pipeline vars
PROJECT_ID=tysliu-150402
BUCKET_NAME=tysliu-templates
PIPELINE_FOLDER=gs://${BUCKET_NAME}/dataflow/pipelines/pubsub-to-bigquery
# Set the runner
RUNNER=DataflowRunner
# Build the template
GOOGLE_APPLICATION_CREDENTIALS=~/.gcp/tysliu-150402-193b103a5e32.json mvn compile exec:java \
-Dexec.mainClass=com.google.cloud.teleport.templates.PubSubToBigQuery \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project=${PROJECT_ID} \
--stagingLocation=${PIPELINE_FOLDER}/staging \
--tempLocation=${PIPELINE_FOLDER}/temp \
--templateLocation=${PIPELINE_FOLDER}/template \
--runner=${RUNNER}"

Execute template

JOB_NAME=pubsub-to-bigquery-$USER-`date +"%Y%m%d-%H%M%S%z"`gcloud dataflow jobs run ${JOB_NAME} \
--gcs-location=${PIPELINE_FOLDER}/template \
--region=asia-east1 \
--parameters \
"inputTopic=projects/tysliu-150402/topics/my-logs,\
outputTableSpec=${PROJECT_ID}:${DATASET}.${TABLE},\
outputDeadletterTable=${PROJECT_ID}:${DATASET}.pubsub_to_bigquery_deadletter"

After execution you should see something like this from https://console.cloud.google.com/dataflow

Testing

Sending a message to pub/sub

gcloud pubsub topics publish projects/tysliu-150402/topics/my-logs \
— message “{\”name\”:\”bahhhh\”,\”timestamp\”:\”`date “+%Y-%m-%d %H:%M:%S”`\”,\”qty\”:123123}”

Within seconds you should see it in BQ

Conclusion

By using this approach for data warehousing; one do not need to worry about scaling and storage limits. This is one of the ways we handle logs from millions of players at Akatsuki, I hope this has been helpful for you.

--

--