Store Service Account keys in GCP Secret Manager

Written in collaboration with Bipin Upadhyaya — Nov 16, 2022

Airflow DAGs may need connections and variables passed to them during runtime to execute operators and sensors. Traditionally, Airflow developers have used environment variables or the metastore database to store and retrieve these connections and variables used during DAG parsing or task execution in the Airflow worker pods.

In addition to these options, Airflow 1.10.10 or later provides support for secret backends. In terms of security, it’s good practice to use secret backends because they provide an additional layer of security and all the features of the secret backend like automatic rotation, custom encryption keys, etc. can also be leveraged.

Users of Cloud Composer can securely store sensitive variables (like passwords and Service Account keys) in a centralized location using the Secret Manager (a secret backend). They can also assign granular permissions and audited access controls to comply with their organization’s security requirements.

Although the secrets stored in Secret Manager can be used along with environment variables, during DAG runtime, the details stored in Secret Manager take precedence over those stored in environment variables or the metastore database (if the values differ from one another).

This guide describes the steps to configure the Cloud Composer environment to read connections from Secret Manager.

Setup and Requirements

These two steps create the necessary environment variables and give permissions to read secrets for the Composer service account.

Step 1: This step shows how to enable Secret Manager and give Composer service account read access to it, with the assumption that the Composer environment is already created (if not, please follow the steps to create an environment). From the Environment Configuration tab, get details of the Composer service account from the environment details page.

Figure 1: Environment Configuration tab on Environment Details page in Composer

The code snippet below shows how to enable Secret Manager and give permission to composer service account. Replace the <<project-id> with the project where Secret Manager is configured and <<service-account>> with the Composer service account copied from the environment configuration tab and run the command.

gcloud services enable secretmanager.googleapis.com
gcloud projects add-iam-policy-binding <<project-id>> --member=serviceAccount:<<composer-service account>> --role=roles/secretmanager.secretAccessor

Step 2: Configure Composer with the override for the airflow configuration of secrets.backend and secrets.backend_kwarg. The code snippet below shows how to set secrets for airflow configurations.

secrets.backend = 
airflow.providers.google.cloud.secrets.secret_manager.CloudSecretManagerBackend
secrets.backend_kwargs =
{"connections_prefix": "airflow-connections", "variables_prefix": "airflow-variables", "sep": "-","project_id"="<<replace-with-project-id>>"}

The backend_kwargs value is the JSON representation of the backend_kwargs object with the following fields:

  • connections_prefix: Specifies the prefix of the secret name to read to get Connections. Default is: airflow-connections.
  • variables_prefix: Specifies the prefix of the secret name to read to get Variables. Default is: airflow-variables.
  • gcp_key_path: Path to the Google Cloud Credential JSON file (if not provided, the default service account is used).
  • gcp_keyfile_dict: The Google Cloud Credential JSON dictionary. Mutually exclusive with gcp_key_path.
  • sep: separator used to concatenate connections_prefix and conn_id. Default is: -.
  • project_id: The Google Cloud Project Id where the secrets are stored.
Figure 2: Airflow Configuration Overrides tab on the Environment Details page of Composer

Access variables from Secret Manager

Step 1: Depending on the name of your variables (my_secret), create an entry in Secret Manager (airlfow-variables-my-secret). Add variable prefix as ‘airflow-variables’

printf "s3cr3t" | gcloud secrets create airlfow-variables-my-secret --data-file=-

Step 2: Deploy the DAG. Here is an example of a simple DAG -

import airflow 
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator

YESTERDAY = datetime.combine( datetime.today() - timedelta(days=1), datetime.min.time())
default_args = {
'retries': 2,
'retry_delay': timedelta(seconds=60),
'start_date': YESTERDAY,
}

with DAG('secrets_manager',
default_args=default_args,
start_date=YESTERDAY,
schedule_interval='@once') as dag:
t = BashOperator( task_id="get_variable_value", bash_command='echo "MY SECRET: {{ var.value.my-secret }}" ')

Create airflow connections based on service account keys in Secret Manager

In this example, we will use a BigQuery dataset. (follow these steps to create the dataset)

Step 1: In Secret Manager, create the secret that would be used as an Airflow connection.

  1. Get the service account JSON using the following command
gcloud iam service-accounts keys create KEY_FILE \  --iam-account=SA_NAME@PROJECT_ID.iam.gserviceaccount.com

2. Using the following sample program (connection_for_secret_manager.py), you can get the value that can be directly uploaded to the Secret Manager.

import sys, urllib.parse as ul
import json
import getopt

