DBT와 Airflow 도입하며 마주한 7가지 문제들

Kim Dojin
당근 테크 블로그
25 min readJan 29, 2024

안녕하세요. 당근 데이터 가치화팀의 Henry예요.

데이터 가치화팀의 비전은 매일 데이터를 통해 사용자를 위한 의사결정을 하는 것이고, 그러기 위해서는 신뢰할 수 있는 사용자에 대한 정보가 많아야 해요. 당근 사용자들에 대한 신뢰할 수 있는 정보들을 많이 생산하기 위해서는 (1) 사용자들에 대한 정보를 이해하고 정의할 수 있는 도메인 지식과 (2) 데이터를 모델링하고 변환하고 데이터 퀄리티를 체크하고 데이터 간 의존성을 확인해서 신뢰성 있는 정보를 만들 수 있는 데이터 엔지니어링 역량이 필요하다고 생각했어요.

데이터 엔지니어링 역량과 도메인 지식을 갖춘 구성원들이 많다면 신뢰할 수 있는 정보를 만들고 사용하기가 쉽겠지만, 현실적으로는 도메인 지식을 가지고 있는 구성원들 비해 데이터 엔지니어링 역량이 있는 구성원들은 적어요. 그렇기 때문에 도메인 지식을 갖고 있는 구성원이 데이터 엔지니어링 역량이 없더라도 데이터 엔지니어링 작업을 수행할 수 있도록 도움을 주는 것이 더 확장성 있는 방안이라고 생각했고, DBT와 Airflow라는 도구를 문제 해결의 매개체로 활용하고자 했어요. 그 과정에서 여러 엔지니어링 문제를 만나고 해결했는데, 그 내용을 정리해서 공유하고 싶다는 생각해서 이 글을 썼어요.

데이터 가치화 팀에서 DBT와 Airflow를 활용하면서 마주친 문제는 총 7개에요. 차근차근 하나씩 살펴볼게요.

  1. 어떤 구조로 DBT 프로젝트를 구성할 것인가?
  2. DBT 모델을 어떻게 저장할 것인가?
  3. DBT와 Airflow를 어떻게 integration 할 것인가?
  4. DBT 모델을 어떤 프로세스로 개발하고, 테스트할 것인가?
  5. 다른 파이프라인과 DBT 모델 간의 의존성은 어떻게 챙길 것인가?
  6. 국가별로 같은 모델에 대한 정의를 어떻게 할 것인가?
  7. DBT 모델의 백필은 어떻게 할 것인가?

1. 어떤 구조로 DBT 프로젝트를 구성할 것인가?

DBT 프로젝트를 구성하는 기본 단위는 모델이에요. 처음 DBT 프로젝트를 구상할 때, 여러 모델을 어떤 구조로 배치하고 연결해야 하는지 고민했어요. 어떤 구조로 모델을 배치하는 것이 좋을까 생각할 때, 사내 구성원이 어떤 흐름으로 사용자에 대한 데이터를 얻는지 살펴봤어요. 왜냐하면 DBT 프로젝트는 사내 구성원이 데이터를 활용하는 방식을 자연스럽게 담아야 한다고 생각했기 때문이에요. 살펴보니 구성원들은 다음과 같은 흐름으로 사용자에 대한 정보를 얻었어요.

  1. 앱이 보낸 데이터를 사람의 행동 단위로 치환해요.
  2. 사용자의 행동을 여러 방법을 사용해서 살펴봐요.
  3. 사용자 행동을 통해 사용자가 어떤 상태인지를 추론해요.

사내 구성원이 사용자 정보를 파악하는 과정을 참고해서 당근의 사용자 정보를 base, dimension, fact라는 이름의 계층으로 구조화했어요. 다음에 mart, metric, semantic layer와 같은 계층도 필요하면 추가할 예정이지만, 처음 시작하는 것이기 때문에 당근 사용자들의 정보 그 자체를 모으는 것에 더 집중했어요.

DBT 프로젝트 구조의 다이어그램

