RxJava2 observable 抛出 UndeliverableException

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/43525052/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-12 00:58:13  来源:igfitidea点击:

RxJava2 observable take throws UndeliverableException

javaobservablerx-java2take

提问by AbdElraouf Sabri

As I understand RxJava2 values.take(1)creates another Observable that contains only one element from the original Observable. Which MUST NOTthrow an exception as it is filtered out by the effect of take(1)as it's happened second.

据我了解,RxJava2values.take(1)创建了另一个 Observable,它只包含原始 Observable 中的一个元素。哪个不能抛出异常,因为它被take(1)第二次发生的影响过滤掉了。

as in the followingcode snippet

以下代码片段所示

    Observable<Integer> values = Observable.create(o -> {
        o.onNext(1);
        o.onError(new Exception("Oops"));
    });

    values.take(1)
            .subscribe(
                    System.out::println,
                    e -> System.out.println("Error: " + e.getMessage()),
                    () -> System.out.println("Completed")
            );

Output

输出

1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main
RxJavaPlugins.setErrorHandler(e -> {
    if (e instanceof UndeliverableException) {
        e = e.getCause();
    }
    if ((e instanceof IOException) || (e instanceof SocketException)) {
        // fine, irrelevant network problem or API that throws on cancellation
        return;
    }
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
        // that's likely a bug in the application
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    if (e instanceof IllegalStateException) {
        // that's a bug in RxJava or in a custom operator
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    Log.warning("Undeliverable exception received, not sure what to do", e);
});
(ch02.java:28) at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.Observable.subscribe(Observable.java:10827) at io.reactivex.Observable.subscribe(Observable.java:10787) at ch02.main(ch02.java:32) Caused by: java.lang.Exception: Oops ... 8 more Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366) at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83) at ch02.lambda$main
private fun initRxErrorHandler(){
    RxJavaPlugins.setErrorHandler { throwable ->
        if (throwable is UndeliverableException) {
            throwable.cause?.let {
                Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), it)
                return@setErrorHandler
            }
        }
        if (throwable is IOException || throwable is SocketException) {
            // fine, irrelevant network problem or API that throws on cancellation
            return@setErrorHandler
        }
        if (throwable is InterruptedException) {
            // fine, some blocking code was interrupted by a dispose call
            return@setErrorHandler
        }
        if (throwable is NullPointerException || throwable is IllegalArgumentException) {
            // that's likely a bug in the application
            Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
            return@setErrorHandler
        }
        if (throwable is IllegalStateException) {
            // that's a bug in RxJava or in a custom operator
            Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
            return@setErrorHandler
        }
        Log.w("Undeliverable exception", throwable)
    }
}
(ch02.java:28) at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30) at io.reactivex.Observable.subscribe(Observable.java:10841) at io.reactivex.Observable.subscribe(Observable.java:10827) at io.reactivex.Observable.subscribe(Observable.java:10787) at ch02.main(ch02.java:32) Caused by: java.lang.Exception: Oops ... 8 more

My questions :

我的问题:

  1. Am I understanding it correct ?
  2. What's really happening to cause the exception.
  3. How to solve this from the consumer ?
  1. 我理解正确吗?
  2. 导致异常的真正原因是什么。
  3. 如何从消费者那里解决这个问题?

采纳答案by Kiskae

  1. Yes, but because the observable 'ends' does not mean the code running inside create(...)is stopped. To be fully safe in this case you need to use o.isDisposed()to see if the observable has ended downstream.
  2. The exception is there because RxJava 2 has the policy of NEVER allowing an onErrorcall to be lost. It is either delivered downstream or thrown as a global UndeliverableExceptionif the observable has already terminated. It is up to the creator of the Observable to 'properly' handle the case where the observable has ended and an Exception occurs.
  3. The problem is the producer (Observable) and the consumer (Subscriber) disagreeing on when the stream ends. Since the producer is outliving the consumer in this case, the problem can only be fixed in the producer.
  1. 是的,但是因为 observable 'ends' 并不意味着内部运行的代码create(...)被停止。为了在这种情况下完全安全,您需要使用o.isDisposed()来查看 observable 是否已在下游结束。
  2. 例外是因为 RxJava 2 的策略是永远不允许onError丢失调用。UndeliverableException如果 observable 已经终止,它要么被传递到下游,要么作为全局抛出。由 Observable 的创建者来“正确”处理 observable 已经结束并发生异常的情况。
  3. 问题是生产者 ( Observable) 和消费者 ( Subscriber) 对流何时结束存在分歧。由于在这种情况下生产者比消费者寿命长,因此问题只能在生产者中解决。

回答by Ilia Kurtov

@Kiskae in previous comment correctly answered about the reason why such exception can occurs.

@Kiskae 在之前的评论中正确回答了发生此类异常的原因。

Here the link to official doc about this theme: RxJava2-wiki.

这里是关于这个主题的官方文档的链接:RxJava2-wiki

Sometimes you cannot change this behaviour so there is a way how to handle this UndeliverableException's. Here is code snippet of how to avoid crashes and misbehaviour:

有时您无法更改此行为,因此有一种方法可以处理 this UndeliverableException。以下是如何避免崩溃和不当行为的代码片段:

##代码##

This code taken from the link above.

此代码取自上面的链接。

Important note. This approach sets global error handler to RxJava so if you can to get rid of these exceptions - it would be better option.

重要的提示。这种方法将全局错误处理程序设置为 RxJava,因此如果您可以摆脱这些异常 - 这将是更好的选择。

回答by Ayush Jain

While using observable.create() just go with tryOnError(). onError() doesn't guaranty that error will get handled. There are various error handling operatorsare there HERE

使用 observable.create() 时只需使用 tryOnError()。onError() 不保证错误会得到处理。有各种错误处理运营商都存在这里

回答by do01

Kotlin

科特林

I cal this in MainActivity onCreate method

我在 MainActivity onCreate 方法中调用它

##代码##