Engagement Activity Delta Lake

Zhidong Ke
Salesforce Engineering
8 min readSep 23, 2020

Authors: Zhidong Ke, Utsavi Benani, Heng Zhang, Kevin Terusaki, Yifeng Liu, Percy.Mehta, Priyadarshini Mitra, Jayanth Parayil Kumarji

Introduction

To support our customers using High Velocity Sales to intelligently convert leads and create new opportunities, we built the engagement activity platform to automatically capture and store user engagement activities. The engagement delta lake is one of the key components supporting Einstein Analytics for creating powerful reports and dashboards and Sales Cloud Einstein for training machine learning models.

Unlike data in our other data lakes, engagement activity is mutable and the mutation ratio is high, which creates a huge challenge for us. We explored a couple of solutions and ultimately decided that the open source tool Delta Lake met all of our requirements. In this blog, we will walk through our journey through the following sections:

  1. Engagement activity ingestion
  2. Incremental Read
  3. Support exact once write across tables
  4. Handle mutation with cascading changes
  5. Normalize tables in data lake
  6. Performance tuning

Engagement Activity Ingestion

High Level Architecture

Our engagement data lake is a distributed key-value object store which is similar to our existing data lake known as Shared Activity Store. To support the basic requirement from Delta Lake, the ingestion pipeline starts when data is pushed to our internal Kafka queue. Then we use Spark Streaming Framework to consume engagement in micro-batches and write the data to Delta Lake table(s),as Figure 1 shows below.

We start with a single table called Data Table, partitioned by organization ID and engagement activity timestamp to support basic queries, such as a time range query for our downstream consumers. However, our customers requested incremental read support. Delta Lake’s native API requires tables to be append-only, but ours is mutable, so we had to implement incremental read support on our own.

Incremental Read

We created a separate table called Notification Table that was partitioned by organization ID and ingestion timestamp. When new engagement is inserted into Data Table or when existing engagement is updated or deleted, we notify downstream consumers of those changes by inserting a new entry with the engagement metadata into the notification table, as Figure 2 shows below.

The downstream consumers can either use streaming mode to listen to Notification Table, or they can periodically pull new notifications from the Notification Table. Based on the value of the data, e.g. organization ID or engagement date, they are able to fetch the actual engagement data from our Data Table.

Exact Once Write Across Tables

While introducing the Notification Table solves our incremental read problem, it creates another challenge for us. We found that when our job tries to process and write data to two tables, Delta API only supports ACID transactions within each table; there is no batch transaction support across tables. For example, if the job writes to Data Table successfully but fails to write to Notification Table, the downstream completely misses this batch since it doesn’t get a notification. This becomes a common problem across our project when a job needs to write to multiple tables in a batch.

So, we created a Checkpoint Store that stores the start, end offset, Kafka metadata, and last job state for a given checkpoint. The first task of the Spark job, then, is to read this store to get the last checkpoint metadata and the last job state.

Since our job executes in a micro-batch mode, and each batch includes multiple processes (process or write data to Table 1, Table 2, Table 3, Table 4 and so on), we also need to store the batch state. We created a Batch Metadata Store to store job name, batch ID (the last succeed batch ID provided by the Spark foreachBatch API), process name, and timestamp of last modified time. After reading from the Checkpoint Store, the Spark job will get the last job state. If the last batch completed successfully, the Spark job will increase the last batch ID; otherwise, it will keep the last batch ID. Then the Spark job fetches the Batch metadata based on the job name.

By comparing the batch ID provided from the Spark job to the one from our Batch Metadata Store, we will know which process succeeded or failed. (If batch ID matches, it indicates success; otherwise, it indicates a failure.) If all processes succeed, we will read the data from Kafka based on the start offset and fetch new data. Otherwise, we will read a certain amount of data based on the start and end offset from Kafka, and resume from the last failed process by skipping the ones with matched batch ID.

Our job will execute the processes in sequence across tables: process/write data to Table 1, Table 2, Table 3, Table 4 and so on. After each process completes, it will update the Batch Metadata Store with the current batch ID and timestamp. After the completion of all processes, the job will update the Checkpoint Store with the offset and the job state including the batch ID.

The following Figure 4 shows the complete workflow:

Handling Mutation In Data Lake

