Task error 발생 시 notification 을 Slack 메시지로 받기

MJ Lee
Snowflake Korea
Published in
15 min readJul 26, 2023

Snowflake 의 Task 실패 시 에러 메시지를 Slack 으로 받는 방법은 세 가지 입니다.

1.Snowflake External Function 을 사용해서 Slack webhook URL 을 호출. Stored Procedure 에서 주기적으로 Task status 를 확인하여 실패한 Task 가 있을 시 External Function 을 통해 알람 메시지 전송 (Snowflake External Function -> API Gateway -> Lambda -> Slack)

  • External Function : Snowflake 외부에서 실행되는 코드를 호출하는 UDF 의 일종으로 일반 UDF 와 다르게 실제 코드를 갖고 있지 않습니다. 외부에서 실행되는 코드를 remote service 라 하면 remote service 로 전송되는 정보는 프록시 서비스(예, AWS API Gateway) 를 통해 전달됩니다. Snowflake 는 External Function 에서 보안과 관련된 설정을 API integration 에 저장합니다.

2.Snowflake 의 Task error메시지를 SNS 로 전송하고 Lambda 를 통해 Slack webhook URL 을 호출 (Snowflake Task error message -> SNS -> Lambda -> Slack)

3.UDF 혹은 Procedure 를 통해 Slack webhook URL 을 직접 호출 (PrPr)

이번 글에서는 1, 2 번을 통해 Task error notification 을 slack 으로 전달하는 방법을 차례대로 살펴보겠습니다. PrPr 단계로 직접 엔드포인트를 호출할 수 있는 3번 방식에 대해서는 다음 글에서 다루도록 하겠습니다.

External Function 을 사용하는 방식

0. 다음 글을 참고해서 Slack Webhook URL 을 생성합니다. 글 4단계를 참고하여 CURL 로 POST request 를 통해 메시지가 Slack 로 전송되는지 확인하고 URL 을 기록해 둡니다.

  1. Slack webhook URL 을 호출할 Lambda function 을 작성합니다.

AWS Console 에서 Lambda->좌측 메뉴Functions->Create function 클릭 후 화면 상단 Author from scratch 을 선택합니다. Function name 을 입력한 후 Runtime 은 Python 3.9 를 선택한 다음 Create function 버튼을 클릭합니다. Lambda function 샘플 코드는 아래를 참고하세요.

2. AWS API Gateway 를 사용해서 Proxy Serivce 를 생성합니다.

우선 AWS account 접근을 위해 Role 을 생성합니다. AWS Console IAM 화면->Role->Create Role -> AWS account 를 클릭 후 화면 하단 Another AWS account 를 선택하고 현재 AWS 계정의 ID 를 임시로 입력합니다. Next 버튼을 클릭 -> Role Name 입력 -> Create role 버튼을 클릭해서 역할을 생성합니다.

3. AWS API Gateway Endpoint 를 생성합니다.

AWS Console 에서 API Gateway 로 이동한 후 Create API->REST API->Build 를 선택합니다. 화면 상단부터 Rest -> New API 를 차례로 선택한 다음 API Name 을 입력하고 Endpoint Type 을 Regional 로 선택합니다.

다음으로 화면 좌측 상단 Actions 드롭다운 메뉴를 클릭한 후 Create Resource 를 선택합니다. 화면에 No Methods defined for the resource 메시지가 보일 겁니다. 지금 생성한 Resource 를 선택한 상태에서 Actions 드롭다운 메뉴를 클릭한 후 Create Method 를 선택합니다. Resource 이름 아래 작은 드롭다운 메뉴를 클릭해서 POST 를 선택하고 우측 회색 체크박스를 선택합니다. Integration Type 은 Lambda Function, Use Lambda Proxy integration 선택, Lambda function 은 1단계에서 생성한 Lambda function 이름을 입력하고 Save 버튼을 클릭합니다.

다시 화면 좌측 상단 Actions 드롭다운 메뉴를 클릭-> Deploy API 를 선택->Stage 생성 후 Deploy 를 클릭합니다. 아래 화면처럼 Stage 에 POST method 를 클릭하면 Invoke URL 을 확인할 수 있습니다. 이 URL 을 기록해 둡니다.

4. AWS API Gateway Endpoint 를 호출할 권한 설정을 합니다.

아래 화면 좌측 Resources 를 선택하고 생성한 POST 를 클릭해서 Method Request ARN 을 기록해둡니다. 이후 Method Request 클릭하고 Authorization 에 AWS IAM 을 선택합니다.

