RxJava part 2: คืออะไร ใช้ยังไง เพื่ออะไร

ลาก่อย callback hell

Travis P
Black Lens
5 min readApr 15, 2017

--

Photo credit: yoshimov via VisualHunt / CC BY-ND

ใน RxJava part 1: ก่อนจะมาเป็น RxJava ผมได้แนะนำแนวคิดพื้นฐานของ RxJava ไปโดยที่ยังไม่ได้แตะโค้ด RxJava กันจริงๆเลย อยากให้ค่อยๆซึมซับแนวคิดพวกนั้นก่อน พอมาเจอโค้ดของจริงก็พยายามตระหนักไว้ว่ามันมีแนวคิดมาจากพื้นฐานพวกนั้น เราจะได้งงน้อยลง

มีอะไรบ้าง แต่ละตัวคืออะไร

ส่วนนี้ผมจะแนะนำตัวละครหลัก คลาส อินเตอร์เฟส คำศัพท์ที่เกี่ยวข้องกันก่อน โค้ดตัวอย่างแถวนี้มีพื้นฐานบน RxJava 2 แต่ผมจะตัดความซับซ้อนบางอย่างออกไปเพื่อความง่ายนะครับ

Observable

ผมเชื่อว่าคำนี้ทำคนไทยมึนที่สุด อันนี้มันมาจาก observe-able คือมันยอมให้คนอื่นมาสังเกตตัวมันได้ มันจะทำตัวเป็นผู้ผลิตข้อมูล มันจะห่อ logic การผลิตข้อมูลเอาไว้ ถ้ามันผลิตข้อมูลอะไรใหม่ได้ มันจะบอกคนที่สังเกตมันอยู่ เราไปดู interface ของมันใน RxJava 2 กันดีกว่า จะได้รู้ว่ามันสัญญาไว้ว่าจะทำอะไรได้บ้าง

public interface ObservableSource<T> {
void subscribe(Observer<T> observer);
}

ก็ตามชื่อเลย มันยอมให้คนอื่น ซึ่งในที่นี้คือ Observer มาคอยสังเกตผ่าน method subscribe(Observer) ส่วนมันจะบอกคนสังเกตยังไงนั้น ดูที่ Observer

Subscribe

คืออาการที่เราเอา Observer ไปสังเกต Observable ด้วยการเรียก Observable.subscribe(Observer)

Observer (Subscriber)

Observer หรือ Subscriber เป็นสองคำที่ผมคิดว่าความหมายเหมือนกัน คือผู้สังเกต หน้าที่หลักคือใช้ข้อมูลที่ Observable ป้อนให้มา

public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T t);
void onError(Throwable e);
void onComplete();
}

พอเจ้า Observable มันผลิตข้อมูลมาได้ มันก็จะบอก observer.onNext(T) ผลิตข้อมูลมากี่ตัวก็ onNext(T) เท่านั้นครั้ง จนกระทั่งมันผลิตข้อมูลจนครบแล้วก็จะบอก observer.onComplete() ถ้าระหว่างผลิตข้อมูลเกิดพังขึ้นมาก็จะบอก observer.onError(Throwable) ทีนี้ฝั่งผู้ใช้ข้อมูลก็ตัดสินใจเอาเองละว่า ถ้ามีข้อมูลใหม่มาใน onNext(T) จะทำอะไร ทำเสร็จหรือพังจะทำอะไร

ทีนี้มันจะมี onSubscribe(Disposable d) อยู่ อันนี้ก็แค่บอก observer ว่าจะเริ่มทำงานละนะ เอา Disposible ไป แล้วจะเอาไปทำอะไรล่ะ?

Disposable (Subscription)

disposable (อนุโลมให้เรียก subscription ได้) เป็นออบเจคแทนการเชื่อมต่อระหว่าง Observable และ Subscriber

public interface Disposable {
void dispose();
boolean isDisposed();
}

ถ้าเราไม่อยากสังเกตแล้ว ก็เรียก Disposable.dispose() ซะ

