Airflow Task failed Alert Mail 폭탄 회피하기

Barkingcode
네이버 플레이스 개발 블로그
15 min readDec 14, 2023

--

안녕하세요?
G플레이스데이터개발 나호철입니다.

제가 속해있는 G플레이스데이터개발팀은 아래와 같이 크게 3가지의 기능을 하는 Data Lakehouse를 운영하고 있습니다.

PDL(Place Data Lakehouse) Architecture
  1. Glace CIC의 여러 서비스로부터 Protobuf로 schema가 정의된 데이터를 받아 HDFS에 적재하는 ETL Pipeline
  2. HDFS에 적재된 데이터를 ML, 어뷰저 탐지, 통계 지표 생성 등 다양한 목적으로 사용할 수 있도록 데이터를 조회할 수 있는 분산 쿼리엔진 제공
  3. 인사이트를 얻을 수 있도록 필요한 지표를 만들거나 레포팅 할 수 있는 BI 툴 제공

이번 글에서는 ETL Pipeline의 구성 요소 중 Airflow를 사용하면서 겪었던 문제 중 하나인 과도한 Task failed Alert mail 발송으로 인한 노이즈를 제거한 내용에 대해 이야기해 보려고 합니다.

Airflow는 Task라고 불리는 최소 작업 단위를 단방향 그래프(DAG)로 구성한 배치 워크플로우를 작성하고 스케쥴링, 모니터링 할 수 있는 오픈소스 프레임워크입니다. 쉬운 사용성과 직관적인 인터페이스로 Data Pipeline을 구성하는데 있어 널리 사용되고 있는 뗄래야 떼어놓을 수 없는 핵심 컴포넌트인데요. 저희 G플레이스데이터개발팀도 다음과 같은 다양한 목적을 달성하기 위해 Airflow를 적극적으로 사용하고 있습니다.

  1. 데이터를 적재하는 ETL Pipeline
  2. 적재된 데이터 정합성 검증
  3. 데이터 백업
  4. Hive External table 관리
  5. Github 연동을 통한 Release/Workflow 자동화
  6. 각종 Sample data 생성

Pain point: 한 Dag에서 병행 수행되는 task들의 실패 횟수만큼 email이 발송되는 경우

Airflow에서는 이런 배치 워크플로우를 잘 관리할 수 있도록 DAG에 실패한 Task가 발생하는 경우 관리자에게 Alert mail을 발송할 수 있습니다. 깨진 창문 신드롬처럼 실패한 Task를 방치하지 않기 위해서 실패 알림을 Email로 받아보고, 재시도하거나 처리 상태를 변경하는 등의 대응을 신속히 수행하는 것은 Airflow 관리자 입장에서 매우 중요한데요. 한 가지 아쉬운 점이 있다면, 실패한 DAG가 아닌 Task 단위로 Alert mail이 발송되기 때문에 동일한(혹은 유사한) 문제로 인해 실패한 Task들의 Alert mail을 한 번에 수십 건씩 받는 경우가 생긴다는 것입니다.

저희 팀에서 운영 중인 ETL Pipeline은 DB 개수 * DB의 Table 개수 * 사용 용도 만큼 증가하고 있으며, 현재 수 백개의 ETL Pipeline을 운영하고 있습니다. 파이프라인마다 약간씩 다르긴 하지만 어찌 되었든 데이터를 전송받고, 처리해 적재하고, 적재된 데이터를 검증하는 로직은 유사하기 때문에 DB 기준으로 DAG를 구성하고, 그 DAG 안에서 for 문을 돌면서 데이터를 적재하는 Task를 동적으로 생성하는 다음과 같은 구조를 지니고 있습니다.

이렇게 동일한 기능을 하는 Task를 동적으로 생성하는 경우 한 개의 Task에서 문제가 발생하게 되면, 나머지 Task들도 역시 문제가 발생할 가능성이 매우 높고 Airflow는 실패한 모든 Task에 대해 Alert mail을 발송한다는 것입니다.
예를 들어, Yarn Cluster와 통신을 하는 Task가 Pipeline상에 존재하는 경우 동작 시점에 Yarn Cluster가 통신 불능인 상태가 된다면, 해당 Task는 모두 실패하여 Airflow 관리자는 동일한 에러에 대해 대한 수십 통의 Alert mail을 받게 될 것입니다. 실제로 실패한 Task에 대한 Alert mail을 받아보면 pipeline 개수만큼 발송되는 것을 볼 수 있습니다.

생각해 보면 동일한 Alert mail이 여러 번 발송된다는 것은 마냥 나쁜 것만은 아닙니다. 그만큼 해당 Task가 여러 군데에서 반복적으로 쓰이고 있고, 문제의 영향 범위가 크다는 것을 직접적으로 표현해 주고 있습니다. 하지만 그렇다고 해서 수십 통의 Alert mail에 의해 발생되는 노이즈로 인해 다른 Alert mail을 확인하기 어려워지거나, 이미 확인한 문제에 대해 똑같은 Alert mail을 지속해서 받는 상황이 용인되는 것은 아닙니다.