다음으로 아래 화면처럼 Resource Policy 를 선택해서 해당 Gateway Endpoint 를 호출할 권한에 대한 policy 를 설정합니다.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:sts::000011112222:assumed-role/<external_function_role>/snowflake"
},
"Action": "execute-api:Invoke",
"Resource": "arn:aws:execute-api:ap-northeast-2:000011112222:sdfsgxxxx/*/POST/sf-proxy"
}
]
}

<external_function_role> 는 2단계에서 생성한 Role Name 으로, “resource” 는 4단계에서 확인한 Method Request ARN 으로 대체합니다. 다시 화면 좌측 상단 Actions 에서 Deploy API 를 클릭해서 변경된 설정을 저장합니다.

5. Snowflake 에서 proxy service (API Gateway) 와 연결할 API Integration 객체를 생성합니다.

CREATE OR REPLACE API INTEGRATION my_api_integration_01
api_provider = aws_api_gateway
api_aws_role_arn = 'arn:aws:iam::000011112222:role/apigateway-snowflake'
api_allowed_prefixes = ('https://sdxxxxxxx.execute-api.ap-northeast-2.amazonaws.com/poc/sf-proxy')
enabled = true;

api_aws_role_arn 는 2단계에서 생성한 Role ARN 을, api_allowed_prefixes 는 4단계에서 확인한 Method Request ARN 을 입력합니다.

아래 명령어를 실행해서 API_AWS_IAM_USER_ARN, API_AWS_EXTERNAL_ID 정보를 기록합니다.

 DESCRIBE INTEGRATION my_api_integration_01;

6. API Integration 과 Proxy Service 를 연결합니다.

AWS Console IAM 에서 2단계에서 생성한 Role 을 클릭한 후 Trust relations 탭-> Edit trust relationship 을 클릭합니다. 5단계에서 확인한 정보를 활용해서 아래와 같이 업데이트한 후 Update policy 를 클릭합니다 Statement.Principal.AWS 에는 API_AWS_IAM_USER_ARN 값을, Statement.Condition 는 API_AWS_EXTERNAL_ID 값을 입력합니다.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::000011112222:user/development/development_user"
},
"Action": "sts:AssumeRole",
"Condition": {"StringEquals": { "sts:ExternalId": "EXTERNAL_FUNCTIONS_SFCRole=3xxxlkjdlkjflkjsOv=" }}
}
]
}

7. Snowflake 에서 External Function 을 생성합니다.

API_INTEGRATION 에는 5단계에서 생성한 API Integration Name 을, as 이후에는 3 단계에서 확인한 Invoration URL 을 기입합니다.

CREATE OR REPLACE EXTERNAL FUNCTION SEND_ALERT(SUBJECT STRING, ADDRESS STRING)
RETURNS VARIANT
API_INTEGRATION = my_api_integration_01
AS 'https://xxxxee11.execute-api.ap-northeast-2.amazonaws.com/default/sf-proxy';

8. External Function 을 호출해서 Slack 으로 Notification 이 전달되는지 확인합니다.

9. 다음은 8 번을 호출하는 Stored procedure 입니다. 이를 다시 Task 로 실행하여 주기적으로 실패한 Task 메시지를 Slack 으로 전달 가능합니다.

Snowflake 의 네이티브 기능인 Task error notification 을 사용하는 방식

두 번째 방식은 Snowflake 의 네이티브 기능인, Task error notification 을 AWS SNS 로 전달하는 기능을 활용하는 방법입니다.

1. AWS SNS 토픽을 생성합니다. (Standard 타입으로 선택합니다.)

2. SNS 에 publish 권한을 위한 정책을 생성합니다.

Create Policy -> Json 탭을 선택하고 아래와 같이 정보를 입력합니다. sns_topic_arn 에는 1단계에서 만든 토픽의 ARN 을 입력한 다음 Create Policy 를 클릭합니다.

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sns:Publish"
],
"Resource": "<sns_topic_arn>"
}
]
}

3. SNS 토픽에 privileges 를 할당할 역할을 생성합니다.

AWS Console -> IAM -> Role -> Another AWS account 를 선택하고 Account ID 에 현재 AWS 계정의 ID 를 임시로 입력합니다. 그 다음 Required external ID 를 선택하고 임시로 0000 을 입력합니다. Next 버튼을 클릭하고 2 단계에서 만든 policy 를 추가한 후 다시 Next 버튼을 클릭합니다. 생성된 역할의 Role ARN 값을 기록해 둡니다.

