มาทำ Data Lineage โดยใช้ DataHub กัน ! (พร้อมวิธีทำอย่างละเอียดและตัวอย่าง)

Burasakorn Sabyeying
Mils’ Blog
Published in
6 min readJul 9, 2022

บทความนี้เราจะมาเล่าอีก concept หนึ่งในการทำ data ซึ่งเป็นส่วนหนึ่งของ Data governance ด้วย นั่นคือ Data Lineage เราจะเริ่มเล่าตั้งแต่ Data Lineage คืออะไร จนถึงตัวอย่าง implementation จริงในการสร้าง Data Lineage จริงที่เราใช้

Data Lineage คืออะไร?

Data lineage includes the data origin, what happens to it and where it moves over time — Wikipedia

Data Lineage คือการ map การเดินทางของ data จากจุดเริ่มต้นไปถึงการใช้งานปลายทาง เริ่มต้นตั้งแต่ดึง data source เป็นอะไร, เก็บไว้ที่ไหน, ถูก process ด้วยอะไร, จนไปถึงการใช้งานว่าออก chart และ dashboard อะไร

ดังเช่นภาพด้านล่าง

ภาพนี้ทำให้เราเห็นภาพว่า flow เริ่มจาก datasource ที่เก็บไว้ที่ s3 และใช้ airflow ในการควบคุมเรื่อง split task จนทำให้เกิด data ที่กระจายไปทั่ว s3 จนสุดท้ายเก็บ dataset ไว้ที่ snowflake จำนวน 3 dataset จนสุดท้าย 3 dataset นั้นถูกนำไปใช้สร้าง dashboard ด้วย Looker

เราสามารถเข้าใจ Flow การไหลของ Dataเพียงแค่ดู visualization

ทำไม Data Lineage ถึงสำคัญ?

  • เราสามารถ track data ได้ง่ายขึ้น หากมี data change เราก็สามารถเริ่ม track ได้ หรือหากจุดไหนที่เกิดปัญหา เราก็สามารถดูได้ว่า dataset นั้นถูกปั่นจากอะไร และจุดไหนเป็นปัจจัยได้บ้าง
  • คนที่ต้องการใช้ dataset สามารถรู้ว่าที่มาที่ไปของข้อมูล ยิ่งรู้ว่า data source มาจากที่ไหน จะทำให้เข้าใจ context มากขึ้น
  • คนที่ได้ประโยชน์จากสิ่งนี้ ไม่เพียงแต่คนที่ implement flow อย่าง Data Engineer, แต่รวมถึง data user อย่าง Data Analyst ที่ต้องการทำ dashboard, หรือ Data Scientist ก็ตาม

คราวนี้มาถึงว่า เราจะใช้อะไรในการทำ Data Lineage ได้บ้าง

ตัว tools หลักในบทความนี้ เราจะใช้ DataHub เป็นหลัก ความจริงแล้ว DataHub นั้นเป็น tools สำหรับการทำ metadata platform รวมถึงการทำ Data Discovery/ Data Catalog ด้วย ซึ่งเราเคยเขียนเอาไว้ในลิ้งค์นี้แล้ว ตัว Data Lineage ก็เป็นอีกฟีเจอร์ที่ตัว DataHub เองก็ provide ให้เหมือนกัน

ประเภทของ Lineage มีกี่แบบบ้าง

  1. Dataset Lineage
  • คือแบบ Dataset → Dataset
  • Support natively Snowflake (access_history), BigQuery (audit log)
ตัวอย่างจากตัว demo ของ DataHub โดย BigQuery -> BigQuery

2. Pipeline Lineage

  • คือแบบ DataJob → dataset เพื่ออธิบายว่า Job อะไรเป็นคนรันเพื่อให้เกิด dataset นี้
  • supported natively for Airflow เมื่อเราเซ็ตให้ Airflow ส่ง DataJob ไปหา DataHub

3. Chart Lineage

  • แบบ Chart → Dataset เพื่ออธิบายว่า dataset อะไรถูกใช้ใน chart
BigQuery table to Tableau Chart

4. Dashboard Lineage

  • แบบ Dashboard → chart เพื่ออธิบายว่า Chart นั้นเป็นส่วนหนึ่งใน dashboard ไหน
Tableau chart to Tableau Dashboard

ในบทความเราจะเน้นตัว Pipeline Lineage เป็นหลัก นั่นคือการที่ Airflow ส่งข้อมูล job ที่รัน โดยบอกว่า job นั้นเกี่ยวข้องกับ dataset ใด ซึ่งมีวิธี set หลายวิธีมากๆ

Airflow Integration

3 ท่าในการทำ

  1. Plugin for Amazon Managed Workflows for Apache Airflow (MWAA)

