RxJava 操作符之 -- 异常操作符

本贴最后更新于 2793 天前,其中的信息可能已经事过景迁
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                if (subscriber.isUnsubscribed()) return;
                //循环输出数字
                try {
                    for (int i = 0; i < 10; i++) {
                        if (i == 4) {
                            throw new Exception("this is number 4 error!");
                        }
                        subscriber.onNext(i);
                    }
                    subscriber.onCompleted();
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        });

        observable.onErrorReturn(new Func1<Throwable, Integer>() {
            @Override
            public Integer call(Throwable throwable) {
                return 1004;
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Next:" + value);
            }
        });

结果

Next:0
Next:1
Next:2
Next:3
Next:1004
Sequence complete.

onErrorResumeNext

onErrorResumeNext 方法返回一个镜像原有 Observable 行为的新 Observable,后者会忽略前者的 onError 调用,不会将错误传递给观察者,作为替代,它会开始镜像另一个,备用的 Observable

Observable<Integer> observable1 = Observable
				.create(new Observable.OnSubscribe<Integer>() {
					@Override
					public void call(Subscriber<? super Integer> subscriber) {
						if (subscriber.isUnsubscribed())
							return;
						// 循环输出数字
						try {
							for (int i = 0; i < 10; i++) {
								if (i == 4) {
									throw new Exception(
											"this is number 4 error!");
								}
								subscriber.onNext(i);
							}
							subscriber.onCompleted();
						} catch (Exception e) {
							subscriber.onError(e);
						}
					}
				});

		observable1.onErrorResumeNext(
				new Func1<Throwable, Observable<? extends Integer>>() {
					@Override
					public Observable<? extends Integer> call(
							Throwable throwable) {
						return Observable.just(100, 101, 102);
					}
				}).subscribe(new Subscriber<Integer>() {
			@Override
			public void onCompleted() {
				System.out.println("Sequence complete.");
			}

			@Override
			public void onError(Throwable e) {
				System.err.println("Error: " + e.getMessage());
			}

			@Override
			public void onNext(Integer value) {
				System.out.println("Next:" + value);
			}
		});

结果

Next:0
Next:1
Next:2
Next:3
Next:100
Next:101
Next:102
Sequence complete.

onExceptionResumeNext

和 onErrorResumeNext 类似,onExceptionResumeNext 方法返回一个镜像原有 Observable 行为的新 Observable,也使用一个备用的 Observable,不同的是,如果 onError 收到的 Throwable 不是一个 Exception,它会将错误传递给观察者的 onError 方法,不会使用备用的 Observable。

Observable<Integer> observable2 = Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                if (subscriber.isUnsubscribed()) return;
                //循环输出数字
                try {
                    for (int i = 0; i < 10; i++) {
                        if (i == 4) {
                            throw new Exception("this is number 4 error!");
                        }
                        subscriber.onNext(i);
                    }
                    subscriber.onCompleted();
                } catch (Throwable e) {
                    subscriber.onError(e);
                }
            }
        });

        observable2.onExceptionResumeNext(Observable.just(100, 101, 102)).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Next:" + value);
            }
        });

结果

Next:0
Next:1
Next:2
Next:3
Next:100
Next:101
Next:102
Sequence complete.

retry

如果原始 Observable 遇到错误,重新订阅它期望它能正常终止

Observable<Integer> observable = Observable
				.create(new Observable.OnSubscribe<Integer>() {
					@Override
					public void call(Subscriber<? super Integer> subscriber) {
						if (subscriber.isUnsubscribed())
							return;
						// 循环输出数字
						try {
							for (int i = 0; i < 10; i++) {
								if (i == 4) {
									throw new Exception(
											"this is number 4 error!");
								}
								subscriber.onNext(i);
							}
							subscriber.onCompleted();
						} catch (Throwable e) {
							subscriber.onError(e);
						}
					}
				});

		observable.retry(2).subscribe(new Subscriber<Integer>() {
			@Override
			public void onCompleted() {
				System.out.println("Sequence complete.");
			}

			@Override
			public void onError(Throwable e) {
				System.err.println("Error: " + e.getMessage());
			}

			@Override
			public void onNext(Integer value) {
				System.out.println("Next:" + value);
			}
		});

结果

Next:0
Next:1
Next:2
Next:3
Next:0
Next:1
Next:2
Next:3
Next:0
Next:1
Next:2
Next:3
Next:0
Next:1
Next:2
Next:3
Next:0
Next:1
Next:2
Next:3
Next:0
Next:1
Next:2
Next:3
Error: this is number 4 error!

retryWhen

retryWhen 和 retry 类似,区别是,retryWhen 将 onError 中的 Throwable 传递给一个函数,这个函数产生另一个 Observable,retryWhen 观察它的结果再决定是不是要重新订阅原始的 Observable。如果这个 Observable 发射了一项数据,它就重新订阅,如果这个 Observable 发射的是 onError 通知,它就将这个通知传递给观察者然后终止。

Observable<Integer> observable1 = Observable
				.create(new Observable.OnSubscribe<Integer>() {
					@Override
					public void call(Subscriber<? super Integer> subscriber) {
						if (subscriber.isUnsubscribed())
							return;
						// 循环输出数字
						try {
							for (int i = 0; i < 10; i++) {
								if (i == 4) {
									throw new Exception(
											"this is number 4 error!");
								}
								subscriber.onNext(i);
							}
							subscriber.onCompleted();
						} catch (Throwable e) {
							subscriber.onError(e);
						}
					}
				});

		observable1.retryWhen(
				new Func1<Observable<? extends Throwable>, Observable<?>>() {
					@Override
					public Observable<?> call(
							Observable<? extends Throwable> observable) {
						System.out.println("delay retry by " + "1"
								+ " second(s)");
						return Observable.timer(1, TimeUnit.SECONDS);
					}
				}).subscribe(new Subscriber<Integer>() {
			@Override
			public void onCompleted() {
				System.out.println("Sequence complete.");
			}

			@Override
			public void onError(Throwable e) {
				System.err.println("Error: " + e.getMessage());
			}

			@Override
			public void onNext(Integer value) {
				System.out.println("Next:" + value);
			}
		});
		try {
			Thread.sleep(Integer.MAX_VALUE);
		} catch (InterruptedException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}ssss

结果

delay retry by 1 second(s)
Next:0
Next:1
Next:2
Next:3
Next:0
Next:1
Next:2
Next:3
Sequence complete.
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3187 引用 • 8213 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...