[Airflow] ย้ายไฟล์จาก GCS ไป BigQuery ด้วย GoogleCloudStorageToBigQueryOperator กัน !

Burasakorn Sabyeying
Mils’ Blog
Published in
3 min readJan 24, 2021

สวัสดีค่ะทุกคน วันนี้เราจะมาเล่น Operator ตัวนึงของ Airflow กัน !
โดยวันนี้เราจะนำเสนอเจ้าตัว GoogleCloudStorageToBigQueryOperator (ชื่อยาวมาก)

ด้วยเหตุที่ว่า เรามีโจทย์ในการดึง data เข้า BigQuery ค่ะ โดยที่เราจะ write file ไว้บนเครื่อง local ก่อนและจะนำเข้า BigQuery

อันที่จริงเราก็จะเขียน CSV file แล้ว upload ผ่าน bq command โดยตรงด้วย

bq load

ก็ได้

แต่เจ้ากรรมนายเวร BigQuery ที่รักเขาติด limit รับไฟล์ได้ไม่เกิน 10MB ค่ะ

ดังนั้นเราก็จะมีตัวช่วยอีกตัวมาคั่นกลาง คือ Google Cloud Storage นี่เอง โดยเราจะให้ GCS เป็นตัวพักไฟล์ เพราะเขาจะช่วยทำให้ไม่จำกัดขนาดของไฟล์ที่เราเขียนในการจะย้ายเข้า BigQuery ค่ะ

สรุป Pipeline ของเราก็จะเป็นประมาณนี้

Write csv file > Send to GCS > Send GCS file to BigQuery

โดยการ upload file ขึ้น GCS เราใช้วิธีเขียน script เอา (วันนี้นางไม่ใช่พระเอก ขอข้ามไปก่อน แฮะๆ)

def upload_blob(bucket_name, source_file_name, destination_blob_name):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)print(
"File {} uploaded to {}.".format(
source_file_name, destination_blob_name
)
)

ต่อไปจะเป็นส่วน Airflow โดยที่พระเอกวันนี้ของเราคือ GoogleCloudStorageToBigQueryOperator

โดยเราจะเขียนเป็น

from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperatorGCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id=’gcs_to_bq’,
bucket=’mils-bucket’, #ชื่อ bucket
source_objects=[
f’demo/data/ga_event_20200123.csv’], #ชื่อ ไฟล์บน GCS
destination_project_dataset_table=f’mils_bq.ga_event_20200123', #ชื่อ <Bigquery dataset>.<ชื่อ table>
source_format=’CSV’,
schema_fields=schema_ga_event,
create_disposition=’CREATE_IF_NEEDED’,
write_disposition=’WRITE_TRUNCATE’,
bigquery_conn_id=’my_gcp_connection’,
autodetect=False,
dag=dag
)

Note: ชื่อ table บน BigQuery เป็น format `%Y%m%d` นะคะ เช่น 20200123 เพื่อให้มันควบรวม table ไว้ กรณีเรา schedule ให้มันรันทุกวัน

คราวนี้ตัวน่าปวดหัวมาแล้ว
เราจำเป็นจะต้อง ใส่ schema type (ชื่อ column) ให้ BigQuery ด้วยค่ะ (ตรง attribute: schema_fields)
มันจะต้องอยู่ในรูปแบบนี้

[
{
"mode": "NULLABLE",
"name": "sessions",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "users",
"type": "STRING"
},
{
"mode": "NULLABLE",
"name": "pageviews",
"type": "STRING"
},
...
]

ซึ่งมันยาวมากกกกกกก ยิ่งถ้าเรามี column เยอะๆจะทำให้ไฟล์ DAG เรายาวมาก
และด้วยความขี้เกียจของเราบวกกับศักดิ์ศรีโปรแกรมเมอร์ของเราแล้ว เราจะไม่เขียนเองค่ะ ! เสียเวลา

เราก็จะใช้ lib ที่ชื่อว่า bigquery-schema-generator เป็นคนสร้าง

โดยเราจะสร้าง schema file เป็น JSON จากไฟล์ csv ที่เรามี data ค่ะ ด้วยคำสั่ง

generate-schema --input_format csv < ga_event_20210123.csv > schema_data/file_schema.json## generate-schema --input_format csv < {ชื่อไฟล์} > {ชื่อไฟล์ JSON}

ที่นี่เราก็จะประกาศไว้ข้างบน ทำให้ task ให้ไฟล์เป็นแบบนี้

### เพิ่มตรงนี้มา อย่าลืม import json นะwith open(f'/usr/local/airflow/scripts/data/schema_data/file_schema.json') as json_file:
schema_ga_event = json.load(json_file)
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='mils-bucket',
source_objects=[
f'demo/data/ga_event_{yesterday}.csv'],
destination_project_dataset_table=f'mils_bq.ga_event_{yesterday}',
source_format='CSV',
schema_fields=schema_ga_event,
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='my_gcp_connection',
autodetect=False,
dag=dag
)
```

และนี่คือไฟล์ DAGแบบเต็มๆ

ที่นี้ค่ะ มาถึงจุดสำคัญอีกสุดคือเราจะติดต่อให้น้อง Airflow ของเรารู้จัก Bigquery ด้วยการสร้าง Connections ค่ะ
ให้เรากดไปที่ Admin > Connections
แล้วกดปุ่ม + สร้าง connections ตัวใหม่ขึ้นมา

เราจะตั้งชื่อเขาว่า my_gcp_connection
โดยจะมี setting ตามนี้

Conn id: my_gcp_connection
Conn Type: Google Cloud Platform
Project id: <ชื่อ project id >
Keyfile Path: <path ไป json key>
Keyfile JSON: <copy ของที่อยู่ในไฟล์ jsonมาแปะ>
Scopes(comma separated): https://www.googleapis.com/auth/cloud-platform
Number of Retries: 1
ตัวอย่างในรูป

ที่นี้เราก็ไปกดรัน DAG ดู และสามารถนำ table ขึ้น BigQuery ได้แล้วเย้ๆๆๆ

คราวนี้เราก็จะเห็น table เข้า BigQuery อย่างสวยงาม ~

ศึกษาเจ้าตัวนี้เพิ่มเติมได้ที่ Airflow Doc เลยนะคะ

ขอบคุณที่อ่านถึงตรงนี้และ Happy Coding ค่ะทุกคน !

ก่อนจาก ฝากน้องเห็ด Airflow ไว้เป็นกำลังใจให้ทุกท่าน

--

--

Burasakorn Sabyeying
Mils’ Blog

Data engineer at CJ Express. Women Techmakers Ambassador. GDG Cloud Bangkok team. Moved to Mesodiar.com