2. Lineage Backend (เราจะใช้ท่านี้)

3. Emitting lineage via a separate operator

ท่า Lineage Backend

ความจริงแล้ว Airflow นั้น support การใช้ lineage เป็นดั้งเดิมอยู่แล้ว เพียงแต่จะเลือกว่าใช้ library ใดในการใช้เท่านั้นเอง ในการ setup จำเป็นต้อง setup ทั้งพาร์ท infrastructure และ code ฝั่ง Airflow

1. Setup infra

1. Kubernetes

เนื่องจากเราใช้ Kubernetes เป็น backbone ของ Airflow จึงต้องเพิ่ม environment variables 2 ตัวใน values.yml เพื่อบอกว่าใช้ Lineage Backend ของ DataHub นะ และด้วย config อะไร เช่น วิ่งไปหา DataHub ด้วย connection ที่ชื่อว่า datahub_rest_default

extraEnv: |
- name: AIRFLOW__LINEAGE__BACKEND
value: "datahub_provider.lineage.datahub.DatahubLineageBackend"
- name: AIRFLOW__LINEAGE__DATAHUB_KWARGS
value: "{ \\"datahub_conn_id\\": \\"datahub_rest_default\\", \\"capture_ownership_info\\": true, \\"capture_tags_info\\": true, \\"graceful_exceptions\\": true }"

Owner กับ Tag ใน Airflow จะถูก added ใส่ DataHubโดยอัตโนมัติเป็น metadata เพราะใส่ capture_ownership_info กะ capture_tags_infoเป็น true

ในกรณีที่อยากใช้ Docker Compose สำหรับ dev บน local ก็สามารถใช้ไฟล์ที่ทาง DataHub provide ให้ได้

2. Docker-compose local

- รัน Docker compose ฝั่ง DataHub (พอดีเราใช้เครื่อง M1)

datahub docker quickstart --quickstart-compose-file ./docker-compose-without-neo4j-m1.quickstart.yml

หากงงขั้นตอนนี้ วิธีใช้ setup แบบละเอียดอยู่ในบทความก่อนหน้า

- รัน Docker compose ฝั่ง Airflow

โดย docker compose ไฟล์นี้จะแตกต่างตัว Airflow เฉยๆ เพราะะมีการเซ็ต env ทั้ง 2 ตัว และ network เพื่อให้ airflow กับ datahub สามารถคุยหากันได้

example:

version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-acryldata/airflow-datahub:latest}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
AIRFLOW__LINEAGE__BACKEND: 'datahub_provider.lineage.datahub.DatahubLineageBackend'
AIRFLOW__LINEAGE__DATAHUB_KWARGS: '{ "datahub_conn_id": "datahub_rest_default", "capture_ownership_info": true, "capture_tags_info": true, "graceful_exceptions": false }'
networks:
- datahub_network

ไฟล์เต็ม: https://github.com/datahub-project/datahub/blob/master/docker/airflow/docker-compose.yaml

แล้วรัน

docker-compose -f docker-compose-datahub.yaml up

เมื่อไม่ใช้แล้วก็ Docker down

docker-compose -f docker-compose-without-neo4j-m1.quickstart.yml down
docker-compose -f docker-compose-datahub.yaml down

สร้าง Airflow Connection (Both K8s and Docker)

2. Setup code

ท่า Pipeline Lineage จะเป็นเพียงแค่การใส่ parameter ที่ชื่อ inlet และ outlet ภายใน task เพื่อบอกว่า ก่อนและหลัง execute task นั้นเกี่ยวข้องกับ dataset อะไร

อย่าลืมต้องลง lib acryl-datahub ด้วย

ตัวอย่าง code:

from datahub_provider.entities import Datasettask1 = BashOperator(
task_id="run_data_task",
dag=dag,
bash_command="echo 'This is where you might run your data tooling.'",
inlets={
"datasets": [
Dataset("snowflake", "mydb.schema.tableA"),
Dataset("snowflake", "mydb.schema.tableB"),
],
},
outlets={"datasets": [Dataset("snowflake", "mydb.schema.tableC")]},
)

โดยใน dataset เขาจะพยายามให้ใส่ format <db>.<schema>.<table>

DAG result จะเห็นเป็นดังนี้

และหากเราเซ็ต lineage backend ได้ถูกต้องแล้ว เราจะเห็น result ใน DataHub ว่ามี Airflow เกิดขึ้น และมีหน้าตาแบบด้านล่าง

ตัวอย่างโค้ด: datahub_lineage_backend_demo

DataFlow vs DataJob

