How to orchestrate Databricks jobs from Azure Data Factory using Databricks REST API

A better (and cheaper) way of controlling your jobs than using Azure Data Factory official Databricks notebook connector.

Iván Gómez Arnedo
9 min readOct 5, 2021

Azure Data Factory (ADF) is a very powerful tool for process orchestration and ETL execution within the Azure suite. Indeed, it has its limitations and many will prefer to use open source alternatives such as Airflow, but Azure Data Factory could be a very good alternative in case you use Azure.

Disclaimer

In this article you are NOT going to find:

  • Description and differences between similar tools (EX: Azure Data Factory and Airflow).
  • Description of simple processes already well described in the official documentation.
  • Extensive descriptions of the tools to be used (you should have a basic knowledge of these tools).

In this article you will find:

  • Practical examples of real use cases in which I had to design and think of alternatives since I could not find anything similar on the internet.
  • Examples that do not reinvent the wheel. I’m a lazy guy, every time that something is already explained I will add a link to that article/documentation.

Real Use Case

A real use case in which Azure Data Factory and Databricks are used together can be seen in following architecture where Azure Data Factory performs the Extraction and Load, meanwhile Databricks performs the Transformations of an ETL process:

Example of Data Platform in Azure

All the different source systems (see left part of previous image) could inform Azure Data Factory when there is new data available by updating the State Storage External System (See below).

This approach can be used when there is a real need of process/insert data just when new data becomes available (If this is not the case, then a normal Schedule trigger can be used). It can also be used in a Event-driven architecture (See article below): when creating new events to inform that the data is available or when reading new events when the data is available.

Execute Databricks jobs from Azure Data Factory

Depending on the use case (many simultaneous executions or some eventual jobs) you will have to use two different approaches:

Activities won’t end until related notebooks have finished.
  • If what you are looking for is to execute multiple (more than 20) simultaneous jobs and to monitor them asynchronously, keep reading :).
Activities finish when the call to REST API has been completed.

Limitations of the native connector (synchronous orchestration)

  • For the same Databricks workspace it can only be run, at most, 145 jobs in parallel (Limit for the whole Databricks workspace, not only for a certain job type).
  • For the same Azure Data Factory pipeline we can only run 50 parallel activities in the same FOR EACH activity (I have only be able to run 20 parallel activities at the same time).
  • To get the most out of Databricks we would need, at least, 3 equal ADF pipelines running at the same time and to take care of possible parallelism problems.
  • Each activity in ADF is executed by an Integration Runtime (VM like). If you are synchronously monitoring a Databricks notebook, you will be paying for the whole processing time in Databricks and for the Integration Runtime that will be constantly monitoring your execution.

In the previous use case, some pipelines will be executed every half an hour, having all these pipelines running at the same time (if there are some long-running Databricks notebooks/jobs) :

  • Could decrease overall ADF performance.
  • Will increase costs as more Azure Integration Runtimes are needed.
  • Will increase difficulty to monitor everything or to search for a certain execution.

Asynchronous Databricks REST API orchestration

For cases in which the number of parallel jobs to execute has to be higher or where the negative points described above constitute red flags, an asynchronous solution using the Databricks REST API can be used by following next steps:

1. Databricks Personal Access Token (PAT) creation

To be able to use Databricks REST API it’s needed to use a Databricks Personal Access Token (PAT) to identify your Databricks workspace.

Once the Token is created, it will be stored in an Azure Key Vault. In case that the Azure Data Factory instance doesn’t has access permissions in that Azure Key Vault, access to the Managed Identity of the ADF instance should be granted in that Azure Key Vault.

2. Databricks jobs creation

A new Databricks job has to be created with the notebook that it wants to be asynchronously monitored.

3. State Storage External System

Storing information about the processes to be executed (EX: Tables to be ingested/processed) in an external system has many advantages:

  • It is not necessary to create a new pipeline for each new type of data to ingest/process.
  • The system can be updated by third party processes that add records when there is new data to process (EX: New data generated by a Change Data Capture process).

In this article, the State Storage External System used is: Azure SQL Server and the table that will store the state is called: track_processing.

All external systems that generate new data will create new entries in that table when there is new available data for a certain source/table. These new entries will be read by the Trigger Pipeline (see below) and when the processing of these entries begins, its status will be changed to ‘RUNNING’ to avoid executing the job created by the entry twice. Once the execution is finished, the status of the entry will be updated again with a ‘Failed’ (job will be retried in next execution) or ‘Done’.

4. Pipelines creation

Two pipelines have to be created to trigger and asynchronously monitor the Databricks jobs:

  1. Trigger Pipeline: pipeline that will read from the State Storage External System (EX: Azure SQL Table) to identify the jobs to be executed (EX: Tables to process) and, for each one of them, it will be executed a call to the Databricks REST API sending to it the necessary parameters and updating the status of them to reflect that they are currently being run (By storing the RUNID of each one of them).
  2. Monitorization Pipeline: pipeline that will check the status (with calls to the Databricks REST API) of all jobs that are marked as running in the State Storage External System. In case that the status of one of them have changed, the entry will also be updated to reflect that the execution has finished.

