Optimized Real-time Analytics using Spark Streaming and Apache Druid
Our advertising data engineering team at GumGum uses Spark Streaming and Apache Druid to provide real-time analytics to the business stakeholders for analyzing and measuring advertising business performance in near real-time.
Our biggest dataset is RTB (real-time bidding) auction logs which amounts to ~350,000 msg/sec during peak hours every day. It becomes crucial for the data team to leverage distributed computing systems like Apache Kafka, Spark Streaming and Apache Druid to process huge volumes of data, perform business logic transformations, apply aggregations and store data that can power real-time analytics.
Real-time Data Pipeline Architecture with Kafka, Spark and Druid
Every ad-server instance produces advertising inventory and ad performance logs to a particular topic in the Kafka cluster. We currently have 2 Kafka clusters along with centralized Avro Schema Registry with over 30 topics with producer rate ranging from 5k msg/sec to 350k msg/sec.
Data format in Kafka is stored in Apache Avro for our biggest datasets to have compatibility for schema evolution over time and more compact than in-efficient JSON format (Read more on why use Avro with Kafka)
Process and Transform
In order to process huge volume of advertising data in near real-time, we leverage Spark Streaming on Databricks, by running spark application per Kafka topic. We then apply data transformations and build Druid Tranquility connection on every single spark worker to send transformed data in real-time to druid in a distributed fashion.
Druid Tranquility, ingests data via HTTP push in realtime to druid and providing abstraction for connecting to druid, handles partitioning, replication and seamless druid realtime node service discovery and data schema changes. Due to our nature of our advertising data and business logic we clean, validate and explode on various columns which are nested and list data-type, thus resulting in 10x more data that we send from tranquility to druid real-time nodes.
Data Aggregation and Storage
In Druid’s data model, every dataset is stored in datasources (similar to tables in relational DB). Key components of every datasource are timestamp, dimensions, metrics. Timestamp serves as primary partition column, dimensions are used to filter, query/groupBy and metrics are values that can be aggregated and must be numeric.
During real-time ingestion, real-time nodes will roll up every datasource based on predefined query granularity (1 min or 1 hour in our use cases), dimensions and perform aggregation on all numeric metrics. This rollup process significantly reduce storage in druid data nodes post-aggregation. Once the data is aggregated and it will be permanently stored in historical nodes and deep storage(S3) based on the datasource retention policy.
Druid broker nodes are entry point for any user data query, based on the query params druid broker can either query real-time data segments from real-nodes and/or query post-aggregate processed data from historical and deep storage nodes.
Druid supports two types of query languages: Druid SQL and native queries via REST endpoint. Native queries are most efficient way to query data from druid in real-time. We use Apache Superset (native queries) for engineering teams and our BI Platform Looker (Druid SQL) for data analysts and business teams to build data insights with real time data points.
Optimization with micro-batch aggregated data ingestion
As with any distributed computing and storage system there will be handful of challenges and tuning that will need to happen. Let’s talk about some of those challenges we faced.
- High cardinality dimensions led to increase in number of real-time nodes to keep up with ingestion rate
- High ingestion rate of ~3.1M msg/secs to druid real-time nodes end up costing us a lot of compute as we scaled to ~120 real-time nodes during peak season to keep up with the ingestion rate.
After closely debugging spark-streaming app and real-time druid nodes, we realized that a lot time on spark-streaming end is spent on network out to druid which resulted in idle CPU time for spark workers. On Druid real-time nodes, we were not able to handle peaks hours for datasources with high cardinality dimensions and had to continue horizontally scaling real-time nodes.
To tackle real-time ingestion cost and performance, we implemented micro-batch (~15sec) aggregation in spark-streaming, so instead of Druid’s real-time ingestion rate at 3.1M msg/sec we reduced it to 280k msg/sec with additional computation step in spark-streaming app to aggregate data every ~15 secs before we send it out to druid via tranquility.
This resulted in 60% cost reduction for spark-workers as there was significantly less CPU idle time and 70% cost reduction in druid real-time nodes as ingestion rate dropped to ~280k msg/sec.
Apache Spark and Apache Druid has been crucial at GumGum to provide real-time insights for the business. After multiple iteration of performance tuning, we have been able to build a capable and cost-effective system, but there’s still more work to do. Our data volume and rate of ingestion is constantly increasing as are the business users and their query use cases. As next steps we will be exploring options to build exactly-once semantics real-time Analytics pipeline with up-to 1 hour data aggregation windows instead of micro-batch aggregations in spark.