RxJava part 2: คืออะไร ใช้ยังไง เพื่ออะไร

ลาก่อย callback hell

Travis P
Black Lens
5 min readApr 15, 2017

--

Photo credit: yoshimov via VisualHunt / CC BY-ND

ใน RxJava part 1: ก่อนจะมาเป็น RxJava ผมได้แนะนำแนวคิดพื้นฐานของ RxJava ไปโดยที่ยังไม่ได้แตะโค้ด RxJava กันจริงๆเลย อยากให้ค่อยๆซึมซับแนวคิดพวกนั้นก่อน พอมาเจอโค้ดของจริงก็พยายามตระหนักไว้ว่ามันมีแนวคิดมาจากพื้นฐานพวกนั้น เราจะได้งงน้อยลง

มีอะไรบ้าง แต่ละตัวคืออะไร

ส่วนนี้ผมจะแนะนำตัวละครหลัก คลาส อินเตอร์เฟส คำศัพท์ที่เกี่ยวข้องกันก่อน โค้ดตัวอย่างแถวนี้มีพื้นฐานบน RxJava 2 แต่ผมจะตัดความซับซ้อนบางอย่างออกไปเพื่อความง่ายนะครับ

Observable

ผมเชื่อว่าคำนี้ทำคนไทยมึนที่สุด อันนี้มันมาจาก observe-able คือมันยอมให้คนอื่นมาสังเกตตัวมันได้ มันจะทำตัวเป็นผู้ผลิตข้อมูล มันจะห่อ logic การผลิตข้อมูลเอาไว้ ถ้ามันผลิตข้อมูลอะไรใหม่ได้ มันจะบอกคนที่สังเกตมันอยู่ เราไปดู interface ของมันใน RxJava 2 กันดีกว่า จะได้รู้ว่ามันสัญญาไว้ว่าจะทำอะไรได้บ้าง

ก็ตามชื่อเลย มันยอมให้คนอื่น ซึ่งในที่นี้คือ Observer มาคอยสังเกตผ่าน method subscribe(Observer) ส่วนมันจะบอกคนสังเกตยังไงนั้น ดูที่ Observer

Subscribe

คืออาการที่เราเอา Observer ไปสังเกต Observable ด้วยการเรียก Observable.subscribe(Observer)

Observer (Subscriber)

Observer หรือ Subscriber เป็นสองคำที่ผมคิดว่าความหมายเหมือนกัน คือผู้สังเกต หน้าที่หลักคือใช้ข้อมูลที่ Observable ป้อนให้มา

พอเจ้า Observable มันผลิตข้อมูลมาได้ มันก็จะบอก observer.onNext(T) ผลิตข้อมูลมากี่ตัวก็ onNext(T) เท่านั้นครั้ง จนกระทั่งมันผลิตข้อมูลจนครบแล้วก็จะบอก observer.onComplete() ถ้าระหว่างผลิตข้อมูลเกิดพังขึ้นมาก็จะบอก observer.onError(Throwable) ทีนี้ฝั่งผู้ใช้ข้อมูลก็ตัดสินใจเอาเองละว่า ถ้ามีข้อมูลใหม่มาใน onNext(T) จะทำอะไร ทำเสร็จหรือพังจะทำอะไร

ทีนี้มันจะมี onSubscribe(Disposable d) อยู่ อันนี้ก็แค่บอก observer ว่าจะเริ่มทำงานละนะ เอา Disposible ไป แล้วจะเอาไปทำอะไรล่ะ?

Disposable (Subscription)

disposable (อนุโลมให้เรียก subscription ได้) เป็นออบเจคแทนการเชื่อมต่อระหว่าง Observable และ Subscriber

ถ้าเราไม่อยากสังเกตแล้ว ก็เรียก Disposable.dispose() ซะ

Event/Data/Item/Signal/Message

ศัพท์พวกนี้เราจะได้ยินเสมอๆเวลาพูดถึง RxJava ผมคิดว่ามันมีความหมายเหมือนกันนะ มันก็คือไอ้ข้อมูล T ที่ส่งมาใน onNext(T) นั่นแหละ แต่ในบางบริบทอาจจะรวมไปถึง onComplete onError ด้วย

Stream

คำนี้เป็นคำที่ได้ยินบ่อย และสร้างความฉงนงงงวยให้กับมือใหม่เป็นอย่างมาก จริงๆไม่มีอะไรยาก onNext(T) เนี่ยมันจะถูกเรียกกี่ครั้งก็ได้ จนกระทั่งทำงานสำเร็จ onComplete() ถูกเรียก(แค่ครั้งเดียว) หรือทำงานล้มเหลว onError(Throwable) ถูกเรียก(แค่ครั้งเดียว) ตราบเท่าที่ onComplete() หรือ onError(Throwable) ยังไม่โดนเรียก ข้อมูลไหลผ่าน onNext(T) เข้ามาได้เรื่อยๆ มีลักษณะเป็น stream นั่นเอง

