Android Architecture #4: MVP & RxJava

MVP 구조에 Repository 부분에 Rx 를 추가한 형태의 예제 입니다.

History

MS Volta 로부터 Reactive Framework 로 분리되고, 이후 .net Framework 의 Reactive Extension 으로 변경되었습니다.
처음에는 동적 언어와 정적 언어 사이에 상호작용을 위한 비동기 로직 처리 방법으로 시작되었으나, 이후 Ajax 비중이 높아지며 비동기 처리에 중점을 둔 RxJS 가 공개되며, 넷플릭스에서 RxJava 를 만들게 되고, node.js 의 callback 지옥을 해결하기 위해 Rx 가 도입되어 유행하기 시작합니다.

Definition

Reactive programming is programming with asynchronous data streams. You can listen to that stream and react accordingly.
데이타 플로우를 통해 상태 변경을 전파하는 것에 근간을 둔 프로그래밍 패러다임을 말합니다.

즉, 외부에서 들어오는 요청을 데이타 플로우를 통해 지속적으로 응답하는 시스템을 말하며, Iterator 패턴과 Observer 패턴의 쌍대관계를 통해 외부에서 지속적으로 Push 되는 이벤트를 받을 수 있는 인터페이스를 제공합니다.

Iterator 패턴과 Observer 패턴의 쌍대관계란 아래와 같은 과정을 말합니다.

Retrieve Data: Iterator T next() > Observer onNext(T)
Discover Error: Iterator throws Exception > Observer onError(Exception)
Complete: Iterator hasNext() == false > Observer onComplete()

Operator

Operator 가 Observer, Iterator 와 만나면서 아래와 같이 비동기 처리에 최적화 되었습니다.

마우스 좌표를 계속 이동합니다.
Iterator 를 통해 마우스 이벤트를 계속 푸쉬합니다.
오페러이터들을 거치면서 데이타를 가공합니다.
가공된 데이터를 각 Observable 을 subscribe 하는 Observer 에서 처리합니다.
Observable 은 subscribe 할 수 있는 이벤트를 말하며, Observer 는 subscribe 을 통해 구독한 Observable 을 onNext / onError / onCompleted 를 통해 처리합니다.

Stream

Stream in Rx

Reactive 의 외부에서 들어오는 모든 요청은 Stream 을 통해 처리됩니다.

스트림에 들어온 이벤트는 buffer 를 통해 단위 처리 시간을 확보하여 Push 가 너무 자주 일어나지 않도록 성능을 확보할 수 있습니다.
또한 map 이나 filter 를 통해 실제 처리할 이벤트를 필터링 할 수도 있습니다.
이런 map 이나 filter 들이 위에 설명한 Operator 입니다.

Operator 를 사용하다보니 요즘 함수형 프로그래밍이 떠오르고 있습니다.

In code

mSubscriptions.clear();
Subscription subscription = mTasksRepository
.getTasks()
.flatMap(new Func1<List<Task>, Observable<Task>>() {
@Override
public Observable<Task> call(List<Task> tasks) {
return Observable.from(tasks);
}
})
.filter(task -> {
switch (mCurrentFiltering) {
case ACTIVE_TASKS:
return task.isActive();
case COMPLETED_TASKS:
return task.isCompleted();
case ALL_TASKS:
default:
return true;
}
})
.toList()
.subscribeOn(mSchedulerProvider.computation())
.observeOn(mSchedulerProvider.ui())
.doOnTerminate(() -> {
if (!EspressoIdlingResource.getIdlingResource().isIdleNow()) {
EspressoIdlingResource.decrement(); // Set app as idle.
}
})
.subscribe(
// onNext
this::processTasks,
// onError
throwable -> mTasksView.showLoadingTasksError(),
// onCompleted
() -> mTasksView.setLoadingIndicator(false));
mSubscriptions.add(subscription);

가장 먼저 Subscriptions 를 초기화합니다.
- public Observable<List<Task>> getTasks() 를 통해 Rx 의 Observable 을 가져옵니다.
- flatMap 을 통해 List<Task>를 Observable<Task> 로 변환합니다.
- filter 를 통해 원하는 데이타만 필터링 합니다.
- toList 를 통해 하나의 리스트로 취합합니다.
- subscribeOn 을 통해 subscribe 될 Rx Scheduler (Thread) 를 설정합니다.
- observeOn 을 통해 Observable 이 아이템을 전파할 때, 사용할 Rx Scheduler (Thread) 를 지정합니다.
- doOnTerminate 를 통해 Observable 이 Error 혹은 Completed 될 때 호출될 callback 함수를 등록합니다.
- Observable.subscribe 를 통해 Next, Error 혹은 Completed 될 때 호출될 함수를 등록하고, Subscription 을 return 합니다.

