Airflow and Azure Storage Accounts

How to connect your local and remote Airflow instance to Azure Blob Storage and Azure Data Lake Store Gen2

DataFairy
4 min readSep 17, 2023

Disclaimer

I am running Airflow locally with astronomer.io. I will reference this in the tutorial a few times and some configuration is very specific to this Airflow version.

Running locally

Before working with blob storage it’s useful to be able to run tests with sample data from your local storage. To do this you will first have to mount your local data to make it available in Airflow. For this to work update your Airflow Dockerfile:

FROM quay.io/astronomer/astro-runtime:9.1.0
# Add the following line and change the local data location if necessary
COPY dags/data /opt/airflow/data

With a hardcoded data string the extract task looks something like this:

 @task()
def extract():
"""
#### Extract task
A simple "extract" task to get data ready for the rest of the
pipeline. In this case, getting data is simulated by reading from a
hardcoded JSON string.
"""
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'

order_data_dict = json.loads(data_string)
return order_data_dict

You will need to change it to this version:

@task()
def extract():
"""
#### Extract task
A simple "extract" task to get data ready for the rest of the
pipeline. In this case, getting data is simulated by reading from a
mounted folder containing a JSON file data.json.
"""
order_data_dict = []

with open("/opt/airflow/data/data.json") as f:
order_data_dict = json.load(f)

return order_data_dict

Now we can test our code with sample data.

Allow connection testing in Airflow

In the astronomer.io Airflow Docker version the testing of connections is disabled by default. Add this line to your .env file and run astro dev restart.

AIRFLOW__CORE__TEST_CONNECTION=Enabled

Connect to Blob Storage

To connect to Azure blob storage we need to create a new connection in Airflow. In this example I will be using a SAS URL for the storage account as this will give me the option to use least privilege access and it can be set to be valid temporarily.

Fill in the following connection values:

Not all values are required even when they are not marked as (optional).

Resource not found error:

If you get this error you might want to remove the name of the blob storage in Blob Storage Login and replace the sas token with the SAS url.

This would look something like this:

https://storage_account_name.blob.core.windows.net/container_name?SAS_token

WASB connector

To be able to interact with Azure blob storage in Python we will use a WASB hook from the airflow.providers.microsoft.azure library. You should have the following code in your requirements.txt:

apache-airflow-providers-microsoft-azure

Run astro dev restart and update your dag with the following code:

from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
import ...
@dag(...)
def extract_from_azure_blob():
@task()
def extract():
blob_connection = WasbHook(wasb_conn_id="blob")
data = len(blob_connection.get_blobs_list('raw'))
return data

order_data = extract()
print(order_data)

extract_from_azure_blob()

Connect to Azure Data Lake Store Gen2

Connecting to Azure Data Lake is a bit more involved. We can use either an app registration client id and secret or a connection string. As the connection string is easier to create (you don’t need to be an admin) we will use that. To create a connection string we need to create a SAS token but on the level of the storage account and not the container.

The connection in Airflow looks as follows:

In this example the ADLS Gen2 Connection String is required.

Now we can update our task using an ADLS hook.

from airflow.providers.microsoft.azure.hooks.data_lake import AzureDataLakeStorageV2Hook
import ...
@dag(...)
def extract_from_azure_blob():
@task()
def extract():
blob_connection = AzureDataLakeStorageV2Hook(adls_conn_id="Data_Lake")
data = blob_connection.list_file_system()
return data

order_data = extract()
print(order_data)


extract_from_azure_blob()

Summary

We have looked at two ways to connect to Azure Blob storage from Apache Airflow. The easiest way is to create a SAS token and fill in the appropriate information in the Airflow connection. By using the airflow provider Python library for Azure we can then create a connection to get information from the different Azure storage accounts.

If you found this article useful, please follow me.

--

--

DataFairy

Senior Data Engineer, Azure Warrior, PhD in Theoretical Physics, The Netherlands. I write about Data Engineering, Machine Learning and DevOps on Azure.