Postgres Automatic IAM Database Authentication in Airflow
Goal : To connect to Postgres using Automatic IAM db authentication in Airflow (Cloud Composer)
Using this approach Passwords and tokens are not required to be passed while making connection to PostgreSQL.It will use IAM service account Credentials to login to PostgreSQL.In this approach I have used Google Cloud SQL with PostgreSQL Instance and enabled Private IP .
Step 1 : Cloud Composer Environment : Package Installation
Install the PYPI package cloud-sql-python-connector and pg8000 in the cloud composer environment.
[https://pypi.org/project/cloud-sql-python-connector/]
I am using below version (Choose the version which supports your composer version )
Step 2 : IAM account
Add a Cloud IAM user account to Cloud SQL Postgre instance .
[After you create a user account with Cloud IAM authentication, it will have no database privileges, so make sure permissions are granted as needed ,eg in step 3 ]
* At IAM level , grant required permission to access cloud sql.
Step 3 (Optional) : DB and Table setup In Postgre .
If you already have tables and granted all the permission to the IAM service account , you can skip this step.
Use the command line where psql is installed and in the same VPC network.Here you can use builtin in user Postgre,( * password required )
psql -h X.Y.240.3 - user=postgres - dbname=postgres
Create schema db3;
postgres=> create schema db3;
postgres=> GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA db3 to "to-access-cloud-sql@XXXXX.iam";
Now I will create the sample table using an IAM service account . Use below in the command line. Here it will be logged in by token.
env PGPASSWORD=$(gcloud auth print-access-token to-access-cloud-sql@XXXXXX.iam.gserviceaccount.com) psql -h X.X.240.3 - user="to-access-cloud-sql@XXXXXX.iam" - dbname=db3
CREATE TABLE IF NOT EXISTS mypets (
pet_id SERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
pet_type VARCHAR NOT NULL,
birth_date DATE NOT NULL,
OWNER VARCHAR NOT NULL);
Step 4: Service Account key file .
Upload the IAM Service account key to the data GCS bucket in Airflow.
To secure these credentials, it is recommended that you use a Cloud Storage ACL to restrict access to the key file.
Step 5 : Airflow Dag Using Python Operator .
Using this approach password and tokens are not required to be passed while making connection to PostgreSQL.
In Below sample dag change below value as per your requirement
- INSTANCE_CONNECTION_NAME -Your Cloud SQL instance name like XXXX:us-west1:diypossql
- DB_IAM_USER — Same as the service account ,But without .gserviceaccount.com
- DB_NAME — Db name where you have created the table.and granted permission to the service account .
- PRIVATE_IP — PRIVATE_IP of Cloud SQL instance . You can get it from google cloud console.
- GOOGLE_APPLICATION_CREDENTIALS to the file ,Created in step 4 .
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
import time
from google.cloud.sql.connector import Connector, IPTypes
import sqlalchemy
import pg8000
import os
def connect_with_connector_auto_iam_authn() -> sqlalchemy.engine.base.Engine:
"""
Initializes a connection pool for a Cloud SQL instance of Postgres.
Uses the Cloud SQL Python Connector with Automatic IAM Database Authentication.
"""
# Note: Saving credentials in environment variables is convenient, but not
# secure - consider a more secure solution such as
# Cloud Secret Manager (https://cloud.google.com/secret-manager) to help
# keep secrets safe.
import os
os.environ["INSTANCE_CONNECTION_NAME"] = "XXXX:us-west1:diypossql"
os.environ["DB_IAM_USER"] = "to-access-cloud-sql@XXXX.iam"
os.environ["DB_NAME"] = "db3"
os.environ["PRIVATE_IP"] = "X.Y.240.3"
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/airflow/gcs/data/XXXXX-6fb03d09b1cf.json'
instance_connection_name = os.environ["INSTANCE_CONNECTION_NAME"] # e.g. 'project:region:instance'
db_iam_user = os.environ["DB_IAM_USER"] # e.g. 'sa-name@project-id.iam'
db_name = os.environ["DB_NAME"] # e.g. 'my-database'
ip_type = IPTypes.PRIVATE if os.environ.get("PRIVATE_IP") else IPTypes.PUBLIC
# initialize Cloud SQL Python Connector object
connector = Connector()
def getconn() -> pg8000.dbapi.Connection:
conn: pg8000.dbapi.Connection = connector.connect(
instance_connection_name,
"pg8000",
user=db_iam_user,
db=db_name,
enable_iam_auth=True,
ip_type=ip_type
)
return conn
# The Cloud SQL Python Connector can be used with SQLAlchemy
# using the 'creator' argument to 'create_engine'
pool = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
# ...
)
return pool
def runme():
connect_with_connector_auto_iam_authn().execute("INSERT INTO mypets (name, pet_type, birth_date, OWNER) VALUES ( 'Milo', 'Dog', '2018-07-05', 'Jane');")
with DAG(
dag_id="postgre_testing13",
start_date=datetime.datetime(2023, 3, 18),
schedule_interval=None,
catchup=False,
) as dag:
run_this = PythonOperator(
task_id='runinpostgre',
provide_context=True,
python_callable=runme)
run_this
Here the path is like /home/airflow/gcs/data/<your file name.json> , not the GCS bucket path.
Step 5 : After running the DAG in cloud composer ,verify that data is inserted into the “mypets” tables using the command mentioned in step 3.
Reference Links : https://cloud.google.com/sql/docs/postgres/iam-logins#python