Airflow2를 이용한 데이터 파이프라인 구성기

Sungmin Han
Riiid Teamblog KR
Published in
30 min readMar 17, 2022

안녕하세요 저는 뤼이드에서 데이터 엔지니어링 및 데이터 시스템을 담당하고 있는 한성민입니다.

오늘은 뤼이드에서 일하며 모델 학습, 데이터 분석, 캠페인 집계, 제품 대시보드에 사용하는 데이터 수집을 위해 Airflow를 채택하고 사용하는 사례를 공유하고자 합니다. 이 사례들을 바탕으로 Airflow 도입을 고려하는 데이터 엔지니어, 개발자 분들이 조금이나마 인사이트를 얻으시길 바랍니다.

이 글을 읽으시면 도움이 되는 독자들

  • Airflow에 관심이 있는 개발자, 데이터 엔지니어.
  • 제품 중심 회사에서 데이터 운용을 어떻게 하는지 궁금한 분.
  • ETL, 데이터 관리를 위한 파이프라인 구성에 관심이 많은 분.
▲ 뤼이드 한국 오피스 R 게이트

뤼이드는 “산타” 제품을 시작으로 여러가지 종류의 AI 교육 플랫폼을 개발하고 운영하고 있습니다. AI 제품 회사에서는 모델 학습을 위해서 데이터 수집과 관리가 중요하며, AI 키워드를 제외하더라도 제품을 운영하는데 있어서 데이터는 대시보드 구성, 모니터링, 의사 결정 토대 마련에 있어서 중요한 자산입니다.

제가 합류했던 당시 뤼이드는 “산타” 제품을 이미 성공적으로 운영하고 있었으며 열정 많은 연구자, 개발자가 모여서 새로운 교육 플랫폼, 실험을 진행하고 있었습니다. 개발자가 아닌 PM (Product Manager), BD (Business Development)직종에 계신 분들 마저도 쿼리문을 통해 데이터 조회가 가능했으며 Redash, Data Studio 등을 사용해 제품 KPI, 캠페인 집행 분석 등 데이터 분석과 이를 통한 의사 결정이 이루어지고 있습니다.

우리의 소중한 뿔뿔이 흩어진 데이터

뤼이드 MLOps팀에 합류하여 여러가지 일을 하던 도중 몇 가지 문제점이 스물스물 발견되기 시작했습니다. 그때 당시 느꼈던 가장 큰 문제점은 데이터의 가공, 수집 프로세스가 가시적이지 않았고 데이터 프로세스 공간이 여러군데 파편화되어 흩어져 있었습니다.

데이터 ETL 없이 Read replica DB에 조회 쿼리를 통해 분석이 진행되었고 여러 데이터가 로컬 컴퓨터에서 가공돼 처리되었으며 그 데이터를 관리하는 코드가 한 곳에 모여있지 않아 일을 진행하기 위해서 이곳저곳 사람을 만나며 컨택스트(context)를 수집해야 했습니다.

당시 저는 데이터와 함께 서빙 시스템 구축에도 참여하고 있었고, 운이 좋게도 당시 서빙 시스템 개발 과정에서 ETL 요구사항이 발생했습니다. 요구사항을 들여다보기 앞서 뤼이드의 교육 플랫폼과 교육 컨텐츠에 대한 설명이 필요합니다.

교육 컨텐츠 스냅샷

▲ 산타 플랫폼을 통한 문제 풀이 인터페이스

우리가 만드는 교육 플랫폼은 지문과 문항, 정답 등으로 이루어진 컨텐츠 데이터를 바탕으로 사용자에게 문제가 제공되고, 사용자는 앱 혹은 웹 인터페이스를 통해 문제를 풀 수 있도록 이루어져 있습니다. 제품 속에는 개인화를 위해 사용자가 문제를 풀던 과정에 따라 각 사용자에게 맞는 문제를 추천하거나, 문제 유형별 취약점을 분석하거나, 점수를 예측해주는 등의 AI 기술이 사용됩니다.

