Snowflake 로 Notion 에 있는 데이터 가져오기
Snowflake 에서 API 를 호출할 수 있다는 걸 알고 계셨나요? Snowflake 의 External Network Access 기능을 사용하면 외부 API endpoint 에 call 을 던져 데이터를 가져오거나 알람을 발생시킬 수 있습니다. 이번 글에서는 Snowflake 와 Notion 을 연계하는 방법에 대해 설명하겠습니다.
- 먼저 여기를 참조해서 Notion 에 Integration 을 설정한 후 secret 을 생성하고 기록해둡니다. 그리고 읽어올 Notion 페이지가 해당 Integration 을 통해 접근 가능하도록 허용해줍니다.
- Notion API call 이 동작하는지 테스트해 봅니다. Notion 은 페이지를 Database, Page, Block 으로 구분하는데요. 가져오고 싶은 페이지의 Block ID 를 찾아 아래와 같이 테스트합니다. (Database, Page, Block 의 개념은 여기 참고, Database ID 찾기는 여기 참고)
curl 'https://api.notion.com/v1/blocks/33333333-6666-999-1111-000000000/children?page_size=50' \
-H 'Authorization: Bearer 'secret_xxxxxxxxxxxxxxxxxxYYYY888888'' \
-H 'Notion-Version: 2022-06-28'
여기까지 성공하면 Notion 에 API call 을 실행할 준비가 다 된것입니다. 위 API call 을 실행하면 다음과 같이 json 형태로 데이터가 리턴됩니다. Notion 에서 table 로 표현된 데이터를 가져오려면 results 키 안에 table_row 키 안에 cells 키 값인 리스트를 가져와야 하며,각 행들의 키는 plain_text 키 입니다.
{
"block": {},
"has_more": false,
"next_cursor": null,
"object": "list",
"request_id": "28aafc08-b7c8-498f-a657-697f3d88e1f1",
"results": [
{
"archived": false,
"created_by": {
"id": "8f8ccee4-3dce-45c2-aef6-92693113d230",
"object": "user"
},
"created_time": "2024-08-29T02:06:00.000Z",
"has_children": false,
"id": "bedc916e-08b5-45c7-b6db-39882a54b7af",
"in_trash": false,
"last_edited_by": {
"id": "8f8ccee4-3dce-45c2-aef6-92693113d230",
"object": "user"
},
"last_edited_time": "2024-09-10T05:11:00.000Z",
"object": "block",
"parent": {
"block_id": "8a2b2d22-9891-475c-82ec-388245195479",
"type": "block_id"
},
"table_row": {
"cells": [
[
{
"annotations": {
"bold": false,
"code": false,
"color": "default",
"italic": false,
"strikethrough": false,
"underline": false
},
"href": null,
"plain_text": "id",
"text": {
"content": "id",
"link": null
},
"type": "text"
}
],
[
{
"annotations": {
"bold": false,
"code": false,
"color": "default",
"italic": false,
"strikethrough": false,
....
....
....
이제 Notion 의 다음 테이블을 Snowflake 로 가져와보도록 하겠습니다.
3. 다음 커맨드를 실행해서 Snowflake 가 Notion endpoint 로 접근 가능하도록Network Rule 을 정의합니다.
CREATE NETWORK RULE notion_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('api.notion.com');
4. Notion 과 연계할 때 사용할 Secret 을 생성합니다.
CREATE SECRET notion_secret
TYPE = GENERIC_STRING
SECRET_STRING = 'secret_xxxxxxxxxxxxxxxxxxYYYY888888';
5. 이전 단계에서 만든 Network Rule 과 Secret 을 사용해서 External Access Integration 을 만듭니다.
CREATE EXTERNAL ACCESS INTEGRATION notion_integration
ALLOWED_NETWORK_RULES = (notion_rule)
ALLOWED_AUTHENTICATION_SECRETS = (notion_secret)
ENABLED = true;
6. 이제 아래와 같이 External Access Integration 을 통해 Notion API endpoint 를 호출하는 Procedure 를 생성한 후 실행합니다. 이 Procedure 는 Notion 으로부터 데이터를 가져와서 dataframe 으로 변환 다음 Snowflake 테이블에 write 합니다.
CREATE OR REPLACE procedure notion_call_tb(blockid string)
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = 3.9
HANDLER = 'get_translation'
EXTERNAL_ACCESS_INTEGRATIONS = (notion_integration)
PACKAGES = ('snowflake-snowpark-python','requests')
SECRETS = ('cred' = notion_secret )
AS
$$
import _snowflake
import requests
import json
import pandas as pd
import snowflake.snowpark as snowpark
from snowflake.snowpark.context import get_active_session
api_session = requests.Session()
def get_translation(blockid):
session = get_active_session()
token = _snowflake.get_generic_secret_string('cred')
url = "https://api.notion.com/v1/blocks/" + blockid + "/children"
header = {"Authorization": "Bearer "+ token, "Notion-Version": "2022-06-28"}
response = api_session.get(url, headers=header)
data = response.json()
table_rows = []
for result in data['results']:
row = []
for cell in result['table_row']['cells']:
if len(cell) !=0:
row.append(cell[0]['plain_text'])
else:
row.append('')
table_rows.append(row)
df = pd.DataFrame(table_rows)
df.columns = df.iloc[0]
df = df[1:].reset_index(drop=True)
snow_df = session.create_dataframe(df)
snow_df.write.save_as_table('test')
return df
$$;
call notion_call_tb('8a2b2d22-9891-475c-82ec-388245195479');
7. 이제 Snowflake 로 저장되었는지 확인해봅니다.
끝났습니다! 지금까지 Snowflake 와 Notion 을 연계하는 방법에 대해 알아봤는데요. 이런 방식을 사용하면 Slack 이나 Jenkins 등 API endpoint 를 제공하는 모든 서비스들과 연계가 가능합니다. 제가 작년에 작성한 Task error 발생시 notification 을 Slack 메시지로 받기 이 글도 External Network Access 를 사용하면 훨씬 간단해 질 것 같네요.