Base는 Source(원천 데이터)와 1:1 대응되는 계층이에요. 이 계층에서는 필요한 컬럼들을 추출하고, 데이터 타입과 네이밍을 컨벤션에 따라서 통일했어요. DBT 공식 문서의 Staging과 동일하다고 볼 수 있어요. Base 계층을 설계할 때는 아래와 같은 것들을 고려했어요:

  • {데이터 소스}_{데이터 이름}로 네이밍해서 어떤 원천 소스를 활용했는지 알 수 있게 했어요
  • tag를 활용해서 해당 모델은 어떤 국가에서 활용되는지 파악할 수 있게 했어요
# 예시 base.yml
- name: db_something_some_table
description: no description
config:
tags: ["kr", "database"]
-- 예시 db_something_some_table.sql
SELECT
CAST(id AS STRING) AS user_id,
CAST(is_leader AS BOOLEAN) AS is_leader,
(CASE status_code
WHEN 1 THEN "NEW"
WHEN 2 THEN "REACTIVATED"
WHEN 3 THEN "ACTIVATED"
ELSE "DEFAULT"
END) AS user_status,
CAST(created_at AS TIMESTAMP) AS created_at,
FROM {{ source('db_something', 'some_table') }}

Dimension은 User, Product와 같은 특정 개체의 속성값을 갖고 있는 계층이에요. 예를 들어, 사용자의 상태나 생성일자와 같은 정보를 의미해요. Dimension 계층은 Base 계층의 모델을 참조해요.

Fact는 사용자의 정보를 X가 Y를 했다라는 형태로 표현된 계층이고, Dimension과 동일하게 Base 계층의 모델을 참조해요. Fact에는 “사용자가 메시지를 보냈다”와 같은 간단한 사용자 정보부터 리텐션 지표에 사용될 수도 있는 “사용자가 처음으로 화면에 진입했다”, 비즈니스 로직이 포함된 “사용자가 액티브하게 피드를 사용했다”와 같은 복잡한 사용자 정보들을 표현하려고 했어요.

  • 모델의 오너를 명시해서 오너십을 명시했어요.
  • 모델과 스키마의 설명을 작성해서 문서화가 될 수 있게 했어요.
  • tag에는 해당 모델이 어떤 주기로 실행되어야 하고, 어느 국가에서 돌아가야 하는지 명시했어요. 해당 정보는 Airflow에서 참고해요.
  • 모델의 결과물이 언제 만들어졌는지, 언제 업데이트되었는지와 같이 항상 있어야 하는 칼럼은 meta column으로 정의했어요.
  • 각 칼럼에 대해 필요시 test를 추가해서 data의 quality를 챙길 수 있도록 구성했어요.
# 예시 fact.yml
- name: sampleUser_sent_message
description: sampleUser가 메시지를 보냈다
meta:
owner: henry@daangn.com
config:
tags: ["daily-kr"]
cluster_by: ["..."]
columns:
- name: event_id
description: 이벤트 ID
tests:
- unique:
config:
where: "DATE(event_timestamp) = DATE('{{ var('target_date') }}')"
- not_null:
config:
where: "DATE(event_timestamp) = DATE('{{ var('target_date') }}')" - name: ...
- name: user_id
description: 유저 ID
- name: message_category
description: 보낸 메시지의 카테고리
- <<: *meta_columns
-- 예시 sampleUser_sent_message.sql
SELECT
event_id,
user_id,
message_category
...
FROM {{ ref('db_something_some_table') }} AS something
JOIN another_thing USING (user_id)
WHERE DATE(event_timestamp) = DATE('{{ var("target_date") }}')

UNION ALL

...

Fact나 Dimension 계층은 정의에 따라서 같은 계층의 모델들도 참조할 수 있어요. 다만, 참조할 때 모델간 의존성이 DAG(Directed Acyclic Graph)를 이루게 해서 순환되는 의존성은 없도록 했어요.

2. DBT 모델을 어떻게 저장할 것인가?

Base, Dimension, Fact계층으로 구조화하기로 한 다음 각 모델을 어떤 방식으로 저장할 건지 고민했어요. 모델을 항상 다 저장하는 것은 비교적 간단하지만, 모든 모델을 다 저장하면 데이터의 스토리지 비용이 많이 들고 모델을 다시 구할 때마다 비용이 발생해요. 그래서, 각 계층의 역할과 사용성을 고려해서 materialized 방식을 다르게 지정했어요.

