Global Synchronization and Ordering in Delta Lake

Michael Zhang
Salesforce Engineering
8 min readOct 14, 2020

Authors: Heng Zhang, Kevin Terusaki, Zhidong Ke, Utsavi Benani

Introduction

At Salesforce, we maintain a platform to capture customer activity — various kinds of sales events such as emails, meetings, and videos. These events are either consumed by downstream products in real time or stored in our data lake, which we built using Delta Lake. This data lake supports the creation of up-to-date dashboards by downstream components like Einstein Analytics and the training of machine learning models for our customers who are using Sales Cloud Einstein to intelligently convert their leads and create new opportunities.

One of the great features provided by Delta Lake is ACID Transactions. This feature is critical to maintain data integrity when multiple independent write streams are modifying the same delta table. Running this in the real world, we observe frequent Conflicting Commits errors which fail our pipeline. We realize that, while ACID Transactions maintain data integrity, there is no mechanism to resolve writing conflicts. In this blog, we share a solution to ensure global synchronization and ordering of multiple process streams that perform concurrent writes to the shared Delta Lake. With this mechanism, we greatly improved our pipeline stability by eliminating Conflicting Commits errors and maintaining data integrity.

Pipeline Design

Our pipeline consists of three process streams: Engagement Ingestion, Engagement Mutation, Engagement Time To Live (TTL). Each uses Spark Structured Streaming Framework to consume engagement data or requests in micro-batches from our internal Kafka queue. Here are the details:

  • Engagement Ingestion: Read engagement records from Kafka queue and append them to Engagement data table.
  • Engagement Mutation: Read engagement mutation requests from Kafka queue and apply mutation operations (delete or update) to Engagement data table. Our Engagement data lake is mutable. Each engagement activity is identified by an ID. When a lead becomes a contact, they will have a new ID, and all engagement activities belonging to that lead need to be updated with the new ID. Mutation requests for each engagement activity are created by upstream components. We are then notified via Kafka and the mutation request can be handled by our Engagement Mutation job.
  • Engagement TTL: Read GDPR requests from Kafka and apply delete operations to Engagement Data Table. GDPR requests are created by our Data Policy service and notify us via Kafka

There are two important aspects of the pipeline.

Independent Process Stream

The independence manifests itself in two aspects. First, each process stream pulls data from independent Kafka queues. Second, they pull data at different intervals based on Service Level Agreements (SLAs). For example, Engagement Ingestion and Mutation processes every hour, while Engagement TTL processes once per day. If needed, Engagement Ingestion and Mutation can run more frequently with smaller intervals such as 15 or 30 minutes.

Process Ordering

Besides the independence, there is a process ordering. The natural order of engagement activity life cycle is creation, mutation, and TTL. We expect the same order in the arriving time of engagement activity, mutation, and TTL request to our Kafka queues and the process order of the three streams. We have retry mechanisms to handle mis-order situations when mutation requests arrive before the engagement activities due to system glitches.

Conflicting Commits Error

The symptom is that one of the process streams fails occasionally with the error shown below. The error happens when any two of the three process streams concurrently operate on the same parquet file in an overlapped window.

ConcurrentDeleteReadException occurs when a concurrent operation of Engagement TTL deleted a file that Engagement Mutation read

To understand the error, we first need to understand the concurrent control model of Delta Lake. Delta Lake uses optimistic concurrency control to provide transactional guarantees between writes. To control the degree to which a transaction must be isolated from modifications from concurrent transactions, it defines three isolation levels (listed from strongest to weakest): Serializable, WriteSerializable (we use it and highlighted above) and Snapshot. Below is the write conflict matrix for different operations and isolation levels. Our three process streams fall into the cells outlined in blue.

Solution

In this section, we will look at four possible solutions to resolve this error and elaborate why the first three do not work and why we choose the final one.

Partitioning and Disjoint Command Conditions

This solution is described in the delta lake document of Concurrency control. The idea is to appropriately partition your data and avoid two operations by adding disjoint command conditions. For example, if the table is partitioned by date, the two operations below will not conflict.

UPDATE table WHERE date > '2010-01-01' ... and DELETE table WHERE date < '2010-01-01'

This solution does not work for us. Our Engagement Data table is partitioned by OrgId and Z-ORDER by engagement date. Because of Z-ORDER, the data is clustered into file chunks that contain engagement activity records in a range of engagement dates. We do include the two columns, OrgId and EngagementDate, in our commands. However, the commands could be applied to the same data file because of Z-ORDER clustering.

Orchestration Tools

