EXPEDIA GROUP TECHNOLOGY — DATA

Fast Approximate Counting Using Druid and DataSketch

Supporting dynamic criteria across multiple big data sets

Elan Halfin
Dec 10, 2020 · 6 min read

Authors: Elan Halfin and Aravind Sethurathnam

Image depicting a traveler segment
Image depicting a traveler segment

At Expedia Group™, we built an internal self-service tool to allow business users to create traveler segments for marketing and other purposes. Example segments are:

  • All the travelers who are searching for lodging in Paris, France in the next 2 weeks but have not booked yet
  • Travelers who booked a flight to a particular destination in the next month but not booked any lodging to that destination

As a user is defining a segment, we wanted to provide immediate feedback on the segment population size. In other segmentation tools, the user might have to wait up to 24 hours to understand their new segment’s size. We decided to build a solution that provides the segment size within 5 seconds.

To solve this distinct count problem, there are a series of challenges to address:

Big Data

Our supported data sets are time-series clickstream data (e.g. searches, visits, bookings) for all Expedia Group brands and other derived data like Machine Learning (ML) model scores. A key understanding is that exact population counts are not required, but approximate population counts are sufficient for our problem. Using approximate counting is typical in Big Data solutions and is available in many products.

Fast (approximate) counts

Our raw data resides on AWS S3, and unless we can load all the data into memory, we know we cannot use a query engine like Presto to get query latency under 5 seconds. We want to use some distributed MPP database solution running on commodity hardware. For static queries, very low latencies are achievable when combined with data ingestion aggregation.

Dynamic Criteria

The segment criteria is user defined and the dynamic query criteria limit what optimizations can be done at data ingestion time. However, out of the box solutions are still available to us.

Multiple Data Sets Joins

A further challenge is a segment may be defined using multiple data sets. When combining all these challenges, we ran into a roadblock and the rest of the article will go over our solution.

Druid and DataSketches to the Rescue

For time-series data, the Druid database was already being used at Expedia Group, so Druid was a good starting point. Druid does support sub-second query latency for a single dataset with the proper configuration. However, we need to support ad hoc queries across multiple data sets. Druid also has limited join support requiring one data set to fit into memory which was not an option for us.

Druid does support approximate counts and provides integration for an open source Apache DataSketches library. DataSketches are based on approximate algorithms and it dramatically speeds up calculations for this kind of analysis. And importantly, DataSketches support set operations such as union, intersection and difference with little accuracy loss. The set operations are critical to support segments that use multiple data sets, for example searches and bookings.

Now we can query each data set independently from Druid to get the DataSketch object that represents that set of travelers e.g.

SELECT DS_THETA(traveler_id) as search_travelers 
FROM search_event
WHERE __time>= '2020-04-01' and product_line in ('LODGING') AND
destination = 'Paris,France'
AND trip_start_date BETWEEN '2020-10-01' AND '2020-10-14'

And once we have the DataSketch object for each dataset defined in the segment, we can apply the appropriate set operation based on the segment definition to get the final count!
Druid does not support merging the DataSketch objects so we handle that in our Java code. A DataSketch object has a tiny memory footprint despite the large size of the set count.

Sketch searchSketch = druidClient.sketchQuery(druidSQLSearch);
Sketch bookingSketch = druidClient.sketchQuery(druidSQLBooking);
AnotB anotB = SetOperation.builder().buildANotB();
anotB.update(searchSketch, bookingSketch);
log.info("Segment count: "+ anotB.getResult().getEstimate());

Ingestion Optimization

As an optimization, instead of using the raw data to create the DataSketch object at query time, at data ingestion time into Druid, the DataSketch can be created as another column in the data set. This will speed up query processing.

"metricsSpec": [ { "type": "count", "name": "count" },
{ "type": "thetaSketch", "name": "traveler_id_sketch",
"fieldName": "traveler_id" } ]

Now the Druid query is slightly altered to leverage that new column.

SELECT DS_THETA(traveler_id_sketch) as search_travelers 
FROM search_event
WHERE __time>= '2020-04-01' and product_line in ('LODGING') AND
destination = 'Paris,France' AND
trip_start_date BETWEEN '2020-10-01' AND '2020-10-14'

Putting it together

Now that we know how to store the time-series data in Druid efficiently and run queries that return a DataSketch object in under less than 5 seconds, we can put all the pieces together.

Druid
A fast time-series database that supports approximate algorithms for approximate counting and ingestion optimization

Java Population Service
Based on a segment definition, generates the appropriate Druid queries and perform any required set operations before returning the approximate population sizes.

UI Segment Builder (NodeJS)
A user defines a segment and instantly sees approximate population sizes as the criteria for each data set is entered.

With everything in place, we can show the overall population count in the UI and even the population count broken down to the DataSketch (rule) level as shown below.

Additional Druid Optimizations

Once created, the sketch object query performance improved during ingestion in all the data sources, but it was still not at the expected level. Eventually we found out that the EC2 node type had to be changed because we were using EBS volumes without any local storage. The segments that didn’t fit in memory are stored on the disk and have to be fetched across the network, causing considerable delays because they read prefetched segments from their local disks to respond to queries. We changed to a node type with local SSD storage, and the performance improved a lot.

Another thing to consider when applicable is rollup during ingestion — Druid can roll up data as it is ingested to minimize the amount of raw data that needs to be stored. The segment file size for each data source was recommended to be about 300MB-700MB or about 5 million rows. It could change based on the use cases, it was the recommendation from Imply, and we managed to make sure all our data sources segment file sizes were within that range and thereby avoid too small or big segments.

Conclusion

Approximate counting on Big Data is a potent cost-effective tool that greatly reduces the amount of data required at query time. And combining an approximate streaming algorithm (DataSketch) that supports set operations and a fast time-series datastore (Druid) can provide capabilities that previously could take hours.

Next time you run into a challenging counting problem on Big Data, look into approximate counting as a way to solve those challenges and provide a better experience for your end users.

Expedia Group Technology

Stories from the Expedia Group Technology teams