def get_content_for_secret(service_account_path, project_id):
file=open(service_account_path)
sa_value=file.read()
sa_value_deser=json.dumps(sa_value)
sa_quote_plus_value=ul.quote_plus(sa_value_deser)
first_half_extra = ul.quote_plus('{"extra__google_cloud_platform__key_path": "", "extra__google_cloud_platform__key_secret_name": "","extra__google_cloud_platform__keyfile_dict":')
second_half_extra=ul.quote_plus(', "extra__google_cloud_platform__num_retries": 5, "extra__google_cloud_platform__project": "{}", "extra__google_cloud_platform__scope": ""}}'.format(project_id))
return "google-cloud-platform://?__extra__={}{}{}".format(first_half_extra,sa_quote_plus_value,second_half_extra)

def main(argv):
service_account_file_path = ''
project_id = ''
try:
opts, args = getopt.getopt(argv,"hi:p:",["saPath=","projectId="])
except getopt.GetoptError:
print('connection_for_secret_manager.py -i <service_account_file_path> -p <project_id>')
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print('connection_for_secret_manager.py -i <service_account_file_path> -p <project_id>')
sys.exit()
elif opt in ("-i", "--saPath"):
service_account_file_path = arg
elif opt in ("-p", "--projectId"):
project_id = arg
return service_account_file_path, project_id

if __name__ == "__main__":
service_account_file_path, project_id = main(sys.argv[1:])
print(get_content_for_secret(service_account_file_path, project_id))
python3 connection_for_secret_manager.py -i <<service_account_path>>  -p <<project_id>> > output_for_secret_manager

3. Upload the content of the output to Secret Manager. From the Airflow DAG, this connection is called conn1. In Secret Manager, you need to create the entry with a prefix ‘airflow-connections’

gcloud secrets versions add airflow-connections-conn1 --data-file=output_for_secret_manager

Step 2: Create GCP Connections (Connection 1) by navigating to Admin > Connections.

Click add a new record to create a new connection. On connection Id, enter conn1, connection type Google Cloud, airflow-connections-conn1 as the keyfile secret name and the project id where the Secret Manager is configured.

Step 3: Deploy the example DAG (see the DAG code below) and manually trigger the workflow. Download a BigQuery public dataset (example) and store it in Google Cloud Storage as shelter_2020.csv. Change the bucket name (<<GCS_BUCKET_NAME>>) in the Python example below. Similarly change the BigQuery project, dataset and table name (<<PROJECT_ID>>.<<DATASET>>.<<TABLE>>).

Example Python DAG:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime, timedelta
from airflow.hooks.base_hook import BaseHook
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator

YESTERDAY = datetime.combine( datetime.today() - timedelta(days=1), datetime.min.time())
with DAG('example_secret_connections_conn1', start_date=YESTERDAY, schedule_interval=None) as dag:

start = DummyOperator(
task_id='start'
)
bq_load_task = GCSToBigQueryOperator(
task_id='bqload',
bucket='<<GCS_BUCKET_NAME>>',
source_objects=["shelter_2020.csv"],
destination_project_dataset_table='<<PROJECT_ID>>.<<DATASET>>.<<TABLE>>',
skip_leading_rows=1,
schema_fields=[
{ "name": "_id", "type": "INTEGER" },
{ "name": "OCCUPANCY_DATE", "type": "STRING" },
{ "name": "ORGANIZATION_NAME", "type": "STRING" },
{ "name": "SHELTER_NAME", "type": "STRING" },
{ "name": "SHELTER_ADDRESS", "type": "STRING" },
{ "name": "SHELTER_CITY", "type": "STRING" },
{ "name": "SHELTER_PROVINCE", "type": "STRING" },
{ "name": "SHELTER_POSTAL_CODE", "type": "STRING" },
{ "name": "FACILITY_NAME", "type": "STRING" },
{ "name": "PROGRAM_NAME", "type": "STRING" },
{ "name": "SECTOR", "type": "STRING" },
{ "name": "OCCUPANCY", "type": "INTEGER" },
{ "name": "CAPACITY", "type": "INTEGER" }
],
write_disposition='WRITE_TRUNCATE',
gcp_conn_id='conn1'
)
stop = DummyOperator(
task_id='stop'
)

start >> bq_load_task >> stop

Trigger the workflow and verify if the data is available in the BigQuery table.

Conclusion

Using Secrets Manager to store the secrets passed to your DAGs and integrating it with your Composer environment has many benefits. The steps in this guide describe how to configure the Composer environment to read connections from GCP Secret Manager, along with some example code.

Additional Links

https://airflow.apache.org/docs/apache-airflow/1.10.10/howto/use-alternative-secrets-backend.html#gcp-secrets-manager-backend

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store