Base는 필요한 데이터 추출, casting 혹은 aliasing만 하므로 원천 데이터가 변경되는 경우에 유연하게 대응할 수 있도록 view로 구성했어요.

# dbt_project.yml
base:
+enabled: true
+materialized: view
+tags: ["base"]
...

Dimension 계층부터는 JOIN, WHERE 등 복잡한 연산이 들어갈 수 있어요. 개체의 속성값을 표현하기 때문에 많은 컬럼을 지닐 수는 있으나 개체의 수만큼의 row 수를 갖게 되기 때문에 row 수는 비교적 적을 수 있어서 table로 materialize 했어요.

# dbt_project.yml
dimension:
+enabled: true
+materialized: table
+tags: ["dimension"]
+persist_docs:
columns: true
...

Fact에도 복잡한 연산이 들어갈 수 있고, 시간 흐름에 따라 데이터가 지속해서 추가 돼서 데이터 크기가 커요. 데이터 크기가 크고, 시간 단위로 데이터를 나눌 수 있기 때문에 데이터 조회와 적재 효율성에서 이점을 얻기 위해 incremental로 materialize했어요. 기본적으로 날짜 단위로 데이터를 파티셔닝하고, 모델이 실행될 때 데이터가 덮어씌워지는 insert_overwrite 방식을 사용했어요.

  • DBT에서 incremental 모델을 활용할 때 sql에서 주로 is_incremental() macro를 활용해요. 이 macro를 사용하면 모델을 처음 돌릴 때 full refresh처럼 실행해요. 당근은 데이터가 많기 때문에 최초로 모델을 실행할 때 전체 데이터를 계산하게 되면 비용이 많이 발생할 수 있어요. 그래서, incremental 모델에서는 vars를 활용해서 데이터의 파티션을 무조건 지정해서 데이터를 가져오도록 했어요.
# dbt_project.yml
fact:
+enabled: true
+materialized: incremental
+incremental_strategy: insert_overwrite
+on_schema_change: append_new_columns
+partition_by:
field: DATE(event_timestamp)
data_type: date
granularity: day
time_ingestion_partitioning: false
copy_partitions: true
+require_partition_filter: true
+persist_docs:
columns: true
+tags: ["fact"]
...
-- 예시 sampleUser_sent_message.sql
SELECT
...
FROM {{ ref('something') }}
WHERE DATE(event_timestamp) = DATE('{{ var("target_date") }}')

-- DBT에서 incremental materialize할때 사용되는 방법이지만,
-- 1) Airflow로 DBT 모델 실행시 정확한 날짜를 전달 할 수 있고
-- 2) 최초 모델 실행할때 full refresh하기에 당근은 데이터가 너무 많기에
-- vars로 갖고올 데이터의 파티션을 명시해서 갖고 오도록 했어요.
-- 밑의 구문은 DBT에서 incremental materialization시 자주 사용되는 패턴
-- {% if is_incremental() %}
-- WHERE event_timestamp > (select max(event_timestamp) from {{ this }})
-- {% endif %}

추가로, 모델들을 설정할 때 clustering, persist_docs, on_schema_change와 같은 설정을 구성했어요.

  • clustering은 더 효율적으로 모델들이 활용하기 위해서 추가했고, clustering을 잘 설정하면 쿼리 성능을 높이고 쿼리 비용을 줄일 수 있어요.
  • persist_docs는 BigQuery 테이블에도 description 들이 보일 수 있게 해서 모델을 BigQuery에서 조회할 때 충분한 참고 할만한 정보가 있을 수 있게 했어요.
  • on_schema_change는 모델이 변경할 때 대한 설정이고, append_new_columns로 설정해서 스키마가 추가되는 경우에 컬럼이 append 될 수 있도록 했어요. DBT 기본 설정은 ignore이고, incremental 모델에서 로직이 변경되서 새로운 컬럼이 추가되어도 실제로는 생성이 안 돼요.