4.1. Trigger Pipeline

One of the simpler versions of the pipeline will look like this:

Base pipeline and flow of the execution

(1) The first activity is a call to the Monitorization Pipeline (see below) that will check (and modify) the status of all the jobs that were running in previous execution of that pipeline.

Then, the PAT will be retrieved from the Key Vault in which it was stored:

Secure output and Secure input are two important properties to avoid displaying credentials in plain text.
More information about how to manage secrets in ADF following a CIDC approach

URL: https://YOUR_KEYVAULT_NAME.vault.azure.net/secrets/YOUR_SECRET_NAME?api-version=7.0

Method: GET

Authentication: Managed Identity

Resource: https://vault.azure.net

Finally, the list of jobs to be executed will be queried from the Azure SQL in which it was stored (EX: All tables from which we have received new Deltas) and a parallel flow for each one of these results will be executed.

(2) For each iteration of the flow, it will be queried ( with a call to the Databricks REST API) the number of jobs currently in execution in the Databricks workspace being used:

URL: https://northeurope.azuredatabricks.net/api/2.0/jobs/runs/list?active_only=true&offset=0&limit=150

Method: GET

Headers : KEY: Authorization Bearer VALUE: @{activity(‘Get PAT TOKEN’).output.value}

In the IF Activity, following conditional is executed:

@less(length(array(activity(‘Check jobs in execution’).output.runs)),130)

As its said above, the maximum number of jobs that could be run in parallel in Databricks is 145, as the For Each activity is executed with a parallelism of 10 and to avoid possible race conditions, the maximum number of jobs that this pipeline will execute at the same time will be 130.

Finally, in case that the condition is met (or in the case that the check fails, as it will mean that there is no job in execution on the cluster) (3) , a new pipeline will be executed.

(4) The final pipeline will take the parameters from the call to the Azure SQL (State Storage External System) and will create a JSON-like body with them to send the parameters to be used in the job:

See Jobs-run Databricks REST API documentation

URL: https://northeurope.azuredatabricks.net/api/2.0/jobs/run-now

Method: POST

Headers : KEY: Authorization Bearer VALUE: @{activity(‘Get PAT TOKEN’).output.value}

Body: { “job_id”: 1, “notebook_params”: { “job_param1”: “value”, “job_param2”:”value”}}

And it will update the corresponding entry of the data to be processed in the Azure SQL with information of the execution:

  • STATUS: Will be changed to “RUNNING
  • RUNID: Will be updated from NULL to the output value of the “Run Job” activity:
@{string(activity(‘Run job’).output.run_id)}

4.2 Monitorization pipeline

Base pipeline and flow of the execution (with the conditions to met to follow each path)

(1) The structure of the base pipeline is very similar to the previous one with some differences:

  • ‘Check tables to Process’ activity will query Azure SQL to get all the entries with status=’RUNNING’ or/and RUNID<>NULL.

(2) For each iteration of the flow, it will be queried the status of the job represented by RUNID with a call to the Databricks REST API.

See Databricks Jobs-runs-get-output REST API documentation

URL: https://northeurope.azuredatabricks.net/api/2.0/jobs/runs/get-output?run_id=@{item().runid}

Method: GET

Headers : KEY: Authorization Bearer VALUE: @{activity(‘Get PAT TOKEN’).output.value}

Then, depending on the conditions met, there are three possible ways to continue the flow:

Job still In Progress: Nothing will be done (no condition will be met) so nothing will change and the status of the job will be checked again in the next pipeline execution.

(3) Job Success: If the job has finished successfully. Following condition has to be met:

@and(equals(activity(‘Check job status’).output.metadata.state.life_cycle_state,’TERMINATED’), equals(activity(‘Check job status’).output.metadata.state.result_state,’SUCCESS’))

First, track_processing table will be updated to change its STATUS field to ‘DONE’.

Then, a log will be created and stored in the table designed for that. More information about how to create a useful logging system:

(4) Job Failed: If the job has failed in any step of the execution. Following condition has to be met:

@or(and(equals(activity(‘Check job status’).output
.metadata.state.life_cycle_state,’TERMINATED’), or(equals(activity(‘Check job status’).output.metadata.state.result_state,’FAILED’),equals(activity(‘Check job status’).output.metadata.state.result_state,’CANCELED’))),equals(activity(‘Check job status’).output.metadata.state.life_cycle_state,’INTERNAL_ERROR’))

A job failed with status ‘CANCELED’ or ‘INTERNAL_ERROR’ will be treated as a normal ‘FAILED’ job.

Same processes than for the ‘SUCCESS’ case but, in this case, the track_processing table will be updated to reflect the ‘FAILED’ status of the job. A new entry will be added to the Logs table with information of the failure.

And that’s all. I hope that this has been useful for you and that you have saved some time.

Thanks for reading. You can find me on Linkedin.

“The success formula: solve your own problems and freely share the solutions.”
Naval Ravikant

--

--