Automated Reporting with Snowflake

Abhra Dasgupta
The Uni Cards Tech Blog
6 min readApr 10, 2023

Introduction

At Uni, we rely heavily on large-scale data processing, and ensuring accurate record-keeping and system functionality is crucial. The existing manual process for reconciliation involved running daily queries and sending results via email to designated recipients. However, this process is not scalable for larger systems and is prone to errors.

To address this issue, an automated reporting framework is required to perform diverse big data queries every day and send results via email to ensure accurate system functionality.

Approach Overview

To streamline our reporting process, we took the following steps:

  • Create a stored procedure in Snowflake to execute intended queries and export results to an S3 bucket.
  • Set up a AWS Lambda function with S3 Put as a trigger to send an email with the file as an attachment using AWS SES.
  • Create a scheduled task to execute the stored procedure and trigger the Lambda function on a daily basis.
  • Enjoy an automated reporting process that streamlines your workflow and ensures accurate and up-to-date reporting.
Sending Email from Snowflake

How to export result to AWS S3?

Snowflake has a concept of an external stage. This external stage can be:

  • Amazon S3 buckets
  • Google Cloud Storage buckets
  • Microsoft Azure containers

In order to export our query results, we created an external stage using Amazon S3:

  • First we created a S3 bucket and named it snowflake-reporting.
  • Block all public access to the bucket(Security is important).
  • Create a headless IAM user and assign the following policy with the required permissions.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "snowflakeS3",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectVersion",
"s3:PutObject",
"s3:ListBucket",
"s3:ListAllMyBuckets"
],
"Resource": [
"arn:aws:s3:::snowflake-reporting",
"arn:aws:s3:::snowflake-reporting/*"
]
}
]
}
  • To create an external stage in Snowflake, we will need to use the access and secret keys of the previously created user. Let’s say we are creating the external stage in the ‘RECON’ schema of the ‘REPORTING_DB’ database.
CREATE STAGE REPORTING_DB.RECON.REPORTING
URL = 's3://snowflake-reporting'
CREDENTIALS = (AWS_KEY_ID = 'ACCESS_KEY' AWS_SECRET_KEY = 'SECRET_KEY')
COMMENT = 'This stage is for unloading reporting data from Snowflake';

How do we execute multiple queries and export results to the external stage?

  • Create a new table called ‘queries’ in the same schema with the following three columns:
    1. ‘name’: This column represents the name of the file that will be generated.
    2. ‘path’: This column represents the S3 path where the generated file will be exported.
    3. ‘query’: This column represents the query that needs to be executed in order to generate the desired data for the file.
create table queries(name string, path string, query string);
  • Insert all queries that we want to execute in the table, similar to example below.
insert into queries values('missing_bill', '/alerts/missing_bill', '<Query to return list of users whose bill was missed/failed>');
  • To execute all entries in the ‘queries’ table and export the resulting data to the expected location, we created a stored procedure using JavaScript. The procedure automatically appends ‘/yyyy/mm/dd’ to the path, ensuring that the results for each day are saved separately in S3. This makes it easier to manage and analyze the exported data over time.
create or replace procedure RunReportingBatchSQL(sqlCommand String)
returns string
language JavaScript
as
$$
// Get Current Date
var date_now = new Date()

// Get Year as YYYY
var full_year = date_now.getFullYear()

// Get Month and add one because month is a zero-based value in java script (where zero indicates the first month of the year).
var full_month = date_now.getMonth() + 1

// Convert number to string and pad zero if it is a single digit
var full_month = String(full_month).padStart(2, '0')

// Get date
var full_day = date_now.getDate()

// Convert number to string and pad zero if it is a single digit
var full_day = String(full_day).padStart(2, '0')

cmd1_dict = {sqlText: SQLCOMMAND};
stmt = snowflake.createStatement(cmd1_dict);
rs = stmt.execute();

var res = '';

