Keep Your Airflow Variables and Connections Safe with GCP Secret ManageršŸ”

jana_om
Apache Airflow
Published in
8 min readMay 7, 2024

Google Cloud Secret Manager is a robust tool within the Google Cloud Platform (GCP) that facilitates the secure storage, management, and retrieval of sensitive data in the form of binary blobs or text strings. Only authorized users can access and view the contents of these secrets as needed. Secret Manager is particularly useful for handling runtime configuration details such as database passwords, API keys, and TLS certificates, ensuring the safety and integrity of crucial application information within the GCP environment.

In this article, I will explore the best practices of integrating Google Cloud Secret Manager with Composer. To illustrate these practices, I have created a DAG with several variables and a Snowflake connection. Using this setup, we will establish a connection to Snowflake, execute a query to retrieve relevant data, and subsequently save the result as a CSV file in a GCP bucket. Through this demonstration, you will learn how to effectively utilize Secret Manager with Composer to ensure secure and efficient data processing.

ā„ļøSnowflake

To demonstrate the process, I set up a Snowflake environment consisting of an account, database, schema, and table. Using the Good Reads Dataset (Top 1000 Books) from Kaggle, I uploaded a CSV file to the Snowflake table. To ensure the desired functionality, I tested the query I intended to use within the DAG. Ultimately, I decided that my goal would be to extract the top 10 books from the dataset and store them in a CSV file within a GCS bucket.

Access your Snowflake profile and copy the account URL, as weā€™ll need it for the next step.

Snowflake Account View

āœØCreate Composer 2

To begin, weā€™ll set up a Composer 2 environment.

Composer 2 Environment

If this is your first time setting up Composer 2, ensure you complete the necessary access permissions setup. To do this, grant the Cloud Composer v2 API Service Agent Extension role to the service-<ā€¦>@cloudcomposer-accounts.iam.gserviceaccount.com service account.

An Example of Cloud Composer Service Agent

To integrate Snowflake with your Composer environment, you first need to install the apache-airflow-providers-snowflake package. This package enables Airflow to connect to Snowflake and interact with its resources. Once the installation is complete, you can proceed with adding the Snowflake connection details either in the Airflow UI or within the Secret Manager, depending on your preferred approach for managing sensitive information.

PyPi Packages in Composer

The apache-airflow-providers-snowflake package is required for utilizing the airflow.providers.snowflake module in your code. Without this package installed in your Composer environment, you wonā€™t be able to import the necessary Snowflake-related classes and methods provided by the module. Consequently, any DAGs that rely on airflow.providers.snowflake will fail to load, resulting in errors and preventing the successful execution of your Airflow tasks.

An Error Indicating Missing apache-airflow-providers-snowflake Package

šŸ—ļøUse Secret Manager with Cloud Composer

It is generally not considered a best practice to hardcode variables directly into your code or to input them through the Airflow UI. Doing so can potentially compromise the security and maintainability of your codebase. Instead, it is advisable to utilize an external secrets management solution, such as Google Cloud Secret Manager, to securely store and manage both variables and connection details, ensuring the integrity of your application and promoting best practices in data security.

By using Secret Manager in conjunction with Composer, you can securely store and manage these secrets, reducing the risk of accidental exposure or unauthorized access.

Secret Manager allows you to define fine-grained access controls for secrets, ensuring that only authorized individuals or services can access the sensitive information stored within. This helps enforce the principle of least privilege and strengthens the overall security posture of your Composer environment.

Enable and configure the Secret Manager backend

Hereā€™s how to configure Composer to use Secret Manager and update your DAG accordingly:

Step 1: Configure Composer to Use Secret Manager

  1. Set up Secret Manager as your secrets backend in Composer by adding the following as Airflow Configuration Overrides to your environment
[secrets]
backend = airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend
backend_kwargs = {"project_id": "<project-id>", "connections_prefix":"example-connections", "variables_prefix":"example-variables", "sep":"-"}

Replace <project-id> with your actual GCP project ID. Here is an example.

Composer Airflow Configuration Overrides

2. Ensure your Composer environment has the correct permissions to access Secret Manager. Use IAM roles to grant the necessary permissions to the service account associated with your Composer environment. Assign the role Secret Manager Secret Accessor to the service account to allow it to access the secret.

Step 2: Store Your Variables and Connections in Secret Manager

  1. Create secrets in Secret Manager for your variables and connections. Use the following naming conventions:
  • For variables: airflow-variables-<variable_name>
  • For connections: airflow-connections-<connection_id>

For example, to create a secret for the gcs_bucket variable, the secret name should be airflow-variables-gcs_bucket.

2. Add the variables and connections to Secret Manager using the Google Cloud Console or the gcloud CLI tool.

Airflow Variables and Connections in Secret Manager

The following example showcases how a secret appears within the Google Cloud Secret Manager.

An Example of a Secret

If you are granted the required level of access privileges, you will have the ability to view the secretā€™s value.

An Example of the Secretā€™s Value

Certain connections may require a specific value format. For instance, when adding a Snowflake connection value in Secret Manager, consider the following example: if your Snowflake account URL is https://abc123.europe-west4.gcp.snowflakecomputing.com, then your connection value should follow this format:

{
"conn_type": "snowflake",
"login": "your-login",
"password": "your-password",
"host": "abc123",
"schema": "your-schema",
"extra": {
"database": "your-database",
"warehouse": "your-wh",
"role": "your-role",
"account": "abc123.europe-west4.gcp"
}
}

