Grokking RxJava, Part3: Reactive with Benefits

Jong Yun Lee
Nspoons
Published in
7 min readFeb 17, 2017

--

이 글은 Dan Lew Codes 의 글을 번역한 것입니다.

원문 : http://blog.danlew.net/2014/09/30/grokking-rxjava-part-3/

지금까지 내용을 살펴보면 Part 1 에서는 RxJava의 기본 구조에 대해서 살펴 보았구요, Part 2에서는 operator들이 얼마나 효과적인지를 보여드렸습니다. 하지만 여러분이 아직도 이것들에 열광하지 않을지도 모르겠네요; 아직 여러분을 설득하기에 충분한 것들을 보여드리지 않았거든요. 이제부터 이 모든 의문들을 종결해버릴 RxJava 프레임워크의 혜택을 살펴 보도록 합시다.

에러 처리 (Error Handling)

지금까지 우리는 대체로 onComplete()onError() 에 대해서 아무것도 하지 않았었죠. 이 함수들은 Observable 객체가 내보내기(emitting)를 멈출때 멈춘 이유에 대해서 표시할 것입니다. (성공 혹은 에러중 하나겠죠)

원래의 SubscriberonComplete()onError() 가 실행되었음을 보고받을(listen) 수 있었습니다. 한번 실제로 해봅시다:

Observable.just("Hello, world!")
.map(s -> potentialException(s))
.map(s -> anotherPotentialException(s))
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) { System.out.println(s); }
@Override
public void onCompleted() {
System.out.println("Completed!");
}
@Override
public void onError(Throwable e) {
System.out.println("Ouch!");
}
});

위에서 potentialException()anotherPotentialException() 둘다 Exceptions(예외)를 던질(throw) 가능성이 있다고 해 봅시다. 각각의 Observable 들은 onCompleted()onError() 둘중 하나를 호출하고 끝날 것입니다. 대게 프로그램들은 “Completed!” 나 “Ouch!” ( Exception 이 일어났기 때문에) 이런 출력을 하도록 되어 있습니다.

이 패턴에 대해서 몇가지 짚고 넘어가야 할 것들이 있습니다:

  1. onError() 는 언제든지 Exception 이 일어나면 호출됩니다.
    이 점이 에러를 처리하는 것을 아주 간단하게 만듭니다. 한 명령의 마지막에 모든 에러를 처리할 수 있는 것이죠.
  2. operators 는 Exception을 처리할 수 없습니다.
    Exception이 onError() 으로 바로 넘어가기 때문에 Exception을 Subscriber 가 처리할 수 있도록 남겨 둘 수 있습니다.
  3. Subscriber 가 언제 아이템을 받았는지 알 수 있습니다.
    언제 동작(Task)이 끝나는지 아는 것이 코드의 흐름을 정하는데 도움이 될 수 있습니다. ( Observable 이 완료(complete) 되지 않을 수는 있지만요.. )

저는 이 패턴이 기존의 전통적인 에러 처리 방법보다 훨씬 쉽다는 것을 알게 되었어요. Callback 함수들로는 각각의 Callback 안에서 에러 처리를 해야 합니다. 이런 방법은 코드 반복으로 이어지고 도 코드 반복은 결국 각각 Callback 함수들이 어떻게 에러를 처리해야 할지 알아야 하고, 이는 Callback 함수가 호출함수와 너무 많은 부분에서 결합하게 된다(tightly coupled)는 것을 의미 합니다.

Schedulers

자, 이제 네트워크 요청을 하는 안드로이드 앱을 만들어 보았습니다. 이 동작이 꽤 오랜 시간이 걸리기 때문에, 다른 thread에서 진행해야 합니다. 새로운 문제군요!

Multi-threaded 안드로이드 앱은 알맞은 스레드에서 돌고 있는지가 확실해야 하기 때문에 다루기 어렵습니다. 복잡해지고 충돌을 일으키기 쉽상이죠.

RxJava 에서는 Observer 코드가 어떤 thread에서 동작하게 할 것인지 subscribeOn() 함수를 통해서 설정할 수 있고, Subscriber 의 설정은 ObserveOn() 을 통해서 할 수 있습니다.

myObservableServices.retrieveImage(url)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(bitmap -> myImageView.setImageBitmap(bitmap));

정말 간단하지 않나요? Subscriber 이 실행되기 전의 모든 동작은 I/O 스레드에서 실행됩니다. 그런다음 마지막으로 뷰를 조정하는 동작이 main 스레드에서 실행됩니다.

정말 좋은 부분은 모든 Observable 객체에 subscribeOn() observeOn() 을 달 수 있다는 것입니다! 이 함수들은 단지 operator일 뿐이죠! Observable 이나 이전 operator 들의 동작을 신경쓸 필요가 없다는 것입니다.

AsyncTask 등과 같은 것들로는 동시에 진행되어야 되는 부분들을 감싸는 구조로 짜야 합니다. RxJava로는 코드는 변함이 없고 다만 동시성 작업이 별도로 추가되는 방식이 가능한 것이죠.

Subscriptions

사실 여러분들에게 보여주지 않았던 것이 있는데요. Observable.subscribe() 은 호출될때, Subscription 객체를 반환합니다. 이 객체는 ObservableSubscriber 사이의 관계에 대한 정보를 담고 있습니다.

Subscription subscription = Observable.just("Hello, World!")
.subscribe(s -> System.out.println(s));

나중에 다룰 내용인데, 이 Subscription 을 서버에 사용할 수 있습니다.

Subscription.unsubscribe();
System.out.println("Unsubscribed=" + subscription.isUnsubscribed());
// Outputs "Unsubscribed=true"

RxJava가 unsubscribe 를 다루는데 좋은점은 chain을 멈추기 때문입니다. 만약에 복잡한 operators들의 chain이 있다고 한다면, unsubscribe 는 현재 어떤 것을 진행하고 있든지 간에 동작을 끝낼 것입니다.

결론

이 글이 RxJava를 소개하는 글이라는 것을 유념하시길 바랍니다. 보여드린 것 보다 배울것이 훨씬 습니다. (예를들어 backpressure에 대한 것들을 살펴보시면 좋을것 같네요). 저도 모든것에 reactive 코드를 사용하지는 않습니다. 복잡한 부분을 훨씬 단순한 로직으로 쪼개고 싶을때 사용합니다.

원래 이 포스팅을 RxJava 시리즈의 마지막 부분으로 계획하였었는데요, Android에 실용적인 적용 예시들이 있으면 좋을것 같다는 요청이 많아 part 4 를 준비하였습니다. 이 포스팅들이 여러분이 RxJava를 시작하는데 도움이 되었으면 좋겠네요. 좀더 알아보고 싶다면 official RxJava wiki 를 읽어 보실것을 권장합니다. 그리고 항상 무엇이든 가능하다는 사실을 잊지 마세요 :)

Many thanks to all the people who took the time to proofread these articles: Matthias Käppler, Matthew Wear, Ulysses Popple, Hamid Palo and Joel Drotos (worth the click for the beard alone).

--

--