แชร์ประสบการณ์ Microservice ตอน Combine data [SQL, NoSQL]

Adulwit Chinapas
HiveGround
Published in
3 min readJan 29, 2024

เมื่อข้อมูลใน Database มีข้อมูลซ้ำกันมากเกินไป และหลายๆครั้งข้อมูลไม่สอดคล้องกัน จึงทำให้เกิดแนวคิด

Single source of truth (SSOT) architecture

หรือ แปลเป็นไทยง่ายๆว่า “วิธีการจัดการข้อมูลให้อยู่แค่ที่เดียว โดยต้องน่าเชื่อถือและมีอำนาจสูงสุด เพื่อให้มั่นใจในความสอดคล้องและความถูกต้องของข้อมูลแหล่งเดียวในระบบ” (ง่ายมั้ยนะ 😅)

ยกตัวอย่างเช่น

อุปกรณ์ตัวนี้ราคาเท่าไหร่?

วิธีคิดง่ายๆ -> DB อะไหล่ + DB แผงวงจร + DB ค่าแรง = ราคาอุปกรณ์ตัวนี้

(*** DB = Database)

แต่เอ๊ะ ราคา ใน DB อะไหล่ ตัวนี้ มันอยู่ใน DB แผงวงจร ด้วย

และที่สำคัญ ราคามันไม่เท่ากัน 😱

ความ Shift หายก็เลยบังเกิด สรุป มันต้องใช้ราคาตัวไหนกันแน่

ดังนั้น ภาระเลยมาตกที่ Dev

“เรามารื้อระบบให้เป็น Microservice กันเถอะ โดยเริ่มจากทำ Single source of truth (SSOT) ก่อนเลย”

สมมุติว่าเรามี แอปพลิเคชันอยู่ 3 ตัว ที่ใช้ DB เหล่านี้

App1 = Feathers.js API, MongoDB

App2 = Sails.js API, MySQL

App3 = Django python API, PostgreSQL

โดยในแต่ละ App ก็มีหน้าเว็บของตัวเอง API ของตัวเอง DB ของตัวเอง

แต่จะมีบาง service นิดหน่อยที่อยากยืม Table มาใช้ใน App ของตัวเอง

นี้แหละคือ จังหวะที่มี Data ซ้ำๆกันเกิดขึ้น

ยกตัวอย่างเช่น

App1 อยากได้ data ของ App2

เราก็จะมี Integration service ที่คอยดึงข้อมูลจาก DB2 ไปลง DB1

และถ้า Integration service ตายละ ไม่เป็นไร เราก็แค่สั่ง Sync ใหม่ได้

ว้าว 🤩 ก็ดูเป็นเทคโนโลยีที่ตอบโจทย์ดีนะ

แต่ก็ด้วย Data ที่มันเยอะขึ้น ไม่ได้มีคนมาสั่ง sync ตลอด หรือ ต้องใช้ความเข้าใจในการกด sync ทำให้บางครั้ง ไม่สามารถไล่ย้อนกลับได้ว่า เอ๊ะ ข้อมูลนี้มัน sync ไปยังนะ หรือ ทำไมตัวเลขมันไม่เท่ากัน

SSOT คือ คำตอบ

มาเริ่มทำกันเลย

  1. ทำความเข้าใจ

ขั้นตอนนี้น่าจะเป็นสิ่งที่ยากที่สุด 🧐 เพราะเราต้องการคนที่ เข้าใจ ทั้ง 3 App (ซึ่งไม่มี 🫢 อุ๊ฟ)

โครงสร้างที่ออกแบบไว้ Service ของ API แต่ละเส้น ว่ามันทำหน้าที่อะไร เก็บข้อมูลอย่างไร

ดังนั้นในช่วงนี้ คือ คุยกันเยอะมาก จดเยอะมาก และทบทวนเยอะมากกกกก

2. แกะ Database

จากโครงสร้าง Database ที่มีอยู่ของทั้ง 3 App

field ไหนที่เหมือนกัน

field ไหนที่กลายเป็นขยะไปแล้ว

หรือ field ไหนที่จะเป็นจุดเชื่อมได้บ้าง

