Error Notifications for Snowflake Tasks

This article was originally written for SELECT and published here

Snowflake Tasks is feature which helps you automate DB related operations and can be used for building complete and automated data pipelines when they are accomplished with another feature called Streams.
Tasks can be used to trigger simple SQL command, same like calling a stored procedure. You can schedule them to run on certain time, same like they can be automatically triggered when your data landed in monitored table.

You can connect tasks together and create a complex data pipeline represented as a DAG (Directed Acyclic Graph) with single root task. Such DAG can represent complete logic for single ELT process, including data import from cloud storage — realized by Snowpipe, through incremental data transformation (Stream and Tasks), up to data distribution into downstream apps (API, data offload, data sharing, etc.).
When you have such complete data transformation process, it is necessary to keep it under monitoring and react accordingly in case the process is stuck or fail at some point.

How to do that?

There is an internal table which provides data about each task run called TASK_HISTORY. You could develop a process which would be regularly checking this table for all or given list of tasks and in case it finds any failure, it will do something…

But isn’t it too cumbersome? It is! There is more convenient way how it can be done. There is new Snowflake features called Error Notifications for Tasks. It is currently in public preview so anyone can try it.

In Today’s blog post, let’s have a look how you can create an error notification for tasks in Snowflake and connect it with your Slack channels. Then anytime if anyone from monitored tasks fail, slack message will be send to your slack channel so the operations team is immediately informed about the failure and can react accordingly.

Integration Architecture

First, let’s draw the overall architecture for this integration and then go through each step in detail.

Snowflake Notification Integration architecture

Suppose you have a DAG consisting of multiple tasks. You can assign a notification integration to the root task of the DAG. Then if any task from the DAG fails, the error notification will catch it and send SNS message. We have Lambda function on AWS side which is triggered by incoming SNS message. Lambda processes that message and send it to your Slack channel via Slack API. Your operation team monitors the notification slack channel and can do necessary actions based on received message.

How to implement the integration end to end

First we need to create a notification integration object in Snowflake. Then it can be assigned to the root task. Notification integration is a Snowflake object that provides interface between Snowflake and AWS SNS. You will need a help of your AWS admins or you need to have proper privileges on AWS side to create and manage several AWS services.

First you need to create a SNS topic in AWS. Here is a documentation how to do that 👇

Creating an Amazon SNS topic — Amazon Simple Notification Service

Then you need a IAM policy that grants permissions to publish to that SNS topic. You need to define the sns:publish action to the SNS topic.

How to do that?

  1. Go into Identity & Access Management (IAM) in AWS console:
  2. Choose Account settings from the left-hand navigation pane.
  3. Expand the Security Token Service Regions list, find the AWS region corresponding to the region where your account is located, and choose Activate if the status is Inactive.
  4. Choose Policies from the left-hand navigation pane.
  5. Click Create Policy.
  6. Click the JSON tab.
  7. Add a policy document that defines actions that can be taken on your SNS topic.
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": "<sns_topic_arn> created in previous step"
}
]
}

As a last step on AWS side we need to create an IAM role. If you already have in place a role which you use for your Snowflake account and AWS interaction you can of course reuse that and just add there the policy created in previous step. If you need to create a new IAM role here are the steps:

  1. Go to Identity & Access Management (IAM) in AWS:
  2. Choose Roles from the left-hand navigation pane.
  3. Click the Create role button.
  4. Select Another AWS account as the trusted entity type.
  5. In the Account ID field, enter your own AWS account ID temporarily.
  6. Select the Require external ID option. This option enables you to grant permissions on your Amazon account resources (i.e. SNS) to a third party (i.e. Snowflake).
    For now, enter a dummy ID such as 0000. Later, you will modify the trust relationship and replace the dummy ID with the external ID for the Snowflake IAM user generated for your account. A condition in the trust policy for your IAM role allows your Snowflake users to assume the role using the notification integration object you will create later.
  7. Click the Next button
  8. Locate the policy you created and select this policy.
  9. Click the Next button.
  10. Enter a name and description for the role, and click the Create role button.
  11. Record the Role ARN value located on the role summary page. You will specify this value in one or more later steps.

