RxJava — One Observable, Multiple Subscribers, Same Data
Pavlos-Petros Tournaris
24114

Perhaps I’m the only one, but I didn’t fully understand the issue. Were you trying to implement a caching mechanism using the .share()/.publish() operators? Essentially, the .share() operator ensures that only one observable will be executing at a time.

As an example, take a look at this sample code:

public static void main(String[] args) {
Observable<Long> observable = Observable.create(new OnSubscribe<Long>() {
@Override
public void call(Subscriber<? super Long> subscriber) {
// wait for 1 second before sending back anything (emulating a network call)
pause(1000);
subscriber.onNext(System.currentTimeMillis());
subscriber.onCompleted();
}
});
    observable = observable
.subscribeOn(Schedulers.io())
.doOnSubscribe(() -> System.out.println(" observable subscribed"))
.doOnNext(aLong -> System.out.println(" observable next: " + aLong))
.doOnCompleted(() -> System.out.println(" observable completed"))
.doOnUnsubscribe(() -> System.out.println(" observable unsubscribed"))
.share();
    System.out.println("First subscription");
Subscription sub1 = observable
.subscribe(aLong -> {
System.out.println("Sub1: received next: " + aLong);
});
    // wait a short while before adding another subscription
System.out.println("Waiting for 0.5 seconds...");
pause(500);
    System.out.println("Second subscription");
Subscription sub2 = observable
.subscribe(aLong -> {
System.out.println("Sub2: received next: " + aLong);
});
    // wait a short while before adding another subscription
System.out.println("Waiting for 0.6 seconds...");
pause(600);
    // the first subscription should have finished now
System.out.println("Third subscription");
Subscription sub3 = observable
.subscribe(aLong -> {
System.out.println("Sub3: received next: " + aLong);
});
    System.out.println("Waiting for 2 seconds...");
pause(2000);
    System.out.println("Unsubscribing");
sub1.unsubscribe();
sub2.unsubscribe();
sub3.unsubscribe();
}
private static void pause(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

Code summary:

The first subscriber results in an operation that takes 1 second to return an item then completes. However, within this 1 second, another subscription occurs. These two subscriptions will get the same data (same timestamp). A third subscription then occurs, at which time the first two subscriptions would have completed. The third subscription results in another operation occurring (hence another timestamp is displayed).

Like what you read? Give Umran a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.