Guaranteed Data Delivery System for Distributed Services
Authors: Zhidong Ke, Utsavi Benani, Kevin Terusaki
With Sales Cloud Einstein, relevant activities like emails and meetings from your Inbox are automatically crawled and added to your organization timeline. While designing the data pipeline using distributed system, we keep our customers’ trust as our top priority, which means we need to make our system reliable, with as high a fault tolerance as possible.
No matter how long the pipeline is, you never want to see data dropping off along the way. And yet that’s exactly what we were finding: emails and meeting notices were sometimes disappearing, and we didn’t have a way to trace each message. Debugging was extremely difficult, in part due to untraced log. So we set out to build an End-to-End Verification System to trace every message to its downstream consumer and to automatically retry if a message failed to deliver rather than manually re-crawling.
Here’s how we summed up the requirements our system would need to meet:
- Able to trace every email and meeting notice at scale
- Able to support creating a new entry when a new notice is created
- Offer support for acknowledgment by each downstream consumer
- Able to support a query that returns all messages that failed to delivery
- Kicks-off automatic retries when a message fails to deliver
- Can track how many messages failed initially, passed after system implementation, and failed in spite of new system implementation.
Before starting to build our own tool, we evaluated the open source options Jaegar and Datadog.
Jaegar was powerful enough to support the distributed tracing we needed and offer latency metrics, but it would have required us to install the agent on every machine it needed to run on. And, it’s not designed for an end-to-end verification system like we needed, since it doesn’t support querying for all failed messages.
Datadog was also powerful enough to support our needs, and we already have it installed on our machines. But it had the same lack of an end-to-end verification system and is also relatively new, so we were wary of going all in on it at this time.
Ultimately neither of these solutions would have worked for us out-of-the box, so we settled on building our own.
High-level Architecture
- Create an end-to-end verification module and implement insert/update/delete/query operation with Cassandra as data store.
- Integrate the module with our realtime crawler to insert a message for each notice.
- Support downstream consumers to acknowledge back.
- Create a verification job to query the system every 10 minutes to fetch the messages that were not delivered by at least one consumer, then kick off the retry for them.
- Implement a checkpoint to verify every completed bucket.
Component Level Details
Cassandra as storage
The schema for our Cassandra datastore would look like this:
CREATE TABLE verification (
bucket timestamp,
datasource text,
id text,
crawlTime timestamp,
priority int,
service_1 boolean,
service_2 boolean,
retry int,
PRIMARY KEY ((bucket), datasource, crawlTime, externalId))
WITH CLUSTERING ORDER BY (datasource DESC, crawlTime DESC, externalId DESC)
Based on the number of events occurring per hour during peak times, so we used 10 minutes as a bucket, with a Time to Live (TTL) of seven days for the collected data.
Connectors
When the collector (Exchange/Google) collects a notice/datasourceId, it inserts a message into Cassandra through the Module based on the schema above. The Module will figure out the bucket based on the current timestamp and will keep every consumer check box as empty, waiting for them to acknowledge. This acknowledgement will happen at each downstream consumer after they successfully process the data. We also built an API as a proxy into our core infrastructure, so, for every message received by consumer in core infrastructure, we call the API to acknowledge it.
Verification/Retry job
For checkpointing, we’re using Zookeeper as our metadata store. In our implementation of the re-try job, Spark reads the last checkpoint from it. Then, the verification job queries Cassandra for each bucket and gets all of the messages that need to be retried. The verification job passes the retry request to the Retry Job, grouped by the id. The Connector Re-processor makes a batch call to the external datasource and emits the missing activity to the downstream consumers accordingly, and creates a new entry with the current bucket in Cassandra and increases the retry counter. Once all messages are completed in the retry, the driver gets notified and creates a checkpoint for this bucket.
With this design in place, our system is becoming more robust and is able to heal automatically after any errors happen. Of course, there is always some room to improve, so we are currently working on trying to extend it to support the span or branch and retry failures from any point of the data flow. But as it stands, we are able to trace and view problematic components in real time. The advantage of this design is that it is simple to implement and easy to embed into any kind of application by installing the module. It’s worked so well for us that we are in the process of open sourcing it, so stay tuned for a link to a GitHub repo in the future!