ก็ดูไม่ค่อยยาก แต่ก็เยอะพอสมควร

3. ออกแบบ Microservice ใหม่

เปรียบเทียบแต่ละเทคโนโลยี

ออกแบบ database ใช้ Database อะไรดี

ใช้ API อะไรดี

มี API gateway ดีมั้ย บราๆๆ

ใครที่ชอบ Research ก็จะชอบขั้นตอนนี้

4. Combine data 🎉

มาถึงงานที่ผมได้รับมอบหมายแล้ว 😅

คิดในใจ “ก็แค่อ่านข้อมูลมา ยำรวมกันเสร็จ ก็โยนไปที่ Database ใหม่ปะ”

เริ่ม !!

Table ใหม่ตัวที่ 1

อ่าน Table app 1, app 2, app 3 (งงละ มันจะรวมกันยังไงว้า รวมด้วยชื่อเหรอ รวมด้วย ID เหรอ หรือรวมด้วยอะไรวะเนี้ย 🤬 ไม่เห็นมีอะไรให้เชื่อมกันเลย)

ใจเย็นๆ เปิด Google sheet ขึ้นมา ซูมเข้า ซูมออก ลอง search คำนั้น คำนี้

อ่า ~ ยากจัง

Fix row ให้มันไปเลยละกัน

สรุปก็คือ สร้างไฟล์ CSV ขึ้นมาใหม่เพื่อที่จะ กำหนด จุดเชื่อมโยงของแต่ละ row และของแต่ละ table

ดังนั้น เราต้องมีเทคโนโลยีอะไรซักอย่าง ที่สามารถ:

  • อ่านไฟล์ CSV
  • อ่าน Database ได้ทั้ง 3 ชนิด
  • มีการแสดงผลตอนรวมข้อมูลกัน หรือตัดบางส่วนตามเงื่อนไข

พระเอกของเราก็คือ Jupyter notebook (python) นั้นเอง 😎

ชีวิตช่างง่ายอะไรเช่นนี้ แทบจะจบด้วย Pandas 🐼 ตัวเดียว

อ่าน mongo ก็ pymongo

import pandas as pd
import pymongo
app1_client = pymongo.MongoClient(app1_url)
app1_db = app1_client['app1_db_name']
app1_collection1 = app1_db['collection1']
app1_data = list(app1_collection1.find())
app1_df = pd.DataFrame(app1_data)
app1_df

อ่าน MySQL ก็ SQLAlchemy

from sqlalchemy import create_engine
app2_db_connection = create_engine(app2_url)
app2_df = pd.read_sql('SELECT * FROM app2_table', con=app2_db_connection)
app2_df

อ่าน Postgresql ก็ psycopg2

import psycopg2
app3_connection = psycopg2.connect(host=host, database=database, user=user,password=password)
app3_df = pd.read_sql_query('SELECT * FROM your_table', app3_connection)
app3_connection.close()
app3_df

หลังจากที่ได้ dataframe ของแต่ละ table มา ก็จะทำการ Merge ข้อมูล ซึ่งเราสามารถใช้ function mapper2df เพื่อเปรียบเทียบ 2 table และหาจุดร่วมกันได้

from difflib import SequenceMatcher
import pandas as pd

def similar(a, b):
if pd.isna(a) or pd.isna(b):
return 0
return SequenceMatcher(None, a, b).ratio()

def mapper2df(
df1, df2, key_map_1, key_map_2, threshold=0.8, key_map_first=True):
mapped_data = []
for idx1, row1 in df1.iterrows():
matched = False
for idx2, row2 in df2.iterrows():
if similar(row1[key_map_1], row2[key_map_2]) >= threshold:
mapped_data.append(row1.tolist() + row2.tolist())
matched = True
break
if not matched:
mapped_data.append(row1.tolist() + [None] * len(df2.columns))

for idx2, row2 in df2.iterrows():
matched = False
for idx1, row1 in df1.iterrows():
if similar(row1[key_map_1], row2[key_map_2]) >= threshold:
matched = True
break
if not matched:
mapped_data.append([None] * len(df1.columns) + row2.tolist())

