Apache Airflow 운영 경험과 에러대응

buzzz
29CM TEAM
Published in
15 min readFeb 13, 2024

안녕하세요. 29CM Data Platform 팀 데이터 엔지니어 윤원진입니다. 저희 팀은 데이터 파이프라인 ETL 작업에 Composer(Apache Airflow)를 활용하고 있습니다. Airflow는 파이프라인을 스케줄링하는 강력한 Workflow 도구이지만, 운영 중에 확인해야 할 사항이 많고 가끔은 예기치 못한 에러에 직면하기도 합니다. 이번 포스팅에서는 팀에서 Airflow를 운영하며 겪은 경험과 고도화한 방법, 그리고 에러 대응했던 경험에 관해 이야기하려고 합니다.

Composer(Apache Airflow) 환경

출처 : https://cloud.google.com/composer/docs/composer-2/environment-architecture?hl=ko

들어가기에 앞서, 현재 데이터 플랫폼 팀의 Airflow 환경입니다. 저희는 GCP 환경에 DW를 구축하여, GCP 환경과 강력하게 통합되는 Composer 서비스를 사용하고 있고 Composer 환경은 2.5.3 버전을 사용하고 있습니다.

Cloud Composer 아키텍처를 살펴보면 Private Service Connect를 통해 비공개 IP 환경에서 VPC 피어링을 사용하지 않고 내부적으로 통신합니다.

비공개 IP 환경 아키텍처는 위 아키텍처로 구성됩니다. Tenant 프로젝트는 Cloud SQL 인스턴스 및 Cloud SQL 스토리지를 호스팅하고 있으며, Cloud SQL 서비스를 제외한 다른 모든 구성 요소는 사용자 프로젝트에 호스팅 됩니다. Airflow 스케줄러와 작업자는 구성된 Private Service Connect Endpoint를 통해 Airflow 데이터베이스에 연결하여 데이터 보안을 강화합니다.

현재 저희 팀은 Composer를 통해 Airflow 인프라 관리에 대한 리소스를 최소화하고 코드 품질 및 운영과 고도화에 중점을 두고 있습니다. 이어서 Airflow 운영 중 고도화한 경험에 대해 더 자세히 설명하겠습니다.

Airflow 운영 고도화

1. Airflow Pool & Deferrable 연산자

Deferrable 연산자(이하 Trigger)를 활용하여 클러스터 리소스를 효율적으로 관리하고 유연하게 작업을 처리할 수 있습니다. Airflow pool은 작업의 동시 실행 수를 제한하여 시스템 리소스 소비를 효율적으로 관리하고, 예기치 않은 부하를 방지합니다.

Trigger는 Pool에 할당된 작업을 Trigger에 할당함으로써 작업에 필요한 필요한 Pool Worker 슬롯을 차지하지 않고 별도의 Trigger 파드에서 관리됩니다. 이렇게 함으로써 다른 작업이 Pool 슬롯을 사용할 수 있게 되며, 이는 Pool이 제한된 상황에서도 비동기식으로 작업을 처리할 수 있게 해줍니다.

deferrable 트리거를 적용

예를 들어 Pool이 부족하여 특정 작업을 수행하기 위해 대기해야 할 때, 해당 작업에 Trigger를 적용하면 유연하게 다른 작업이 실행될 수 있습니다. 이는 클러스터의 전반적인 사용률을 최적화하고 작업을 효과적으로 분산시킬 수 있습니다. 따라서 Pool과 Trigger를 조합하여 Airflow를 사용하면 리소스 사용을 더욱 효율적으로 조절하고 유연하게 Task 처리를 할 수 있습니다.

deferrable Trigger 연산자 기능은 Cloud Composer 버전 2.0.31 이상 및 Airflow 버전 2.2.5 이상에서 활용할 수 있습니다.

GCP Composer 2(≥2.0.31)부터 제공하는 기능을 활용하여, 저희 팀은 유연하게 Airflow Task가 처리할 수 있도록 deferrable을 통해 Trigger를 사용하여 Airflow에서 작업 실행을 세분화하고 효율적으로 처리하고 있습니다.

deferrable Trigger 연산자의 동작 방식

