Async method to run queries in Redshift Part-1

Soubhik Khankary
4 min readJan 25, 2024

--

Problem_Statement:

I had a requirement where I wanted to use compute service(Lambda/glue) to trigger the redshift query or procedure but I wanted to save the compute costs by switching it off post the procedure run is done.

WHY?

  1. Lambda could be executed for only 15 minutes at maximum. In case of glue If I try running redshift procedure it is going to cost me a bit more and wouldn’t be a plausible solution.
  2. Possibility/Probability of running the procedure on redshift and time it might take more than 15 minutes is high and lambda wouldn’t be a good solution as per the scenario.

Async method makes running , execution & monitoring the redshift procedure via AWS lambda. Best part is we won’t require additional packages to be installed as well.

Step_1 Import the required packages to create a synchronous connection with redshift cluster. I have created multiple functions to create different modules and proper segregation

# Fetch environment variables
ENV = os.environ['ENV']
REDSHIFT_SECRET_NAME = os.environ['REDSHIFT_SECRET_NAME']
ERROR_SNS_ARN = os.environ['ERROR_SNS_ARN']
SUCCESS_SNS_ARN = os.environ['SUCCESS_SNS_ARN']
LAMBDA_NAME = os.environ['LAMBDA_NAME']
CODE_BUCKET_NAME = os.environ['CODE_BUCKET_NAME']
S3_PROC_LIST_FILE_KEY = os.environ['S3_PROC_LIST_FILE_KEY']
REDSHIFT_AUDIT_TABLE = os.environ['REDSHIFT_AUDIT_TABLE']
REDSHIFT_CLUSTER_NAME = os.environ['REDSHIFT_CLUSTER_NAME']
DB_NAME = os.environ['DB_NAME']

# Create and return s3 client
def get_s3_client():
return boto3.client('s3')

# Create and return sns client
def get_sns_client():
return boto3.client('sns')

# Fetch redshift data client
def get_redshift_data_client():
return boto3.client('redshift-data')

# Establish redshift connection
def create_redshift_connection(db=None):
return redshift_conn(db)

# Close redshift connection
def close_redshift_connection(psycopg_conn):
psycopg_conn.close()

# Create current timestamp
def create_current_timestamp():
time_now = datetime.now()
return time_now.strftime("%Y-%m-%d %H:%M:%S")

def send_error_sns(subject, message):
"""
Function to send error sns email
"""
logger.info("send_error_sns starts...")

sns_client = get_sns_client()
sns_client.publish(
TopicArn = ERROR_SNS_ARN,
Subject = subject,
Message = message
)

logger.info("send_error_sns ends...")

def user_pwd_retrieve():
"""
Function to retrieve username and password from secrets manager
"""
logger.info("Retrieving username and password from secrets manager")
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId=REDSHIFT_SECRET_NAME)
database_secrets = json.loads(response['SecretString'])
pwd = database_secrets['password']
host = database_secrets['host']
port = database_secrets['port']
db_nm = database_secrets['dbname']
usernames = database_secrets['username']
logger.info("Username and password fetched successfully")
return usernames,pwd,host,port,db_nm

def redshift_conn(db=None):
"""
Function to create a connection to redshift
"""
logger.info("Creating connection to redshift")

user1, pwd1, host, port, default_db=user_pwd_retrieve()

if not db:
ps_connection = psycopg2.connect(user=user1,
password=pwd1,
host=host,
port=port,
database=default_db)
else:
ps_connection = psycopg2.connect(user=user1,
password=pwd1,
host=host,
port=port,
database=db)
ps_connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

logger.info("Connection to redshift successful")
return ps_connection


def read_and_fetch_s3_json_object(s3_client):
"""
Function to read the proc list file in s3 and return the json object for further processing
"""
logger.info("read_and_fetch_s3_json_object starts")

s3_response = s3_client.get_object(Bucket = CODE_BUCKET_NAME, Key = S3_PROC_LIST_FILE_KEY)
s3_file_content = s3_response["Body"]
s3_json_object = json.loads(s3_file_content.read())

logger.info("read_and_fetch_s3_json_object ends")
return s3_json_object

def execute_procedure(redshift_client, db_nm, schema_nm, proc_nm):
"""
Function to execute the query asynchronously
"""
logger.info("execute_procedure starts")
# Construct redshift call query
query = f"call {db_nm}.{schema_nm}.{proc_nm}"

