RxJava2: Continue till Success — a use case

sandeep shabd
Aug 26, 2017 · 3 min read

Problem Statement:

There are list of data. Among all, some data satisfy the success condition while other data throws exception. You need to continue through exceptions till you find the first success condition and ignore the rest of the data points. For example, there are list of ports you would like to connect. You want to know which is the first port that you can connect to. Once you are connected to a port, stop trying to connect to rest of the ports down in the list. Using RxJava2, I try to explain how this can be achieved.

Test ports:

deviceList.add("Port_Address1");
deviceList.add("Port_Address2");
deviceList.add("Port_Address3");
deviceList.add("Port_Address4");
deviceList.add("Port_Address");
deviceList.add("Port_Address5");
deviceList.add("Port_Address6");

The test method helps in connecting to the port address:

private String checkPortConnection(String deviceID) throws Exception {
if ("Port_Address".equals(deviceID) || "Port_Address5".equals(deviceID)) {
return deviceID;
}
throw new Exception("Not connected.");
}

Lets start the RxJava process to iterate through the port list and start connecting to the ports till success is achieved:

Observable observableName = Observable.fromIterable(deviceList);
observableName
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String device) throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {

try {
System.out.println("Test and connect to port connection:"+device);
String portAddress = checkPortConnection(device);
observableEmitter.onNext(portAddress);
observableEmitter.onComplete();
} catch (Exception e) {
System.out.println("Problem connecting to port:"+device);
}
}
});
}
})
.take(1)
.subscribe(getObserver());

I create an observable that iterates through the list of ports or device list. Then using the flatMap operator, I operate on the data and observe the results. On failure, I log the failure port. When the successful connection occurs, I convey the success. Since I want only the first success, I use the “take” filter to take only the first data. I subscribe to the output using an observer. The observer will get the data and log it.

The code snippet can be made much smaller if you use lambda. Go ahead and optimize the code using lambda as a task.

Here is the overall test code. Copy the code in MainTest.java and compile and run in your maven project. You will need RxJava2 maven dependencies in your pom.xml of your java maven project.

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;


import java.util.ArrayList;
import java.util.List;

public class MainTest {

List<String> deviceList = new ArrayList<>();

void setTestPorts() {
deviceList.add("Port_Address1");
deviceList.add("Port_Address2");
deviceList.add("Port_Address3");
deviceList.add("Port_Address4");
deviceList.add("Port_Address");
deviceList.add("Port_Address5");
deviceList.add("Port_Address6");
}


public void testPortConnection() {

try {
Observable observableName = Observable.fromIterable(deviceList);
observableName
.flatMap(new Function<String, Observable<String>>() {
@Override
public Observable<String> apply(String device) throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {

try {
System.out.println("Test and connect to port connection:"+device);
String portAddress = checkPortConnection(device);
observableEmitter.onNext(portAddress);
observableEmitter.onComplete();
} catch (Exception e) {
System.out.println("Problem connecting to port:"+device);
}
}
});
}
})
.take(1)
.subscribe(getObserver());

} catch (Exception e) {
e.printStackTrace();
}

}

private String checkPortConnection(String deviceID) throws Exception {
if ("Port_Address".equals(deviceID) || "Port_Address5".equals(deviceID)) {
return deviceID;
}
throw new Exception("Not connected.");
}


public static void main(String[] args) {
System.out.println("Test port connection.");

MainTest test = new MainTest();
test.setTestPorts();
test.testPortConnection();
}


private Observer<String> getObserver() {
return new Observer<String>() {

@Override
public void onSubscribe(Disposable disposable) {
System.out.println("Subscribed to observable.");
}

@Override
public void onNext(String s) {
System.out.println("**** Port Connected:" + s +" ****");
}

@Override
public void onError(Throwable throwable) {
}

@Override
public void onComplete() {
System.out.println("connect process completed");
}
};
}
}
I ran the code in IntelliJ IDEA IDE. You can use Eclipse, Netbeans or a Java IDE of your choice. Here is the output:

Test port connection.
Subscribed to observable.
Test and connect to port connection:Port_Address1
Problem connecting to port:Port_Address1
Test and connect to port connection:Port_Address2
Problem connecting to port:Port_Address2
Test and connect to port connection:Port_Address3
Problem connecting to port:Port_Address3
Test and connect to port connection:Port_Address4
Problem connecting to port:Port_Address4
Test and connect to port connection:Port_Address
**** Port Connected:Port_Address ****
connect process completed

)

Written by

Software engineer. Hobbies are photography, poetry, reading books, movies, music, yoga. I like doing volunteer work when I have time.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade