Manage Databricks workloads with Apace Airflow

Sheranga Gamwasam
Analytics Vidhya
Published in
4 min readFeb 25, 2024

Databricks and Apache Airflow are essential tools in the day-to-day life of Data Engineers because Databricks is an open-source Unified Analytics Platform that runs on Apache Spark for big data processing and also Apache Airflow can be used to author, schedule and monitor out workflows effectively.

If we have a large number of ingestion and transformation workflows in Databricks, it becomes crucial to effectively manage and orchestrate these workflows to ensure efficient data processing and analytics. In such a scenario, Apache Airflow can be invaluable. With Apache Airflow, you can define, schedule, and monitor these workflows as Directed Acyclic Graphs (DAGs), allowing for better organization, dependency management, and scalability. This ensures that Data Engineers can easily manage and scale their data pipelines as their workload grows, optimizing resource utilization and improving overall productivity.

So, in this article, I’ll demonstrate how to connect Databricks and Airflow, as well as how to schedule and monitor your Databricks tasks using Airflow.

For this demonstration, I’ll be leveraging Airflow Docker Compose and Azure Databricks. Let’s outline the steps required to connect and schedule our workflow in Airflow and I assume you’ve already set up Airflow Docker Compose and have a Databricks account ready.

  1. Generate an Azure Databricks personal access token for Airflow
  2. Set up a Databricks Connection in Apache Airflow
  3. Create a DAG in Apache Airflow

Generate an Azure Databricks personal access token for Airflow

To connect Airflow to Databricks, you’ll need an Azure Databricks personal access token (PAT). Here’s how to create one:

  1. In your Azure Databricks workspace, click on your Azure Databricks username located in the top bar.
  2. From the drop-down menu, select ‘User Settings
  3. Navigate to the ‘Developer Settings’ section.
  4. Click on the ‘Manage’ button for Access Tokens.
  5. Generate a new access token by following the prompts.

Set up a Databricks Connection in Apache Airflow

To utilize Databricks hooks or operators in Airflow, you need to establish an Airflow connection enabling communication with your Databricks account.

Here’s how to set up a Databricks connection in Apache Airflow:

  1. Open the Apache Airflow UI and navigate to the “Admin” tab.
  2. Choose “Connections” from the drop-down menu.
  3. Click on the “Create” button and provide the following details:

Create a DAG in Apache Airflow

The subsequent step involves creating a DAG (Directed Acyclic Graph) in Apache Airflow, which serves as a blueprint for a workflow.

In this example, I’ll create the following DAG, consisting of two dummy operators and two DatabricksSubmitRunOperators (notebook_task1, notebook_task2)

notebook_task1.py

notebook_task2.py

DAG

from airflow import DAG
from datetime import timedelta, datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago

default_args = {
'owner': 'airflow',
'email': ['msgamwasam@gmai.com'],
'depends_on_past': False,
}

dag = DAG(
dag_id='databricks_operator_demo2',
default_args=default_args,
schedule_interval=None,
start_date=days_ago(1),
tags=['databricks-airflow-demo'],
)

EXISTING_CLUSTER_ID = xxxxxxxxxxxxx

notebook_task_params1 = {
'existing_cluster_id': EXISTING_CLUSTER_ID ,
'notebook_task': {
'notebook_path': '/Users/manoj.gamwasam@pickles.com.au/notebook_task1', # Notebook Task
}
}

notebook_task_params2 = {
'existing_cluster_id': EXISTING_CLUSTER_ID ,
'notebook_task': {
'notebook_path': '/Users/manoj.gamwasam@pickles.com.au/notebook_task2', # Notebook Task
}
}

start = DummyOperator(task_id = 'start', dag = dag)

notebook_task1 = DatabricksSubmitRunOperator(task_id='notebook_task1', json=notebook_task_params1)

notebook_task2 = DatabricksSubmitRunOperator(task_id='notebook_task2', json=notebook_task_params2)

end = DummyOperator(task_id = 'end', dag = dag)

start >> [notebook_task1, notebook_task2] >> end

If everything goes well, after starting the scheduler, you should be able to see two Delta tables created in the default managed database in Databricks, with inserted data as well.

If you need any assistance with this process, feel free to leave a comment below or email me at msgamwasam@gmail.com.

--

--