Data Ingestion and Processing at Intuit
Intuit provides a variety of financial products for tax (TurboTax), personal finance (Mint), and small business accounting (Quickbooks). The company’s mission is to power prosperity around the world. Achieving this goal involves various dimensions. Here, we are going to focus on how data is one of the important dimensions. The biggest challenge is ingesting the data from a wide range of products and applications that are coming in different formats and being able to process it in a timely manner. For ingestion, we want to capture data reliably, quickly, and securely. For processing, there are two broad categories: batch use cases and streaming use cases. Irrespective of the processing mode, each has a defined SLA (Service Level Agreement) that has to be met consistently. Batch processing use-cases are typically used for analytics and reporting. Stream processing use-cases are typically used for supporting near real-time use cases. Let’s look into detail how this is done today and our plans for the future.

Ingestion
The goal of the ingestion system is to be a common pipeline that applications use to get their data into Intuit Analytics Cloud. The system is designed to make ingestion simple and easy to integrate with.
The ingestion pipeline can be broadly divided into 3 components
- A ReST layer — Jetty
- A message queue layer — Kafka
- A persistence layer — HDFS (and S3)
ReST Layer
We use a ReSTful web application that runs on a web server powered by Jetty. Most of the traffic that the pipeline deals with is generated from client browsers and mobile devices. A majority of this traffic is clickstream.
The system was designed to handle small messages under 5KB. Applications integrate with the pipeline using SDKs available for both web and mobile platforms.
When the system was designed, ability to scale horizontally was a basic requirement. On AWS, we leveraged the auto scaling groups to accomplish this. All the servers in the web layer are stateless and have the ability to mark themselves in and out of service. The instances reside behind an Elastic Load Balancer (ELB) that handles the distribution of traffic among the available servers. Due to the highly seasonal, yet very predictable nature of the traffic, we can scale out a few days before we expect high traffic and scale in after the traffic subsides.
This layer is responsible for accepting requests from clients and then publishing request payloads to Kafka. The messages can be produced in synchronous or asynchronous methods. We chose the asynchronous method as this yields much higher throughput. However, asynchronous producing has an inherent disadvantage in that the client will not know if the message was not published to the queue. This is a necessary tradeoff to ensure we are able to handle the kind of traffic we expect during peak season.
The layer is designed to be highly available. We accomplish this by distributing the servers that are behind the ELB, across 3 availability zones. This, combined with the ability to scale horizontally, ensures that even in the case of a zone failure, the system can still operate at full capacity.
Messaging Queue Layer
At the heart of the ingestion pipeline is the distributed, high throughput Kafka messaging layer.
This layer helps us assimilate high amounts of data with low latency. The web layer produces messages to this layer in small batches in asynchronous mode. Each client posts to a specific endpoint, which maps to a topic in Kafka. Topics are usually partitioned across multiple nodes, called brokers. The partitioning helps us achieve parallelism resulting in greater throughput. Further, the topics are replicated between the brokers thereby ensuring that the data is not lost in case of a broker failure. Kafka is built as a fault tolerant system and most of the time it is easy to recover from a few broker failures.
Cluster configuration
Although Kafka is built to tolerate broker failures, it is not stateless, which means we cannot scale horizontally as easily as we do with the web layer. A typical Kafka cluster has a quorum of zookeepers and a set of brokers. We put both these components behind ELBs and use the ELB names when producing. The ingestion pipeline has one Kafka cluster per availability zone.
Aggregating the data
The data that we have in the Kafka brokers needs to be brought into the data center for Camus (Hadoop Kafka consumer) to consume and write to HDFS. We accomplish this using Kafka MirrorMaker. It is a collection of producers and consumers that reads data from a Kafka cluster and writes to another. It includes a lot more features than that, like the ability to whitelist or blacklist topics, batching, and multiple streams.
We run multiple instances of Mirror Maker processes per availability zone. All these processes write to the same Kafka cluster which is hosted in Intuit’s data center. While this is not operationally optimal, it is configured in this way for prioritizing topics.
Some topics receive exponentially higher traffic than others and consume much greater resources. There are other topics that are critical to the business and any delay in processing them is not desirable. Separating their consumption into different processes helps us monitor and plan better for variation in traffic patterns.
We divide the number of processes per zone using the following strategy.
- Topics with high volume
- Topics that are business critical
- Other topics
Processing
We support both streaming and batch style of processing. If the use-case demands stream processing, it can be easily supported by directly consuming data from Kafka. We use Spark for both batch and stream processing, as there is minimal to no change in code for supporting both modes of processing.
Processing involves two broad categories, stateless and stateful. Stateless is processing an event that does not depend on any previous event(s) e.g. IP to Geo lookup, User Agent Processing. For this kind of processing, the order in which data is processed doesn’t matter as well. Stateful depends on the data derived from previous event(s) e.g. aggregates (does not require ordering), sessionization (requires ordering). The streaming system is integrated only with Kafka in AWS, but our batch system is available both in AWS and Intuit owned data center.
Our compute is thoroughly done in Spark Streaming and Spark Batch frameworks. These systems run on tight SLAs, hence alerting and metrics collection are equally important.
Performance is key in terms of meeting SLAs. We were able to meet SLA with our old Hadoop systems, but with the newer Spark system, we are able to comfortably accomplish the job in far less time. Hence, meeting SLAs are comparatively easy. So far, the Spark system has been benchmarked to perform ~ 8x better compared to our Hadoop based systems.
Future State
We have a solid working solution to meet our needs, but we are always looking ahead, to the next stage. Currently, we are in a hybrid setup where we are spread across AWS and our internal datacenter. In the future, we would look to have our system completely in AWS. This setup would decrease the number of hops we make now to get from our source to our destination which can be accomplished by using S3. An additional benefit of moving to the cloud is being able to do near real-time compute and also move batch processing to AWS.
Lokesh Rajaram is a Staff SWE @Intuit currently working on unified profile service, previously working on data processing and data ingestion.
Vaishak Suresh is a Senior SWE @Intuit currently working on data ingestion.
Deborah Yu is a Senior SWE @Intuit currently working on unified profile service, previously working on data ingestion.