제가 참여했던 서빙 시스템은 추천, 점수 예측, 취약점 분석을 위한 AI 모델과 기능을 제공해주어야 했고, 이때 빠르게 컨텐츠 정보를 조회하고 사용할 수 있는 개념이 필요했습니다. 사내에 문제 정보를 보관하는 컨텐츠 서버가 존재했지만, 취약 문제 유형 분석을 위해서는 각 유형에 따른 컨텐츠 목록을 2–30ms 안팎의 빠른 시간 안에 조회가 필요한 우리의 요구 사항에 알맞지 않았고, 요구 조건을 충족하기 위한 솔루션이 필요했습니다.

▲ Content Snapshot을 생성하는 도식도 예시

당시 이를 위한 해결책으로 보유한 컨텐츠를 취합한 스냅샷을 서빙 시스템에 탑재해 사용했으며, 이 과정에서 일자마다 돌아가야 하는 ETL 코드를 어떻게 구성할 것인지 논의했습니다.

Apache Airflow

MLOps 팀 멤버들은 대부분 Python 언어를 사용할 줄 알았으며 운이 좋게도 저는 과거 Airflow를 이용한 프로젝트를 진행한 적이 있었습니다. Airflow 외에도 Luigi, Argo 등의 도구들에 대한 관리 용의성, 생태계, 러닝 커브, 기능 비교를 했고 최종적으로 Apache Airflow가 선정되었습니다.

▲ Apache Airflow 로고

Apache Airflow는 Airbnb로 부터 개발된 워크플로우 도구로, 현재는 Apache 재단에서 관리되고 있습니다. Airflow 에서는 데이터 수집, 처리 등의 단위를 테스크로 정의하고 DAG(Directed Acyclic Graph)를 통해 테스크 간의 진행 순서와 관계를 정의할 수 있습니다.

DAG는 그래프 유형 중 하나로, 하나의 방향을 가지며 루프가 존재하지 않는 특징을 가지고 있고 이로 인해 테스크 설계에 유리한 구조를 가지고 있습니다.

▲ DAG 구조 예시

이렇게 정의한 DAG는 트리거를 통해 실행 가능하며 트리거는 crontab 규칙에 따른 스케줄 방식과 직접 실행하는 메뉴얼 방식, SmartSensor를 통한 특수한 조건 등을 통해 만들어 낼 수 있습니다.

▲ 과거 Airflow 1 DAG 구성

컨텐츠 다운로드, 데이터 가공, 유효성 검증, 데이터 업로드 등으로 테스크를 분리하고 XCom이라고 부르는 Airflow에서 제공하는 테스크 간의 데이터 전송 방식을 통해 중간 결과물을 테스크 간 전송하도록 구성했고 언뜻 보면 깔끔한 DAG를 구성했고 한동안 이 방식으로 컨텐츠를 운영하는데 문제가 없었습니다.

사건의 발단

▲ Airflow 장애 화면

개발자라면 저마다 겪기 싫은 끔찍한 상황이 존재할 것입니다. 저는 당시 위 사진과 같은 장애 화면이었는데, 어느 순간부터 Airflow server에 Crash가 발생했습니다. 인프라 팀과 원인을 분석한 끝에 Database 용량을 우리의 예상 외로 많이 사용하고 있는 현상을 발견했고, 이로 인해 용량 한계치를 초과해 문제가 발생했습니다.

용량이 초과된 원인은 XCom의 무분별한 사용에 있었습니다. Airflow1에서는 XCom 전송이 Database를 통해 이루어지기 때문에 과도한 데이터를 XCom을 통해 전송하면 Database에 거대한 데이터를 저장하는 것과 마찬가지였고, 우리의 경우 거대한 교육 컨텐츠를 잘게 쪼개놓은 테스크간에 전달하기 때문에 막대한 데이터를 여러 번 Database에 저장하고 조회하는 비효율적인 방식으로 동작했습니다.

▲ 당시 인프라와 MLOps 팀의 논의 사항 정리본