Alert Mail이 발송되기 시작하면 멈출 수가 없다..!

따라서 저희 팀에서는 동일한 Alert mail이 발송될 여지가 높은 DAG에 대해서 Alert mail 발송 빈도를 감소시키는 방법을 구상하고 적용해 보기로 했습니다.

문제 해결 방법: 같은 DAG run에서 실행된 실패한 Task instance의 정보를 취합해 DAG run당 1회의 Alert mail만 발송

먼저 구상한 방식을 살펴보면 매우 간단합니다. 하나의 DAG 안에 서로 다른 Table을 HDFS로 적재하는 동일한 Task가 여러 개 존재하고 있으니, 실패하는 원인이 동일하다면 모든 Task에 대한 반복적인 Alert를 받을 수밖에 없습니다. 따라서 모든 Task의 Alert를 제거하고, 앞선 Task의 결과에 따라 Alert를 발생시키는 단 하나의 Alert Task를 생성해 이 Task안에서 실패한 Task들을 처리하여 한 개의 Alert mail을 발송할 수 있도록 구상하였습니다.

Airflow의 요소와 그 속성을 살펴보며 요구사항 구현하기

구상한 내용을 실제로 구현하기 위해 필요한 Airflow의 요소와 그 속성을 살펴보며 한번 구현해봅니다.

email_on_failure 속성

Airflow의 Alert mail 발송은 Task의 email_on_failure 속성값에 따라 결정됩니다. 이 속성값은 DAG와 Task 양쪽 모두에서 정의할 수 있는데요. DAG의 default_args 파라미터로 DAG를 구성하는 모든 Task에 기본으로 email_on_failure의 값을 부여할 수도 있고, 특정 Task에만 별도로 email_on_failure의 값을 부여할 수도 있습니다.

Task의 email_on_failure 속성 설정 방법

DAG의 default_args의 email_on_failure의 값이 True이기 때문에 DAG를 구성하는 모든 Task는 실패할 경우 Alert mail을 발송하게 되므로 이 옵션을 False로 변경하여 Alert mail을 발송하지 않도록 할 수 있습니다. Alert Task를 제외한 DAG를 구성하는 모든 Task들의 email_on_failure를 source code 상에서 일일이 False로 설정할 수도 있지만 이 방법은 매우 번거롭고, DAG의 default_args의 email_on_failure를 False로 설정할 수도 있지만 이 경우 email_on_failure의 값은 False인데 실제로는 Alert email이 발송되는 Airflow 관리자 입장에서 매우 아이러니한 상황에 놓이게 됩니다. 따라서 의미적인 면과 기능적인 면에서 요구 조건을 모두 부합하도록 DAG의 default_args의 email_on_failure를 True로 설정하고 프로그래밍적으로 모든 Task의 email_on_failure를 False로 설정한 뒤, Alert Task만 email_on_failure=True로 설정하는 방식으로 구현하였습니다.

DAG의 tasks라는 attribute를 사용해 DAG를 구성하는 모든 task들을 가져올 수 있습니다. 이를 순회하면서 모든 Task의 email_on_failure를 False로 설정합니다.

DAG와 Task의 속성

모든 Task의 email_on_failure=False로 설정되었으니, 이제 Task가 실패하더라도 더는 Alert mail을 받아보지 못합니다. 따라서 Alert를 받기 위해 Alert Task에서 모든 Task의 상태 값을 확인하여 실패한 Task가 있는 경우 Alert mail을 전송하도록 해야 합니다.

먼저 앞서 실행된 Task와 Alert Task간의 Dependency 설정이 필요합니다. 앞선 Task가 끝나야지만 성공 여부를 판별할 수 있기 때문입니다.
DAG의 leaves라는 attribute를 사용해 DAG를 구성하는 가장 마지막 Task들(leaves)를 가져올 수 있습니다. 이후, 모든 leaves의 다음 Task에 Alert Task를 downstream으로 연결해 주어 모든 Task가 끝난 후 Alert Task가 수행될 수 있도록 합니다.

여기서 주의할 점은 Alert Task를 생성할 때 DAG의 tasks에 포함되기 때문에 dag.leaves >> Alert Task 처럼 코드를 작성할 경우 자기 자신에 대한 dependeny를 설정하려 하기 때문에 다음과 같은 에러를 마주할 수 있습니다.

dag.leaves 안에 Alert Task 자신도 포함되어 발생하는 에러

따라서 별도의 list에 앞선 leaves task들을 저장해두고 이를 활용해 dependency를 구성합니다.

또한, Alert Task는 앞선 Task가 성공했던 실패했던 반드시 실행되도록 설정해야 합니다. 일반적으로는 앞선 Task가 실패할 경우, downstream task는 실행되지 않도록 trigger_rule이 default로 all_success로 설정되어 있습니다.
앞선 Task의 성공 여부와 관계없이 어쨌든 끝이 났으면 실행될 수 있도록 trigger_rule을 all_done으로 설정해 줍니다.

DAG run과 Task Instance의 속성

