Durable, Fault Tolerant & Resilient Workloads in Azure Databricks

Nicholas Hurt
Microsoft Azure
Published in
7 min readFeb 28, 2019

Recently I was asked how to prevent any loss of data or in-memory state, should a master or worker node fail in an Azure Databricks cluster. This lead me to investigate the options that can isolate workloads from failures at both the VM (node) level and the regional datacenter level. It’s good to discuss this with reference to the architecture of Azure Databricks

At the VM level, failures should be extremely rare as Azure SLAs will guarantee Virtual Machine (VM) connectivity at least 99.9% of the time, however this is not a reason to be complacent or unprepared, as incidents do happen.

At the regional level, appropriate DR topologies and coordination should be implemented and deployed as described in further detail below.

Worker node failures

Fault-tolerance and resilience are essential features which one would expect from a processing framework such as Spark. One of the key ingredients to achieve this lies behind the principle of Resilient Distributed Datasets.

RDDs are a fault-tolerant collection of elements that can be operated on in parallel, but in the event of node failure, Spark will replay the lineage to rebuild any lost RDDs . Even code written using the Structured APIs will be converted into RDDs thanks to the Catalyst Optimizer:

Source: https://medium.com/microsoftazure/apache-spark-deep-dive-4e01022afc32

So out-of-the-box, Spark running on Azure Databricks should tolerate worker node failure from a theoretical point of view, but if you want to see it in action refer to the demo section below!

Storage

The next thing to understand is that data is completely decoupled from compute — if any cluster dies (or even the whole workspace) the data is not persisted in the cluster. If mounted on Blob or Azure Data Lake Storage (ADLS) then the data resides in Azure Storage. To ensure the highest degree of durability and use the Geo-redundant storage option:

Geo-redundant storage (GRS) is designed to provide at least 99.99999999999999% (16 9’s) durability of objects over a given year by replicating your data to a secondary region that is hundreds of miles away from the primary region. If your storage account has GRS enabled, then your data is durable even in the case of a complete regional outage or a disaster in which the primary region isn’t recoverable.

Disaster Recovery

Should a disaster strike a particular region your data would be protected, but you still need to create a regional disaster recovery topology as described in the link which involves provisioning a workspace in separate regions and migrating users, config, code etc.

Running jobs — Retry policy

To ensure job resiliency, particularly in the event of master node failure, you must specify a retry policy and if you’re running a streaming operation, use checkpointing to ensure that queries resume where they left off:

To make your queries fault tolerant, you must enable query checkpointing and configure Databricks jobs to restart your queries automatically after a failure

This is all well and good but what about the in-memory state store used during streaming join operations and de-duplication? Thanks to checkpointing again and write ahead logs, this state is persisted on the same Azure Storage if your checkpoint location is mounted on Blob or ADLS. For examples of these please refer to my streaming ETL demo.

At every trigger, the state is read and updated in the state store, and all updates are saved to the write ahead log. In case of any failure, the correct version of the state is restored from checkpoint information, and the query proceeds from the point it failed. Together with replayable sources, and idempotent sinks, Structured Streaming ensures exactly-once guarantees for stateful stream processing.

Demonstration

I also had the unusual (but fair) request to demonstrate some of these capabilities — how Spark jobs running on Azure Databricks could be resilient to either worker or (in the worst case) driver failures.

Generally, as a PaaS offering the underlying infrastructure would be “hidden” from you as the end consumer of the service, however to demonstrate this capability of Spark we needed to be a little creative.

At the time of writing Vnet injection is in preview and this allows you to run your Databricks data plane resources within your own Azure Virtual Network. There are many reasons from a security perspective why this would make sense but for our purposes it gives us the opportunity to adjust the security group which will allow ssh access to the underlying VMs.

** Note: this demonstration may not be possible in the future or once Vnet injection is GA

To deploy this you could configure your own public/private subnets but the easiest way is to use the all-in-one ARM template. Follow this link and click on the “Deploy To Azure” button, at which point you will be directed to the Azure portal where complete the configuration:

Once the resources are ready, go into the resource group and open the databricks-nsg network security group. Open the Azure Cloud Shell and obtain the IP address of the shell by running the following command.

dig +short myip.opendns.com @resolver1.opendns.com

Click on inbound security rules and add a new entry which will allow ssh access from this IP. Specify a source IP and a destination port of 2200. Alternatively you could whitelist your local machine’s IP but cloud shell is just more convenient especially for those running Windows.

Returning to cloud shell follow the steps to create a ssh key pair so that we ssh to the cluster. Once created, copy the public key generated in the .ssh folder and leave this shell connected, i.e. if closed or disconnected you’ll need to whitelist the new IP of your shell.

Then navigate back to the resource group and launch the Databricks workspace. Click on clusters and create a standard cluster with all the defaults, however expand the advanced section, navigate to the ssh tab and copy the SSH Public Key into the box provided. Then click create cluster.

Once running navigate to the clusters panel, click on your cluster, and then click on the Spark Cluster UI — Master.

Note the hostname/IP address of the master. Click the drop down to choose one of the workers and take note of the IP.

Now let’s create a new notebook that joins two synthetic streams. Leave this running in a separate browser window.

import org.apache.spark.sql.functions._spark.conf.set("spark.sql.shuffle.partitions", "1")val impressions = spark
.readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()
.select($"value".as("adId"), $"timestamp".as("impressionTime"))

val clicks = spark
.readStream.format("rate").option("rowsPerSecond", "5").option("numPartitions", "1").load()
.where((rand() * 100).cast("integer") < 10) // 10 out of every 100 impressions result in a click
.select(($"value" - 50).as("adId"), $"timestamp".as("clickTime")) // -100 so that a click with same id as impression is generated much later.
.where("adId > 0")
display(impressions.join(clicks, "adId"))

Go back to the Azure portal tab with cloud shell open and ssh to the worker VM using the following command.

ssh ubuntu@ip-of-worker-node -p 2200 -i ~/.ssh/id_rsa

Now you can run a shutdown command or find the spark process and kill it.

sudo shutdown now

Back in your Databricks tab you should notice that your job is still running…

Going back into the Spark Cluser UI section we see it has marked the worker state as DEAD

The good news is that your job will continue to run and after some time you should see new nodes provisioned.

Simulate driver failure

In order to recover from this scenario we need to ensure run our streaming notebook as a job and specify a retry policy, should the job fail. Click on the jobs section, and create a new job, selecting your notebook to run, and in the advanced section specify a retry policy. Edit the cluster section, leaving defaults and enter your public key in the advanced options. Note the job will create a new cluster which will be needed in this scenario.

Run the job now and wait for the cluster to start up in the job clusters section. Once started click into it, and grab the IP from the Spark Cluster UI. Back in Azure cloud shell, ssh to this IP and run the shutdown command.

Head over to the clusters section in Databricks and watch the events unfold…

The affects are not immediately visible, but after some time you will notice the running job will fail and a new cluster will begin to spin up…

Launching a new replacement cluster…

Once the cluster is provisioned, the job will resume!

Back in business!

Conclusion

Even though we have covered these aspects from a very general perspective, I hope you agree that building highly reliable and fault-tolerant pipelines using Azure Databricks is entirely possible if done in the correct manner. Implementing a DR topology and leveraging the guarantees that Azure Storage provides are two important steps in this process. Additionally, checkpointing and write-ahead logs provide fault-tolerance for streaming operations, and enabling a retry policy on jobs that create new clusters is key to ensuring full recovery from master node failures.

--

--

Nicholas Hurt
Microsoft Azure

My personal blog, usually tech related. My views are my own.