# Execute the query
response = redshift_client.execute_statement(
ClusterIdentifier=REDSHIFT_CLUSTER_NAME,
Database=db_nm,
Sql=query,
SecretArn=REDSHIFT_SECRET_NAME
)

logger.info("execute_procedure ends")
return response


def create_audit_entry(psycopg_conn, db_nm, schema_nm, proc_nm, exec_id):
"""
Function to create an entry in redshift audit table
"""
logger.info("create_audit_entry starts")

# Fetch current timestamp
current_timestamp = create_current_timestamp()

# Source system
if "_" in schema_nm:
src_system = schema_nm.split("_")[0]
else:
src_system = schema_nm

# Formulate the insert query
query = f"""
insert into {db_nm}.audit.{REDSHIFT_AUDIT_TABLE}
select '{db_nm}.{schema_nm}.{proc_nm}' as proc_nm
,'{exec_id}' as exec_id
,NULL as redshift_pid
,'{current_timestamp}' as create_ts
,'{current_timestamp}' as update_ts
,'SUBMITTED' as status
,'' as error_msg
,'project' as buss_unit
,'{src_system}' as src_system"""

logger.info(f"Audit Query: {query}")

# Execute and commit the query
cursor = psycopg_conn.cursor()
logger.info(cursor)
cursor.execute(query)
psycopg_conn.commit()

logger.info("create_audit_entry ends")
return "success"

You could see the power of boto3 which could be used for creating a client with redshift-data and could be used to connect with redshift cluster. The sample code helps to establish a connection with redshift cluster and submit the query . Once the query is submitted the lambda makes a synchronous entry into the redshift cluster in the audit table.

def lambda_handler(event, context):
"""
Lambda to execute stored procedures to load data from
redshift raw to redshift stage in async fashion
"""
logger.info("lambda_handler starts")
try:
# S3 Client
s3_client = get_s3_client()

# Redshift data client
redshift_client = get_redshift_data_client()

# Execution Id List
exec_list = []

# Establish a connection to redshift
psycopg_conn = create_redshift_connection(DB_NAME)

# Reading the file which has the list of procedures and creating a json object
s3_json_object = read_and_fetch_s3_json_object(s3_client)
logger.info(f"S3 json object fetched successfully: {s3_json_object}")

# Iterating over all the input procs
for item in s3_json_object["payload"]:
logger.info(f"Processing: {item}")
# Splitting input by database, schema and procedure name
db_nm, schema_nm, proc_nm = item.split(".")
logger.info(f"DB Name: {db_nm}, Schema Name: {schema_nm}, Proc Name: {proc_nm}")

# Execute the procedure in redshift
execute_response = execute_procedure(redshift_client, db_nm, schema_nm, proc_nm)
logger.info(execute_response)

# Append it to execution id list
exec_list.append(execute_response["Id"])

# Add to redshift audit table
audit_response = create_audit_entry(psycopg_conn, db_nm, schema_nm, proc_nm,execute_response["Id"])

if "success" in audit_response:
logger.info(f"Successfully inserted audit entry into redshift for execution id: {execute_response['Id']}")

# Close redshift connection
close_redshift_connection(psycopg_conn)

except Exception as ee:
logger.error("Error occured in redshift lambda async")
logger.error(f"Reason: {ee}")
############# SENDING SNS #############
current_timestamp = create_current_timestamp()
subject = f"Lambda: {LAMBDA_NAME} failed"
error_msg = (
f"Message: Lambda: {LAMBDA_NAME} failed\n" \
f"Status: FAILED\n" \
f"Error Reason: {ee}\n" \
f"Timestamp: {current_timestamp}\n" \
)
send_error_sns(subject, error_msg)
logger.info("Successfully sent error sns notification.")
############# SENDING SNS #############
raise ee

logger.info("lambda_handler ends")
return {
'statusCode': 200,
'message': 'Successfully executed!',
'exec_list': exec_list
}

This code helped us resolve the problem statement and we were able to use the compute power at its best. We also saved a lot of costs as our compute service was not switched on and we were able to get good outcomes of it.

Thanks for your time for going through the post. Please do let me know if you have any questions. Thanks !!!

--

--

Soubhik Khankary

Data Engineer by job , Teaching computers by stats and love to learn never endless math.