I’ve been thinking about writing a technical blog post since I join Traveloka about 4 months ago. However, because of so many things went around: work’s stuff, commencement stuff (YES! I finally graduate and become #CMUGrad),etc. , I barely had enough time to work on small POC and write several things about it.
This work actually was inspired by my Big Data AWS training couple months ago with AWS Solution Architects in Traveloka Jakarta office. On that training, they proudly told us that they now have full set of serverless solution for Big Data. It includes AWS Kinesis Firehose (AWS Messaging Service solution), AWS Glue (AWS Serverless Batch Processing Solution), AWS S3 (Cloud Storage), AWS Firehose Analytics (AWS Realtime Streaming SQL Solution) and their new product named AWS Athena used for running Adhoc query to data stored in AWS S3. Go to this link to read the complete explanation about those things. Then, I’m thinking, why not doing the same thing in GCP?
First thing first. If you are not familiar with Lambda Architecture, you might need to read some articles on internet about it. Here is the some key concepts cited from http://lambda-architecture.net/ about lambda architecture:
- All data entering the system is dispatched to both the batch layer and the speed layer for processing.
- The batch layer has two functions: (i) managing the master dataset (an immutable, append-only set of raw data), and (ii) to pre-compute the batch views.
- The serving layer indexes the batch views so that they can be queried in low-latency, ad-hoc way.
- The speed layer compensates for the high latency of updates to the serving layer and deals with recent data only.
- Any incoming query can be answered by merging results from batch views and real-time views.
I started this project by using public streaming data available on the internet and by picking one simple user story. The decision comes to Meetup.com RSVP streaming API. Fortunately, meetup.com provides free public streaming API that we can use to get all RSVPs that have been made world-wide. This should be enough for us because we just want to create speed layer and batch layer consuming those data. Then the user story chosen is:
As an user, given the range of date, I should be able to get the number of RSVP created for every 15 minutes.
You must be asking why it is for every 15 minutes while we can actually put the data somewhere, run the query and get the data for every minutes. Don’t think too fast. This is just for simplifying my further explanation.
Techinically in lambda architecture, the speed layer and batch layer are made so that they can provide (near) realtime analytics to the business decision maker or business analysts. In the real world, running batch job is expensive in terms of money and time consumed by the application. On the other hand, business stakeholder simply can’t wait to get the current data until the next batch job runs on the cluster. However, it is worth to note that batch processing should be the source of the most accurate data a company or organization can have. What we can do to solve this dilema?
Streaming or speed layer comes to the resque. Speed layer provides us with the estimated data in (near) realtime manner. Yes! It is estimated! It’s really hard to get the accurate data by using the speed layer. On the other hand, speed layer can provide the user with the current data easily. So, what we can do to get the realtime data? The answer is simply by combining the data from the batch job and realtime streaming job. Yes we need to take the trade off.
Let me give you example. Assume that now is 8 AM in the morning and CEO meetup.com wants to know the number of RSVP have been made up until now since yesterday midnight. However, the last batch job run 12 AM on last night so we clearly dont have accurate data from 12 AM until 8 AM. In this case, we need to combine the accurate data from the last batch job with the estimated data from the straming job runs from 12 AM until 8AM. Once we run the next batch job and get the accurate data for today, we can simply rewrite the result written by the streaming layer in the serving layer. The idea is simple, right? Don’t forget that we sacrifice the accuracy of the data a bit in this case, but to save the costs and for faster data driven decision, it is worth investment, IMHO.
I know that you have been waiting for the buzz words. Here we go.
As visually described on the diagram above, we can breakdown the component into several parts:
- RSVP Stream Producer
This is a simple java application I wrote to pull the data from RSVP Streaming API of meetup.com and push the data to GCP Pubsub. This application runs in Kubernetes Pod and deployed in Google Container Engine. For more detail, check in on my github in event-delivery/MeetupRSVPPublisher.java.
- Cloud Pubsub
Google Cloud Pubsub is a centralized messaging system like Apache Kafka, RabitMQ, etc. If you are familiar with Topic and Consumer Group concept in Apache Kafka, it will be easier for you to understand the concept owned by Cloud Pubsub. It has topics (equivalent to Kafka’s Topic) and Subscription (equivalent to Kafka’s Consumer Group).
In Cloud Pubsub, all consumer should pull the message from the subscription instead of directly to the topic partition like what Kafka does. Once the subscription is created, the message will start flowing from the Pubsub and are ready to be consumed by the consumer subscribing the pubsub subscriptions.
In this case, Pubsub layer is not part of Lambda Architecture. However, to help getting clearer picture and creating scalable architecture, Cloud Pubsub has a very important role to achieve it.
From the data above, it is easy to see that the rate of published message and consumed message is not that huge. Pubsub only received about 3 messages per seconds and get about 3 pull operations per seconds.
- Speed Layer
Cloud Dataflow is used as the streaming engine in our implementation of speed layer. There are two responsibilities of the speed layer in our use case. First is to write the data pulled from the Pubsub to Google Cloud Storage by using TextIO so that the Batch layer can consume these data later and run batch processing on top of it. The second is to aggregate the number of RSVP comes to the system for every 15 minutes window. Once it gets the number, it will store the result to Google NoSQL technology named Cloud Bigtable by using BigtableIO. We can also dump the data to Google BigQuery by using BigqueryIO, however we don’t really need it in this use case.
You can go to the streamprocessor/RSVPStreamPipeline.java to see what is happening. :D
The DAG is pretty simple. First,
rsvpParser used to serialize String given by Pubsub to Java Object. Then, every object parsed will be grouped by using 15 minutes fixed window in
rsvpGroupWindow . In order to group the RSVP, I use
rsvpGroupByKey to give every RSVP a key representing the time window of its arrival timestamp. Then to aggregate the number of RSVP within the same fixed window, I used
rsvpReducer to simply accumulate the count. Then transformation to a Hbase Put object is done and then the result is stored in our CloudBigtable using BigtableIO plugin.
- Batch Processing
Dataflow jon running for batch processing is not that different with the one for batch processing. The different is only the data source used, which is the Google Cloud Storage. Other than that, the batch processing as we know are not the long running process. It only runs once in a particular range of time. For instance once a day, a week or even a month.
To get the full picture of the code, you can take a look in streamprocessor/RSVPBatchPipeline.java.
- Serving Layer
We simply decided to use NoSQL technology for the serving layer provided by GCP called Bigtable.
In designing NoSQL schema, we need to think about how we gonna query the data. Since we want to query the data based on its date, we can simply use date yyyyMMdd as the partition key of the table. To get the data for every 15 minutes, we can create a column family called count containing many columns for storing the count for every 15 minutes. The way I do it is by using string 0000, 0015, 0030, 0100 and so on as column name to represents the time window of 15 minutes. By using this schema design, we can get additional benefits if:
- We want to get the total rsvp in a single day, we can simply iterate over all of the column for a partition on the fly. This is cheap.
- We want to get the total rsvp in a range of time in a day, we can do the iteration by picking up some specific columns.
Both of batch and speed layer will write to the same partition and to the column name and family. The speed layer will write the estimation count and the batch layer will write the corrected count of the data.
Once the data are stored in the bigtable, the other application such as backend RESTful API will be able to read the data and exposed it to the outside world.
All works mentioned in this blog post are made available in my github repository. Feel free to take a look, submit issues or even submit Pull Request for any kind of advancement.