당시 인프라와 이 문제를 가지고 진지하게 고민하였고, 테스크 덩어리를 지금보다는 크게 합치고 거대한 데이터 전송은 XCom 대신 S3를 직접 이용하도록 수정하는 쪽으로 개선을 진행했습니다. 다만 이런 개선 방식은 아래와 같은 문제를 가지고 있었습니다.

  • 테스크가 동작 단위가 아니라 데이터 단위로 묶이기 때문에 가독성이 떨어지고 운영에 비효율적입니다.
  • 데이터 전송을 직접 명시해줘야 되기 때문에 실수의 여지가 있고, 시스템이 제약을 걸지 않기 때문에 컨벤션으로 관리해야 하며, 데이터 저장 경로를 일일이 관리해주어야 합니다.
  • 테스크와 테스크간에 어떤 데이터 전송이 일어나는지 가시적으로 확인이 되지 않았고, 암묵적으로 이루어집니다.

그럼에도 서비스 운영에는 큰 이상이 없기 때문에 한동안 이 방식을 이용했습니다.

Airflow2

2021년 봄에서 여름으로 넘어갈 즈음 ML 엔지니어 최정우님이 우리 팀에 합류하셨고, Airflow에 관심이 많으셨기 때문에 작업을 인계하고 파악할 수 있는 시간을 드린 적이 있습니다. 정우님은 Airflow 운영의 문제점을 파악하시고 불과 몇 달 지나지 않아 Airflow2를 제안하셨습니다.

▲ 뤼이드의 Airflow 2 진입 화면 (Dev)

Airflow2는 2020년 12월 릴리즈된 기존 Airflow1에서 몇 가지 커다란 기능이 패치된 새로운 버전의 Airflow로 AIP-34(TaskGroup), AIP-35(TaskFlow API)를 포함하여 편의 기능과 안정성, 확장된 연동 기능을 제공합니다.

Airflow2 PoC를 진행하기로 결정하고 PoC 시나리오를 우리 서비스가 실제로 사용하고 있는 컨텐츠 스냅샷으로 진행했습니다. PoC 과정에서 Airflow2에 추가된 기능은 더욱 부각되었는데, 우리 팀에서 유용했었던 기능을 몇 가지 정리하면 아래와 같습니다.

  1. TaskFlow API:

XCom 전송을 ti(Task Instance)를 통해 명시적으로 하지 않더라도 Task 혹은 Operator의 반환 값을 그대로 다른 Task, Operator에 전송하면 XComArg로 추상화 되어 전송할 수 있습니다. 또한 이 과정에서 Shift operator(>>, <<)를 사용하지 않고 함수 호출 방식으로 테스크를 구성할 수 있어서 테스크 관계를 더욱 직관적으로 정의할 수 있었습니다.

더 자세한 설명은 아래 별도 섹션인 “TaskFlow API”에서 다룹니다.

2. TaskGroup:

Airflow1에서는 테스크를 잘게 쪼개면 전체적인 구조를 세부적으로 보고 관리할 수 있지만 큰 흐름에서 작업을 한눈에 파악하기 어려워집니다. Airflow2에서는 TaskGroup이라는 개념으로 테스크들을 추상화된 개념으로 묶어 관리할 수 있고 이를 통해 한눈에 쉽게 전체 흐름을 파악할 수도, 원한다면 각 세부 개념을 자세하게 파악할 수도 있게 됩니다.

더 자세한 설명은 아래 별도 섹션인 “TaskGroup”에서 다룹니다.

3. Refreshed UI:

Airflow1과 비교해서 UI가 전체적으로 수정 되었고, DAG Rendering 과정에서 발생하는 오류를 접어 놓거나 Manual Trigger를 더욱 간결한 Depth로 수행할 수 있고, Airflow의 시간을 UTC가 아닌 KST로 설정해서 볼 수 있는 등 세부적으로 더욱 좋아졌습니다.

TaskFlow API

TaskFlow API는 제가 주관적인 생각으로 Airflow2의 가장 핵심 기능이자 큰 변화입니다. TaskFlow API를 설명하기 앞서서 코드로 차이점을 살펴봅시다. 아래는 간단한 테스크를 DAG로 구성한 예시입니다.

Airflow 1