아래는 Trigger의 동작 원리에 대한 설명입니다.

  1. 작업 시 태스크가 실행되면 Airflow 작업자 슬롯을 차지합니다. 해당 태스크는 작업을 다른 서비스에 위임하는 역할을 합니다.
  2. 작업 위임 및 대기 작업은 작업 식별자(BigQuery 작업 ID 등)를 Airflow Trigger에 전달합니다.
  3. Trigger는 작업이 완료될 때까지 작업을 모니터링합니다. 이때 작업자 슬롯이 점유되지 않으며, 비동기식 아키텍처를 통해 수백 개의 작업을 처리할 수 있습니다. 작업이 완료되면 해당 이벤트를 트리거하여 다음 단계로 진행합니다.
  4. Airflow 작업자가 작업 완료 시 콜백을 실행합니다. 콜백은 작업을 성공으로 표시하거나 다른 작업을 실행하며, Trigger가 다시 모니터링하도록 설정합니다.

Deferrable Trigger를 사용하는 경우에는 Pod가 생성되어 kubectl 명령어를 통해 Pod를 확인할 수 있습니다.

위와 같이 Trigger Pod에 대한 Grafana 모니터링을 설정하여 대시보드를 통해 CPU 및 Memory 사용률을 확인하고 있습니다. 또한 Composer 서비스에서도 Trigger Pod에 대한 모니터링 대시보드 기능을 제공하므로, 선택적으로 모니터링을 생성하여 확인할 수 있습니다.

2. Airflow 웹 서버 재기동 시간 단축

Composer Scheduler 및 웹 서버 재기동 시 기동시간이 약 10분에서 15분까지 걸리는 현상이 발생했습니다. 데이터 팀 특성상 매시간 배치가 실행되는 Task도 있었기 때문에 재기동 되는 시간 동안 Task가 Fail 처리되는 현상이 발생했습니다.

문제의 원인은 재기동 시 Scheduler 및 웹 서버를 재기동 시 Airflow에 작성된 모든 DAG를 다시 파싱해야 하므로 기동 시간이 오래 걸리게 되는 현상이었습니다. 그러나 Airflow 2.0 이후 버전부터는 웹 서버에서 DAG 파일을 동기화하지 않아도 DAG Serialization을 통해 작성한 DAG 코드를 Airflow UI에서 확인할 수 있습니다.

아래는 DAG Serialization 과정입니다.

Airflow 2.x Dag Serialization 동작은 아래와 같습니다.

  1. Scheduler의 DagFileProcessorManager 프로세스가 신규, 수정된 DAG 파일을 체크하여 읽습니다.
  2. 읽은 DAG 파일을 DagFileProcessorProcess가 직렬화하여 DB에 저장합니다.
  3. 웹서버에서 직렬화된 DAG를 DB에서 읽어 역 직렬화하여 DagBag를 생성하여 UI에 표시합니다.

또한 DAG 파일의 처리(직렬화, 역 직렬화)는 DagFileProcessorManager, DagFileProcessorProcess를 통해 진행되고 각 역할은 아래와 같습니다.

DagFileProcessorManager

  • 새롭게 추가되거나 수정된 DAG 파일을 체크합니다.
  • DagFileProcessorProcess 처리한 결과를 수집하고 로깅합니다.

DagFileProcessorProcess

  • DAG 파일 Python 모듈을 로드하고 실행합니다(Scheduler).
  • DAG 파일을 직렬화하여 데이터베이스에 저장합니다(Scheduler).
  • 직렬화된 DAG를 역 직렬화한 후 DagBag을 생성합니다(Web Server).

즉, Scheduler는 DagFileProcessorProcess를 통해 DAG 파일을 파싱하고 JSON 형식으로 직렬화한 후 Metadata DB에 SerializedDagModel 모델로 저장합니다. 이를 통해 파일을 동기화하지 않아도 DAG 파일을 확인할 수 있습니다.

에러가 발생한 당시 Airflow는 DAG 코드 내에서 작성한 Custom Operator에서 DAG 하위의 모듈을 불러오도록 작성되었습니다. 그러나 Airflow 2.x 버전에서는 웹 서버가 DAG 파일을 가져오지 않기 때문에 작성한 Custom Operator에서 DAG 하위의 모듈을 불러올 수 없어 Import Module Error 에러가 발생했습니다.