https://pixabay.com/en/washington-landscape-scenic-1908788/

รู้สึกมันทำงานคล้ายๆ Callback/Listener จังเลย

บางท่านอาจเคยใช้ Listener ต่างๆของ Android หรือใช้ http library เอาไว้ยิง api แบบ asynchronous แล้วได้ของกลับมาใน Callback หรือบางท่านอาจจะเขียน Callback เป็นของตัวเองประมาณนี้

ใช่แล้วครับ Observer มันก็คือ Callback ของเราดีๆ นี่แหละ Observable ก็เหมือนการห่อ logic เอาไว้ การ Subscribe ก็เหมือนการเรียก method แล้วส่ง callback เข้าไปรอฟังเท่านั้นเอง

ใช้ยังไง

เมื่อกี้ผมพูดถึง ObservableSource<T> ที่เป็น interface เพื่อให้เข้าใจจุดประสงค์จริงๆของมันมีแค่นั้น แต่ชีวิตจริงเราใช้งาน Observable<T> กันครับ ไอตัวนี้ทรงพลังมาก

ผมจะยังไม่สร้าง Observable เองละกัน สมมติผมใช้ Retrofit + RxJava เจ้า retrofit มันก็จะจัดการห่อ logic การเรียก api การ deserialize ผลลัพท์ไว้ในนั้นสร้างเป็น Observable มาให้เราเรียบร้อย หน้าที่เราคือเอามาใช้

ถ้าคุณไม่เคยใช้ Retrofit หรือไม่เข้าใจว่ามันมายังไง ก็คิดซะว่าเวลาเราเรียก MyApi.helloWorld() มันจะคืน Observable มาให้แล้วกัน เดี๋ยวเราค่อยไปดูการสร้างทีหลัง

แต่จะเอามาใช้จะให้ override Observer ทุก method ก็แลดูเยอะไป บางทีเราก็ไม่ได้สนใจทุกอย่าง เค้าก็มี overload ต่างๆให้ใช้เฉพาะที่ต้องการ

Consumer<T> นี่ก็เป็นแค่ interface ที่ไว้ให้เราห่อ onNext(T) onError(Throwable) แยกเป็นอันๆเท่านั้น

ตรงนี้ขอเตือนนิดนึง ถ้าเราไม่ได้ใส่ onError ไว้ด้วย เวลามี Exception แอพแครชทันทีนะครับ

แล้วมันต่างกับ Callback ธรรมดายังไงล่ะ

แทบไม่ต่างเลย ถ้าใช้แค่นี้ ไม่จำเป็นต้องใช้หรอกครับ ใช้ callback ธรรมดาก็พอ แต่ Observable นี่เค้ามีฟีเจอร์ให้ใช้เพียบ เรียกว่า Operator ครับ

Operator

operator นี่ก็คือ method ธรรมดาแหละ สมมติเรามี observable ตัวนึง แล้วเรียกใช้ operator บนตัวมัน (instance method) ก็จะได้ observable ตัวใหม่ออกมา (คอนเซปเรื่อง immutibility) หรือบาง operator ก็เป็น static method เรียกใช้จาก Observable class

Transform (Map)

map ในที่นี้ไม่ใช่ key-value pair นะครับ แต่เป็นการแมพ(แปลง)ค่านึงไปเป็นอีกค่านึง สมมติจาก MyApi.helloWorld() เมื่อกี้ ผมอยากได้ความยาวของ string แทน

หรือจะเขียนยาวไปเลยงี้ก็ได้

โอเค แล้วมันต่างยังไงกับ subscribe String แล้วเราค่อยมา string.length เอง? จริงครับกรณีนี้ผมก็ว่าไม่ได้ว้าวขนาดนั้น ก็แค่แปลงข้อมูลให้เสร็จเรียบร้อยเลย แต่มันก็มีกรณีที่ใช้สะดวกแหละ เช่นผมยิง api ได้ข้อมูลหน้าตาอุบาทว์ๆมา ผมอยากแปลงให้มันสวยงามก่อนมาใช้ ก็ใส่ logic การแปลงไว้ใน map เลย ปลายทางคนใช้เค้าก็สะดวก ทำงานกับข้อมูลสวยงาม

Filter

จริงๆอันนี้ไม่น่าจะต้องอธิบายนะ ดูตัวอย่างละกัน สมมติผมมี MyApi.getFriends() แต่ผมนี่สายหื่น อยากกรองเฉพาะสาวเอ๊าะๆเท่านั้น