from datetime import datetimefrom airflow import DAG
from airflow.operators.python import PythonOperator
default_args = {
'owner': 'airflow',
}
with DAG(
'example',
default_args=default_args,
description='An example',
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
tags=['example'],
) as dag:
def hello_world(**kwargs):
ti = kwargs['ti']
ti.xcom_push('msg', 'hello, world!')
def display(**kwargs):
ti = kwargs['ti']
display_msg = ti.xcom_pull(task_ids='hello_world', key='msg')
print(display_msg)
hello_world_task = PythonOperator(
task_id='hello_world',
python_callable=hello_world,
)
display_task = PythonOperator(
task_id='display',
python_callable=display,
)
hello_world_task >> display_task

Airflow 2

from datetime import datetimefrom airflow.decorators import dag, taskdefault_args = {
"owner": "MLOps",
"description": "An example",
"start_date": datetime(2022, 1, 1),
}
@dag(
"example",
default_args=default_args,
catchup=False,
schedule_interval=None,
)
def example_dag:
@task()
def hello_world() -> str:
return "hello, world!"
@task()
def display(msg: str):
print(msg)
# XComArgs를 통해 함수 호출 Style로 테스크 제어.
msg = hello_world()
display(msg)
example_dag = example_dag()

둘 모두 2가지의 테스크를 정의했고 hello_world 테스크가 끝나면 display 테스크가 실행되어 문구를 출력하도록 테스크 순서를 구성했습니다. Airflow 1은 Python callable 함수와 PythonOperator를 별도로 정의해야 하지만 Airflow 2의 TaskFlow API는 @task decorator로 DecoratedPythonOperator 형태로 파이썬 Callable와 테스크를 통합해서 정의할 수 있어서 더욱 직관적입니다.

Airflow 2 TaskFlow API의 가장 큰 장점은 ti(TaskInstance)를 통해 XCom data를 직접 push/pull 할 필요 없이 @task 로 정의한 함수의 반환 값을 그대로 다른 테스크의 인자로 제공하면 XCom 전송이 일어나도록 구성되어 데이터의 전송 흐름과 테스크의 흐름이 동일해지고 XCom 호출 과정을 생략할 수 있어서 간결한 테스크 구성과 데이터 전송 흐름을 바탕으로 빠른 파이프라인 진단이 가능해집니다.

Custom XCom Backend

Airflow는 Custom XCom Backend라고 부르는 XCom 저장소를 사용자가 정의할 수 있는 구현체를 제공하는데, 우리는 이 기능을 이용하여 XCom을 Database 대신 S3를 지정하여 대용량 데이터에 대해서 안정적이고 확장성 있는 구조를 갖게끔 수정했습니다. 아래 코드를 봅시다.

class S3XComBackend(BaseXCom):
PREFIX = "xcom_s3://"
BUCKET_NAME = "custom-xcom-backend"

@staticmethod
def serialize_value(value: Any):
if isinstance(value, pd.DataFrame):
from airflow.operators.python import get_current_context

context = get_current_context()
date_partition = context["execution_date"].strftime("year=%Y/month=%m/day=%d")
run_id = context["run_id"]
key = "data_" + str(uuid.uuid4())
key = f"{S3XComBackend.ENV}/{date_partition}/{run_id}/{key}.json"
s3_client = boto3.client("s3")
s3_client.put_object(
Bucket=S3XComBackend.BUCKET_NAME,
Key=key,
Body=value.to_json(),
ContentType="application/json",
)
value = S3XComBackend.PREFIX + key
return BaseXCom.serialize_value(value)

@staticmethod
def deserialize_value(result) -> Any:
result = BaseXCom.deserialize_value(result)
if isinstance(result, str) and result.startswith(S3XComBackend.PREFIX):
key = result.replace(S3XComBackend.PREFIX, "")
s3_client = boto3.client("s3")
obj = s3_client.get_object(Bucket=S3XComBackend.BUCKET_NAME, Key=key)
data = obj["Body"].read().decode("utf-8")
result = pd.read_json(data)
return result