Step 3: Update Your DAG

Modify your DAG to retrieve variables from Secret Manager using the Variable class. Since you've configured Secret Manager as the secrets backend, Variable.get() will automatically fetch the variables from Secret Manager.

from airflow.models import Variable

gcs_bucket = Variable.get('gcs_bucket')
gcs_path = Variable.get('gcs_path')

Customize the provided DAG by replacing placeholder variables with your own values and updating the snowflake_query logic to align with your projectā€™s unique requirements.

from airflow import DAG
from airflow.models import Variable
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
from google.cloud import storage
import logging

default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'snowflake_to_gcs',
default_args=default_args,
description='Query Snowflake and save to GCS',
schedule_interval=timedelta(days=1),
catchup=False,
)

def transfer_snowflake_to_gcs(**context):
#Retrieve variables from Airflow UI/Secret Manager
gcs_bucket = Variable.get('gcs_bucket')
gcs_path = Variable.get('gcs_path')

snowflake_query = 'SELECT * FROM goodreads_top.books.books LIMIT 11'
filename = '/tmp/snowflake_data.csv'

#Use SnowflakeHook to establish a connection
hook = SnowflakeHook(snowflake_conn_id='snowflake_default')
conn = hook.get_conn()
cursor = conn.cursor()

try:
#Execute the query and fetch the data
cursor.execute(snowflake_query)
data = cursor.fetchall()

#Save the data to a file
with open(filename, 'w') as f:
for row in data:
f.write(','.join([str(item) for item in row]) + '\n')

#Upload the file to GCS
client = storage.Client()
bucket = client.bucket(gcs_bucket)
blob = bucket.blob(gcs_path + 'file.csv')
blob.upload_from_filename(filename)

logging.info("Data transfer from Snowflake to GCS completed successfully")

except Exception as e:
logging.error(f"Data transfer from Snowflake to GCS failed: {str(e)}")
raise

finally:
#Close the cursor and connection
cursor.close()
conn.close()

#Task to transfer data from Snowflake to GCS
transfer_data = PythonOperator(
task_id='transfer_snowflake_to_gcs',
python_callable=transfer_snowflake_to_gcs,
provide_context=True,
dag=dag,
)

Save the DAG file in Composerā€™s ā€œdagsā€ folder to complete the deployment.

Composerā€™s ā€œdagsā€ Folder

If you have previously added any variables or connections through the Airflow UI, it is recommended to remove them and test the DAG using the updated configuration.

Cloud Composer first attempts to retrieve variables and connections from Secret Manager. If unsuccessful, it then searches the environment variables and the Airflow database for the requested variable or connection.

Refer to the ā€œConfigure Secret Manager for your environmentā€ guide for additional information and setup instructions.

DAG View from the Airflow UI

For further details, review the task logs. However, please note that the logs may not explicitly indicate the source of the variables and connections.

#example of Airflow logs
INFO - Using connection ID 'snowflake_default' for task execution.
#This log message indicates the number of rows returned in the first chunk of data fetched from Snowflake.
INFO - Number of results in first chunk: 11
INFO - Data transfer from Snowflake to GCS completed successfully

Upon the successful completion of the DAG execution, you will find the CSV file generated by the process within the specified bucket you created.

Generated CSV File

The contents of the file can be examined as follows: If your snowflake_query is ā€˜SELECT * FROM goodreads_top.books.books LIMIT 11ā€™, the query will return a result set containing 11 rows. This dataset includes the header row, followed by the information for the top 10 books as specified in the query.

Book Name,Author,Average Rating,Number of Ratings,Score on Goodreads
To Kill a Mockingbird,Harper Lee,4.26,6129090,17358.0
1984,George Orwell,4.19,4604557,15474.0
Pride and Prejudice,Jane Austen,4.29,4273146,15135.0
Harry Potter and the Sorcerer's Stone (Harry Potter, #1),J.K. Rowling,4.47,10063128,12440.0
The Great Gatsby,F. Scott Fitzgerald,3.93,5244056,10828.0
Jane Eyre,Charlotte Brontƫ,4.15,2095866,10613.0
Lord of the Flies,William Golding,3.69,2906413,10098.0
Aliceā€™s Adventures in Wonderland / Through the Looking-Glass,Lewis Carroll,4.06,564072,9225.0
The Lord of the Rings,J.R.R. Tolkien,4.53,676596,9201.0
The Hobbit (The Lord of the Rings, #0),J.R.R. Tolkien,4.29,3977772,9013.0

You can also observe and verify these actions by reviewing the Query History in Snowflake. The Snowflake Query History feature provides a detailed log of all executed queries within your Snowflake account, allowing you to monitor, track, and analyze your data processing activities.

SQL Text Example from Snowflake Query History

You can run the same DAG with the variables and connections managed through the Airflow UI. For reference, you can find an example of adding a Snowflake connection and variables to the Airflow UI within my repository.

If youā€™d like to share your experiences with Secret Manager or have any thoughts on the topic, feel free to reach out to me on LinkedIn. Iā€™d love to hear from you and connect! šŸ˜Š

--

--

jana_om
Apache Airflow

Currently obsessed with Matcha lattes and GCP data engineering projects ā€“ because L-theanine and data make life thrilling.