Event/Data/Item/Signal/Message

ศัพท์พวกนี้เราจะได้ยินเสมอๆเวลาพูดถึง RxJava ผมคิดว่ามันมีความหมายเหมือนกันนะ มันก็คือไอ้ข้อมูล T ที่ส่งมาใน onNext(T) นั่นแหละ แต่ในบางบริบทอาจจะรวมไปถึง onComplete onError ด้วย

Stream

คำนี้เป็นคำที่ได้ยินบ่อย และสร้างความฉงนงงงวยให้กับมือใหม่เป็นอย่างมาก จริงๆไม่มีอะไรยาก onNext(T) เนี่ยมันจะถูกเรียกกี่ครั้งก็ได้ จนกระทั่งทำงานสำเร็จ onComplete() ถูกเรียก(แค่ครั้งเดียว) หรือทำงานล้มเหลว onError(Throwable) ถูกเรียก(แค่ครั้งเดียว) ตราบเท่าที่ onComplete() หรือ onError(Throwable) ยังไม่โดนเรียก ข้อมูลไหลผ่าน onNext(T) เข้ามาได้เรื่อยๆ มีลักษณะเป็น stream นั่นเอง

https://pixabay.com/en/washington-landscape-scenic-1908788/

รู้สึกมันทำงานคล้ายๆ Callback/Listener จังเลย

บางท่านอาจเคยใช้ Listener ต่างๆของ Android หรือใช้ http library เอาไว้ยิง api แบบ asynchronous แล้วได้ของกลับมาใน Callback หรือบางท่านอาจจะเขียน Callback เป็นของตัวเองประมาณนี้

public interface MyCallback<T> {
void onStart();
void onComplete(T t);
void onFailure(Throwable e);
}

ใช่แล้วครับ Observer มันก็คือ Callback ของเราดีๆ นี่แหละ Observable ก็เหมือนการห่อ logic เอาไว้ การ Subscribe ก็เหมือนการเรียก method แล้วส่ง callback เข้าไปรอฟังเท่านั้นเอง

ใช้ยังไง

เมื่อกี้ผมพูดถึง ObservableSource<T> ที่เป็น interface เพื่อให้เข้าใจจุดประสงค์จริงๆของมันมีแค่นั้น แต่ชีวิตจริงเราใช้งาน Observable<T> กันครับ ไอตัวนี้ทรงพลังมาก

interface MyApi {
@GET("/helloworld")
Observable<String> helloWorld();
}

ผมจะยังไม่สร้าง Observable เองละกัน สมมติผมใช้ Retrofit + RxJava เจ้า retrofit มันก็จะจัดการห่อ logic การเรียก api การ deserialize ผลลัพท์ไว้ในนั้นสร้างเป็น Observable มาให้เราเรียบร้อย หน้าที่เราคือเอามาใช้

ถ้าคุณไม่เคยใช้ Retrofit หรือไม่เข้าใจว่ามันมายังไง ก็คิดซะว่าเวลาเราเรียก MyApi.helloWorld() มันจะคืน Observable มาให้แล้วกัน เดี๋ยวเราค่อยไปดูการสร้างทีหลัง

myApi.helloWorld()
.subscribe(new Observer<String>() {
@Override
void onNext(String string) {
myTextView.setText(string);
}
@Override
void onError(Throwable throwable) {
Toase.makeText(...).show();
}
@Override
void onComplete() {
}
@Override
void onSubscribe(Disposable d) {
}
});

แต่จะเอามาใช้จะให้ override Observer ทุก method ก็แลดูเยอะไป บางทีเราก็ไม่ได้สนใจทุกอย่าง เค้าก็มี overload ต่างๆให้ใช้เฉพาะที่ต้องการ