당근은 BigQuery를 Data Warehouse로 활용하고 있기 때문에 BigQuery에 호환되도록 DBT 모델들을 설정했어요.

참고: DBT BigQuery configuration, BigQuery clustering

3. DBT와 Airflow을 어떻게 integration 할 것인가?

당근 사용자에 대한 최신 정보를 얻으려면 DBT 모델을 주기적으로 실행해야 해요. DBT 자체에는 DBT 모델을 주기적으로 실행하는 기능이 없어요. 당근에서는 주기적으로 작업을 실행하는 도구로 Airflow를 사용하고 있기 때문에 DBT를 Airflow에 연동하기로 했어요. 이 연동 작업은 다음과 같은 요구사항을 만족해야 해요.

  1. 모델 간 의존성을 Airflow 내에서 파악할 수 있어야 함.
  2. 모델별로 어느 시간 간격으로 주기적으로 실행할지 다르게 설정할 수 있어야 함.
  3. 모델이 실행되어야 함.

단순 DBT core 기능만으로는 위의 요구사항을 만족하기 어려웠어요. 처음에는 직접 DBT 모델을 파싱하는 로직을 만드는 것도 고려했다가 리서치하는 도중에 DBT 모델을 Airflow task로 쉽게 변환해 주는 astronomer-cosmos를 발견했어요. PoC를 하고 나서 위 요구사항을 만족할 수 있겠다고 생각해서 활용했어요.

astronomer-cosmos는 DBT 모델을 파싱하면서 의존성을 파악하고 Airflow task로 그려줘요. 또한 dbt run, dbt test 명령어를 하나의 task group 내에서 실행할 수 있도록 여러 모델을 task group 단위로 묶어줘요. select exclude 파라미터에는 DBT tag나 model path를 지정하면 파이프라인에서 원하는 모델만 필터 혹은 필터아웃 할 수 있어서, Airflow DAG 당 필요한 부분만 필터해서 활용했어요.

astronomer-cosmos로 Airflow task로 렌더링한 UI
# 예시
sample_task = DbtTaskGroup(
group_id='sample_task',
dbt_project_name='sample',
dbt_root_path='sample',
conn_id='sample',
select={
'paths': ['models/sample'...],
'configs': ['tags:daily-kr'...],
},
exclude={
'paths': [...],
'configs': [...],
},
dbt_args={
...
'vars': {
'target_date': '{{ data_interval_start }}',
...
},
},
)

astronomer-cosmos 패키지는 릴리즈된 지 얼마 되지 않은 패키지다 보니 필요한 기능이 부족한 경우도 있고, 최적화가 완전히 되어있지는 않아요. 하지만, astronomer에서 매우 활발하게 컨트리뷰션 하고 있어서 빠르게 변화하고 있어요. 최신 내용은 공식 repository를 참고해 주세요.

4. DBT 모델을 어떤 프로세스로 개발하고, 테스트할 것인가?

도메인 지식이 있는 사내 구성원도 쉽게 DBT 모델을 개발할 수 있는 것이 DBT와 Airflow 환경 구축의 목표였어요. 기존에 DBT를 데이터 가치화팀에서만 활용할 때는 로컬 환경에서 DBT 모델을 정의하고 SQL을 작성하고 실제 DBT 모델을 Run하고 Test를 했었어요. 더 많은 구성원이 DBT 모델을 로컬에서 개발하면 로컬 환경 구축부터 권한, 작업에 대한 가시성, 실수로 비효율적인 쿼리를 돌릴 수 있는 문제가 발생할 수 있어요. 그래서 구성원이 DBT모델을 개발하고 실행하고 테스트 할 수 있는 별도의 테스트 환경을 구축했어요.

당근 데이터 파이프라인 github repository에서는 기존에도 Pull Request를 열게 되면 테스팅 환경이 PR별로 배포되고 있어요. 구성원이 DBT 모델에 대한 정의와, SQL을 작성하고 Pull Request를 열게 되면 다음의 step으로 모델을 테스트하게 돼요.