XCom 데이터는 json, pickle 등 직렬화, 역직렬화가 가능한 표현으로 변환할 수 있으며 json은 직렬화 표현이 가능한 Python primitives(Tuple, Dict, int, bool, str 등)에 한해서 XCom 전송한 대신 pickle은 Python 대부분의 오브젝트를 전송할 수 있습니다. 다만 pickle은 파이썬의 함수와 같은 실행 가능한 오브젝트도 전송이 되므로 악의적인 RCE(Remote Code Execution) 공격이 허용될 수 있으므로 enable_xcom_pickling 옵션을 켜주지 않는 이상 JSON 직렬화가 어려운 데이터를 XCom을 통해 전송할 수 없도록 설계되어 있습니다.

우리는 S3 Custom backend 코드를 구성할 때 데이터 처리를 위해 많이 사용하는 판다스(Pandas) DataFrame에 한하여 직렬화, 역직렬화 처리를 작성하여 DataFrame 전송은 가능하도록 구성했습니다.

TaskGroup

TaskGroup은 이름에서 유추할 수 있듯이 테스크를 그룹화 할 수 있는 기능입니다. 테스크의 동작 방식을 바꾸는 기능은 아니지만 테스크를 볼 수 있는 UI에서 테스크를 추상적인 그룹으로 묶어 깔끔하게 관리할 수 있는 편의 기능으로 테스크의 진행 흐름을 한눈에 쉽게 파악할 수 있도록 도와줍니다. 아래 코드를 봅시다.

from datetime import datetime
from typing import List
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import dag
@dag(...)
def example_dag() -> None:
ops: List[DummyOperator] = []
with TaskGroup(group_id="task_group"):
for i in range(5):
op = DummyOperator(task_id=f"task_{i}")
ops.append(op)
if i > 0:
ops[i - 1] >> op
example_dag = example_dag()

코드에서 반복문이 총 5번 반복하며 task_0 ~ task_4 까지 5개의 테스크를 생성하고 선형적으로 연결시켜 task_group 이름의 테스크 그룹안으로 정의하였습니다. 이렇게 구성된 DAG를 실행하면 아래와 같이 보이게 됩니다.

▲ 생성된 테스크와 테스크 그룹
▲ 최상위 레벨 테스크 UI

예를들어 우리가 구현했던 ETL 처리 DAG는 큰 흐름에서는 위와 같습니다.

TaskGroup을 세부적으로 펼쳐보면:

▲ 조금 더 세부적인 테스크 UI

data_preparation 테스크 하나에도 여러가지 테스크가 포함되어 있습니다.

이를 더 세부적으로 보면:

▲ 가장 세부적인 테스크 UI

위와 같이 복잡한 세부 테스크가 조합되어 이루어진 것을 확인할 수 있습니다.

최종적인 모습

지금까지 소개드린 방식을 모두 종합하여 Airflow2에서 성공적으로 DAG 구성을 마칠 수 있었습니다. 아래 그림을 봅시다.

▲ 최상위 레벨에서 살펴보는 테스크 구조

그림에서 보이는 것과 같이 ETL 처리에 큰 영역 별로 테스크 그룹을 나누었기 때문에 큰 영역에서 작업 처리의 순서와 현황을 한눈에 볼 수 있습니다. 또한 TaskFlow API의 XComArg를 통해 테스크간의 데이터 전송이 이루어지도록 하였기 때문에 테스크, 테스크 그룹의 간선 연결 관계가 데이터 전달 관계와 일치하기 때문에 파이프라인을 진단하거나 분석하기 쉽습니다.

▲ 세부적인 내용을 모두 펼쳐본 테스크 구조

앞선 그림에서 조금 더 세부적으로 테스크 그룹을 펼쳐보면 위 그림과 같이 복잡한 테스크 구성이 나오게 됩니다. S3로 XCom Custom Backend를 구성하더라도 엄연히 테스크 간의 데이터 전송은 Network IO를 필요로 하기 때문에 큰 용량의 데이터 전송을 최대한 줄이는 것이 효율적입니다. 따라서 테스크 가장 앞 단에서 데이터를 수집하는 테스크와, 불필요하거나 유효하지 않은 데이터를 필터링 하는 테스크를 배치하고, 뒤쪽에서는 작은 단위로 가공된 데이터를 처리하는 테스크를 배치하게 되었습니다.

파이프라인 테스트

