[Rx.share()] - Part1 - ทำไมต้อง share ไม่ share ได้ไหม?

Nutron
3 min readJun 26, 2018

--

บทความชุดนี้จะพูดถึง operator share() ใน ReactiveX ว่ามีที่มาที่ไปอย่างไร ทำไมต้องใช้ และจะใช้งานมันได้อย่างไร โดยแบ่งเนื้อหาออกเป็นสองส่วนดังนี้

Share Vs NonShare

ก่อนที่เราจะพูดถึง Observable.share() ว่ามันคืออะไร และทำงานอย่างไร ก็อยากจะทบทวนความรู้เดิมกันก่อนสักหน่อย ว่าการทำงานของ Observable และ Observer นั้น สัมพันธ์กันอย่างไร เราทุกคนทราบกันดีว่า

การ subscribe ของ Observer คือการ “trigger” ให้ Observable ทำงาน

ดังนั้น หาก Observable ถูก subscribe โดย Observer หลายๆตัว Observable นั้นก็จะถูกเรียกให้ทำงานหลายครั้งตามจำนวน Observer ที่เข้ามา subscribe แล้วจะเกิดอะไรขึ้น ถ้างานของ Observable นั้นเป็นงานที่ต้องใช้เวลานานหรือใช้ทรัพยากรของเครื่องมาก อย่างเช่น การเรียก API หรือ การบันทึกรูปลงไฟล์?

แน่นอนว่าในบางครั้ง เราคงไม่อยากให้ Observable ของเราถูกเรียกให้ทำงานทุกครั้งที่มีการ subscribe โดยเราอยากให้เกิดการทำงานแค่ครั้งเดียวแล้วแชร์ผลลัพธ์ที่ได้ไปให้กับ Subscriber ทุกตัว และนั่นจึงเป็นที่มาของObservable.share()

Observable.share() คือ operator ที่ช่วยให้เราสามารถแชร์ output ที่ได้จากการทำงานของ Observableปให้กับ Observer ทุกตัวที่ subscribe มันอยู่ โดยที่ Observable จะถูกเรียกให้ทำงานเพียงแค่ครั้งเดียว และผลลัพธ์ที่ Observer แต่ละตัวได้นั้นจะเป็นข้อมูลชุดเดียวกัน

เพื่อให้เห็นภาพ เราลองมาดูตัวอย่างด้านล่างนี้ประกอบครับ

จากโค้ดด้านบนเราให้ Observable.create() ปล่อยข้อมูลออกมาเป็นตัวเลขของเวลาตามที่ Observable นั้นเริ่มทำงาน โดยที่เราจะ Delay การทำงานไว้ที่ 100 Milliseconds ซึ่งเมื่อเราใช้คำสั่ง share() ในบรรทัดที่ 6 ผลลัพธ์ที่ได้จะเป็นดังนี้

Observable: Proceed at time: 1518421010299    <-- for Both Observer
Observer1: Got result of time: 1518421010299
Observer2: Got result of time: 1518421010299

แต่เมื่อเอาคำสั่ง share()ในบรรทัดที่ 6 ออก ผลลัพธ์ที่ได้จะเป็นดังนี้

Observable: Proceed at time: 1518421088038    <-- for Observer1
Observable: Proceed at time: 1518421088085 <-- for Observer2
Observer1: Got result of time: 1518421088038
Observer2: Got result of time: 1518421088085

จากผลลัพธ์ทั้งสอง จะเห็นว่าคำสั่ง share() จะทำให้ Observable ของเราทำงานแค่ครั้งเดียว และแชร์ข้อมูลที่ได้ออกไปให้กับ Observer ทุกตัวที่ subscribe มันอยู่ แต่ถ้าเราเอาคำสั่ง share() ออก Observable ของเราจะทำงานเท่ากับจำนวนครั้งที่ Observer มาsubscribe

Subscription timing is an obstacle!

อย่างไรก็ตาม ผลลัพธ์ที่ได้จาก share() ก็อาจจะไม่สวยหรูอย่างที่คิด หากเราไม่เข้าใจการทำงานของ share() อย่างแท้จริง ก็อาจทำให้โปรแกรมของเราทำงานผิดพลาดได้ สิ่งหนึ่งที่เราต้องรู้เกี่ยวกับ share() คือ การ subscribe นั้นยังคงมีผลต่อการ trigger ให้ observable ทำงาน โดยช่วงเวลาในการ subscribe ที่แตกต่างกันของแต่ละ Observer อาจทำให้ได้จำนวนของผลลัพธ์แตกต่างกันออกไป จากตัวอย่างด้านบน หากเราลองเอา delay() ออก ผลลัพธ์ที่ได้จะเป็นดังนี้

Observable: Proceed at time: 1518423582362
Observer1: Got result of time: 1518423582362

จะเห็นว่าไม่มี log ของObserver2 เกิดขึ้น เนื่องจากการ subscribe ของ Observer1 จะ trigger ให้ Observable ทำงานทันที ซึ่งเมื่อเราเอา delay() ออก จะทำให้ Observer2 เข้ามา subscribe ไม่ทัน ทำให้ Observer2 ไม่ได้รับ Item ที่ปล่อยออกมานั่นเอง โดยเราสามารถจำลองเหตุการณ์การทำงานได้ดังภาพด้านล่างนี้