สิ่งที่เกิดขึ้นคือ ข้อมูลที่ Airflow จะส่งไปหา DataHub มีอยู่ 2 object คือ DataFlow และ DataJob

โดย 2 ตัวนี้จะแตกต่างกัน DataJob จะเป็นส่วนหนึ่งใน DataFlow (= Job เป็น subset ใน Flow)

และเนื่องด้วย DataHub จะมีระบุ object ต่างๆผ่านสิ่งที่เรียกว่า URN, URN ของ Data Flow และ DataJob จึงเป็นดังนี้

An example DataFlow URN:

{"urn": "urn:li:dataFlow:(airflow,product_dup,prod)"}

An example DataJob URN:

{"urn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,product_dup,prod),ingest_data.get_category)"}

จะเห็นได้ว่า DataJob จะระบุตัว DataFlow ข้างในด้วย

Real life Use Case

จนมาถึงจุดนี้ เราก็สามารถทำ Data Lineage ได้แล้ว แต่ในเคสจริงที่ทีม TILDI CJ เราทดลองกับ pipeline ที่ใช้ปัจจุบันนั้นซับซ้อนกว่านั้น

จะเห็นได้ว่า dataset ที่ต้องใช้คือ Google Cloud Storage

ซึ่ง ณ ขณะนี้ ทาง DataHub ยังไม่สร้าง source ตัว Google Cloud Storage (GCS) แต่ support blob storage อื่นๆทั้ง S3, Azure Data Lake Storage (Gen1+2) นะ แต่ทิ้งน้อง Google ไว้ลำพัง

ดังนั้นเราจึงไปลองหา Source ที่น่าสนใจ จนกระทั่งเจอ Data-Lake ที่น่าจะดูใกล้เคียงกับสิ่งที่อยากได้มากที่สุดแล้ว

ดังนั้นเราจึงต้องลง lib เพิ่ม

pip install acryl-datahub[data-lake]

กลับมาที่ Pipeline ปัจจุบันของเรา ซึ่งหน้าตาประมาณนี้

Airflow Pipeline

อธิบายคือ Pipeline เราจะเริ่มตั้งแต่เก็บข้อมูลโดยการ scraping จากหน้า website ไปเก็บที่ Data Lake (GCS)โดยจะถูกแบ่งออกเป็น 3 zones: landing, transforming และ Serving จนไปถึง data mart ปลายทางที่ BigQuery

จึงแบ่ง stage ออกเป็นดังนี้:

  1. Landing:

เมื่อข้อมูลถูก scrape มาเราจะเก็บ as raw data ไว้ที่ landing ทันที และเมื่อเก็บเสร็จก็จะ validate data ว่าข้อมูลที่ได้มีความถูกต้องไหม (Data Quality) เช่น datatype เป็นเป็น int, float, string หรือ column ที่ได้ครบถ้วนรึเปล่า

  • input: Website
  • output: Landing path in GCS

2. Transforming

stage นี้เราจะ clean หรือ process data โดยใช้ Spark ผ่าน SparkKubernetesOperator และ SparkKubernetes Sensor โดยที่

  • input: Landing path in GCS
  • output: Transforming path in GCS

3. Serving

stage นี้เราจะดึงเฉพาะ field ที่ Data Analyst จะต้องใช้ โดยใช้ Spark SQL

  • input: Transforming path in GCS
  • output: Serving path in GCS

4. BigQuery table

เคสเราจะมีเก็บข้อมูล serving ใน BigQuery ด้วย โดยจะ upsert data ชุดใหม่เก็บเข้า table ที่กำหนดไว้แล้ว

  • input: Serving path in GCS
  • output: BigQuery format <project_id>.<dataset>.<table_name>

เมื่อเข้าใจแต่ละ stage แล้วจึงปรับมาเป็นโค้ด โดยจะโชว์ตัวอย่างในส่วน Transforming คือ

bucket_name = "<bucket_name>"
project_path = "<path>/<to>/<table>"
landing_path = f"{bucket_name}/landing/{project_path}"
transform_path = f"{bucket_name}/transforming/{project_path}"
datahub_env = "DEV"
sensor_transform = SparkKubernetesSensor(
task_id="spark_pi_monitor",
kubernetes_conn_id="kube_spark_conn",
application_name="{{ task_instance.xcom_pull(task_ids='transform_data.pyspark_transform')['metadata']['name']}}",
inlets={"datasets": [Dataset("data-lake", landing_path, datahub_env),],},
outlets={"datasets": [Dataset("data-lake", transform_path, datahub_env),],},

)

Edit เพิ่ม (( 11-Feb-2023))
ใน DataHub versionใหม่ๆจะเปลี่ยนวิธีเขียน inlet, outlet