PR을 오픈하면 나오는 Testing Airflow, DBT document 환경
  • PR별로 독립된 테스트 환경으로 Airflow와 컴파일된 DBT 모델의 dbt docs가 배포돼요. 이 과정에서 DBT 모델의 정의, 의존성 등을 잘못 설정하면 CI 혹은 dbt docs에서 인지할 수 있게 돼요.
독립된 테스트 환경에 배포된 Airflow와 DBT docs
DBT 모델에 대한 CI
  • DBT Test 파이프라인에 접속해서 테스트하고 싶은 모델 정보를 Trigger w/ config로 추가해요.
DBT Test 파이프라인에서 DBT 모델을 트리거 하는 UI
  • DBT모델이 컴파일되고, dry run으로 모델 쿼리가 실행돼요. Dry run으로 컴파일된 모델 쿼리가 실제 잘 작동하는지 알 수 있고 컴파일된 코드가 몇 Byte의 데이터를 스캔하는지도 알 수 있어요. 디폴트로 모델 쿼리의 데이터 스캔양이 1TB를 넘으면 작업이 실패하도록 해서 실수로 너무 큰 데이터를 스캔하는 걸 방지할 수 있게 했어요. 물론, 실제로 1TB가 넘는 데이터를 사용해야 하는 경우가 있기 때문에 강제로 실행하는 flag 추가가 가능하게 했어요.
  • dbt run, dbt test가 실행이 돼요. dbt run의 결과물을 별도의 _test suffix가 있는 데이터셋에서 확인할 수 있고, dbt test가 잘 통과했는지 확인할 수 있어요.

모든 과정에 문제가 없으면 파이프라인이 정상적으로 돌아요. 너무 큰 데이터를 스캔하는 경우나 올바르지 않게 모델을 정의한 경우, 모델을 잘 정의했지만 test가 실패하는 경우 중간에 에러가 나요.

DBT Test하는 DAG

별도의 DBT를 테스트할 수 있는 환경을 구축해서 dbt에 대한 설정을 구성원이 별도로 하지 않을 수 있게 되었어요. 또한 테스트할 때 실수로 실제 프로덕션 데이터를 덮어씌우거나 큰 쿼리를 돌려서 비용이 많이 나오는 문제를 사전에 방지할 수 있게 되었어요.

5. 다른 파이프라인과 DBT모델 간의 의존성은 어떻게 챙길 것인가?

DBT와 astronomer-cosmos 패키지를 활용하면 DBT 모델 간 의존성은 자동으로 Airflow task로 표현이 되지만, DBT 모델에서 참조하고 있는 원천 소스에 대한 의존성까지는 자동으로 챙겨지지 않아요. 만약 원천 소스가 적재되기 전에 DBT 모델이 실행된다면 데이터가 일부 비어서 온전하지 않을 수 있어요. 그래서, DBT 모델을 통해 계산된 결과물의 데이터가 온전해지려면 DBT 모델에서 바라보고 있는 원천 소스가 잘 적재되고 나서 모델이 실행되어야 해요.

당근 팀은 배치와 스트리밍 파이프라인 여러 개를 운영하고 있고, 대부분의 원천 소스의 적재는 Airflow에서 구성한 파이프라인이 담당하고 있어요. 원천 소스를 적재하는 파이프라인은 일관된 네이밍 컨벤션을 갖고 있어요. 그리고 DBT 프로젝트에서 원천 데이터와 source를 정의하고 있기 때문에 DBT 모델에서 어떤 원천 소스를 참조하고 있는지 알 수 있어요. 그래서 해당 정보를 활용해서 DBT 모델이 원천 소스가 적재되는 다른 파이프라인에 의존성을 갖도록 구성했어요.

Airflow에는 Sensor라는 기능이 있어서 다른 파이프라인의 작업이 완료되는 것을 기다릴 수 있어요. 그중에서 ExternalTaskSensor를 활용해서 원천 소스를 적재하는 파이프라인이 완료되는 것을 확인하고 DBT 모델이 실행될 수 있도록 했어요.

다른 Airflow DAG를 DBT model에서 sensor하는 UI

ExternalTaskSensor를 구현할 때 고려했던 부분은 DBT 모델이 자신이 바라보고 있는 원천 소스가 적재되면 바로 실행이 될 수 있게 하는 것이었어요. DBT 모델이 자신이 어떤 원천 소스를 바라보고 있는지 알기 위해서는 모델별로 upstream으로 올라가서 원천 소스를 알아내야 했어요.

