Building a Scalable Data Pipeline
A good data pipeline is one that you don’t have to think about very often. Even the smallest failure in getting data to downstream consumers can be noticed immediately, so it’s important to build a system that is able to deliver events quickly and accurately.
When building a data pipeline, we kept the following 2 requirements in mind:
- Allow for many downstream consumers of data. We have several different data systems in place past the ingestion pathway, and all of them need to receive data sent through the pipeline.
- HBase, which serves our results page in real time
- A Hadoop data mining cluster, which we use to ingest data into Druid and also to run batch analysis jobs
- Amazon S3, which we use for backups in the event of failure of either of the other 2 systems
- Potential future systems
2. Ability to build streaming applications. At Optimizely, we are striving to build a strong internal shared data infrastructure to run computations in real time.
Initially, we were using Apache Flume to ingest data from our application servers into HBase and S3. While this worked reasonably well for some time, there were some major operational issues we encountered using Flume to ingest directly into the downstream systems.
- Flume required us to use one channel per downstream sink. As a result, we were sending 3 copies of every event through Flume; one for S3 and one for each of our two HBase clusters that we had at the time. Furthermore, there were no guarantees that all 3 copies would end up on different hosts, so it was possible for us to lose events entirely in the event of a host going down.
- We used Flume’s file channels for buffering events between our log servers and HBase/S3. While these proved to be very handy in the event of temporary process failures, they did not provide us much buffer in the event of extended failure of a downstream system. Furthermore, any exceptions or errors thrown in Flume would stop other events from being processed. As a result, we would have to rush frantically to fix the Flume issue before upstream systems started feeling back pressure.
In order to avoid data duplication and provide a more reliable source of truth for our data streams, we decided to go with Kafka as a message bus between our log servers and our downstream data systems. Using Kafka has provided us with immense benefits, particularly the following:
- We no longer send 3 copies of our data over the network into Kafka, which reduces our network traffic.
- Kafka provides us a single source of truth for our event stream — previously it was difficult to debug if we had events appearing in S3 but not HBase or vice versa.
- It is much easier to recover from downstream failures, as Kafka retains 7 days of data by default. With Flume file channels we had about 6 hours of data retention at a maximum.
- Having data in Kafka has opened up a wide variety of applications, including stream processing using Samza. For instance, one application we have is calculating the top experiments on Optimizely by traffic in real time using Samza on top of our event stream. Furthermore, this greatly aids our high level goals of giving our engineering teams access to our raw data streams and enabling them to turn data into action.
While Kafka has provided us with accessibility and reliability for our data, we decided to retain Flume in our pipeline, inspired by this blog post on Cloudera’s page. Using “Flafka” instead of Kafka enabled us to migrate quicker than expected and reuse several components already in our codebase. In particular, Flume’s existing integration with both HDFS and HBase allowed us to seamlessly transport data from Kafka into those systems.
Migrating our infrastructure over from Flume to Flafka was a tricky problem at first. We decided to coordinate this migration task with a separate effort to modify our HBase schema and make it more compact. One great blessing during the migration process was that both HBase schemas were idempotent. That way, we would be able to run both the old ingest pipeline (without Kafka) and the new one (with Kafka) simultaneously. Thanks to idempotence, we lost no events and incurred no downtime as a result.
Moving to the Kafka-based ingest path had other positive effects as well. We run two HBase clusters in parallel called Prod and Loki (don’t ask me why we didn’t name the first one Thor). Running 2 clusters has been very convenient for us when we need to do routine maintenance Previously, we used two separate Flume channels to write data to Prod and Loki, so it was possible that events would appear in one cluster, but not in the other. Now, we have two separate Flume clusters to write to Prod and Loki, but they both consume the same Kafka topic. Therefore, it is easy for us to keep the two clusters synced, since we have Kafka as a single source of truth for both.
We monitor all our backend systems using a combination of Nagios for alerting and OpenTSDB for monitoring and metrics collection. In order to gather metrics, we run tcollector on our Flume and Kafka hosts. tcollector picks up Flume metrics via the HTTP JSON endpoint and Kafka metrics via JMX. We wrote a few custom tcollector modules to pick up metrics like Kafka consumer lag and augment the existing collectors.
We need to be sure that the deployment process for Flafka remains relatively painless, just in case we need to update our libraries. For deployments and configuration management, we utilize Cloudera Manager heavily. Using Cloudera Manager has enabled us to spin up Flume and Kafka clusters very quickly and get the project moving without much initial overhead. For automatic provisioning and host setup, we use Chef heavily in our stack. When an instance boots up in AWS, we assign it a tag that corresponds to the Chef role that the instance needs to run once it’s bootstrapped with Chef. The Chef role also handles automatically registering instances into Cloudera Manager and starting up the necessary service; thus, we can have hosts running Flume or Kafka automatically within about 15 minutes of us bringing them up in AWS. This functionality has been a massive benefit for us in scaling our stacks up to meet traffic.
- We don’t have Flume or Kafka hooked up to a cluster manager like Mesos; as a result it’s hard to scale out our cluster in case of emergency. We mainly rely on AWS autoscaling groups for our scaling needs. We might use Mesos with Kafka eventually, but so far our cluster has been sufficiently large to handle the scale of traffic we process (and even was able to handle a 3x traffic spike a few days ago)
- We would have difficulty handling any network partition that might render Kafka inaccessible.
- We use Flume’s failover sink functionality to try and get around this by dumping to S3 whenever Kafka is offline; while that would prevent any data loss, it does mean that downstream consumers of the data will not be able to receive any data until we restore Kafka and read the data back in from S3.
- Our Cloudera Manager auto registration has been a bit flaky; we have occasionally run into issues where it marks a healthy host as dead.
If you liked this blog post, we’re hiring.