มาใช้ go routine แยกไปทำงาน แต่ต้องการ rollback เมื่อมีตัวใดตัวหนึ่ง error

มีโจทย์มาว่า เนื่องจากมีงานๆหนึ่ง ต้องทำหลาย task ในคราวเดียว เช่น อาจจะต้องการยิง api ไปที่โน่นที่นี่ ซึ่งแต่ละที่อาจจะใช้เวลาไม่มาก แต่ถ้าเราต้องรอให้เสร็จทีละตัว รวมๆกันอาจจะกินเวลานานจนระบบหน้าบ้าน timeout หนีไปเสียก่อน
ยังไม่พอ หากแต่ละ service ที่ยิงไป ถ้าหากว่ามีตัวใดตัวหนึ่ง error ก็อยากจะ rollback งานทั้งหมดที่ทำเสร็จไปแล้วด้วยอีกต่างหาก

ในเมื่อมีโจทย์มาแบบนี้ เราก็ต้องหาวิธีแก้ปัญหาซึ่งถ้าหากใครสนใจสามารถไปดู source code ได้ที่นี่ https://github.com/golians/rollback

แล้วตัว code มันทำงานอย่างไร มาอธิบายกันเลย


package rollback

func SyncParallel(workers ...Worker) error {
var gerr error
total := len(workers)
chErr := make(chan error)
chDone := make(chan struct{})
chFinish := make(chan struct{})
chRollback := make(chan struct{})

defer func() {
for i := 0; i < total; i++ {
<-chDone
}
}()

for i := range workers {
go AsyncHandler(chErr, chDone, chFinish, chRollback, workers[i])
}

for i := 0; i < total; i++ {
if err := <-chErr; err != nil {
gerr = err
}
}

if gerr != nil {
close(chRollback)
return gerr
}

close(chFinish)
return nil
}

ใน code นี้ต้องการ Worker ที่มีหน้าตาแบบนี้

type Worker interface {
Do() error
Rollback()
}

ใน SyncParallel คาดหวังว่าจะรับ Worker มาได้หลายๆตัว โดยแต่ละตัวจะถูกแยกไปทำงานพร้อมๆกันด้วย go routine
ในนี้ใช้ chan 4 ตัวตามนี้

  1. chErr ใช้เมื่อมี error เกิดขึ้นกับ Worker ตัวไหนก็ตาม จะส่ง error กลับมาใน chan นี้
  2. chDone ใช้สำหรับให้แต่ละ Worker ส่งสัญญาณออกไปเมื่อทุกสิ่งทุกอย่างจบลง
  3. chFinish ใช้สำหรับส่งสัญญาณไปบอก Worker ทุกๆตัวว่าไม่มี Worker ตัวไหน error เลย ให้จบการทำงานได้
  4. chRollback ใช้สำหรับส่งสัญญาณไปบอก Worker ตัวที่ทำงานตัวเองเสร็จแล้วว่า มี Worker ตัวอื่นเกิด error ให้ทำการ rollback ตัวเองซะ

defer ในนี้ จะรอรับ chDone จาก Worker ทุกๆตัวกลับมา เพื่อให้มั่นใจว่างานเสร็จทั้งหมดแล้วจริงๆ ไม่มีตกค้าง แล้วค่อยจบการทำงานได้

ใน for นี้คือการวนลูปเพื่อปล่อย go routine ไปทำงานกับ Worker แต่ละตัว
จากนั้นจะมารอ chErr กลับมาจาก Worker ให้ครบทุกตัว โดย Worker ตัวไหนที่เกิด error มันจะจบการทำงานของตัวเองไปเลย ส่วนตัวไหนที่ไม่เกิด error มันจะรอสัญญาณ อย่างใดอย่างหนึ่ง ระหว่าง chFinish หรือ chRollback ถ้า finish ไปมันก็จะจบการทำงานแล้วส่ง chDone ออกมา แต่ถ้าได้รับ chRollback มันจะทำการ rollback

ตรงนี้แก้ไขตามคำแนะนำของ ปรมาจารย์ป้อ ที่ได้แนะนำว่าไม่ต้องวนลูป แค่ close(chRollback) ก็พอ ทำให้ตาสว่างวาบเลย หน้าตาดีขึ้น +50%
ตรงนี้ก็แก้ไม่ต้องมีลูปละ แค่ close(chFinish) เป็นอันจบ Worker ทุกตัวรับสัญญาณเหมือนกัน

ถึงตรงนี้ถ้าไม่อธิบาย AsyncHandler ก็คงจะไม่ได้ มาต่อกันเลยครับ

package rollback

type Worker interface {
Do() error
Rollback()
}

func AsyncHandler(cherr chan error, chDone, chFinish, chRollback chan struct{}, w Worker) {
defer func() {
chDone <- struct{}{}
}()

err := w.Do()
if err != nil {
cherr <- err
return
}

cherr <- nil

select {
case <-chFinish:
return
case <-chRollback:
w.Rollback()
return
}
}

เข้ามาใน function นี้ก็เริ่มด้วยการ defer ก่อน ว่าไม่ว่าจะจบท่าไหนก็ตาม จะต้องส่ง chDone ออกไปเพื่อบอก SyncParallel ว่า Worker ตัวนี้จบการทำงานแล้วจ้า

จากนั้นก็เรียก Worker.Do แล้วรับ error มาซะ ตรงนี้สำคัญ เพราะต้องมาเช็คก่อนว่ามี error กลับมาหรือไม่ ถ้ามีก็ต้องส่ง err กลับออกไปทาง chError แล้วจบการทำงานตัวเองซะ

แต่ถ้าไม่ error ก็ต้องทำบรรทัดต่อไปเลย ด้วยการจงใจส่ง nil เข้าไปใน chError เพราะว่า SyncParallel รอคำตอบอยู่ ถ้าไม่จงใจส่งออกไป มันจะเฝ้ารออยู่ตลาดกาล

จากนั้นก็มารอสัญญาณตัวใดตัวหนึ่งระหว่าง chFinish หรือ chRollback
ถ้าได้ chFinish ก็จบ แสดงว่าทุกๆ Worker ทำงานสำเร็จ
แต่ถ้าได้รับ chRollback ก็เรียก Rollback ของตัวเองซะ แล้วก็จบการทำงานได้

จบแล้วครับ

Show your support

Clapping shows how much you appreciated Pallat Anchaleechamaikorn’s story.