private void processTasks(@NonNull List<Task> tasks) {
if (tasks.isEmpty()) {
// Show a message indicating there are no tasks for that filter type.
processEmptyTasks();
} else {
// Show the list of tasks
mTasksView.showTasks(tasks);
// Set the filter label's text.
showFilterLabel();
}
}

실제 onNext 에서 수행될 함수는 위와 같습니다. 즉, 필요한 파라미터 List<Task> 를 데이타 스트림을 통해 만들고, 쓰레드를 지정하고, 에러 처리 및 완료 처리까지 지정하고, 실제 주요 동작은 onNext 를 통해 지정한 내용을 subscription 으로 정의하고, mSubscriptions 에 추가합니다.

private CompositeSubscription mSubscriptions;
mSubscriptions = new CompositeSubscription();
...
public final class CompositeSubscription implements Subscription

위와 같이 Subscription 으로 정의되어 있습니다.

@Override
public void onResume() {
super.onResume();
mPresenter.subscribe();
}

@Override
public void onPause() {
super.onPause();
mPresenter.unsubscribe();
}

Fragment 의 Lifecycle 에 따라 아래와 같이 Presenter 의 subscribe / unsubscribe 가 호출됩니다.

@Override
public void subscribe() {
loadTasks(false);
}

@Override
public void unsubscribe() {
mSubscriptions.clear();
}

loadTasks 에서 위에 subscribe 가 호출되기 때문에, 결국 Fragment 의 Lifecycle 에 따라 subscribe / unsubscribe 가 되게 됩니다.

MissingBackpressureException

Observable 이 items 를 보내는 속도가, operator / subscriber 에서 처리되는 속도보다 빠를 때 주로 발생합니다.

Throttling / Buffers and windows 등 몇가지 operator 를 사용해서 예방할 수 있으며, 무엇보다 Observable / Flowable 을 구분해서 100% 는 아니지만 많은 경우의 문제를 해결할 수 있습니다.

Observable

  • 만약 플로우에 1000개 이하의 항목이 있다면, 시간이 지나면서 항목이 대부분 없어지기 때문에 애플리케이션에서 OutOfMemoryError가 발생할 일이 없습니다.
  • 마우스 움직임이나 터치 이벤트와 같은 GUI 이벤트를 처리할 때는 합리적으로 Backpressure를 줄 수 없으며, 빈번하지도 않습니다. Observable을 사용하면 초당 1000개 혹은 그 이하의 항목을 처리할 수 있지만 샘플링이나 디바운싱을 사용하는 것이 좋습니다.
  • 플로우가 본질적으로는 동기식이지만 플랫폼이 Java 스트림을 지원하지 않거나 그런 기능이 있다는걸 놓쳤을 때, Observable을 쓰는 것이 Flowable을 쓰는 것보다 대부분 오버헤드가 적습니다. (Java 6+를 지원하는 Iterable 플로우에 최적화된 IxJava도 고려할 수 있습니다.)

Flowable

  • 어딘가에서 생성되는 10000개 이상의 요소를 처리할 때, 체인은 소스가 생성되는 양을 제한할 수 있습니다.
  • 파일을 디스크에서 읽거나 파싱하는 일은 본질적으로 블로킹이고, 풀에 기반(Pull-based)합니다. 이럴 때는 Backpressure를 통해 사용자가 제어할 수 있습니다.
  • JDBC를 통해 데이터베이스를 읽는 것 또한 블로킹이고 풀에 기반을 두며, 각 다운스트림 요청에 대해 ResultSet.next()를 호출해서 사용자가 제어할 수 있습니다.
  • 네트워크를 거치거나, 논리적 리소스를 요청하는 프로토콜을 사용하는 네트워크 (스트리밍) 입출력
  • 추후에 논블로킹 리액티브 API 혹은 드라이버를 지원하게 될 수 있는, 블로킹이거나 풀에 기반을 둔 데이터 소스

RxMarbles

Interactive diagrams of Rx Observables

Frodo

Frodo is an android library inspired by Jake Wharton’s Hugo, mainly used for logging RxJava Observables and Subscribers outputs on the logcat.


Rx 에 대한 자세한 내용은 여기를 참고하시기 바랍니다.

Like what you read? Give Jae-young Lee a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.