저는 파이프라인을 구성하는 것 보다 테스트 해보는 것이 더 어렵다고 생각합니다. 우리의 테스트 환경과 전략도 현재는 더욱 발전이 필요한 상황이지만 적용했던 몇 가지 프랙티스를 공유하겠습니다.

유닛 테스트 / 통합 테스트

유닛 테스트는 파이프라인을 각 기능 단위로 테스트할 수 있는 가장 명확하고 쉬운 방법에 속합니다. 이후에 더 자세히 설명하겠지만, 우리는 GitHub 리포지토리에 Airflow scripts와 설정 정보를 관리하고 이를 git-sync를 통해 Airflow와 연동합니다. 따라서 유닛 테스트 환경을 구성했다면 Git actions등의 CI 도구를 통해 유닛 테스트가 통과한 코드만 병합할 수 있도록 Protected branch 룰을 걸어두면 파이프라인 테스트의 최소 요건은 만족하게 됩니다. 아래 그림을 봅시다.

▲ GitHub 리포지토리를 통한 Airflow 배포 과정

우리는 Airflow DAG 관련 작업을 GitHub PR(Pull Request) 기능을 이용해 병합 요청을 하고, 코드 검증 후 병합(Merge)하는 프로세스로 작업이 이루어진다. 이때, PR에서 Git actions를 통해 유닛 테스트를 검증하고 최소 요건을 통과한 PR을 Base branch에 병합하면 Airflow에서 특정 주기로 git-sync를 통해 새로운 코드로 업데이트 하게 되고 신규 업데이트 된 DAG를 랜더링 합니다. 우리는 git-sync 주기를 60초로 잡았고 현재까지 이 주기로 인한 부하 문제가 발생하지 않았습니다.

다만 문제는 몇 가지 Airflow의 기능을 이용하는 함수를 테스트할 때 인데, 이 경우 Airflow 기능을 어떻게 Mocking 하는지가 중요하게 됩니다. 아래 코드를 봅시다.

# conftest.py@pytest.fixture(scope="session")
def mock_airflow_environ() -> Dict:
conn_my_mock = Connection(
conn_type="conn-type",
login="mock-dummy-user-id",
password="mock-dummy-pass",
)
return {
"AIRFLOW_VAR_MY_MOCK": "mock-dummy-variable-value",
"AIRFLOW_CONN_MY_MOCK": conn_my_mock.get_uri(),
}
@pytest.fixture(scope="session")
def dagbag(project_root: Path) -> DagBag:
return DagBag(
dag_folder=project_root / "dags",

주로 Airflow Variable, Connection과 필요에 따라서 통합 테스트를 위한 DAG를 Mocking해야 할 필요가 생기는데, 우리는 위 코드와 같이 Pytest를 통해 Airflow의 공식 테스트 가이드라인에 맞게끔 DagBag를 정의하였습니다.

코드에서 위쪽 함수는 각각 Connection, Variable을 Mocking하는 방법을 나타낸다. Airflow는 AIRFLOW_xxx 형태로 환경 변수를 정의하면 Mocking을 할 수 있는데 (문서 참조), 예를들어 AIRFLOW_CONN_MY_MOCKmy_mock이라는 이름의 Connection을 Mocking하고, AIRFLOW_VAR_MY_MOCKmy_mock이라는 이름의 Variable을 Mocking 할 수 있게 됩니다. 이때 코드의 mock_airflow_environ fixture를 이용해 아래와 같이 patch 해주어야 합니다.

def test_my_task(mock_airflow_environ: Dict):
with patch.dict('os.environ', mock_airflow_environ):
# My test code here.

DagBag를 통한 테스트는 Dag 랜더 과정의 유효성, Dag의 동작 검증을 할 수 있지만 우리는 Dag 랜더 과정에 대한 유효성 테스트만 진행합니다. 앞서 정의한 dagbag fixture를 아래와 같이 이용할 수 있습니다.

def _import_file(module_name: str, module_path: str) -> Any:
spec = importlib.util.spec_from_file_location(module_name, str(module_path))
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
return module
def test_dag_integrity(dagbag: DagBag, mock_airflow_environ: Dict) -> None:
with patch.dict("os.environ", mock_airflow_environ):
dag_paths = glob.glob(os.path.join(dagbag.dag_folder, "*.py"))
for dag_path in dag_paths:
file_name = path.basename(dag_path)
module = _import_file(file_name, dag_path)
dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]
assert dag_objects
for dag in dag_objects:
dag_cycle_tester.test_cycle(dag)