inlets=[Dataset("data-lake", inlet_path, datahub_env)],       
outlets=[Dataset("data-lake", outlet_path, datahub_env)]

และเมื่อเรา implement เสร็จ จึงได้ result ที่ DataHub แบบนี้

DataHub Result

อีกรูป แบบ compare กัน

Compare between Airflow and DataHub

Note และ Experiment อื่นๆ:

  • ใช้ value ของ Data-Lake จะใช้ gs:// นำหน้าไม่ได้ ไม่งั้นจะ datahub UI (หากใช้เป็น s3 จะสะดวกหน่อย ก็ระบุเป็น s3://xxx/yyy ได้ปกติ )
  • ปัญหาที่เจอคือบาง operator เช่น DummyOperator จะไม่โชว์ชื่อ task ในหน้า UI แต่เราไม่ซีเรียสตรงจุดนี้เพราะ DummyOperator คือ dummy จริงๆ ไม่ได้ process data แต่เป็นเพียงตัวเสริมความเข้าใจใน Airflow DAG เฉยๆ จึงไม่ใช่ตัวที่ต้องคิดใน Data Lineage
  • หากเราเปลี่ยน inlet/outlet เป็นตัวอื่น ตัว dataset จะไม่โชว์ Lineage อีกต่อไป แต่จะยังเห็น dataset นั้นได้อยู่ทุกที่

e.g เปลี่ยนDataset("s3", "bucket/mydb/schema/tableA") เป็น dataset ตัวอื่น (Dataset("adlsGen1", "bucket/mydb/schema/tableA"),ก็ไม่สามารถกดปุ่ม lineage ในหน้า lineage ได้อีกต่อไป

เห็น Lineage เป็นสีเทาๆบางๆ

และจะยังมี dataset อยู่ ไม่หายไปไหน

S3 ยังคงมี 3 ตัวเท่าเดิมในหน้า Home
  • ไม่จำเป็นต้องสร้าง dataset เตรียมไว้ก่อนก็ได้ แต่สร้างจาก lineage (inlet, outlet ได้เลย) และ dataset จะเป็นข้อมูลที่ไม่มีอยู่จริงก็ได้

e.g. กรณีนี้ s3 ไม่มีอยู่จริง

  • เมื่อเรากดเข้าไปดูใน dataset ที่เราสนใจ เราจะเห็น DataJob ที่เกี่ยวข้องกับ dataset นั้นทั้งหมด

เช่น หากเรามี 2 DAGs

DAG แรกมี task ชื่อ run_data_task โดยใช้ dataset ตรง landing กับ transforming

DAG ที่ 2 มี task ชื่อ run_data_task_1, run_data_task_2, run_data_task_3 โดยจะยุ่งกับ dataset ทั้ง landing, transforming, serving, และ BigQuery table

เมื่อเรากดดู Lineage เราจะเห็น task ของทั้ง 2 DAGs เพราะทั้ง 2 ใช้ dataset ร่วมกัน

สรุป

ในบทความนี้เราสรุปถึงความหมายของ Data Lineage ว่ามันคืออะไร และ implement ผ่าน DataHub โดยใช้ Airflow Integration ที่เรียกว่า Lineage Backend เพียงแค่กำหนด inlet และ outlet ใน Airflow task และระบุว่า Airflow จะไปเรียก DataHub ได้ยังไง

DataHub นั้นไม่เพียงแต่ทำเรื่อง Data Catalog แต่ทำให้เราสามารถเห็นว่า dataset นั้นถูกสร้างมาได้อย่างไร และ track การไหลของข้อมูลได้ง่ายขึ้น

ในตัวอย่างมี pipeline จริงที่ทีม Data Engineer ของ TILDI CJ สร้างไว้เพื่อให้ทุกคนเห็นภาพมากขึ้น มากกว่าตัว demo ที่ทาง DataHub ให้มา ซึ่งเราว่าน่าจะมีประโยชน์แก่คนที่ได้ลองเล่น Lineage ของ DataHub แล้วเหมือนกัน

หากมีข้อสงสัย หรือคำแนะนำสามารถติดต่อเข้ามาทางเพจ https://www.facebook.com/mesodiar ได้นะคะ

และสุดท้าย หากอยากเป็นส่วนหนึ่งในการสร้าง pipeline คูลๆ สร้าง Data Lineage, Data Quality, Data Observability และหลายอย่างอีกมากมาย ก็สามารถติดต่อมาได้เช่นกันค่ะ We are hiring !!!

--

--

Burasakorn Sabyeying
Mils’ Blog

Data engineer at CJ Express. GDE in Cloud. Women Techmakers Ambassador. Co-lead GDG Cloud Bangkok. Other channel > Mesodiar.com