สมมุติ getFriends() เฉยๆจะส่ง friend ผ่าน onNext(Friend) 10 คน หกคนเป็นผู้หญิง ถ้าผ่าน .filter() แล้วเรา suscribe ก็จะเหลือ onNext(Friend) 6 ครั้งเป็น female ที่ age<25 เท่านั้น

CombineLatest

อันนี้ท่าเริ่มยาก เอาไว้รวม item ล่าสุดของ 2 observable ขึ้นไปเป็นหนึ่งเดียว
ถ้ามึนๆข้ามไปได้เลย

FlatMap

อันนี้ท่ายาก หลักการทำงานมันคือเอา item หนึ่งอันมาแปลงเป็น observable 1 เส้น มี n ไอเทม ก็ได้ n เส้น จากนั้นเอา ทุก item ของแต่ละเส้นมารวมกันเหลือเส้นเดียว แลดูงงๆ ถ้าไม่เข้าใจก็อ่านผ่านๆไปก่อนครับ

แต่ประโยชน์ทางอ้อม จะเอาไว้ใช้เพื่อทำ observable นึง ต่อจากอีก observable นึง

เค้าว่ากันว่าเปลี่ยน Thread ง่ายนิ

ก็ง่ายจริงอย่างว่าครับ เค้ามี operator subscribeOn() บอกให้ไปทำงานใน Thread ไหน ส่วนobserveOn() บอกว่าทำงานเสร็จแล้วส่งผลลัพท์ไปยัง subscriber หรือ operator ถัดไปที่ Thread ไหน

subscribeOn() สั่งได้ครั้งเดียว ใส่แทรกเข้าไปตรงไหนก็ได้ Observable ของเราจะเริ่มทำงานที่ Thread นั้น

observeOn() สั่งกี่ครั้งก็ได้ มันจะเอา item ไปส่งที่ Thread นั้น พอมันเอา item ไปส่งที่ Thread นั้น ทำให้ operator ถัดๆไปก็ถูกทำที่ Thread นั้นตามไปด้วย

จริงๆแล้วไอ้ที่เรา subscribeOn() observeOn() นั้นเรากำหนด Scheduler ให้มันต่างหาก เดี๋ยวมันไปจัดการ Thread ให้เองข้างหลัง เข้าใจตรงกันนะ

Creation

ถึงเวลาสร้าง observable กันเองบ้างแล้ว หลังจากที่ผมผลักภาระไปให้ retrofit ซะนาน ขอแนะนำ 3 operator ยอดฮิตดังนี้

Observable.fromCallable()

Callable<V> เป็นแค่ interface ที่เอาไว้ให้เราห่อ logic ของเราไว้ ในตัวอย่างผมอยาก encrypt text ก็ encrypt แล้วคืน String ออกไป

โค้ดที่เราห่อไว้นี่จะยังไม่ทำงานนะ ห่อไว้ก่อนเฉยๆ พอมีคนมา subscribe เท่านั้นแหละ ทำงานทันที แล้วผลลัพท์ก็ไปโผล่ใน onNext(T) ตามด้วย onComplete()

ถ้าเกิดมันพังอะไรขึ้นมาก็ไม่ต้องกังวล ตอน subscribe เราก็ไปรอฟังใน onError(Throwable)

วิธีนี้เหมาะสำหรับการห่อ logic ที่ทำงานแล้วคืนผลลัพท์ธรรมดาทั่วไป

Observable.create()

ดูโค้ดแล้วก็ค่อนข้างตรงไปตรงมา เราห่อ callback style api เอาไว้ข้างใน ให้มันทำงานของมันไป พอทำงานเสร็จ ได้ของกลับมาทาง callback เราก็ไปบอก emitter.onNext(T) ทำงานเสร็จก็บอก emitter.onComplete() หรือถ้าพังก็บอก emitter.onError(Throwable)

เจ้า ObservableEmitter<T> นี่ก็ให้คิดซะว่ามันคือ Observer<T> นั่นแหละ มันเป็นแค่ตัวช่วยห่ออีกชั้นนึง ผมก็ไม่เข้าใจจริงๆหรอกว่าข้างในมันทำอะไรบ้าง คงซับซ้อนน่าดู

ข้อควรระวังในการสร้างด้วย create ก็คือเราต้องอย่าลืม clean up เช่น ยกเลิกพวก callback หากเกิดการ dispose ด้วย วิธีทำดูได้จาก javadoc ของ method create นั่นแหละ มีตัวอย่างอยู่แล้ว

วิธีนี้เหมาะสำหรับการแปลง callback api มาเป็น observable

Observable.just()

วิธีนี้เหมาะสำหรับเรามีข้อมูลอยู่แล้ว จะเอามาใช้ในโลก reactive