Dag의 동작 검증을 DagBag를 이용해 수행하기 위해서는 Task 혹은 Operator간의 데이터 전송을 Mocking하여 데이터 전송 영역을 제어할 필요가 있는데, XComArg에 대한 Mocking 방법이 명확하지 않아 여전히 숙제로 남아있는 단계입니다. 아래 코드를 봅시다.

@task
def task1() -> str:
return 'hello, world!'
@task
def task2(display_msg: str) -> None:
print(display_msg)
@dag(...)
def my_dag():
msg = task1()
task2(msg)
my_dag = my_dag()

위 코드의 테스크를 일반적인 유닛 테스트 기법으로 테스트 코드를 작성하면 아래와 같습니다.

from unittest.mock import patchfrom dags.my_dag import task1
from dags.my_dag import task2
def test_task1():
result = task1()
assert result == 'hello, world!'
def test_task2():
display_msg = 'abc'
with patch('dags.my_dag.print') as mock_print:
task2(display_msg)
mock_print.assert_called_once_with(display_msg)

하지만, 예상과 다르게 이 코드는 동작하지 않습니다.

self = <airflow.models.xcom_arg.XComArg object at 0x7ff008759ac0>
other = 'hello, world!'
def __eq__(self, other):
> return self.operator == other.operator and self.key == other.key
E AttributeError: 'str' object has no attribute 'operator'

이유는 @task 데코레이터를 포함해 정의한 함수는 DecoratedOperator로 동작하며, XComArgs로 감싼 오브젝트를 반환하게 됩니다. 따라서 return_value를 비교하기 위해서는 테스크의 반환 값의 XComArgs를 벗겨내서 비교를 할 필요가 있습니다. 아래 코드를 봅시다.

def test_task1(dagbag: DagBag):
dag = dagbag.get_dag('my_dag')
task1 = dag.get_task('task1')
ti = TaskInstance(task=task1, execution_date=datetime.now())
result = task1.execute(ti.get_template_context())
assert result == 'hello, world!'

위와 같은 방식으로 테스크의 값을 검증할 수 있지만, 테스트 구성이 복잡한 것을 볼 수 있습니다. (Task 동작을 위한 Dag와 Context, XCom를 사용하고 각자 상태를 가지기 때문에 테스트가 Deterministic하게 동작하기 위해 노력이 듭니다.) 테스크를 테스트하는 방법에 대한 명확한 가이드라인이 없어 이 부분에 대한 Best Practice를 찾아나서고 있는 상태이며, 현재는 테스크 함수 안쪽에서 사용하는 함수에 대해서만 유닛 테스트를 통해 커버하고 있습니다.

로컬 테스트

유닛 테스트는 테스트 정의가 명확하고 기능 각각 세부적인 동작을 검증해줄 수 있지만 전체적인 파이프라인의 동작을 검증해줄 수 없기 때문에, DAG를 직접 실행시켜 결과를 검증해야하는 경우가 존재합니다. 이를 위해 docker-compose로 구성한 Airflow를 구성하여 각 작업자가 자신의 로컬 환경에서 Airflow를 직접 구동하여 테스트를 하도록 정책을 가지고 있습니다.

이때, Airflow Connection, Variable의 각종 인증 정보는 Vault에 관리하고 있는데, 이 Vault로 부터 AIRFLOW_xxx 형태의 환경 변수를 통해 도커 내부에 Connection, Variable 설정해주는 스크립트를 통해 실제 구동되고 있는 Airflow 환경과 유사한 환경을 로컬 환경에 구성하여 테스트를 수행할 수 있도록 구성되어 있습니다.

Airflow 생태계에서는 Airflow Breeze라고 부르는 Test environment 구성을 위한 프로젝트가 진행되고 있는데, 이 프로젝트로 CI에서 동작하는 테스트와 로컬 테스트 환경을 일치 시킬 수 있는 방법이 없는가 찾고 있는 단계입니다.