We also examined the option of using an orchestration tool such as Airflow to operate our process streams into a Directed Acyclic Graph (DAG) of a pipeline: Engagement Ingestion → Engagement Mutation → Engagement TTL. Although running those process streams in a DAG as a whole could eliminate conflicting commits error, it would introduce multiple limitations. First, it bounds all streams to run at the same pace, that of the highest SLA . This could cause empty process for Engagement TTL because the TTL request per org is sent once per day. Second, the pipeline’s independence characteristic indicates that running it as a DAG does not fit. Third, we would have to run the streams as a batch job and would lose the option to run them continuously in micro-batches.

Loosen Isolation levels to Snapshot

This option does not guarantee data integrity, which is not acceptable to us.

Distributed Locking

With all the analysis, we realized that our Conflicting Commits issue falls into race condition and locking model. The shared resource is Engagement Data table and our streams are the threads modifying it concurrently. We decided to build a thin layer on top our Spark framework to enable a distributed lock on the Engagement Data table to ensure a globally synchronous write. We borrow the concept of compute-and-swap to ensure the ordering of the process. Here are the details.

Job Coordinator

We use Zookeeper’s Shared Locks recipe as our distributed locking mechanism. We create a light-weight component called Job Coordinator to perform locking operations. It is initialized during the stream job, starts to perform necessary setup on Zookeeper, and applies distributed locking at the micro-batch level.

Configuration

Job Coordinator needs three configurations. The first one (job.coordinator.lock.name) indicates the shared resource to secure. Streaming jobs performing concurrent writes against the shared Delta table should use the same resource name. The second one (job.coordinator.name) indicates the name of the streaming job. The third one (job.coordinator.predecessor) indicates the expected predecessor streaming job name. Job Coordinator uses the first configuration to ensure globally synchronous write and uses the last two configurations to ensure the ordering of execution across micro batches.

job.coordinator.lock.name=ENGAGEMENT_HVS_DATA
job.coordinator.name=Mutation
job.coordinator.predecessor=Ingestion

Workflow

Here is how Job Coordinator works in detail:

  1. Streaming Job starts and Job Coordinator is initialized with Zookeeper.
  2. Streaming job pulls data from Kafka periodically and starts a micro-batch process when message arrives in Kafka.
  3. Within a micro-batch process, Job Coordinator first tries to obtain a distributed lock with the given resource name set in job.coordinator.lock.name. If it cannot obtain a lock within a given time, it gives up. The next pull will start from the last checkpoint.
  4. Once it obtains the lock, it reads the the Predecessor field and compares it with the expected one set in job.coordinator.predecessor. (4.1) If the predecessor is not expected, it gives up this turn, releases the lock, and the next pull will start from the last checkpoint. (4.2) If the predecessor is expected, it registers its name set in job.coordinator.name.
  5. The micro-batch process starts.
  6. The checkpoint is saved.
  7. The Job Coordinator releases the lock and waits for the next pull.

APIs

Job Coordinator provides three APIs to fulfill its functionalities.

class JobCoordinator(appContext: AppContext, zk: ZooKeeper) {/**
* Obtain distributed lock.
* It blocks current thread til lock is obtained
*/
def lock(): Either[Throwable, String] = {
.....
}
/**
* Release distributed lock
*/
def unlock(): Either[Throwable, Boolean] = {
.....
}
/**
* Register this running instance of an application as the current running application for THIS LOCK
* @return The predecessor
*/
def register(): Either[Throwable, String] = {
.....
}

Test Job Coordinator

In order to test that Job Coordinator can truly resolve conflicting commit errors, we create three mocked structure stream jobs — to simulate Ingestion, Mutation and TTL — and put them into two testing groups: one with Job Coordinator enabled, and the other with job coordinator disabled. Here are the test results:

In the first group, with Job Coordinator enabled, there is no overlapped running window observed and the ordering of processing across jobs are preserved. (The full visualization is available here https://time.graphics/editor/358157)

In the second group, with Job Coordinator disabled, we observed frequent overlapped running window where conflicting commit errors could happen. (The full visualization is available here https://time.graphics/editor/357088)

Apply Job Coordinator to other Data Lakes

With Job Coordinator, we no longer observe conflicting commit errors in our Engagement Data table pipeline. Later, we applied Job Coordinator to other pipelines of data lakes containing different sales activity types such as VoiceInsight, EmailEvent and MeetingEvent, and no commit errors occurred on those pipelines either.

Final Remarks

It will be common to see more streaming processing in building modern data lakes. With Delta Lake’s ACID Transactions feature, conflicting commit error is likely to happen especially when multiple process streams tries to write to the same table. While multiple options to address this issue are available, they introduce other limitations or compromises in partitioning strategy, data processing, and data integrity. Applying distributed locking to multiple stream processing in Delta Lake can resolve write conflicts and effectively mitigate those limitations. It proved to be a successful pattern when applied to our multiple Delta Lake pipelines. We hope that our story will provide you insights when resolving similar problems.

--

--