Now we have done needed setup on AWS side and we can create a notification integration object on Snowflake side. Please make a note with AWS ARN of the SNS topic and the IAM role. You will need them right now for notification integration creation.

Here is a code:

CREATE NOTIFICATION INTEGRATION my_notif_integration
ENABLED = true
TYPE = QUEUE
NOTIFICATION_PROVIDER = AWS_SNS
DIRECTION = OUTBOUND
AWS_SNS_TOPIC_ARN = 'your SNS topic'
AWS_SNS_ROLE_ARN = 'your role;

Cool. Now we have to grant Snowflake access to the SNS topic. Let’s run DESC NOTIFICATION INTEGRATION my_notif_integration

We need two values from the output. Make a note with following values:

  • SF_AWS_IAM_USER_ARN — ARN for the Snowflake IAM user created for your account.
  • SF_AWS_EXTERNAL_ID — External ID for the Snowflake IAM user created for your account.

Final step of the configuration is modification of trust relationship in our IAM role. Go back to AWS console, find your IAM role and click on tab Trust relationship. Click the Edit trust relationship button and modify the policy document with the values which we retrieved from describing your notification integration.

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "<sf_aws_iam_user_arn>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<sf_aws_external_id>"
}
}
}
]
}

Integration between Snowflake and AWS is now complete. We just need to assign this notification integration to our root task. If you did this integration first and now you are going to create a task, the assignment can be done directly as part of the CREATE TASK statement.

I have had the root task already in place so I am going to use ALTER TASK command.

ALTER TASK <name> SET ERROR_INTEGRATION = <integration_name> ;

The whole integration setup is also described in Snowflake documentation. If you need more details go here 👇
Enabling Error Notifications for Tasks

Sending to Slack

That was first part. Now we need to somehow process the incoming SNS messages and send them to Slack. Let’s create Lambda function for that task. We will use SLACK API for sending the messages.

We are going to use our SNS topic as a lambda trigger. Lambda will be triggered anytime the new SNS message will come.

Lambda trigger

The Snowflake documentation contains also sample payload for the SNS message. You might need that for testing. You can write your own logic to process the payload — use only relevant attributes, add other information which are not available, change the formatting, etc. For demonstration purposes let’s take the whole incoming message as is and send it to slack.

First, let’s format the message little bit, adding the indent to make it easier to read.

def format_slack_message(message):

json_message = {
"blocks": [
{
"type": "section",
"text": {
"text": ":red_circle: *Snowflake pipeline failure:* Snowflake notification integration output :point_down: ",
"type": "mrkdwn"
}
},
{
"type": "section",
"text": {
"text": "```" + json.dumps(message,indent=2, separators=(',', ': ')) + "```",
"type": "mrkdwn"
}
}
]
}
return json_message

And now we connect to Slack and send the message:

def lambda_handler(event, context):
if event:
message = format_slack_message(event)

if message:
logging.info('Starting sending message to slack')
response = requests.post(
my_webhook_uri, data=json.dumps(message),
headers={'Content-Type': 'application/json'}
)
logging.info(response.text)
logging.info('Finished sending message to Slack webhook')
if response.status_code != 200:
raise ValueError(
'Request to slack returned an error %s, the response is:\n%s'
% (response.status_code, response.text)
)
else:
return {"status": 501, "message": "Not valid SNS message"}
Slack message with info about failed Snowflake Task

And that’s it. Now we have complete error notification pipeline between Snowflake tasks and your Slack by using Snowflake Notification integration and bunch of AWS Services.
What I haven’t mentioned so far — same notification pipeline could be done for serverless tasks or Snowpipes. Happy coding! 🧑‍💻

--

--

Tomáš Sobotík
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Lead data engineer & architect, Snowflake Data Superhero and SME, O'Reilly instructor. Obsessed by cloud & data solutions. ☁️ ❄️