columns = df1.columns.tolist() + df2.columns.tolist()
mapped_df = pd.DataFrame(mapped_data, columns=columns)
if key_map_first:
listcolumn = mapped_df.columns.tolist()
listcolumn.remove(key_map_1)
listcolumn.remove(key_map_2)
listcolumn_new = [key_map_1, key_map_2] + listcolumn
mapped_df = mapped_df[listcolumn_new]
return mapped_df

วิธีใช้ เช่น

df1 = pd.DataFrame({'app1_df_name': ['Adulwit', 'Chinapas', 'Adul'],
'app1_df_age': ['28', '29', '30']})
df2 = pd.DataFrame({'app2_df_name': ['ad', 'AdulwitC', 'AC'],
'app2_df_weight': ['50', '55', '60']})
df_mapped = mapper2df(df1, df2, 'app1_df_name', 'app2_df_name', threshold=0.8)

ผลลัพธ์ที่ได้ ดังรูปที่ 1

รูปที่ 1 ผลลัพธ์หลังจาก merge ข้อมูล

จากรูปที่ 1 พบว่าระหว่างชื่อ df1 กับ ชื่อ df2 หากชื่อไหนมีความคล้ายกันมากกว่า 80% ก็จะนำมาจับคู่กัน เช่น Adulwit กับ AdulwitC ส่วนชื่อไหนที่คล้ายกันน้อยกว่า 80% ก็จะยังเก็บค่าเดิมไว้

หากต้องการชื่อที่มีความเหมือนกันเป๊ะๆก็เปลี่ยน threshold=1 ก็คือเหมือนกัน 100%

จากนั้นก็ทำการ Clean data อะไรที่เป็นขยะ ก็ลบ row นั้นทิ้ง และก็ทำการ filter column ให้ตรงกับ table ใหม่ ก่อนที่จะเอาเข้า database เช่น

rename_mapper = {
'app1_df_name':'name',
'app1_df_age':'age',
'app2_df_weight':'weight',
}
df_mapped = df_mapped.rename(columns=rename_mapper)
df_mapped_filter = df_mapped[['name', 'age', 'weight']]

วิธีการเอาเข้า database ก็จะมีการลบก่อนทุกครั้ง ซึ่งในบางครั้งหาก table นั้นมีการผูก FOREIGN_KEY ก็ต้องปลดล็อกก่อนและค่อยนำเข้า เช่น

new_app_db_connection = create_engine(new_app_url)
tb_name = 'new_table'
with new_app_db_connection.connect() as conn:
conn.execute(text('SET FOREIGN_KEY_CHECKS=0;'))
result = conn.execute(text(f'DELETE FROM {tb_name}'))
conn.execute(text('SET FOREIGN_KEY_CHECKS=1;'))
conn.commit()
df_mapped_filter.to_sql(tb_name, con=new_app_db_connection, if_exists='append', index=False)
df_mapped_read = pd.read_sql(f'SELECT * FROM {tb_name}', con=new_app_db_connection)
df_mapped_read

ก็จะเป็นอันเสร็จสิ้นในการ Combine Data 🎉

อ่านดูเหมือนง่าย แต่ในการปฏิบัติจริงยังมีเรื่องของความเข้าใจ และสิ่งที่ใช้และไม่ได้ใช้อยู่อีกเยอะมาก ในบางครั้งเราต้อง hard code เพื่อลบ row ที่มีการเข้า table ไปแบบผิดๆ

หรือบางครั้ง คนออกแบบ มีการปรับ structure ใหม่ ก็ต้องมาปรับตัว combine data ใหม่

นี้ยังไม่ได้พูดถึงเรื่อง ทั้ง 3 app มีการมาเรียก service ใหม่ตัวนี้ เช่น จาก ObjectID ใน mongo เปลี่ยนมาเป็น uuid ใน mysql ว้าว งานเยอะฝุดๆ

หวังว่าบทความนี้คงเป็นประโยชน์กับเพื่อนๆผู้อ่านไม่มากก็น้อย

หากใครมีประสบการ์ที่อยากแชร์ หรืออยากรู้เรื่องราวไหนเพิ่มเติม ก็สามารถ Comment มาในนี้ได้เลยครับผม 😁

--

--