pov: 슬랙 bolt로 에어플로우 DAG 실행시키기

Jiyoon You
원티드랩 기술 블로그
9 min readJul 27, 2023

안녕하세요 원티드랩 데이터 팀에서 데이터 분석을 하고 있는 유지윤입니다.
작년에 이어 3개월간 이벤트 QA 자동화 툴을 고도화하는 단기 프로젝트를 진행하게 되었어요.

많은 일이 있었지만 그중에서도 기술적으로 새로운 개념을 접할 기회가 가장 많았던
슬랙 bolt에어플로우 DAG를 연동시킨 과정을 공유해 보려고 합니다.

💡 왜 하게 되었는가

로컬에서 실행시킨 화면

작년까지 작업한 QA 자동화 툴은 로컬에서 파이썬 파일을 실행하는 방식이었어요 (더보기)

데이터 팀 내에서 사용하는데 이 방식이 크게 불편하지 않았지만, 간혹 이벤트 QA를 진행하는 PO를 포함한 다른 구성원이 쓰기엔 파이썬과 깃헙 환경을 세팅하고 사용하는 것이 허들로 작용할 거라 생각했어요.

그래서 환경 세팅 없이도 웹에서 제어가 용이한 airflow DAG를 만드는 작업을 제일 먼저 진행했어요.
기존에는 아래와 같이 코드 실행에 필요한 변수는 DAG config로 입력할 수 있도록 만들어져 있었어요.

하지만 JSON과 airflow를 사용하는 방식은 비 개발 직무를 포함하는 모든 사내 구성원이 사용하기에 허들이 있다고 생각했어요.

더 편한 형태를 고민하게 되었고, 전사가 쓰고 있는 슬랙으로 (1) JSON에 필요한 값을 입력해서 (2) DAG를 실행시키고 (3) 진행 상황을 볼 수 있도록 한다면 사용성이 대폭 개선될 것이라 생각했어요.

목표: 슬랙으로 이벤트 QA를 할 수 있도록 만들어보자!

  1. JSON에 필요한 값 입력
  2. DAG 실행
  3. 진행 상황 알림

🤔 그래서 어떻게 연결한건데

슬랙 “이벤트 QA 실행” 메시지에 응답하기

슬랙에서 슬랙 앱을 편하게 만들기 위해 제공하는 프레임워크인 bolt를 사용했어요.
빅쿼리 오류 자동응답을 위해 bolt 환경이 세팅되어 있어서 문서에 나와있는 대로 데코레이터와 함수만 추가하면 해당 문자열에 반응할 수 있도록 만들 수 있었어요.

@app.message('이벤트 QA 실행')
def event_qa_message(message, say):
...

메시지에 간단한 설명과 버튼을 추가해서 버튼 클릭 시 모달을 발생시키기 위해 아래와 같이 block을 만들었어요.

다른 액션으로 이어지는 요소에는 action_id와 같은 id가 필요해요.
저는 Click Me라는 버튼 클릭 시 모달을 발생시킬 거라 이 버튼에 event_qa_action이라는 action_id를 추가해 줬어요.

blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "이벤트 QA를 실행하려면 아래 버튼을 클릭해주세요!"
},
"accessory": {
"type": "button",
"text": {
"type": "plain_text",
"text": "Click Me",
"emoji": True
},
"action_id": "event_qa_action"
}
}
]

say 함수에 argument로 thread_ts를 넣게 되면 “이벤트 QA 실행”이라고 적은 메시지의 스레드로 모달 실행 메시지가 전송됩니다.

say(blocks=blocks, thread_ts=message['ts'])

Click Me 클릭시 모달 생성하기

모달에서 JSON에 필요한 값들을 입력받을 수 있도록 구현해 봤어요.
모달에 입력받는 값은 아래 key (붉은 글씨)의 value로 가공 할거예요.

blocks에서 action_id로 넘겨준 event_qa_action을 트리거로 위와 같이 생긴 모달 뷰를 열어줄 거예요

@app.action('event_qa_action')
def event_qa_modal(ack, body, client):
# Acknowledge the command request
ack()

client.views_open(...)

모달의 전체적인 모양새는 block kit builder를 사용해서 잡고 입력받은 값을 가공하기 위해 block_id와 action_id 값을 추가했어요.