이를 해결하기 위해 웹 서버의 환경 변수를 수정하여 Airflow 1.x와 같이 DAG 파일들을 로드하도록 수정했습니다. 그러나 모든 DAG를 로드하기 때문에 웹 서버 재기동 시간이 약 10~15분이 소요되는 문제가 있었습니다.

현재는 Airflow DAG 내에서 Custom Operator에서 DAG 하위의 모듈 종속성을 제거 후 웹 서버에서 DAG 파일을 로드하지 않도록 롤백하여 웹 서버 재기동 시간이 3분 이내로 단축되었습니다.

여기까지 Airflow 운영 고도화 작업의 일환으로, 이어서 Airflow 운영 중에 발생한 에러 대응 경험을 자세히 설명하겠습니다.

Airflow 운영 에러 대응

1. Queue 무한 대기 현상

DAG 내 실행 중인 프로세스에서 Task가 큐 대기 상태에서 넘어가지 않고 worker로 전환되지 않는 현상이 발생했습니다. 해당 Task를 Clear 후에도 여전히 큐에서 무한히 대기하고 있었습니다. 문제를 해결하기 위해 해당 Task를 Fail 처리 후 Retry 하여 정상적으로 큐로 이동되었습니다. 이러한 문제는 간헐적으로 발생하여 문제의 원인을 찾아보았습니다.

해당 에러는 Airflow에서 Task가 Scheduler에 의해서 실행 대기열에 추가되고 Task는 큐를 통해서 Worker로 넘어가게 되는데 Scheduler 사이의 네트워크 통신 에러 등의 이유로 CeleryExecutor의 State의 상태를 갱신하지 못해 Queue 무한 대기 현상에 빠지게 됩니다.

해당 문제를 해결하기 위해 Airflow 내 Configuration(celery.stalled_task_timeout, celery.task_adoption_timeout)값을 조정하여 문제를 일시적으로 해결할 수 있으나 완전한 해결 방법이 되지는 못했습니다.

SELECT
dag_id,
run_id,
task_id,
queued_dttm
FROM
task_instance
WHERE
queued_dttm < current_timestamp - interval '30 minutes'
AND state = 'queued'

하지만 Airflow 2.6 버전부터는 큐 대기 작업에 걸린 Task를 감지하는 논리적 프로세스(위와 같은 쿼리형태)를 Scheduler의 별도의 프로세스로 이동하여 별다른 Configuration 설정 없이 문제가 해결되었습니다. 따라서 Composer 업그레이드를 통해 큐 무한 대기 현상이 해결될 것으로 생각됩니다.

2. 비정상 노드 할당으로 인한 Pod 종료

저희는 Celery Executor와 GKE Pod 기반으로 Airflow Worker를 운영하고 있습니다. 이는 DAG가 많고 빠르게 처리되어야 하는 Task들이 있고, 이를 여러 작업으로 분산하여 수행할 필요가 있기 때문입니다.

각 Pod에서 작업하는 코드들은 데이터 가공 작업에만 집중하고 있어서 작업을 각 Pod에서 효율적으로 분산하여 수행할 수 있도록 이와 같은 구성을 택했습니다. 작업이 완료되고 변환 작업이 정상적으로 완료된 후에는 Pod가 정상적으로 종료되어야 합니다. 그러나 위 스크린샷에서와 같이 에러가 발생했습니다.

위 에러는 Airflow에서 Task 실행 시 Pod가 Pending 되거나 종료되지 않은 상태로 남아 Task가 결국 Fail 처리되는 현상이었습니다.

문제의 원인은 정상적으로 종료되지 않는 노드에서 발생한 것입니다. 이는 종료되지 않은 노드에 Pod가 할당되어 있어서 Pod가 지속적으로 Pending 상태로 대기하거나 정상적으로 종료되지 않는 상황을 의미합니다. 저희 팀은 해당 노드를 비정상적인 노드로 판단하여 문제 해결을 위해서 장애 노드를 제거 후 다른 노드로 실행하려 했습니다.