//call the api
helloDisposable = myApi.helloWorld()
.subscribe(new Consumer<String>() {
@Override
void accept(String string) {
//onNext
myTextView.setText(string);
}
}, new Consumer<Throwable>() {
@Override
void accept(Throwable t) {
//onError
Toast.makeText(...).show();
}
});
//dispose it in onDestroy()
helloDisposable.dispose();

Consumer<T> นี่ก็เป็นแค่ interface ที่ไว้ให้เราห่อ onNext(T) onError(Throwable) แยกเป็นอันๆเท่านั้น

ตรงนี้ขอเตือนนิดนึง ถ้าเราไม่ได้ใส่ onError ไว้ด้วย เวลามี Exception แอพแครชทันทีนะครับ

แล้วมันต่างกับ Callback ธรรมดายังไงล่ะ

แทบไม่ต่างเลย ถ้าใช้แค่นี้ ไม่จำเป็นต้องใช้หรอกครับ ใช้ callback ธรรมดาก็พอ แต่ Observable นี่เค้ามีฟีเจอร์ให้ใช้เพียบ เรียกว่า Operator ครับ

Operator

operator นี่ก็คือ method ธรรมดาแหละ สมมติเรามี observable ตัวนึง แล้วเรียกใช้ operator บนตัวมัน (instance method) ก็จะได้ observable ตัวใหม่ออกมา (คอนเซปเรื่อง immutibility) หรือบาง operator ก็เป็น static method เรียกใช้จาก Observable class

Transform (Map)

map ในที่นี้ไม่ใช่ key-value pair นะครับ แต่เป็นการแมพ(แปลง)ค่านึงไปเป็นอีกค่านึง สมมติจาก MyApi.helloWorld() เมื่อกี้ ผมอยากได้ความยาวของ string แทน

Observable<String> helloObs = myApi.helloWorld();
Observable<Integer> lengthObs = helloObs.map(
new Function<String,Integer>() {
@Override
Integer apply(String string) {
return string.length;
}
});
lengthObs.subscribe(new Consumer<Integer>() {
@Override
void accept(Integer integer) {
//use your length
}
});

หรือจะเขียนยาวไปเลยงี้ก็ได้

myApi.helloWorld()
.map(new Function<String,Integer>() {
@Override
Integer apply(String string) {
return string.length;
}
})
.subscribe(new Consumer<Integer>() {
@Override
void onNext(Integer integer) {
//use your length
}
}
});

โอเค แล้วมันต่างยังไงกับ subscribe String แล้วเราค่อยมา string.length เอง? จริงครับกรณีนี้ผมก็ว่าไม่ได้ว้าวขนาดนั้น ก็แค่แปลงข้อมูลให้เสร็จเรียบร้อยเลย แต่มันก็มีกรณีที่ใช้สะดวกแหละ เช่นผมยิง api ได้ข้อมูลหน้าตาอุบาทว์ๆมา ผมอยากแปลงให้มันสวยงามก่อนมาใช้ ก็ใส่ logic การแปลงไว้ใน map เลย ปลายทางคนใช้เค้าก็สะดวก ทำงานกับข้อมูลสวยงาม

Filter

จริงๆอันนี้ไม่น่าจะต้องอธิบายนะ ดูตัวอย่างละกัน สมมติผมมี MyApi.getFriends() แต่ผมนี่สายหื่น อยากกรองเฉพาะสาวเอ๊าะๆเท่านั้น

public class Friend {
public String name;
public String gender;
public int age;
}
myApi.getFriends()
.filter(new Predicate<Friend>() {
@Override
boolean test(Friend friend) {
return friend.gender.equals("female") && friend.age < 25;
}
})
.subscribe(new Consumer<Friend>() {
@Override
void accept(Friend friend) {
//only female friend
}
});

สมมุติ getFriends() เฉยๆจะส่ง friend ผ่าน onNext(Friend) 10 คน หกคนเป็นผู้หญิง ถ้าผ่าน .filter() แล้วเรา suscribe ก็จะเหลือ onNext(Friend) 6 ครั้งเป็น female ที่ age<25 เท่านั้น

CombineLatest

