Plumbing Big Data Pipelines
Qubit’s Visitor Cloud receives over a billion events a day from thousands of customer websites and mobile applications all over the world. Our service level objective is to have these events validated, enriched and available for consumption by our stable of personalisation and business intelligence products within 10 seconds of reaching the perimeter of Qubit.
If we compare Qubit Visitor Cloud to the human circulatory system, the heart would be the fleet of ingestion servers that receive all the raw event data piped over the internet from various different sources. These data points get written to a Google Cloud Pub/Sub topic — which is analogous to the aorta, the main artery connected to the heart. At peak, the ingestion servers write over 20,000 batches a second to this Cloud Pub/Sub topic. This forms the entrypoint to Qubit’s stream processing system that leverages Cloud Dataflows to validate, enrich and persist the data to our data warehouse. We break the batches into individual events after validation — which results in over a 100,000 messages a second flowing downstream at peak.
After successfully building and deploying the high-speed stream processing system, one of the most common questions posed to us by other engineering teams was how they could easily tap into the data stream and build systems that react to interesting events happening in our customers’ online properties. One quick solution would have been to use the data warehouse and periodically poll for data. However, we felt that aside from being inelegant and inefficient, this approach has several major drawbacks.
- Periodic polling introduces artificial delays
If the polling interval is two minutes, for example, then reactions to some events could take at least two minutes. - State management becomes tedious and complex
For polling to work effectively, the system needs to checkpoint its state to avoid duplicate processing. This becomes even more complicated when we consider that data collection over the internet can introduce arbitrary delays beyond our control and any serious data processing system needs to consider how to detect and handle late arrivals. Naive polling solutions run a very high risk of missing some data and/or processing some data points more than once. - Resource contention and duplication of effort
A handful of polling applications might not have any noticeable impact on the data warehouse. However, as the number of applications grow, they will compete for resources — degrading the experience for both interactive and non-interactive users. Furthermore, typical data access patterns in a data warehouse involves a few “hot” fact tables. The chances are high that most of the applications will be duplicating the logic and work required to query those few core tables. - Cost
Qubit’s main data warehouse is built on BigQuery. Queries are charged based on the number of gigabytes accessed. When the data warehouse consists of petabytes of data, even simple queries can start incurring non-trivial costs.
The other option would have been to provide every interested team with a subscription to the main firehose. However, most teams are interested in only a handful of events and asking them to build systems that can deal with hundreds of thousands of messages in real time, only to discard most of it seemed wasteful and would have introduced a high barrier for entry. Furthermore, doing so would have increased our Cloud Pub/Sub operating costs as well.
After careful consideration, we decided that the best course of action would be to provide some sort of a stream router. Users should be able to specify the messages they were interested in, and the system should route all messages matching the criteria to a Cloud Pub/Sub topic of choice. This system would be connected to the main firehose on one end and many different Cloud Pub/Sub topics on the other end, similar to a “manifold” in plumbing (and automotive) parlance.
Our first inclination was to build the Manifold using Cloud Dataflow itself. It was a system we were already intimately familiar with and had successfully deployed to production. However, the requirement to add/remove/update routes while the system was running and dynamically configure output sinks proved to be a major problem. While we could potentially overcome the first problem by polling a data source for routing updates, the issue of dynamically adding output sinks to a Cloud Dataflow was a harder problem to solve. One option we considered was to use the Cloud Pub/Sub API from within a DoFn to write to many topics as we wanted. However, this felt like a brittle solution that would be harder to reason about and debug. It also didn’t quite fit into the Cloud Dataflow model of processing and violated the principle of least astonishment.
We then turned to Akka Streams, a framework that we had already used in a couple of projects with great success. Our first foray was when the Akka Streams project was in its infancy to build an internal metrics reporting tool for a large HBase cluster. The APIs and the specifications changed significantly leading up to the 1.0 release and we again used it successfully to glue our old data processing pipeline to the new one while we were in the process of migrating to the Google Cloud. We were extremely impressed by how an Akka Streams application running on a single machine was able to effortlessly copy our Kafka firehose to Cloud Pub/Sub in real time. Since it had already proven itself as a reliable workhorse, we had no doubts about its suitability for the Manifold system.
The implementation was not without challenges though. The first hurdle was the lack of Cloud Pub/Sub Sources and Sinks for Akka. At the time we started working on the project, Google had just published the gRPC API definitions for Cloud Pub/Sub. As all of our backend systems were already using gRPC, it was a natural choice to implement the Akka Stream sinks and sources in gRPC as well. We have used this implementation in production for more than a year to process hundreds of billion events that amount to hundreds of terabytes of data by volume. The Sinks and Sources are now open-source and can be obtained from the Git repository at https://github.com/QubitProducts/akka-cloudpubsub. (We will soon release an update that takes advantage of the recently introduced streaming pull feature of Cloud Pub/Sub.)
The second hurdle was the one we faced with the Cloud Dataflow system we were considering earlier as well. As Akka stream graphs are materialised in advance and executed, how could we make the system write to Cloud Pub/Sub topics that are unknown at materialisation time. It seemed like an insurmountable problem where the only solution seemed to be re-materialising and launching the stream graph on every change until we stumbled onto an undocumented feature of Akka Streams called “stage actors”. Stage actors are a mechanism by which code can safely interact with the actors running the stream stages of the materialised graph. By interacting with the stage actor, we can dynamically alter the stage processing logic at runtime — allowing updates to routing specifications to take effect immediately without re-materialising the stream graph.
After solving the fundamental issue of dynamic routing, our next concern was scaling the system to deal with ever increasing traffic volumes in the main firehose. We addressed this by making Manifold a clustered application using Akka Cluster. The advantage of this approach is that new nodes can be added or removed from the system with almost no effort. Newly launched nodes automatically join the cluster, obtain the current routing specifications from the coordinator and start running the stream graph. Updates are communicated by the coordinator to all nodes through the Distributed Publish Subscribe mechanism provided by Akka Cluster. Thanks to Akka’s persistent actor mechanism, the coordinator does not become a single point of failure either. If the current coordinator becomes unresponsive, another coordinator will automatically take over with no data loss as the state is checkpointed to persistent storage on every update.
Users are provided with a CLI application that allows them to submit routing specifications written using a YAML-based DSL to the gRPC controller that runs alongside the cluster. The controller validates the incoming requests, checks permissions on relevant Cloud Pub/Sub topics (creating them if necessary), converts the specification to a Protobuf-based binary representation and submits the state change to the cluster coordinator. The cluster coordinator then publishes the change to the cluster members.
Manifold is deployed on a managed Kubernetes cluster on Google Container Engine which allows us to easily scale the system as necessary. It was surprisingly easy to plug in Kubernetes service discovery primitives to Akka cluster so that pods can join and leave the cluster effortlessly. The Kubernetes cluster makes use of the auto-scaling functionality provided by Google Container Engine to automatically expand and shrink the cluster according to demand. This has become particularly useful in handling sudden traffic spikes resulting from flash sales and social media campaigns run by our customers to promote their website traffic.
Manifold has been in production use at Qubit for over a year and has enabled us to quickly prototype and build new products that take full advantage of the near real-time availability of data from our data processing pipeline. (Visitor Pulse and Adaptive Targeting are two examples of products that rely on Manifold to tap into the filtered event stream.)
If you relish these sorts of engineering challenges, we have plenty more in store. Check out Qubit careers for more information.