while (rs.next()) {
let name = rs.getColumnValue("NAME");
let path = rs.getColumnValue("PATH");
let command = rs.getColumnValue("QUERY");

// Construct the copy statement
var copy_query = ""
copy_query = copy_query.concat("COPY INTO @RECON.REPORTING", path, "/yyyy=", full_year, "/mm=", full_month, "/dd=", full_day, "/", name, ".csv FROM ")
copy_query = copy_query.concat("(", command, ") ")
copy_query = copy_query.concat("FILE_FORMAT = ( TYPE = CSV null_if=('') COMPRESSION = None) OVERWRITE = TRUE SINGLE = TRUE HEADER = TRUE;")

try {
// Create statement BEGIN, Begins a transaction in the current session
let begin_statement = snowflake.createStatement({sqlText: "BEGIN"} );
begin_statement.execute();

// Create snowflake statement using above copy statement
let copy_statement = snowflake.createStatement( {sqlText: copy_query} );

// Execute the above copy statement
let result_set = copy_statement.execute();

// Create statement COMMIT, Commits an open transaction in the current session
let commit_statement = snowflake.createStatement({sqlText: "COMMIT"} );
commit_statement.execute();

// Statement returned for info and debuging purposes
res += name + " : Succeeded" + "\n";
} catch (err) {
// Return error message
res += name + " : Failed: " + "\n";
}
}
return res;
$$
;

Send email using S3 put event and Lambda

  • we wrote a Python code in AWS Lambda called ‘s3-email’ and set the S3 put event of the designated bucket as the trigger. This code allowed us to easily send email notifications to the intended recipients with the exported data file attached. You will need to have AWS SES setup for this. We had our domain verified in SES and used the same while sending the emails.
import boto3
import os.path
from datetime import date
from datetime import timedelta
from urllib.parse import unquote
from botocore.exceptions import ClientError
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
client = boto3.client('ses', region_name = "ap-south-1")
s3 = boto3.client('s3')

def lambda_handler(event, context):

print('Event Recieved: ' + str(event))

#Default values for sending the email
sender = 'Production Alert <alerts@example.com>'
subject = 'File uploaded to snowflake reporting s3'
bodyText = "The Object file was uploaded to S3"
toAddresses = ['<recipient1@example.com>','<recipient2@example.com>']

fileObject = event["Records"][0]
bucketName = str(fileObject['s3']['bucket']['name'])
rawKey = str(fileObject['s3']['object']['key'])
key = unquote(rawKey)
fileName = os.path.basename(key)

#Update values for sending the email based on path
if(key.startswith('alerts/missing_bill/')):
sender = 'Production Alert <alerts@example.com>'
subject = 'Bill generation pennding on date ' + str(today)
bodyText = 'PFA: Pending data on production ' + fileName
toAddresses = ['<recipient_billing_alert@example.com>']

attachment = '/tmp/' + fileName
s3.download_file(bucketName, key, attachment)

msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = sender
msg['To'] = ", ".join(toAddresses)
textpart = MIMEText(bodyText)
msg.attach(textpart)

att = MIMEApplication(open(attachment, 'rb').read())
att.add_header('Content-Disposition','attachment',filename=fileName)
msg.attach(att)

try:
response = client.send_raw_email(
SourceArn='arn:aws:ses:<aws_region>:<aws_account_id>:identity/example.com',
FromArn='arn:aws:ses:<aws_region>:<aws_account_id>:identity/example.com',
ReturnPathArn='arn:aws:<aws_region>:<aws_account_id>:identity/example.com',
Source=sender,
Destinations=toAddresses,
RawMessage={ 'Data':msg.as_string() }
)
except ClientError as e:
print(e.response['Error']['Message'])
else:
print("Email sent! Message ID:"),
print(response['ResponseMetadata']['RequestId'])

os.remove(attachment)
  • Add resource based policy to the AWS lambda created so that it can access the files to be sent as attachment.
{
"Version": "2012-10-17",
"Id": "default",
"Statement": [
{
"Sid": "event_permissions_from_snowflake-reporting_for_s3-email",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": "lambda:InvokeFunction",
"Resource": "arn:aws:lambda:<aws_region>:<aws_account_id>:function:s3-email",
"Condition": {
"StringEquals": {
"AWS:SourceAccount": "<aws_account_id>"
},
"ArnLike": {
"AWS:SourceArn": "arn:aws:s3:::snowflake-reporting"
}
}
}
]
}

Automate the Procedure call

  • Create a scheduled task in snowflake so that in keeps executing everyday automatically. We schedule it for 6AM daily.
CREATE TASK recon_reporting_task
WAREHOUSE = <YOUR_WAREHOUSE_NAME>
SCHEDULE = 'USING CRON 0 6 * * * Asia/Kolkata'
AS
call RunReportingBatchSQL('select name, path, query from "REPORTING_DB"."RECON"."QUERIES"');
  • Tasks are by default created in suspended state. So we need to start the task.
alter task recon_reporting_task resume;

Conclusion

We were able to execute multiple queries and export them to a particular Amazon S3 path and send out these results via email to intended recipients.

Future work

The email recipients, subject and sender names are currently hardcoded in the lambda code. This can be moved to an external source/database.

--

--