파일명 부분만 보면 아래와 같은 식으로요:

{
"type": "input",
"block_id": "file-name-block",
"element": {
"type": "plain_text_input",
"action_id": "file-name-action",
},
"label": {
"type": "plain_text",
"text": "파일명",
"emoji": True
},
"hint": {
"type": "plain_text",
"text": "실제 파일명과 동일하게 입력해주세요. 띄어쓰기도 모두 동일해야합니다"
}
}

그리고 모달 제출 시 다른 함수로 연결할 수 있는 callback_id와 슬랙에 알람을 보내기 위해 필요한 슬랙 채널 ID(channel_id)와 스레드 값(thread_ts)도 private_metadata로 넘겨줬어요.

"callback_id": 'event_qa_submission',
"private_metadata": f"{body['container']['channel_id']} {body['container']['thread_ts']}"

모달 제출 시 → DAG 실행

이제 거의 다 했어요!
모달에서 입력하고 넘겨받은 정보로 DAG만 실행시키면 bolt에서 할 수 있는 건 다에요

일단 위에서 사용한 callback_id로 app.view를 데코레이터로 추가하고 함수를 적어줬어요.

@app.view("event_qa_submission")
def run_event_qa(ack, body, view):
ack()
...

DAG 실행할 때 json 형태로 넘길 수 있도록 다 파싱 해줄게요

# 채널ID, thread_ts
channel_id, thread_ts = view['private_metadata'].split()

# 파일명, 시트범위, 서브도메인
file_name = view["state"]["values"]["file-name-block"]["file-name-action"]['value']
sheet_range = view["state"]["values"]["sheet-range-block"]["sheet-range-action"]['value']
subdomain = view["state"]["values"]["subdomain-block"]["subdomain-action"]['value']

channel_id, thread_ts, file_name 등을 json 형태로 담아줄게요

json_data = {
'conf': {
'file_name': file_name,
'sheet_range': sheet_range,
'channel_id': channel_id,
'thread_ts': thread_ts
}
}

이 header를 넣어서 request 해주면 DAG가 짜잔 실행됩니다!

url = 'http://{에어플로우 주소}/api/v1/dags/event_qa_trigger/dagRuns'
requests.post(url, headers=headers, json=json_data, auth=(username, password))

DAG에서 진행 상황 슬랙으로 전달

이 부분은 언제 알람이 필요한지에 따라 가감, 응용하면 될 것 같아요

저는 아래 시점에 슬랙 메시지를 전송했어요

  1. DAG 실행
  2. 요청 시트 url 가져온 후 (본격 QA 시작 직전)
  3. 코드 실행 완료/실패

셋 다 시점이 차이일 뿐 비슷한 방식이라 2번 코드를 예시로 봐볼게요 (참고 문서)

text = f"텍스트 어쩌구저쩌구~"
client.chat_postMessage(text, channel=channel_id, thread_ts=thread_ts)

그럼 이렇게 맨 처음 메세지의 스레드로 슬랙 메시지를 보낼 수 있어요.

실패 시 메시지 전송

airflow에선 DAG나 task가 실패했을 때 특정 함수가 실행되도록 on_failure이라는 argument를 제공해요

def trigger_event_qa_dag():
trigger_main = PythonOperator(
python_callable=run_ecs, # 실행시 호출할 함수이름
task_id='trigger_main', # task 이름
on_failure_callback=main_fail, # 실패시 main_fail 실행
op_kwargs={'install': True}
)

main_fail이라는 함수에 위 슬랙 메시지와 마찬가지로 chat_postMessage 사용해서 메시지를 보내주면 돼요

def main_fail(context):
text = "텍스트 자리는 여기"
client.chat_postMessage(text, channel=channel_id, thread_ts=thread_ts)

성공시:

실패시:

마무리

저희 팀 데이터 엔지니어 분께서 이 글을 읽고 “ChatOps다!” 라고 말씀 주셔서 처음으로 ChatOps라는 단어를 접하게 되었는데, 딱 들어맞는 모델이더라구요.

개발적으로 새로운 툴에 대한 지식뿐만 아니라 제3자가 사용하는 입장에서 어떻게 하면 더 편할까?라는 고민을 할 수 있었던 의미 있던 시간이었다고 생각합니다!

짧지 않은 글, 읽어주셔서 감사합니다.

--

--