Go Worker Pools With Timeout
ได้รับมอบหมายให้เขียน API ที่ต้องไปยิงเอาข้อมูลจากชาวบ้านหลายๆ request มาประกอบกันให้ได้ข้อมูลชุดนึง แต่ว่าจะทำไงดีนะ…
เราคงไม่อยากยิงทีละ request แล้วรอให้แต่ละ request แต่ละตัวเสร็จก่อนถึงจะยิงตัวถัดไป จึงจะใช้ goroutine ยิงแบบ concurrent ไปทีละหลายๆตัว ช่วยลดเวลาทำงาน จุดเด่นของ Go ของอยู่แล้ว….แต่เขาก็ห่วงว่าเดี๋ยวจะ cpu peak นะเพราะ service อื่น เขาก็ต้องกินต้องใช้เหมือนกัน (Go มันจัดการเรื่องพวกนี้ให้อยู่แล้ว goroutine กิน cpu นิดเดียว) และทางนั้นก็อาจจะรับไม่ไหวถ้าเรายิงทีละมาก….. ค่อย ๆ ยิงได้ไหม แล้วก็มี Timeout ด้วยนะ ยิงได้เท่าไหร่เอาเท่านั้น
งั้นเรามาแบ่งให้มันค่อย ๆยิงไปดีกว่า นึกถึงกลุ่มของเด็กเสิร์ฟที่ยืนต่อแถวกัน 5 คน
ซึ่งเมื่อมีงานแต่ละงานเข้ามา ก็จะถูกแจกให้เด็กเสิร์ฟแต่ละคน ถ้าใครมีออร์เดอร์ก็ต้องลุกออกไปเสิร์ฟ คนที่เหลือก็นั่งรอ และเมื่อทำเสร็จก็กลับมาที่แถวแล้วรอ ออร์เดอร์ใหม่เข้ามา ซึ่งจะเห็นว่าไม่ว่าออร์เดอร์จะมามากขนาดไหน ก็จะมีคนทำตามออร์เดอร์มากที่สุดก็แค่ 5 คน งานที่เข้ามาก็ต้องรอจนกว่าจะมีคนรับงานไปทำ
และมันมี Concept แบบนี้ภาษา Go ที่เรียกว่า Worker Pools (เหมือน Thread pool ในภาษาอื่น)
ให้นึกภาพของคนงานจำนวนหนึ่ง…คนงานแต่ละคนก็ “รอ” ดึงงานออกไป ..และ “ทำ” จน “เสร็จ” แล้ว “รอ” งานใหม่จนกว่า “งานหมด” (เพิ่มเติม….หรือ “หมดเวลา”)
Ref. Code ตั้งต้นมาจากตรงนี้นะ แต่เราจะปรับมันหน่อยละกันเพราะเราใช้ Timeout จึงรอ Job เสร็จหมดไม่ได้ (ไม่หน่อยเหมือนรื้อใหม่ ฮา..)
อันนี้เป็นโปรเจคตัวอย่างไว้ลองนะครับ และไม่ใช่ best practice เป็นเพียงตัวอย่างของการทำ Worker Pools With Timeout
เรามาค่อย ๆไล่กันไปทีละส่วน เพราะมีหลาย concept ของ Go ที่ใช้แต่จะไม่ลงลึกมาก
ใน Go มีสิ่งที่เรียกว่า Channels คือ แบรนด์ที่มีกระเป๋าจ๊าบมาก… (ไม่ฮา =.=)
Channels
Channels เปรียบเสมือนสายท่อที่เอาไว้คุยกันในระดับของ goroutine เพราะเราเขียนแบบ Concurrency คือทำใครทำมัน แต่ก็ยังไม่ใช่ parallelism นะ
การสร้าง channel type int แบบ unbuffered . demo
c := make(chan int)
สร้าง channel type int แบบ buffered. demo
c := make(chan int, 2)
แล้วบอกทำไม…นั่นสิ โดยปกติแล้ว unbuffered channel เนี่ยเราส่งของเข้าไปได้ “เรื่อยๆ” ชื่อก็บอกแล้ว unbufferd ส่วนตัว buffered คือเมื่อถึงจำนวนนึงที่ buffer เต็มมันก็จะ block แล้ว“รอจนกว่า buffer จะว่างอีกครั้ง”
จะเห็นว่าจะมี code ส่วนนี้อยู่คือวน loop ใน jobs จนกว่าจะหมด
เจ้าตัว m.jobs เนี่ยเป็น buffered channel ขนาดเท่ากับ numRoutines
ถ้ามันขนาดเท่ากับ 2 ก็จะได้รับได้เพียง 2 และ“รอ”จนกว่า buffer จะว่างอีกครั้ง แล้วมันจะว่างตอนไหน?
code ส่วนนี้คือการ range ของที่อยู่ใน m.jobs ตอนนี้แหละคือการ pop ของออกมาจาก channel และจะมีพื้นที่ว่างใน channel m.jobs
แล้วเราจะให้มันรอ jobs ทำงานหมดยังไงนะ เพราะมัน concurrent (~async) กัน ถ้าไม่มีตรงไหนบอกให้รอมันคงทำงานแล้วก็จบเลย เหมือนเขียน async func โดยไม่มี await.
WaitGroup
ว่าไปแล้วมันก็ตรงตัวมันเลย มันจะรอ x ครั้งจนกว่าจะครบแล้วทำคำสั่งถัดไปหลังจากนั้น มองเป็นตัว counter ก็ได้ง่ายดี
เอ๋… ยังไงนะ…
ให้ลองนึกถึงเรายิง async ออกไป x ครั้ง แล้วรอจนกว่าแต่ละตัวทำงานเสร็จ x ตัว
อย่าเพิ่งทำคำสั่งถัดไปนะให้มันทำงานให้หมดก่อน หรือถ้าไม่มีคำสั่งถัดไปโปรแกรมก็จะปิดตัวเองลง… ให้รอก่อนนน
ถ้าเรามี numRoutines = 3 ก็จะ wg.Add(1) สามครั้งเท่ากับ counter = 3
เรา pass WaitGroup ด้วยนะ เสร็จแล้วสั่ง wg.Done() ให้ด้วย คือของใน m.jobs หมดนั่นแหละแล้วทำ wg.Done() ก็เหมือนรอให้งานเสร็จหมดก่อน จากนั้นคนงานก็บอกงานหมดแล้วนี่นา กลับแล้วน้าาา ตอกบัตร พอครบจำนวนคนก็ปิดประตูโรงงาน
แล้วอย่าลืมละ channel ใช้แล้วปิดด้วยฟังก์ชัน close(channel) จะใช้ defer เมื่อการทำงานของฟังก์ชันนี้เสร็จก็ให้ปิด channel
…แล้วจะทำให้มันตัดตอน Timeout ยังไงนะ เดี๋ยวเราใช้ Select
Select
ก็คล้ายกับตัว switch case เพิ่มเติมคือเลือก channel ถ้า channel ไหนมีค่าก็จะเข้า case นั้น…
ใน Code นี่คือ channel m.done หรือ timeOut ก่อนให้ return ออกไปเลย….
m.done นี่มีค่าตอนไหนนะ ก็ตอนที่ทำงานแต่ละ job เสร็จแล้วมีค่าเข้ามาใน m.resultCounter เท่ากับ m.NumJobs ก็คือ concept ของการรอนั่นแหละ…
หรือ timeOut มีค่านะ …. ทำไม select timeOut ได้ละ
ก็เพราะ time.After() return เป็น channel ไงละเมื่อถึงที่เวลาก็จะส่งของมาใน channel นี้
Composite Type
จริง ๆมองไปแล้วมันก็เหมือนเราสร้าง type จาก type ที่มีอยู่แล้ว…ถ้าเราต้องการที่จะ pass function ใดๆ เข้าไปเป็น parameter ใน function …. ตัวมันเองก็ต้องมี type ที่ตรงกับที่ function นั้นต้องการ
func (m *Pool) Start(resources []interface{}, timeOutSec int64, procFunc ProcessorFunc)
จะเห็นได้ว่า function Start เนี่ยมันรับ type ProcessorFunc
ซึ่งมันก็ถูกประกาศไว้ว่าสิ่งที่จะเข้ามาผ่านตัวนี้เนี่ยต้องมีหน้าตาแบบนี้นะ
type ProcessorFunc func(resource interface{})
ก็คือฟังก์ชันอะไรก็ได้ที่รับ parameter เป็น type interface{}
ลองเล่นกัน
ตัวโปรเจคจะประกอบได้ด้วย 3 ส่วน
1.caller คือตัวที่ ที่ส่ง request ไปหาคนอื่นๆ
2.receiver คือตัวที่ไว้รับ request แล้วสั่ง sleep ตามเวลา
3.workerpool คือส่วนที่จัดการ worker อยู่
สั่ง go run caller/caller.go และ go run receiver/receiver.go
caller port:8083
receiver port:8084
เดี๋ยวยิงเข้า caller ให้มันไปยิง reciever ต่อโดยมี worker=2 ตัว
timeout = 20 sec
success case
จะเห็นว่าเริ่มต้นเลยมัน add job เข้าไปเท่ากับจำนวน worker
แล้วจะเป็นจังหวะ ADD -> RUN -> DONE แต่ก็ไม่เสมอไปหรอกนะเพราะ RUN แล้วจังหวะที่ทำงานอยู่ตัว job ก็ถูก pop ออกไปแล้ว Queue ก็ว่างก็ถูก ADD JOB เข้ามาก่อนที่จะ DONE ก็เป็นด้ายยย Welcome to Concurrent Programming
curl “http://0.0.0.0:8083?worker=2&&timeout=5
success with timeout
จะเห็นว่าหมดเวลาก่อนที่ทุกอย่างจะเสร็จ
COMPLETE WITH TIMEOUT
แล้วจากนั้น fmt.Println(“done with: “, len(doneJobs)) ก็ถูก executed
หลังจากนั้นก็คือ routine ที่ค้างอยู่…หลังจากที่ cutoff แล้ว
ทำไงกับมันนะ ….let it go…..
ขอบคุณที่อ่านจบครับ 😂
บทความนี้ความตั้งใจจริงคือรวมของที่หลายคนอาจจะไม่ค่อยได้ใช้ ซึ่งผมเห็นแล้วมันเป็น concept ที่ดีมากส์ของ Go มายำในบทความเดียว หวังว่าจะสนุกกับการเขียน Go นะครับ กราบส์
Thank You for mentor.