-- 예시
-- user_actively_used_feed fact
SELECT * FROM {{ ref('user_scrolled') }}
UNION ALL
SELECT * FROM {{ ref('user_created_item') }}

-- user_scrolled fact
SELECT * FROM {{ ref('event_impressed') }} WHERE ...

-- user_created_item
SELECT * FROM {{ ref('db_article') }} WHERE ...

--
[db_article, event_impressed]
>> [user_scrolled, user_created_item]
>> user_actively_used_feed
-- 해당 fact의 원천 소스는 db_article와 event_impressed
-- event_impressed, db_article이 적재되고 나서 DBT 모델들이 실행되어야 함

원천 소스별로 Airflow 파이프라인을 바라보는 ExternalTaskSensor들을 만들어놓고, DBT모델에서 해당하는 sensor에 의존성을 갖도록 구성했어요. 예를 들어, 위 예시에서 user_actively_used_feed fact가 실행되기 위해서는 db_article, event_impressed 적재가 돼서 해당 데이터셋들이 적재되기를 기다리는 ExternalTaskSensor가 완료되어야 해요.

for (
task
) in sample_dbt_task_group.children.values():
if not isinstance(task, CosmosTaskGroup):
continue

# DBT모델이 Airflow내의 다른 DAG를 sensing할 수 있도록
# source이름을 확인해서 적절한 파이프라인을 찾고
# ExternalTaskSensor를 생성해서 모델에 의존성 추가
sensor_task_ids = get_sensor_task_ids(...)
task_sensor_deps = [
sensor_tasks.get(task_id) for task_id in sensor_task_ids
]

# Airflow내 다른 파이프라인에 의존성이 있다면 sensing하고, 없다면 skip
if len(task_sensor_deps) > 0:
task_sensor_deps >> task

6. 국가별로 같은 모델에 대한 정의를 어떻게 할 것인가?

당근은 글로벌 서비스이기 때문에 모델링을 할 때 경우에 따라 국가별로 다른 특성의 데이터를 고려 해야 하고, 주기적으로 스케줄링하는 파이프라인은 경우에 따라 각 국가나 도시의 타임존을 고려해야 해요. DBT 프로젝트 및 모델을 구성할 때 글로벌 데이터를 어떻게 다룰지 고민했어요. DBT 모델을 국가별로 프로젝트를 구성해서 정의하게 되면 중복이 많아지고 특정 국가는 모델이 추가가 안 되는 등 사용성과 관리의 문제가 발생할 수 있다고 생각했어요. 그래서 최대한 DRY(Don’t Repeat Yourself)로 모델링하기 위해 하나의 프로젝트로 모든 모델이 관리되지만 각 국가에 맞는 로직이 커스텀하게 실행될 수 있도록 dbt vars와 dbt tag, jinja를 최대한 활용하는 것으로 구성했어요.

같은 DBT 모델이지만 국가별로 다른 타임존에 맞춰서 실행되어야 하는 경우 모델에 tags를 활용해서 구분했어요. tags에 국가코드를 작성하면 자동으로 Airflow에서 국가 타임존에 맞춰서 실행되는 파이프라인에서 해당 tag가 있는 모델들만 선택해서 실행해요.

- name: sampleUser_sent_message
description: sampleUser가 메세지를 보냈다
meta:
owner: henry@daangn.com
config:
tags: ["daily-kr", "daily-..."]
cluster_by: ["..."]

같은 DBT 모델 내에서 국가마다 다른 데이터 특성으로 인해 다른 비즈니스 로직을 반영해야 하는 경우에 vars와 jinja를 활용해서 SQL에서 분기했어요. SQL 파일에 jinja가 많을수록 가독성이 떨어지고 모델의 복잡성이 올라갈 수 있어요. 그래서, 기본적으로 복잡한 비즈니스 로직이 필요한 코어 모델은 데이터 가치화팀에서 최대한 미리 만들었고, 필요시 여러 SQL에서 반복되어서 나타나는 로직들은 macros를 활용해서 SQL문을 단순화했어요.