อันนี้ท่าเริ่มยาก เอาไว้รวม item ล่าสุดของ 2 observable ขึ้นไปเป็นหนึ่งเดียว
ถ้ามึนๆข้ามไปได้เลย

Observable<String> nameObs = myApi.getName(); //"Alice"
Observable<Integer> ageObs = myApi.getAge(); //22
Observable<String> nameWithAgeObs = Observable.combineLatest(nameObs, ageObs,
new BiFunction<String, Integer, String>() {
@Override
String apply(String s, Integer integer) {
return String.format("%s (%d)", s, integer);
}
});
nameWithAgeObs.subscribe(new Consumer<String>() {
@Override
public void accept(String s) {
//"Alice (22)"
myTextView.setText(s);
}
});

FlatMap

อันนี้ท่ายาก หลักการทำงานมันคือเอา item หนึ่งอันมาแปลงเป็น observable 1 เส้น มี n ไอเทม ก็ได้ n เส้น จากนั้นเอา ทุก item ของแต่ละเส้นมารวมกันเหลือเส้นเดียว แลดูงงๆ ถ้าไม่เข้าใจก็อ่านผ่านๆไปก่อนครับ

แต่ประโยชน์ทางอ้อม จะเอาไว้ใช้เพื่อทำ observable นึง ต่อจากอีก observable นึง

interface MyUserApi {
Observable<User> getMe();
Observable<Address> getAddressById(String id);
}
myUserApi.getMe()
.flatMap(new Function<User, ObservableSource<Address>>() {
@Override
ObservableSource<Address> apply(User user) {
return myUserApi.getAddressById(user.addressId);
}
})
.subscribe(new Consumer<Address>() {
@Override
void accept(Address address) {
//use your address
}
});

เค้าว่ากันว่าเปลี่ยน Thread ง่ายนิ

ก็ง่ายจริงอย่างว่าครับ เค้ามี operator subscribeOn() บอกให้ไปทำงานใน Thread ไหน ส่วนobserveOn() บอกว่าทำงานเสร็จแล้วส่งผลลัพท์ไปยัง subscriber หรือ operator ถัดไปที่ Thread ไหน

myApi.getSomethingFromNetwork()
.subscribeOn(Schedulers.io())
.observeOn(AndroidScheduler.mainThread)
.subscribe(new Consumer<Something>() {
@Override
void accept(Something something) {
myTextView.setText(something.toString());
}
});

subscribeOn() สั่งได้ครั้งเดียว ใส่แทรกเข้าไปตรงไหนก็ได้ Observable ของเราจะเริ่มทำงานที่ Thread นั้น

observeOn() สั่งกี่ครั้งก็ได้ มันจะเอา item ไปส่งที่ Thread นั้น พอมันเอา item ไปส่งที่ Thread นั้น ทำให้ operator ถัดๆไปก็ถูกทำที่ Thread นั้นตามไปด้วย

myApi.getSomethingFromNetwork()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.map(new Function<Something,String>) {
@Override
String apply(Something something) {
return SomeUtil.doHeavyComputation(something);
}
})
.observeOn(AndroidScheduler.mainThread)
.subscribe(new Consumer<String>() {
@Override
void accept(String string) {
myTextView.setText(string);
}
});

จริงๆแล้วไอ้ที่เรา subscribeOn() observeOn() นั้นเรากำหนด Scheduler ให้มันต่างหาก เดี๋ยวมันไปจัดการ Thread ให้เองข้างหลัง เข้าใจตรงกันนะ

Creation

ถึงเวลาสร้าง observable กันเองบ้างแล้ว หลังจากที่ผมผลักภาระไปให้ retrofit ซะนาน ขอแนะนำ 3 operator ยอดฮิตดังนี้

Observable.fromCallable()

public interface Callable<V> {
V call();
}
Observable.fromCallable(new Callable<String>() {
@Override
public String call() {
return EncryptionUtil.encrypt(plainText);
}
});

