--
Integrating Apache Airflow with Databricks : A Step-by-Step Guide
Apache Airflow and Databricks are two potent tools for data engineering, data science, and data analytics. By integrating these tools, organizations can establish an efficient workflow management system that simplifies and automates complex workflows. In this blog post, we will provide a detailed, step-by-step guide on how to integrate Apache Airflow with Databricks and how to capitalize on the advantages of this integration.
Apache Airflow is a widely used open-source workflow management platform that enables the seamless composition and scheduling of diverse workflows. Renowned for its effectiveness in managing data engineering pipelines, Apache Airflow also boasts a built-in user interface that allows for real-time monitoring of workflow execution progress.
Databricks is a cutting-edge data platform that enables organizations to effortlessly process, analyze, and visualize large-scale data. This unified data platform seamlessly integrates data engineering, data science, and machine learning tasks, enabling teams to collaborate and extract valuable insights from their data. Databricks offers an array of tools and features that streamline data workflows, enhance productivity, and hasten innovation, making it a preferred choice for data-driven organizations across a multitude of industries.
Technologies : Apache Airflow, AWS, Databricks, Docker.
Pre-requisites :
- Basic understanding of Databricks.Getting started with Databricks.
- Fundamentals of Apache Airflow .Getting started with Airflow.
- Airflow setup with docker image on local environments.
- Databricks cloud account. This blog using Databricks on AWS.
Let’s look at how to integrate Databricks Notebook with Apache Airflow.
Step 1: Set up a Databricks Connection in Apache Airflow
To use any Databricks hooks or operators, you must first establish an Airflow connection that allows Airflow to communicate with your Databricks account.
The first step in integrating Apache Airflow with Databricks is to set up a Databricks connection in Apache Airflow. This can be done by following these steps:
- Open the Apache Airflow UI and click on the “Admin” tab.
- Select “Connections” from the drop-down menu.
- Click on the “Create” button and enter the following details:
4. Conn Id: Enter a unique identifier for the connection, e.g. “databricks_conn”.
5. Conn Type: Select “Databricks”.
6. Host: Enter the hostname of your Databricks workspace.
7. Login: Enter the username for your Databricks account.
8. Password: Enter the password for your Databricks account.
9. Extra : Add “Personal Access Token” (PAT) token in key value pair.
10. Click on the “Save” button to create the connection.
In the screenshot below, the above steps are marked with an arrow based on their step numbers.
This is how airflow UI looks like after creating the Databricks connection
Databricks suggests using a personal access token(PAT) to access the Databricks REST API .The PAT authentication method must be used, and a connection must be established via the Airflow UI.
Step 2: Create a Databricks Job
This step involves creating a Databricks job for a notebook.
- We are using the DatabricksRunNowOperator so we need a Databricks job already created.
- Using the Databricks UI we can create a Databricks job.If you don’t have idea how to create a job then there is Databricks documentation available.Here is link for documentation.
- After creating a job we need to add a cluster to it and parameterise the notebook as a task.
- When you create a job, it should appear in the Databricks UI Jobs tab.
This is what the Databricks UI looks like after creating the Databricks job.
Step 3: Create a DAG in Apache Airflow
The next step is to create a DAG(Directed Acyclic Graph) in Apache Airflow. A DAG is a collection of tasks that define a workflow.
At this step with both Databricks job and an Airflow connection setup, DAG to trigger the notebook will configure the DAG.
Need to install apache airflow Databricks provider to make our DAG work locally
Add below pip installation in docker file
RUN pip install — no-cache-dir apache-airflow-providers-databricks==1.0.1
In this example DAG we are using Databricks operator i.e.DatabricksSubmitRunOperator.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.providers.databricks.operators.databricks import (
DatabricksSubmitRunOperator,
)
schedule_interval = None
# Define the new cluster params for Submit Run Operator
new_cluster = {
'spark_version': '7.3.x-scala2.12',
'num_workers': 2,
'node_type_id': 'i3.xlarge',
}
# Define the notebook path as a parameter for Submit Run Operator
notebook_params = {
"notebook_path": "/Users/example@gmail.com/databricks_notebook"
}
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=2)
}
with DAG('databricks_dag',
start_date=datetime(2021, 1, 1),
schedule='@daily',
catchup=False,
default_args=default_args
) as dag:
trigger_databricks_notebook = DatabricksSubmitRunOperator(
task_id="trigger_databricks_notebook",
databricks_conn_id="databricks_default",
cluster_id=new_cluster,
notebook_task=notebook_params,
)
trigger_databricks_notebook
In the above example DatabricksSubmitRunOperator is used to manage the definition of our Databricks job and its cluster configuration within Airflow.
To use the DatabricksSubmitRunOperator, you should define the following important Parameters:
- task_id: A unique task ID for the operator instance.
- run_name: The name of the Databricks run.
- json: A JSON object that contains the configuration for the Databricks job run. This object includes parameters like the notebook path, timeout, cluster ID, and other configuration details.
- databricks_conn_id: The name of the Airflow connection to use for connecting to Databricks. This connection should be preconfigured in the Airflow Connections settings.
- polling_interval_sec: The interval at which the operator checks for the status of the Databricks job run. The default value is 30 seconds.
- timeout_seconds: The maximum time in seconds that the operator should wait for the Databricks job run to complete. If the job run does not complete within this time, the operator will fail.
- do_xcom_push: A boolean value that specifies whether the operator should push the Databricks run ID to the XCom system. This value is used to pass the Databricks run ID to downstream tasks in the DAG.
Following are the mandatory parameters for the DatabricksSubmitRunOperator :
task_id: A unique task ID for the operator instance.
databricks_conn_id: The name of the Airflow connection to use for connecting to Databricks. This connection should be preconfigured in the Airflow Connections settings.
Cluster_Id : ID of dedicated cluster that is linked with a notebook.
Notebook_task : Path of the Databricks notebook we want to trigger.
Conclusion : This step-by-step guide outlines the process of setting up a Databricks connection, creating DAGs, adding tasks, configuring operators, and running workflows.Leveraging the combined capabilities of Apache Airflow and Databricks, we streamline complex data processing pipelines, automate repetitive tasks, and enhance productivity.
Credits
I extend my special appreciation to the individuals who have provided invaluable assistance in the creation of this blog post. Their contributions have been instrumental in its development, and I am sincerely grateful for their support.