โดยเราจะเห็นว่า Observer2 จะไม่ได้รับ Item หมายเลข 1 เนื่องจากช่วงเวลาที่เข้ามา Subscribe นั้น ช้ากว่าช่วงเวลาที่ Observable ปล่อย item หมายเลข 1 ออกมา

Publish & Connect

ถึงตรงนี้หลายคนคงสงสัยว่า ถ้าไม่อยากพลาด Item ใดๆเลย ควรทำอย่างไร ต้องใช้ delay ตลอดอย่างนั้นหรือ? เพราะในโลกของ Rx ยังมี operator อีกหลายตัวที่เข้ามาช่วยแก้ไขปัญหานี้ โดยในที่นี้จะขอพูดถึง publish() และ connect() ที่มักใช้คู่กันนั่นเองครับ

publish() คือ operator ที่ใช้ในการเปลี่ยน Observable ของเราให้กลายเป็น ConnectedObservable ที่ทำหน้าที่เหมือนเป็นแทงค์น้ำที่มีวาว์ลปล่อยน้ำ โดยมีคำสั่ง connect() ทำหน้าที่เป็นผู้เปิดวาล์วน้ำ ซึ่งหากใคร (Observer) อยากได้น้ำก็แค่เอาท่อมาต่อ (subscribe) กับแทงค์น้ำ (ConnectedObservable) ไว้ก่อน และเมื่อวาล์วน้ำถูกเปิด (connect) ทุกคนก็จะได้รับน้ำพร้อมกัน ลองดูตัวอย่างโค้ดด้านล่างครับ

Observable: Proceed at time: 1518424908469
Observer1: Got result of time: 1518424908469
Observer2: Got result of time: 1518424908469

จากโค้ดจะเห็นว่าเราเรียกคำสั่ง connect() หลังจากที่ Observer ทั้งสอง subscribe Observable ซึ่งจะเห็นว่าผลลัพธ์ที่ได้เป็นไปตามที่เราคาดหวัง โดยเราสามารถจำลองเหตุการณ์การทำงานได้ดังภาพด้านล่างนี้

ทำไมต้อง share ไม่ share ได้ไหม?

หลังจากเราได้ทราบถึงการทำงานของ share() กันแล้ว ก็กลับมาตอบคำถามที่ว่า

“ทำไมต้อง share ไม่ share ได้ไหม?” คำตอบนี้ไม่มีอะไรถูกอะไรผิด ขึ้นอยู่กับว่าเรากำลังทำอะไร

ตัวอย่างหนึ่งที่เรามักจะใช้ share() คือแชร์ผลลัพธ์ที่ได้จากการเรียก API ไปให้กับ Observable ที่ทำหน้าที่แตกต่างกัน ทำให้เราไม่ต้องเรียก API เหล่านั้นหลายรอบ ลองดูตัวอย่างด้านล่างครับ

จากตัวอย่าง หากมี Event ใดๆที่เกิดขึ้นจาก EventTriggerObservable (ซึ่งอาจจะเป็นการกดปุ่มหรือการเชื่อมต่อกับอินเทอร์เน็ตสำเร็จ) โปรแกรมจะดึงข้อมูล UserConfig จากการเรียก API จากนั้นจะแชร์ข้อมูลที่ได้ไปให้กับ handleNewUser และ handleMemberUser ซึ่งจะเห็นว่าเกิดการเรียก API แค่เพียงครั้งเดียว แต่ในทางกลับกัน หากเรา subscribe ไปที่ EventTriggerObservable โดยตรง โดยที่ไม่ใช้คำสั่ง share() โปรแกรมของเราจะเรียก API ถึงสองครั้ง ซึ่งจะเห็นว่าเป็นการสิ้นเปลืองโดยใช่เหตุ

Conclusion

คำสั่ง share() และ publish()+connect() ล้วนแล้วแต่มีข้อดีและข้อเสียในตัวมันเอง ไม่มีตัวไหนดีกว่ากัน

เพราะแท้จริงแล้วคำสั่ง share() ก็ถูกสร้างขึ้นมาจากคำสั่ง publish() นั่นเอง

ดังนั้นการที่จะเลือกใช้คำสั่งใด เราจึงต้องเข้าใจพฤติกรรมการทำงานของมันก่อน แล้วดูว่าสิ่งที่เราต้องการนั้น มันสอดคล้องกับพฤติกรรมแบบใด จึงค่อยเลือกมาใช้ให้เหมาะสม

ในบทความหน้าเราจะมาเจาะลึกขึ้นอีกนิดว่า มีคำสั่งไหนบางที่เกี่ยวข้องกับคำสั่ง share() และพฤติกรรมของ ConnectedObservable แท้จริงแล้วเป็นแบบไหน รอติดตามกันได้ในบทความหน้าครับ

สุดท้ายนี้ก็เช่นเคยครับ หากผู้อ่านคิดว่าบทความนี้มีประโยชน์ ก็ฝาก share() และฝากกด 👏 เป็นกำลังใจให้ด้วยนะครับ

--

--