Callable<V> เป็นแค่ interface ที่เอาไว้ให้เราห่อ logic ของเราไว้ ในตัวอย่างผมอยาก encrypt text ก็ encrypt แล้วคืน String ออกไป

โค้ดที่เราห่อไว้นี่จะยังไม่ทำงานนะ ห่อไว้ก่อนเฉยๆ พอมีคนมา subscribe เท่านั้นแหละ ทำงานทันที แล้วผลลัพท์ก็ไปโผล่ใน onNext(T) ตามด้วย onComplete()

ถ้าเกิดมันพังอะไรขึ้นมาก็ไม่ต้องกังวล ตอน subscribe เราก็ไปรอฟังใน onError(Throwable)

วิธีนี้เหมาะสำหรับการห่อ logic ที่ทำงานแล้วคืนผลลัพท์ธรรมดาทั่วไป

Observable.create()

Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(final ObservableEmitter<String> emitter) {
myCallbackStyleApi.getSomething(new MyCallback<String>() {
@Override
void onComplete(String s) {
emitter.onNext(s);
emitter.onComplete();
}
@Override
void onFailure(Throwable t) {
emitter.onError(t);
}
});
}
});

ดูโค้ดแล้วก็ค่อนข้างตรงไปตรงมา เราห่อ callback style api เอาไว้ข้างใน ให้มันทำงานของมันไป พอทำงานเสร็จ ได้ของกลับมาทาง callback เราก็ไปบอก emitter.onNext(T) ทำงานเสร็จก็บอก emitter.onComplete() หรือถ้าพังก็บอก emitter.onError(Throwable)

เจ้า ObservableEmitter<T> นี่ก็ให้คิดซะว่ามันคือ Observer<T> นั่นแหละ มันเป็นแค่ตัวช่วยห่ออีกชั้นนึง ผมก็ไม่เข้าใจจริงๆหรอกว่าข้างในมันทำอะไรบ้าง คงซับซ้อนน่าดู

ข้อควรระวังในการสร้างด้วย create ก็คือเราต้องอย่าลืม clean up เช่น ยกเลิกพวก callback หากเกิดการ dispose ด้วย วิธีทำดูได้จาก javadoc ของ method create นั่นแหละ มีตัวอย่างอยู่แล้ว

วิธีนี้เหมาะสำหรับการแปลง callback api มาเป็น observable

Observable.just()

Observable<String> helloObs = Observable.just("Hello");
helloObs.subscribe(new Consumer<String>() {
@Override
void accept(String string) {
//call 1 time with a String "hello"
println(string)
}
});
Observable<String> helloObs = Observable.just("Hello","world","!");
helloObs.subscribe(new Consumer<String>() {
@Override
void accept(String string) {
//call 3 times with a String "hello", "world", "!"
println(string)
}
});

วิธีนี้เหมาะสำหรับเรามีข้อมูลอยู่แล้ว จะเอามาใช้ในโลก reactive

ถ้าอ่าน tutorial เจ้าอื่นคงจะเจอ just มาก่อนเลย แต่ผมอ่านเจอแล้วหงุดหงิดเป็นพิเศษเลยเอามาไว้หลังสุด ผมไม่เข้าใจว่าเรามีข้อมูลอยู่แล้ว จะเอามาใส่ Observable แล้วค่อย subscribe มาใช้ทำแพะอะไร มีอยู่แล้วก็ใช้เลยสิ

คำตอบคือปกติก็ไม่ทำงี้กันหรอกครับ แต่อาจมีบางกรณีที่เราจะต้องเอาไปใช้ใน operator อื่นๆ เช่นเอาไป combineLatest แล้วไป map ต่อ filter ต่อ หรือบางทีเราออกแบบแอพเราให้ใช้ RxJava คุยกันระหว่างส่วนประกอบต่างๆ ฝั่งผู้ใช้เค้าอยากได้ Observable<String> เราจะไป return String คืนให้มันก็ไม่ได้ มีของอยู่แล้วก็ต้องห่อใส่ Observable ให้เค้าไป