FAQ

이 포스트에 더 많은 Airflow 도입 배경, 사용 사례를 공유하고 싶지만 포스트가 너무 길어져 다음에 세부적인 별도 포스트로 담고자 합니다. 아래는 간략하게 자주 물어볼만한 질문에 대한 답변을 정리했습니다.

  1. Airflow는 MWAA와 같은 Managed인가요? 직접 관리하나요?

인프라팀에서 Kubernetes를 통해 배포하도록 관리합니다. 결과적으로 볼 때 Managed에 비해서 운영 요금이 “매우” 저렴하고, Airflow configure 설정을 GitHub와 연동하여 자동화 하는 등 커스터마이즈 측면에서 이점이 있었습니다.

2. Airflow에서 Secretive value는 어떻게 관리하나요?

뤼이드에서는 Valut를 통해 암호나 토큰과 같은 민감 정보를 관리하고, 인프라에서 배포하는 순간에 airflow-cli를 통해 Variable, Connection 등을 설정합니다. 또한 Airflow 내에서 민감 정보는 secret_xxx, password_xxx 패턴으로 Variable로 정의하면 value가 암호화되고 UI에서 *****로 마스킹 처리되어 안전하게 관리할 수 있습니다.

3. Airflow를 관리하면서 어려운 점은 없었나요?

셋업하고 지금까지 중간중간 문제점이 있었지만 점진적으로 해결하고 개선하고 있습니다. 가장 어려웠던 점은 Develop, Stage, Production 단계를 GitHub 브랜치로 나누고 각각 Airflow 서버가 동작하는데, 이때 서로 다른 브랜치의 코드가 완전히 독립적이라(브랜치의 조상이 달라) 유지보수 하는게 어려웠습니다. 최근에는 Develop, Stage, Production의 조상이 동일하고 Develop → Stage → Production으로 순차적 병합을 진행하고 각 환경의 ENV에 따라 Airflow DAG 코드 내에서 동작을 달리 하도록 처리하였습니다.

4. Airflow 1에서 Airflow 2로 마이그레이션 하는 것이 어렵지 않았나요?

개인적으로 Airflow는 하위 호환성을 꽤나 신경쓰고 있는 프로젝트입니다. 따라서 대부분의 Airflow 1 코드는 Airflow 2에 동작하기 때문에 버전 업그레이드로 인한 Crash 이슈는 적은 편입니다. 그 이후 점차적으로 코드를 Airflow 2 스타일로 변환하는 순서로 마이그레이션을 진행하면 되므로 부담이 덜한 편입니다.

마이그레이션을 고려하고 있다면 아래 문서를 참고해봅시다.

https://airflow.apache.org/docs/apache-airflow/stable/upgrading-from-1-10/index.html

코드 기여

뤼이드는 오픈 소스에 열려있고 외부 오픈 소스 프로젝트에 문제점이 발견되었고 문제점 개선이 회사 업무에 도움을 준다면 기여로 이어질 때가 있습니다. 아래 링크는 Airflow 2를 사용하던 당시 발견되었던 대용량 XCom 로그를 전송하는 기능을 제어할 수 없었던 불편함을 개선하는 이슈와 PR (Pull Request) 입니다.

Issue: https://github.com/apache/airflow/issues/18264

Pull Request: https://github.com/apache/airflow/pull/19378

Airflow 프로젝트 외에도 뤼이드 데이터 팀에서는 오픈 소스에 문제점이 파악되면 간간히 이슈 및 PR을 통해 개발자 생태계에 기여하고 있습니다.

끝으로

이 포스트에서는 뤼이드에서 Airflow를 운영하는 사례와 겪고 있는 어려움, 고민들을 정리했습니다. 만약 포스트 내용에 흥미가 있고 함께 챌린지를 함께 풀어나가고 싶다면 데이터 팀에 조인하시기를 적극 추천합니다.

▼ Riiid Data Engineer JD가 궁금하다면? ▼

--

--

Sungmin Han
Riiid Teamblog KR

MLOps Lead / Quizium PO at Riiid | Google Developer Experts(GDE) for AI/ML/Cloud | GDG Golang Korea Organizer