ถ้าอ่าน tutorial เจ้าอื่นคงจะเจอ just มาก่อนเลย แต่ผมอ่านเจอแล้วหงุดหงิดเป็นพิเศษเลยเอามาไว้หลังสุด ผมไม่เข้าใจว่าเรามีข้อมูลอยู่แล้ว จะเอามาใส่ Observable แล้วค่อย subscribe มาใช้ทำแพะอะไร มีอยู่แล้วก็ใช้เลยสิ

คำตอบคือปกติก็ไม่ทำงี้กันหรอกครับ แต่อาจมีบางกรณีที่เราจะต้องเอาไปใช้ใน operator อื่นๆ เช่นเอาไป combineLatest แล้วไป map ต่อ filter ต่อ หรือบางทีเราออกแบบแอพเราให้ใช้ RxJava คุยกันระหว่างส่วนประกอบต่างๆ ฝั่งผู้ใช้เค้าอยากได้ Observable<String> เราจะไป return String คืนให้มันก็ไม่ได้ มีของอยู่แล้วก็ต้องห่อใส่ Observable ให้เค้าไป

เพื่ออะไร ดียังไง

เป็นคำถามที่ตอบให้เห็นภาพยากมาก จนกว่าจะได้ลองใช้เองแล้วจะเข้าใจ แต่พูดงี้เหมือนขายตรงเกินไป ผมเองก็ยังไม่นับว่าเป็น expert ด้านนี้ แต่จะพยายามตอบเท่าที่ได้โดยมีพื้นฐานอยู่บนโพสนี้ก่อนละกันครับ

ผมมองว่ามันเป็น framework สำหรับการรับส่งข้อมูลที่ decoupled และมีความยืดหยุ่นครับ ไม่รู้ว่าใช้คำนี้ถูกหรือเปล่า แต่ผมสามารถห่อ logic ของผมไว้ใน Observable ซึ่งสามารถส่งไปมาได้ จนถึงมือผู้ใช้ โดยที่ผู้ผลิตข้อมูลและผู้รับข้อมูลไม่จำเป็นต้องรู้จักกันเลย

ผมสามารถออกแบบแอพให้ใช้ RxJava คุยกันระหว่างส่วนประกอบต่างๆ แล้วสร้าง Observable จากชั้น data layer ที่ได้ข้อมูลดิบๆ มาผ่านชั้น domain layer ที่เป็น business logic ผมก็แค่ใส่ operator map, filter ตาม requirement จนมาถึงชั้น presentation ผมก็บอกให้มัน subscribeOn(Scheduler.io()) นะ แล้ว observeOn(AndroidSchedulers.mainThread) จากนั้น subscribe นำข้อมูลดิบที่ผ่านการแปลงมาเรียบร้อยแล้วมาแสดงผลได้อย่างถูก Thread สวยงาม

และเนื่องจากมี operator ให้เลือกใช้มากมาย ที่เราใช้ประจำอยู่แล้วเช่น map filter ผมถือว่ามันอ่านง่ายนะ อ่านเป็นภาษาคนรู้เรื่อง สื่อความหมายชัดเจนว่าเราจะทำอะไร เทียบกับโค้ด Java ธรรมดาที่เราต้องมานั่งวน for กรองเอาเองแล้วอันนี้อ่านเข้าใจง่ายกว่าเยอะ แต่การที่เราต้องสร้าง annonymous class ขึ้นมาก็อุบาทว์พอสมควร ซึ่งก็แก้ไขได้โดยใช้ lambda ใน Java 8 หรือทางที่ดีก็เปลี่ยนมาใช้ Kotlin เถอะ

และที่ขาดไม่ได้คือทำเรื่อง concurrency/threading ง่ายมาก พอเราห่อ logic ไว้ใน observable แล้ว ก็ให้แต่ละตัวทำงานในแต่ละ thread ของมัน สุดท้ายก็นำมา observeOn/merge/combineLatest กันได้ไม่ต้องมากังวลคอยจัดการเอง RxJava ทำให้หมดแล้ว รวมไปถึงการกำจัด callback hell ที่พลิกแพลงมาจากการใช้ flatMap อีก

จริงๆคิดว่า section “เพื่ออะไร ดียังไง” นี้ยังเขียนได้ไม่ดีเท่าไร ถ้ามีโอกาสอาจจะมาแก้ไข เพิ่มเติม หรืออาจจะเขียนเป็นอีก blog post นึงเลย แต่ยังไงก็หวังว่าผู้อ่านคงเข้าใจ RxJava มากขึ้นนะครับ ต้องขออภัยด้วยที่ part 2 ออกช้า เขียนพวกนี้ใช้เวลาไม่น้อยเลย นี่หยุดสงกรานต์เพิ่งมีเวลาว่างจริงๆมาเขียนต่อ เอาไว้เจอกัน part 3 ครับ

--

--

Travis P
Black Lens

Android Developer, Kotlin & Flutter Enthusiast and Gamer