-- 예시 sampleUser_sent_message.sql
SELECT
event_id,
user_id,
message_category
{% if var('country_code') is 한국 %}
// 한국 관련 로직
{% else %}
// 다른 국가 관련 로직
{% endif %}
FROM {{ ref('db_something_some_table') }}
...

7. DBT 모델의 백필은 어떻게 할 것인가?

일반적으로 DBT에서 모델의 데이터를 백필하기 위해서는 full refresh라는 방식으로 전체 데이터를 백필해요. 하지만, 당근에는 데이터가 워낙 많기 때문에 모든 모델에 대해 full refresh하는 건 비용 대비 효용 떨어져요.

그래서, 원하는 날짜에 대해서만 데이터를 백필할 수 있도록 별도의 백필 파이프라인을 만들었어요. 구성원이 백필하고 싶은 (1) 모델의 이름과 (2) 날짜 범위를 입력하면 반자동으로 백필이 되도록 했어요. 모델마다 백필해야 하는 날짜를 다르게 지정하는 경우가 있다 보니 Airflow dynamic task를 활용해서 DBT 모델마다 다른 날짜 범위에 대한 데이터가 백필될 수 있게 했어요. Dynamic task를 사용하면 하나의 task 내에 여러 mapped task가 만들어져서 각 task 별로 실행과 실패 기록을 확인할 수 있게 했어요. 예를 들어, 2023–11–01, 2023–11–30으로 설정하면 30개의 mapped instance가 나와요.

Backfill DAG

앞으로의 계획

DBT with Airflow

현재는 당근 구성원이 사용할 수 있는 사용자 정보를 최대한 많이 개발하는 작업을 하고 있어요. 당근에서의 DBT는 이제 시작이고 DBT와 Airflow 프로젝트가 더 고도화된다면 다음처럼 되길 기대해요.

  1. 믿을 수 있는 사용자 정보가 늘 존재하여 데이터 조회가 매우 쉬워요.
  • 사용자 정보를 얻기 위해 여러 JOIN이나 WHERE 조건, 데이터 클랜징를 하지 않고 또는 타팀에 질문하지 않고 원하는 데이터를 조회할 수 있는 걸 목표로 해요.

2. 도메인 지식이 있는 구성원이 재사용되는 사용자 정보를 더 쉽게 만들 수 있어요.

  • 데이터 엔지니어링 지식이 없어도 데이터 간 의존성, 스케줄링, 데이터 퀄리티 테스트, 관측성 등이 갖춰진 데이터 파이프라인을 만들 수 있는 것을 목표로 해요.

그리고 이를 달성하기 위해서는 몇 가지의 기술적 도전 과제들이 있어요.

  1. DBT 모델의 확장성 문제
    DBT모델이 많아질수록 모델 간 의존성이나 관리가 어려워질 수 있고, 파이프라인의 성능 문제가 발생할 수 있어요.
  2. 데이터 가치화팀의 다른 데이터 제품과의 유기적인 연동
    실험플랫폼, Karrotmetrics와 같은 데이터 가치화팀 제품과 호환이 될 수 있어야 해요. 사용자 행동 정보를 한번만 정의하고 이를 각 제품에서 사용할 수 있게 해서 데이터의 일관성과, 데이터 관리의 효율성을 높이길 기대해요.
  3. 더 많은 구성원이 사용할 수 있도록 DBT 모델을 개발하고, 테스트하고, 백필하는 환경의 개선
  4. Base, Dimension, Fact 이외 필요한 형태의 구조 추가 개발
  5. 사용자 정보를 편리하게 찾을 수 있게 하기 위한 Discovery의 고도화

이러한 문제들을 같이 해결해 나갈 Software Engineer, Data와 Data Scientist, Decision 그리고 Data Analyst 분들을 기다리고 있어요.

추가로, 서론에서 언급했던 데이터 가치화 팀 비전에 대해 자세히 알고 싶은 분은 “창업자처럼 일하는 데이터 팀 빌딩 3년 로드맵 만들기” 글을 참고해주세요.

--

--