ลองใช้ GCP ทำ Data Engineering Project กัน
สวัสดีครับ วันนี้จะลองมาใช้ GCP (Google Cloud Platform) ทำ Data Engineering Project แบบง่ายๆกัน
แต่ก่อนอื่น ผมขอออกตัวก่อนเลยว่า ผมเป็นมือใหม่สำหรับ Data Engineer มาก โดยความรู้ที่ผมนำมาใช้ทำ project นี้ ก็มาจากคอร์ส Road to Data Engineer 2.0 ของ Data TH ที่ผมเพิ่งได้เรียนไป
หากบทความนี้มีส่วนไหนที่ผิดพลาด ก็ขออภัยมา ณ ที่นี้ด้วยนะครับ :)
โอเค งั้นเรามาเริ่มกันเลย
หน้าที่ของ Data Engineer คือ เป็นคนที่ต้องรวบรวมข้อมูลดิบมาจากแหล่งข้อมูลต่างๆ (Data Source) จากนั้นนำข้อมูลเหล่านั้นมาแปลง, ประมวลผลและทำความสะอาด (Data Cleansing) ให้ข้อมูลมีคุณภาพที่ดีและอยู่ในรูปแบบที่ต้องการ และนำไปเก็บไว้ใน Storage หรือจุดหมาย (Destination) ที่ต้องการ เพื่อเตรียมให้กับคนที่จะนำข้อมูลไปใช้งานต่อ เช่น Data Analyst ที่จะนำข้อมูลไปวิเคราะห์หรือทำ Dashboard/Report เพื่อประกอบการตัดสินใจ, Data Scientist ที่จะสร้าง model จากข้อมูล เป็นต้น
ซึ่งการรวบรวมข้อมูล (Extract), การแปลงและประมวลผลข้อมูล (Transform), การนำไปเก็บไว้ใน storage (Load) เราจะเรียกว่า ETL โดยงานทั้งหมดนี้จะทำให้เป็นอัตโนมัติ (Automate) อยู่ใน Data Pipeline ซึ่งเปรียบเสมือนท่อในการลำเลียงข้อมูลจาก Data Source ไปยัง Destination
สำหรับ project นี้ ผมได้เลือกข้อมูลมาจาก kaggle.com ซึ่งเป็นเว็บที่มีข้อมูลมากมายให้เราได้ลองเล่น
โดยข้อมูลที่ผมเลือกเป็นข้อมูลเกี่ยวกับการขายของร้านกาแฟ ซึ่งประกอบไปด้วย 3 ตาราง ได้แก่ ตาราง customer, ตาราง product, และตาราง transaction
สมมุติว่าเราได้รับ requirement มาจากทาง business ว่าให้เตรียมข้อมูลเกี่ยวกับยอดขาย เพื่อที่จะดูว่าสินค้าไหนขายดีและถูกใจผู้บริโภคบ้าง, ดูจำนวนลูกค้าทั้งหมดที่มาซื้อสินค้า เป็นต้น
ส่วนแรก (ลองดูข้อมูลและทำการ clean)
ในส่วนแรก เราก็จะมาลองดูข้อมูลแต่ละตารางกันก่อน โดยใช้ Google Colab ที่สามารถเขียน code และดูผลลัพธ์ทีละช่องได้
หลังจากดาวน์โหลดข้อมูลมาจาก kaggle ก็ให้อัพโหลดไปที่ colab
จากนั้น import pandas ซึ่งเป็น library ของ python ที่ช่วยจัดการกับข้อมูลในรูปแบบ DataFrame
import pandas as pd
จากนั้นใช้คำสั่ง pd.read_csv เพื่ออ่านข้อมูลของทั้ง 3 ตารางออกมา
- ตาราง customer
customer = pd.read_csv("/content/customer.csv")
customer
- ตาราง product
product = pd.read_csv("/content/product.csv")
product
- ตาราง transaction
transaction = pd.read_csv("/content/transaction.csv")
transaction
หลังจากดูหน้าตาข้อมูลคร่าวๆแล้ว ต่อไปจะตรวจสอบดูว่ามี missing value ในคอลัมน์ใดบ้าง ของทุกตาราง ด้วยคำสั่ง loc และกำหนดเงื่อนไขให้หา null
ถ้าพบคอลัมน์ใดที่มี null ก็จะแสดงคอลัมน์นั้นออกมา
- ตาราง customer
customer.loc[: , customer.isnull().any()]
- ตาราง product
product.loc[: , product.isnull().any()]
- ตาราง transaction
transaction.loc[: , transaction.isnull().any()]
จากการตรวจสอบดูว่ามี missing value ในคอลัมน์ใดบ้าง พบว่ามีแค่คอลัมน์ age ในตาราง customer ที่มี null
ลองดูว่าเจอค่า null ทั้งหมดกี่แถว
customer[customer["age"].isnull()]
เจอค่า null ทั้งหมด 151 แถว
ต่อไปเราจะมาจัดการค่า null กัน โดยจะแทนที่ null ด้วยค่า mean ของคอลัมน์ age ด้วยการ import numpy ซึ่งเป็น library ที่ใช้ในการคำนวณทางคณิตศาสตร์ใน python เพื่อหาค่า mean ในคอลัมน์ age
import numpy as np
mean_age = np.mean(customer.age)
mean_age
จากนั้นทำการแทน null ด้วยค่า mean ที่ได้ ด้วยคำสั่ง fillna และลองดูว่ายังพบ null อยู่ไหม
customer["age"].fillna(mean_age.round(1), inplace = True)
customer[customer["age"].isnull()]
จะเห็นว่า null ได้ถูกแทนที่แล้วเรียบร้อยแล้ว เย้ๆ
จาก requirement ที่ต้องการรู้ว่าสินค้าไหนขายดีและจำนวนลูกค้าทั้งหมดที่มาซื้อสินค้า จึงต้องทำการ merge หรือ join ตาราง transaction เข้ากับตาราง product และ customer เพื่อรวมเป็นตารางเดียวสำหรับทำการวิเคราะห์
final = transaction.merge(product, how = "left", left_on = "product_id", right_on = "product_id") \
.merge(customer, how = "left", left_on = "customer_id", right_on = "customer_id")
final
หลังจาก merge ตารางทั้งหมดเข้าด้วยกันพบว่า ยังไม่มียอดขายรวมของแต่ละ transaction ซึ่งหากสังเกตดูแต่ละ transaction จะมีแค่ quantity (จำนวนที่ซื้อ) กับ unit_price (ราคาสินค้าต่อชิ้น) จึงต้องทำการเพิ่มคอลัมน์ที่มาจาก คอลัมน์ quantity คูณกับ คอลัมน์ unit_price
แต่เนื่องจากคอลัมน์ unit_price เป็น string เพราะมีเครื่องหมาย $ อยู่ ต้องเอาออกและแปลงให้เป็น float ก่อนทำการคูณกับคอลัมน์ quantity
final["unit_price"] = final.apply(lambda x: x["unit_price"].replace("$",""), axis = 1)
final
จะเห็นว่าเครื่องหมาย $ หายไปแล้ว ต่อไปลองเช็ค type ข้อมูลของแต่ละคอลัมน์ดู
final.dtypes
พบว่าคอลัมน์ unit_price เป็นข้อมูลประเภท string อยู่ เราต้องทำการแปลงเป็น float เพื่อใช้ในการคำนวณ จากนั้นทำการเช็คอีกครั้ง
final["unit_price"] = final["unit_price"].astype(float)
final.dtypes
จะเห็นว่าคอลัมน์ unit_price เป็น float เรียบร้อยแล้ว ก็จะสามารถสร้างคอลัมน์ใหม่ตั้งชื่อว่า total_amount ที่เป็นการนำคอลัมน์ quantity (เป็น int) มาคูณกับคอลัมน์ unit_price ได้แล้ว
final["total_amount"] = final["quantity"] * final["unit_price"]
final
ก็จะได้คอลัมน์ total_amount ออกมาซึ่งเป็นคอลัมน์ที่แสดงยอดรวมของแต่ละ transaction
สรุปในส่วนแรก เราได้มีการ clean ข้อมูลแบบเบื้องต้น และมีการ merge หรือ join ข้อมูลเข้าด้วยกัน เพื่อเตรียมนำไปวิเคราะห์ต่อไป
ส่วนที่ 2 (จำลองว่าข้อมูลอยู่ใน database)
เราจะจำลองว่าข้อมูลทั้งหมด 3 ตาราง (ที่ดาวน์โหลดมาจาก kaggle) ได้แก่ ตาราง customer, ตาราง product และ ตาราง transaction อยู่ใน MySQL Database
โดยจะเก็บไว้ในเว็บ https://www.freesqldatabase.com/ ซึ่งเป็นเว็บที่ให้เก็บ database ฟรี โดยเว็บจะส่ง connection ของ database ให้ใน e-mail
ทำการ import ข้อมูลทั้งหมดเข้าไปใน MySQL database
หลังจาก import ข้อมูลแล้ว เราจะมาลองเชื่อมต่อกับ database ผ่าน colab เพื่อดูว่า database สามารถใช้ได้ไหม โดย
ติดตั้ง pymysql สำหรับเชื่อมต่อ MySQL database
!pip install pymysql
สร้าง class config สำหรับเชื่อมต่อ MySQL database โดยใช้ข้อมูล connection ที่ได้มาใน e-mail
class Config:
MYSQL_HOST = "ใส่ข้อมูล host"
MYSQL_PORT = 3306
MYSQL_USER = "ใส่ข้อมูล user"
MYSQL_PASSWORD = "ใส่ข้อมูล password"
MYSQL_DB = "ใส่ข้อมูล db"
MYSQL_CHARSET = "utf8mb4"
เชื่อมต่อกับ MySQL database
import pymysql # เรียกใช้ PyMySQL package ที่ install ไว้
# connect to MySQL database ด้วย package PyMySQL
connection = pymysql.connect(host=Config.MYSQL_HOST,
port=Config.MYSQL_PORT,
user=Config.MYSQL_USER,
password=Config.MYSQL_PASSWORD,
db=Config.MYSQL_DB,
charset=Config.MYSQL_CHARSET,
cursorclass=pymysql.cursors.DictCursor)
หลังจากเชื่อมต่อ ก็ลอง list ตารางใน database ดูว่ามีอะไรบ้าง
# list all tables เพื่อดูว่ามี tables อะไรใน database นี้บ้าง
cursor = connection.cursor()
cursor.execute("show tables;")
tables = cursor.fetchall()
cursor.close()
print(tables)
จะได้เป็นชื่อตารางทั้งหมดที่มีใน database ซึ่งมี 3 ตารางเท่ากับที่เรา import เข้าไป แสดงว่าเราเชื่อมต่อเข้า database ได้เรียบร้อยและใช้งานได้
สรุปส่วนที่ 2 เราได้ import ข้อมูลทั้งหมดเข้า database เพื่อจำลองว่าข้อมูลที่เราจะดึงออกมาอยู่ใน database
ส่วนที่ 3 (สร้าง data pipeline)
หลังจากที่มีทุกอย่างพร้อมแล้ว ต่อไปเราก็จะทำให้เป็น Automate ทั้งหมด ด้วยการสร้าง data pipeline ที่ประกอบไปด้วยกิจกรรม ETL ดังนี้
- Extract — ทำการดึงข้อมูลทั้งหมดจาก database ที่สร้างไว้ในส่วนที่ 2 (โดยใช้ hook ใน airflow เพื่อเชื่อมต่อกับ database) เก็บไว้ใน data lake (ในที่นี้จะใช้ Google Cloud Storage ในการเก็บ) เพื่อรอไป transform หรือ cleaning
- Transform — ทำการแปลง, ทำความสะอาด และ join ข้อมูลเข้าด้วยกัน ซึ่งจะนำ code ที่เราเขียนในส่วนที่ 1 มาใช้
- Load — เก็บไฟล์ผลลัพธ์ไว้ใน data lake และ load เข้า data warehouse (จะใช้ Google BigQuery เป็น data warehouse)
กิจกรรมทั้งหมดจะอยู่ใน data pipeline ที่ถูกสร้างด้วย Cloud Composer มีเบื้องหลังการทำงานเป็น Apache Airflow ซึ่งเป็นเครื่องมือสำหรับจัดการ pipeline ให้เป็นระเบียบ
เรามาเริ่มสร้าง data pipeline ด้วย cloud composer กัน
ขั้นแรกทำการ Create Environment ใน composer และติดตั้ง package ทั้งหมดที่จะใช้ได้แก่ pymysql, pandas, และ numpy
หลังจากสร้าง environment เสร็จ composer จะสร้าง bucket ใน google cloud storage ให้อัตโนมัติ
ทำการตั้งค่า bucket ซึ่งใน bucket นี้เราจะเก็บไฟล์ DAG สำหรับ run pipeline ไว้ที่ folder dags และเก็บข้อมูลไว้ที่ folder data
ซึ่งใน folder data เราจะสร้าง folder ที่ชื่อ
- raw สำหรับเก็บข้อมูลดิบที่ดึงมาจาก database
- cleaned สำหรับเก็บข้อมูลที่ผ่านการทำความสะอาดแล้ว
ต่อไป ให้เราเข้าไปที่ BigQuery เพื่อสร้าง dataset สำหรับเตรียม load ข้อมูลเข้า
จากนั้นเข้าไปที่ airflow ผ่าน composer และเข้าไปที่ connection เพื่อใส่ config สำหรับเชื่อมต่อ MySQL database (ใช้ข้อมูลเหมือนกับส่วนที่ 2)
ต่อไปก็ให้นำ code ที่เราเคยทำไว้ในส่วนที่ 1 มาสร้างไฟล์ DAG เพื่อ run ให้ pipeline นี้ทำงานอัตโนมัติ ตั้งชื่อว่า coffee_shop_pipeline.py ซึ่งมีรายละเอียดดังนี้
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.providers.mysql.hooks.mysql import MySqlHook
from airflow.utils.dates import days_ago
import pandas as pd
import numpy as np
MYSQL_CONNECTION = "mysql_default" # ชื่อ connector ใน Airflow ที่ตั้งค่าไว้
# path ทั้งหมดที่จะใช้
customer_raw_output_path = "/home/airflow/gcs/data/raw/customer.csv"
product_raw_output_path = "/home/airflow/gcs/data/raw/product.csv"
transaction_raw_output_path = "/home/airflow/gcs/data/raw/transaction.csv"
customer_cleaned_output_path = "/home/airflow/gcs/data/cleaned/customer.csv"
final_output_path = "/home/airflow/gcs/data/cleaned/final_output.csv"
def get_data_from_database(customer_raw_path, product_raw_path, transaction_raw_path):
# รับ customer_raw_path, roduct_raw_path, transaction_raw_path มาจาก task ที่เรียกใช้
# เรียกใช้ MySqlHook เพื่อต่อไปยัง MySQL จาก connection ที่สร้างไว้ใน Airflow
mysqlserver = MySqlHook(MYSQL_CONNECTION)
# Query จาก database โดยใช้ Hook ที่สร้าง ได้ผลลัพธ์เป็น pandas DataFrame
customer_raw = mysqlserver.get_pandas_df(sql = "SELECT * FROM customer")
product_raw = mysqlserver.get_pandas_df(sql = "SELECT * FROM product")
transaction_raw = mysqlserver.get_pandas_df(sql = "SELECT * FROM transaction")
# Save เป็น csv
customer_raw.to_csv(customer_raw_path, index = False)
print(f"Output to {customer_raw_path}")
product_raw.to_csv(product_raw_path, index = False)
print(f"Output to {product_raw_path}")
transaction_raw.to_csv(transaction_raw_path, index = False)
print(f"Output to {transaction_raw_path}")
def clear_null_in_customer_table (customer_raw_path, customer_cleaned_path):
# อ่านจากไฟล์ สังเกตว่าใช้ path จากที่รับ parameter มา
customer = pd.read_csv(customer_raw_path)
# แทนที่ค่า null ด้วยค่าเฉลี่ย
mean_age = np.mean(customer.age)
customer["age"].fillna(mean_age.round(1), inplace = True)
# Save เป็น csv ไฟล์ไปที่ customer_cleaned_path ("/home/airflow/gcs/data/cleaned/customer.csv")
customer.to_csv(customer_cleaned_path, index = False)
print(f"Output to {customer_cleaned_path}")
def merge_data(customer_cleaned_path, product_raw_path, transaction_raw_path, final_path):
# อ่านจากไฟล์ สังเกตว่าใช้ path จากที่รับ parameter มา
customer = pd.read_csv(customer_cleaned_path)
product = pd.read_csv(product_raw_path)
transaction = pd.read_csv(transaction_raw_path)
# แปลง unit_price ใน product โดยเอาเครื่องหมาย $ ออก และแปลงให้เป็น float
product["unit_price"] = product.apply(lambda x: x["unit_price"].replace("$",""), axis = 1)
product["unit_price"] = product["unit_price"].astype(float)
# merge 3 DataFrame
final = transaction.merge(product, how = "left", left_on = "product_id", right_on = "product_id").merge(customer, how = "left", left_on = "customer_id", right_on = "customer_id")
# เพิ่ม column total_amount
final["total_amount"] = final["quantity"] * final["unit_price"]
# Save เป็น csv ไฟล์ไปที่ final_path ("/home/airflow/gcs/data/cleaned/final_output.csv")
final.to_csv(final_path, index = False)
print(f"Output to {final_path}")
with DAG(
"coffee_shop_transaction_to_bq",
start_date = days_ago(1),
schedule_interval = "@once",
tags = ["coffee_shop_transaction"]
) as dag:
t1 = PythonOperator (
task_id = "get_data_from_database",
python_callable = get_data_from_database,
op_kwargs = {
"customer_raw_path" : customer_raw_output_path,
"product_raw_path" : product_raw_output_path,
"transaction_raw_path" : transaction_raw_output_path
}
)
t2 = PythonOperator (
task_id = "clear_null_in_customer_table",
python_callable = clear_null_in_customer_table,
op_kwargs = {
"customer_raw_path" : customer_raw_output_path,
"customer_cleaned_path" : customer_cleaned_output_path
}
)
t3 = PythonOperator (
task_id = "merge_data",
python_callable = merge_data,
op_kwargs = {
"customer_cleaned_path" : customer_cleaned_output_path,
"product_raw_path" : product_raw_output_path,
"transaction_raw_path" : transaction_raw_output_path,
"final_path" : final_output_path
}
)
t4 = BashOperator (
task_id = "load_to_bq",
bash_command = "bq load \
--source_format=CSV \
--autodetect \
coffee_shop.coffee_shop_transaction \
gs://us-central1-project1-5439c9c3-bucket/data/cleaned/final_output.csv"
)
#กำหนด dependencies ให้แต่ละ tasks
t1 >> t2 >> t3 >> t4
จากนั้นให้อัพโหลดไฟล์ DAG นี้ไปไว้ที่ folder dags
เข้าไปที่ airflow จะเห็นว่ามี pipeline ชื่อ coffee_shop_transaction_to_bq เข้ามาแล้ว
ลองเข้าไปดูการทำงานของ pipeline นี้ โดยกดเข้าไปที่ชื่อ pipeline จากนั้นดูที่ Graph
จะเห็นว่า pipeline นี้ run ผ่านเรียบร้อยแล้ว
งั้นเรามาลองดูกันว่าข้อมูลเข้าไปที่ BigQuery แล้วหรือยัง
จะเห็นว่ามีข้อมูลเข้ามาเรียบร้อยแล้ว เย้ๆ
จากนี้เราก็สามารถนำข้อมูลนี้ไปทำ Dashboard/Report ต่อได้
สรุปส่วนที่ 3 เราได้ทำการสร้าง data pipeline ที่ช่วยให้ข้อมูลไหลมาจากแหล่งข้อมูล (ในที่นี้คือ database) ไปยังจุดหมาย (ในที่นี้คือ data warehouse ที่เป็น BigQuery) โดยระหว่างทางก็จะมีการทำความสะอาดให้ข้อมูลมีคุณภาพ ก่อนที่จะนำไปใช้ในการวิเคราะห์ หรือสร้าง model
ส่วนที่ 4 (สร้าง Dashboard)
หลังจากที่มีข้อมูลพร้อมแล้ว คราวนี้เราจะมาสร้าง dashboard ด้วย Looker Studio กัน ซึ่งเป็นบริการฟรีของ google สำหรับทำ data visualization
ขั้นแรกเราต้องทำการสร้าง view ใน BigQuery เพื่อนำข้อมูลบางส่วนไปใช้ทำ dashboard ซึ่งสาเหตุที่ต้องสร้าง view เนื่องจากข้อมูลบางคอลัมน์เราอาจไม่ได้อยากให้คนอื่นเห็น เช่น ที่อยู่, เบอร์โทรของลูกค้า เป็นต้น
เมื่อได้ view มาแล้ว ให้เข้าไปที่ Looker Studio แล้วกด Create >> Data Source >> BigQuery >> เลือก view ที่สร้างไว้
ข้อมูลใน view ที่สร้างไว้ก็จะมาอยู่ Looker Studio เรียบร้อยแล้ว
ทำการสร้าง dashboard ตาม requirement ที่ได้รับ
เสร็จแล้วหน้าตา Dashboard ที่เราสร้าง
เข้าไปลองกดเล่นได้ที่นี่
https://lookerstudio.google.com/u/0/reporting/561f032b-705d-4779-8739-06bf31ef4533/page/vSalD
สามารถนำ dashboard นี้ไปเสนอผู้บริหารได้เลย เย้ๆ
สรุปส่วนที่ 4 เราได้ทำการสร้าง view ใน BigQuery และดึง view นั้นมาที่ Looker Studio เพื่อสร้าง dashboard ตาม requirement ที่ได้รับ
จบกันไปแล้วนะครับสำหรับการทำ Data Engineering Project ด้วย GCP ซึ่ง project นี้และบทความนี้เป็นบทความแรกของผม หากมีส่วนใดผิดพลาด ก็ ขออภัยมา ณ ที่นี้ด้วย
ขอบคุณที่อ่านจนจบครับ :)