장애 노드를 제거하려 했으나 GCP Autopilot Mode의 경우 권한이 존재하지 않아 장애 노드만 제거하는 것은 불가능했습니다.

Autopilot Mode에서의 장애 노드의 제거가 불가능하고 Pod의 역할은 단순 데이터 가공이기에 Pod가 종료되어 다른 노드에서 Pod가 실행되어도 큰 문제는 없다고 생각하여 아래와 같이 장애 노드에 대해 Drain을 적용하였습니다.

kubectl drain <gk3-worker-nap-ioe540p2-41b8951a-ojp0> --ignore-daemonsets

Drain 후에는 장애 노드 내 생성된 파드가 없어 해당 노드가 정상적으로 제거가 되었고, Task를 진행하는 Pod는 정상 노드에 실행되어 Task를 정상적으로 수행할 수 있었습니다. GCP Autopilot에서 Drain 적용 시 동작 순서는 아래와 같습니다.

  • 노드에서 기존 Pod를 제거하고 해당 노드에서 워크로드 예약을 중지합니다.
  • 컨트롤러에서 제거된 워크로드(Task)를 다른 노드에서 자동으로 다시 생성합니다.
  • Standard 클러스터의 경우 노드에 남아 있는 모든 워크로드를 종료하고 향후 노드를 복구하거나 다시 만듭니다.
  • Autopilot을 사용하면 GKE는 노드를 즉시 종료하고 교체하며 구성된 모든 PodDisruptionBudgets을 무시합니다.

해당 문제를 해결하기 위해 dag_id를 라벨로 가지는 노드가 생성되도록 tolerations 설정을 하여 DAG마다 다른 노드에서 작업이 수행되어 노드가 주기적으로 생성 및 제거될 수 있도록 진행할 예정입니다.

3. Apache Airflow UI 파싱 에러

Airflow UI에서 특정 DAG의 코드를 확인할 때, 다른 DAG 코드가 호출되는문제였습니다.

문제의 원인은 Airflow 내 직렬화된 Metadata DB의 fileloc에는 다른 DAG 정보가 포함되어 있었습니다. DAG를 다른 DAG에서 재사용하는 경우, Airflow 2.4.0 버전에서 업데이트된 DAG Auto_register로 인해 다른 DAG의 파일 경로(fileloc)가 반영하게 됩니다.


amplitude_app_downstream = ExternalTaskSensor(
task_id=f"{AMPLITUDE_EXTERNAL_DAG_ID}_downstream",
timeout=3600,
mode="reschedule",
check_existence=True,

# AS-IS
external_dag_id=AMPLITUDE_EXTERNAL_DAG_ID
# TO-BE
external_dag_id=DagIdEnum.amplitude # amplitude_app 호출 ,

failed_states=[State.FAILED],
execution_date_fn=lambda x: x - timedelta(hours=1),
)

그러나 DAG의 ID를 가져올 때는 수동으로 입력한 DAG_ID를 사용하기 때문에, 문제를 해결하기 위해 DagIdEnum 클래스를 생성하여 미리 정의된 DAG_ID를 사용하기로 결정했습니다. 이렇게 함으로써 외부 DAG에서 external_dag_id의 ID를 가져오는 대신에 새로 생성한 DAG Enum Class에서 코드를 호출하여 정상적으로 작동하는 것을 확인했습니다.

마치며

Airflow를 효과적으로 운영하기 위해서는 다양한 문제에 신속하게 대응하고 정확하게 파악하는 것이 중요합니다. 저희 팀이 겪은 경험을 통해 미리 문제에 대비할 수 있었으면 좋겠습니다. 긴 글을 읽어주셔서 감사합니다.

함께 성장할 동료를 찾습니다

29CM (무신사)는 3년 연속 거래액 2배의 성장을 이루었습니다. 이러한 배경을 바탕으로 높은 사용자 경험을 제공하면서 비즈니스 성장을 함께 할 프로덕트 디자이너 동료를 찾습니다. 많은 지원 부탁드립니다!

🚀 29CM 채용 페이지 : https://www.29cmcareers.co.kr/

레퍼런스

--

--