RxJava/RxAndroid:retry(long times, Predicate<? super Throwable> predicate)
import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import java.net.Socket;
import java.util.concurrent.Callable;
import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.functions.Predicate;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.schedulers.Schedulers;
public class MainActivity extends AppCompatActivity {
private String TAG = "輸出";
private DisposableObserver mDisposableObserver = new DisposableObserver<Socket>() {
@Override
public void onNext(Socket socket) {
Log.d(TAG, "onNext:" + socket);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, e.toString(), e);
}
};
private Socket mSocket = null;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Observable.fromCallable(new Callable<Socket>() {
@Override
public Socket call() throws Exception {
Log.d(TAG, "call");
mSocket = new Socket("127.0.0.1",80);
return mSocket;
}
}).retry(3, new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
Log.d(TAG, "test:" + throwable.toString());
if (mSocket != null && mSocket.isConnected()) {
return false;
} else {
return true;
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(mDisposableObserver);
}
}
output:
01-22 12:34:16.481 29902-29955/zhangphil.book D/輸出: call
01-22 12:34:16.483 29902-29955/zhangphil.book D/輸出: test:java.net.ConnectException: Connection refused
01-22 12:34:16.483 29902-29955/zhangphil.book D/輸出: call
01-22 12:34:16.485 29902-29955/zhangphil.book D/輸出: test:java.net.ConnectException: Connection refused
01-22 12:34:16.485 29902-29955/zhangphil.book D/輸出: call
01-22 12:34:16.486 29902-29955/zhangphil.book D/輸出: test:java.net.ConnectException: Connection refused
01-22 12:34:16.486 29902-29955/zhangphil.book D/輸出: call
01-22 12:34:16.592 29902-29902/zhangphil.book E/輸出: java.net.ConnectException: Connection refused
java.net.ConnectException: Connection refused
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:334)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:196)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:356)
at java.net.Socket.connect(Socket.java:592)
at java.net.Socket.connect(Socket.java:541)
at java.net.Socket.<init>(Socket.java:427)
at java.net.Socket.<init>(Socket.java:210)
at zhangphil.book.MainActivity$3.call(MainActivity.java:46)
at zhangphil.book.MainActivity$3.call(MainActivity.java:42)
at io.reactivex.internal.operators.observable.ObservableFromCallable.subscribeActual(ObservableFromCallable.java:42)
at io.reactivex.Observable.subscribe(Observable.java:11040)
at io.reactivex.internal.operators.observable.ObservableRetryPredicate$RepeatObserver.subscribeNext(ObservableRetryPredicate.java:111)
at io.reactivex.internal.operators.observable.ObservableRetryPredicate.subscribeActual(ObservableRetryPredicate.java:41)
at io.reactivex.Observable.subscribe(Observable.java:11040)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:463)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
at java.util.concurrent.FutureTask.run(FutureTask.java:237)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:776)