Our engagement data is mutable, which means we not only need to support insertion but also need to support update and deletion in data lake. As we all know, mutation operation is a much more expensive operation than insertion in batch jobs. Furthermore, due to the nature of the engagement data, the mutation request could be cascaded, and our mutation volume is almost 20% of insertion volume. So the goal for us is to reduce the mutation operation as much as possible in each batch.

Use Graph To Detect Cascading Mutation

One simple example for cascading mutation is id1→id2, id2→id3, id3→id4. If we receive such a request in one batch and just execute the update query, the output will be incorrect, e.g. id1 will be id2, but we expect it to be id4. In order to execute them in a batch against our data lake, we will need to pre-process mutation requests. As Figure 5 shows, we break it down into several steps:

  • We read the mutation request from the mutation table in a batch and group them by organization and sorted by mutation execution time.
  • For each group, we use Spark Graph API to build direct graph(s) where the node is the IDs and the edge is the request, and we find all connected components.
  • We find the final state of each node; the process goes through each node in each sub-graph, and, based on the execution time, finds its corresponding final node.
  • Finally, we convert each connected component into a list request for each organization, then apply the components in a batch.

Normalize Table In Data Lake

As we mentioned, the mutation request volume is high, which impacts our performance a lot. We took a closer look into our data shape and found that a lot of records need to be mutated for any one request. For example, a request to change t_name_1 to t_name_2 can touch thousands of records because we store the t_name for each record (thanks to our “flatten data” scheme design).

As the Figure 6 shows, we extract out the “NAME” fields from our data table into a separate table called mapping table, and, in this table, we store the mapping from ID to name. By changing our original design, we also need to add an extra write in the ingestion job and an extra update in the mutation job for both data table and mapping table. But we do see a big performance jump since we significantly reduce the number of records/files updates in each mutation batch.

Performance Tuning

Finally, we’ve solved most of our design challenges and have reached the performance tuning stage. Based on the performance reports for our ingestion and mutation jobs, we achieved product SLA on the ingestion part. But addressing the performance issues in the mutation job took much longer than we expected. So we looked into the DeltaLog and ran the analysis against our tables and found the following issues:

  1. Due to the partition scheme (organization_id/engagement_date) and traffic across many thousands of organizations, there are many small files (in MBs or KBs) in certain partitions, which caused the slow mutation operation.
  2. Our mutation job doesn’t repartition by the partition key before it writes, so many tasks are writing to the same partition and slowing down the write process.
  3. Spark set the default spark.default.parallelism to 200, which limited our join and read parallelism.

To address those issue, we applied some fixes:

  1. We compacted those small files by running the “OPTIMIZE” table command provided by Delta Lake.
  2. We added one more stage at the end of our mutation job to partition by organization_id and engagement_date to make sure only one task is writing to a partition.
  3. We increased the spark.default.parallelism to 400.

After those changes, we saw a performance gain while running the performance test, but it still didn’t meet our goal, so further optimization is required. We started experimenting with the partition scheme to only partition by organization_id, since this is the only way we can improve the file distribution, but we also needed to pay attention to downstream consumer read performance (most queries are based on organization_id and date). We ran an AB test against two partition schemes:

  1. We created a new table with organization_id as the partition key and loaded the same amount of data as the original table.
  2. We ran the OPTIMIZE command with z-ordering by engagement_date to improve read performance.
  3. We analyzed the file size per organization and compared the read/update/insert operation time.

To our surprise, we saw a more than 50% read performance gain, more than 300% update performance improved, and 10% more time for insert (since we added an extra job for OPTIMIZE). This is a huge performance improvement for our data lake, and we summarized the reasons as follows:

  • The mutation requests in each batch are going to touch a big time window, which means they will need to touch a large amount of partitions if we have date as sub-partition.
  • By changing the partition scheme to organization_id, it highly reduces the I/O in both read/write/update.
  • The Delta Lake is using the Data Skipping and Z-Ordering algorithm for I/O pruning where it supports our read query pattern.

Conclusion

Building our Engagement Activity Delta Lake was a fun process. We hope our journey may help those who are designing a data lake that supports batch/incremental read, want to support mutation of the data in data lake, want to scale up data lake with performance tuning, or want to support exact once write across tables with Delta Lake.

--

--