4. Snowflake 에서 Notification Integration 객체를 생성하고 접근을 허용하는 설정을 합니다.

1단계에서 생성한 토픽의 ARN 과 3단계에서 생성한 역할의 ARN 을 AWS_SNS_TOPIC_ARN 와 AWS_SNS_ROLE_ARN 에 각각 입력합니다.

CREATE NOTIFICATION INTEGRATION sns_noti_int
ENABLED = true
TYPE = QUEUE
NOTIFICATION_PROVIDER = AWS_SNS
DIRECTION = OUTBOUND
AWS_SNS_TOPIC_ARN = 'arn:aws:sns:ap-northeast-2:000011112222:snow-task-alert'
AWS_SNS_ROLE_ARN = 'arn:aws:iam::000011112222:role/snow-sns-role'
;

다음으로 아래 명령어를 통해 SF_AWS_EXTERNAL_ID 와 SF_AWS_IAM_USER_ARN 정보를 확인합니다.

DESC NOTIFICATION INTEGRATION sns_noti_int;

AWS Console -> IAM -> Roles 선택 후 3단계에서 생성한 Role 을 찾고 Trust relationship 탭 -> Edit trust relationship 버튼을 클릭합니다. 아래에서 Statement.Principal.AWS 에는 앞 단계에서 알아낸 SF_AWS_IAM_USER_ARN 값을, sts:ExternalId 는 SF_AWS_EXTERNAL_ID 값으로 대체합니다.

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"AWS": "<sf_aws_iam_user_arn>"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "<sf_aws_external_id>"
}
}
}
]
}

5. Task 가 error notification 을 SNS 로 보내도록 설정합니다.

아래 명령어를 통해 Task 가 error notification 을 SNS 로 보내도록 설정합니다. 이때 ERROR_INTEGRATION 는 4단계에서 생성한 notification 객체의 이름을 입력합니다.

CREATE TASK mytask
SCHEDULE = '5 MINUTE'
ERROR_INTEGRATION = sns_noti_int
AS
INSERT INTO mytable(ts) VALUES(CURRENT_TIMESTAMP);

혹은 아래 명령어로 이미 생성된 Task 의 설정을 변경할 수도 있습니다.

ALTER TASK mytask SET ERROR_INTEGRATION = sns_noti_int;

6. 이제 Lambda 를 사용해서 SNS 로 전송된 notification 으로 Slack 에 메시지를 전달합니다. 아래는 샘플 코드입니다.

해당 코드를 Deploy 합니다. 아래 데이터로 샘플 테스트 케이스를 만들어 정상적으로 호출되는지 확인할 수 있습니다.

{
"Records": [
{
"EventSource": "aws:sns",
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:us-east-1:{{{accountId}}}:ExampleTopic",
"Sns": {
"Type": "Notification",
"MessageId": "XXXXXX-BBBBBBBB-EEEEEE-SSSS903-4c221d41eb5e",
"TopicArn": "arn:aws:sns:us-east-1:000011112222:ExampleTopic",
"Subject": "example subject",
"Message": "example message",
"Timestamp": "1970-01-01T00:00:00.000Z",
"SignatureVersion": "1",
"Signature": "EXAMPLE",
"SigningCertUrl": "EXAMPLE",
"UnsubscribeUrl": "EXAMPLE",
"MessageAttributes": {
"Test": {
"Type": "String",
"Value": "TestString"
},
"TestBinary": {
"Type": "Binary",
"Value": "TestBinary"
}
}
}
}
]
}

아래처럼 Lambda Function overview 에서 Add trigger 를 선택한 후 1단계에서 생성한 SNS 토픽을 선택합니다.

7. 태스크의 error 메시지가 정상적으로 Slack 에 전송되는지 아래 명령어로 확인합니다.

EXECUTE TASK mytask;

지금까지 Snowflake 의 Task 실패 시 에러 메시지를 Slack 으로 받는 방법에 대해 알아봤습니다. Snowflake 는 지금까지 보안상의 이유로 직접적인 API 호출은 CSP 의 프록시 서비스를 통해서만 가능하도록 제한해 왔습니다만, 앞으로 공개될 새로운 기능을 통해서는 직접 API 를 호출할 수 있게 됩니다. 다음 글에서 UDF 혹은 Procedure 를 통해 외부 API 를 직접 호출하는 간단한 방법에 대해 다루도록 하겠습니다.

--

--