เพื่ออะไร ดียังไง

เป็นคำถามที่ตอบให้เห็นภาพยากมาก จนกว่าจะได้ลองใช้เองแล้วจะเข้าใจ แต่พูดงี้เหมือนขายตรงเกินไป ผมเองก็ยังไม่นับว่าเป็น expert ด้านนี้ แต่จะพยายามตอบเท่าที่ได้โดยมีพื้นฐานอยู่บนโพสนี้ก่อนละกันครับ

ผมมองว่ามันเป็น framework สำหรับการรับส่งข้อมูลที่ decoupled และมีความยืดหยุ่นครับ ไม่รู้ว่าใช้คำนี้ถูกหรือเปล่า แต่ผมสามารถห่อ logic ของผมไว้ใน Observable ซึ่งสามารถส่งไปมาได้ จนถึงมือผู้ใช้ โดยที่ผู้ผลิตข้อมูลและผู้รับข้อมูลไม่จำเป็นต้องรู้จักกันเลย

ผมสามารถออกแบบแอพให้ใช้ RxJava คุยกันระหว่างส่วนประกอบต่างๆ แล้วสร้าง Observable จากชั้น data layer ที่ได้ข้อมูลดิบๆ มาผ่านชั้น domain layer ที่เป็น business logic ผมก็แค่ใส่ operator map, filter ตาม requirement จนมาถึงชั้น presentation ผมก็บอกให้มัน subscribeOn(Scheduler.io()) นะ แล้ว observeOn(AndroidSchedulers.mainThread) จากนั้น subscribe นำข้อมูลดิบที่ผ่านการแปลงมาเรียบร้อยแล้วมาแสดงผลได้อย่างถูก Thread สวยงาม

และเนื่องจากมี operator ให้เลือกใช้มากมาย ที่เราใช้ประจำอยู่แล้วเช่น map filter ผมถือว่ามันอ่านง่ายนะ อ่านเป็นภาษาคนรู้เรื่อง สื่อความหมายชัดเจนว่าเราจะทำอะไร เทียบกับโค้ด Java ธรรมดาที่เราต้องมานั่งวน for กรองเอาเองแล้วอันนี้อ่านเข้าใจง่ายกว่าเยอะ แต่การที่เราต้องสร้าง annonymous class ขึ้นมาก็อุบาทว์พอสมควร ซึ่งก็แก้ไขได้โดยใช้ lambda ใน Java 8 หรือทางที่ดีก็เปลี่ยนมาใช้ Kotlin เถอะ

และที่ขาดไม่ได้คือทำเรื่อง concurrency/threading ง่ายมาก พอเราห่อ logic ไว้ใน observable แล้ว ก็ให้แต่ละตัวทำงานในแต่ละ thread ของมัน สุดท้ายก็นำมา observeOn/merge/combineLatest กันได้ไม่ต้องมากังวลคอยจัดการเอง RxJava ทำให้หมดแล้ว รวมไปถึงการกำจัด callback hell ที่พลิกแพลงมาจากการใช้ flatMap อีก

จริงๆคิดว่า section “เพื่ออะไร ดียังไง” นี้ยังเขียนได้ไม่ดีเท่าไร ถ้ามีโอกาสอาจจะมาแก้ไข เพิ่มเติม หรืออาจจะเขียนเป็นอีก blog post นึงเลย แต่ยังไงก็หวังว่าผู้อ่านคงเข้าใจ RxJava มากขึ้นนะครับ ต้องขออภัยด้วยที่ part 2 ออกช้า เขียนพวกนี้ใช้เวลาไม่น้อยเลย นี่หยุดสงกรานต์เพิ่งมีเวลาว่างจริงๆมาเขียนต่อ เอาไว้เจอกัน part 3 ครับ

--

--

Travis P
Black Lens

Android Developer, Kotlin & Flutter Enthusiast and Gamer