When the organizations scale and the data explodes, it becomes vital to have scalable data architecture. This post revisits the problem statement discussed here, but for an entirely different scale. To give a quick recap, the goal is to forecast the inventory impressions per day, given a set of targeting rules and sample data. This time, the inventory being forecasted is programmatic inventory. In part one of the blog post, Jatinder Assi discussed in detail about data architecture and distributed sampling on the programmatic inventory. In part two here, I will focus on enabling the forecasting application at this scale using Delta Lake and Delta Caching in Apache Spark on Databricks.
The application which reads the sample data and performs the forecast is called Search and Forecast application. The Search application reads the sample data along with the targeting rules which are provided by the user and builds the time series of the form impressions per day. This resulting time series is used by the Forecast application to train the forecasting model and predict the time series for the next n days. We use the same forecasting models as mentioned in the previous post. The main topic of discussion here is the Search application which has to deal with an entirely different scale.
If the Search application has to read the sample data from S3 for every forecasting request, we cannot adhere to the SLA of the forecast response time of less than 30 seconds. The obvious choice is to cache the data in memory on Spark, however this is an expensive choice. For the scale at which we operate for programmatic inventory, we will need a huge cluster if we were to cache the entire data on a Spark cluster which is not viable. An alternate choice is to cache the data on the disk. Though the performance won’t be as good as in-memory caching, we can still achieve the SLA. But as the sample data gets refreshed daily through the daily pipelines, we will need to recreate the cache every single day which is tedious. Moreover after evaluation, we found that we will need at least 35 c4.2xlarge nodes, as we will still need enough memory for the IO and compute operations to build the time series data. We thought of alternatives and landed on Delta lake with Delta caching.
Delta Lake is an open source storage layer originally developed by Databricks and later open sourced at Spark Summit 2019. Delta Lake brings ACID transactions to Spark, but for our use case, the feature of most interest is scalable metadata handling. The transaction log in Delta Lake keeps a record of every single transaction that has occurred on the Delta Lake (and hence supports features like versioning and time travel). So when new sample data gets written into Delta Lake, Spark checks the transaction log for the new writes and updates the table automatically without having to explicitly refresh the table daily.
The following are some of the operations on Delta Lake
- Writing to Delta Lake is straightforward. We just need to specify the format as “delta”.
- We can specify relevant columns as partition keys for efficient data filtering.
- To create the user table, use CREATE TABLE statement pointing to the S3 location of Delta Lake
- OPTIMIZE command can compact the Delta files up to 1 GB data. This comes really handy to enable Spark to efficiently read the files. We can also specify optional ZORDER to optimize the data being read by colocating the column information in the same set of files.
- VACUUM can delete the already compacted files (that was compacted before 7 days by default) from S3.
- ANALYZE TABLE can update the statistics that will help the query planner for better execution of the queries
We schedule OPTIMIZE, VACUUM and ANALYZE on the cluster daily.
To cache the Delta table on the cluster, we use Databricks Delta caching (previously called IO cache). Delta cache stores the data on the disk using a fast intermediate format which supports accelerated reads. Successive reads of the same data are always done locally which saves IO time drastically. Delta caching is enabled by default for i3.xlarge instance types.The data is entirely stored on the disk which frees the memory for map-reduce operations.
The data stored and read from Delta cache is typically faster than Spark Caching. We can monitor the Delta cache metrics on Storage tab of Spark UI which shows how much data is cached on each node, volume of data read from S3, volume of repeated reads from Delta Cache and so on.
We don’t need to invalidate or load the delta cache explicitly. But to warm up the cache in advance, CACHE SELECT command can be used. If the existing cached entries have to be refreshed, REFRESH TABLE statement can be used which is lazily evaluated.
The high level architecture of the Search application can be summarized as below:
With Delta Lake and Delta Caching, our cluster size dropped to 25 i3.xlarge nodes which is more efficient and cost effective when compared to disk caching and in-memory caching.
We’re always looking for new talent! View jobs.