다음으로 Alert Task에서 앞선 Task의 결과를 가져와 성공 여부를 판단할 수 있어야 합니다.

PythonOperator의 경우 dag_run 파라미터를 추가하거나, **kwargs 또는 **context 파라미터를 추가해 dag_run을 받아올 수 있습니다.
DAG run의 get_task_instances() method를 사용해 앞서 실행된 Task Instance를 가져온 뒤, Task Instance의 state attribute를 확인하여 Task instance가 실패한 경우 Alert mail의 대상에 포함하도록 구현합니다.

Alert mail에는 실패한 Task를 식별하기 위한 task_id와 Log 전체를 확인하고, retry나 mark success 등 조치를 취할 수 있는 페이지로 접근할 수 있는 log_url을 추가합니다. 두 속성 모두 Task Instance의 attribute이므로 쉽게 참조할 수 있습니다.

여기까지 구현했다면, 한 개의 Alert mail에 실패한 모든 Task를 담는 과정은 마무리가 되었습니다. 하지만 이전 Alert mail과 비교해 보면 이렇게 생성한 Alert mail에서는 Log를 직접 확인하지 못한다는 단점이 있습니다.
Log를 확인할 수 있는 URL을 제공하긴 하지만 사내 서비스에 접근을 위해서는 인증이 필요하기도 하고, 메일 안에서 Log를 바로 확인하지 못하기 때문에 조금 불편하기도 합니다.

기존에는 Error Log를 바로 확인할 수 있었지만, 변경된 방식으로는 Mail 안에서 Log 확인이 불가능하다.

Conf의 logging section

단점을 보완하기 위해 실패한 Task의 Log를 가져와 Alert mail 본문에 추가하도록 구현합니다. Task의 log는 Task가 실행되는 시점에 log file에 flush되기 때문에 Task instance에서 직접 가져오지는 못합니다. 따라서 log file을 직접 참조해 실패한 Task의 log를 읽어옵니다.
log file의 경로를 알기 위해서는 log가 저장되는 경로와 log file의 template을 가져와 log file의 경로를 생성해야 합니다. 이 값은 conf의 logging section에서 확인할 수 있습니다.

conf는 dag_run과 마찬가지로 conf 파라미터를 추가하거나 **kwargs 또는 **context 파라미터를 추가하여 가져올 수 있습니다. 아래와 같이 base_log_folderlog_filename_template을 가져와 log file 경로를 생성하고 실패한 Task의 log file을 읽어올 수 있습니다.
모든 log를 그대로 추가할 경우 mail 본문이 너무 길어지니 HTML의 details tag를 활용해 Log 내용을 열고 닫을 수 있도록 구현합니다.

구현 및 동작

완성된 코드를 보면 아래와 같습니다. DAG의 Alert mail을 하나로 통합하고 싶은 경우, remove_duplicate_alert()의 인자로 DAG를 넘겨주기만 하면 어떤 DAG든지 Alert mail을 하나로 통합 할 수 있습니다

def send_alert_mail(dag_run: DagRun, conf: AirflowConfigParser) -> None:
base_log_folder = conf.get('logging', 'base_log_folder')
log_filename_template = conf.get('logging', 'log_filename_template')
msg = []
for ti in dag_run.get_task_instances():
if ti.state != State.FAILED:
continue

msg.append(f'Task id: {ti.task_id}, Log URL: <a href="{ti.log_url}">Link</a>')
latest_log_file_path = Template(log_filename_template).render(ti=ti, try_number=ti._try_number)
with open(f'{base_log_folder}/{latest_log_file_path}', 'r') as f:
msg.append('<details>')
msg.append('<summary>Error log</summary>')
for l in f.readlines():
msg.append(l)
msg.append('</details>\n')

raise AirflowException(''.join(msg)) if msg else AirflowSkipException()


def remove_duplicate_alert(dag: DAG) -> DAG:
if not dag.default_args.get('email_on_failure'):
return dag

for task in dag.tasks:
if isinstance(task, BaseOperator):
task.email_on_failure = False

leaves = dag.leaves

alert_op = PythonOperator(
task_id='send_alert_mail',
python_callable=send_alert_mail,
execution_timeout=timedelta(minutes=1),
trigger_rule='all_done',
dag=dag)

leaves >> alert_op
return dag

해당 코드를 통해 다음과 같이 여러 건의 Alert mail을 하나로 통합해 받을 수 있습니다.

또한, 각 Task의 Log를 열어보면 Task의 실패 원인에 대해 메일에서 바로 파악할 수 있습니다.

마치며

앞선 내용을 통해서 Airflow의 다양한 요소와 속성들에 대해서 살펴보고 이를 활용해서 원하는 방식대로 DAG와 Task를 재구성해 보았습니다. 이 글에서 소개한 속성들 이외에도 Airflow에는 흥미를 가지고 다뤄볼 만한 속성들이 많이 존재합니다. 이러한 것들에 조금만 관심을 가지고 살펴본다면 여러분들의 Airflow를 한층 더 강력하고 편리하게 발전시킬 수 있다고 생각합니다.

읽어주셔서 감사드립니다.

--

--