java RxJava onErrorResumeNext()

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/25634499/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-02 08:21:35  来源:igfitidea点击:

RxJava onErrorResumeNext()

javaretrofitrx-java

提问by mradzinski

I have two observables (named A and B for simplicity) and one subscriber. So, the Subscriber subscribes to A and if there's an error on A then B (which is the fallback) kicks in. Now, whenever A hits an error B gets called fine, however A calls onComplete() on the subscriber, so B response never reaches the subscriber even if B execution is successful.

我有两个 observables(为了简单起见,命名为 A 和 B)和一个订阅者。所以,订阅者订阅 A,如果 A 上有错误,那么 B(这是后备)开始。现在,每当 A 遇到错误时,B 都会被调用,但是 A 调用订阅者的 onComplete(),所以 B 响应即使 B 执行成功,也永远不会到达订阅者。

Is this the normal behaviour? I thought onErrorResumeNext() should continue the stream and notify the subscriber once completed as noted in the documentation (https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#onerrorresumenext).

这是正常行为吗?我认为 onErrorResumeNext() 应该继续流并在完成后通知订阅者,如文档中所述(https://github.com/ReactiveX/RxJava/wiki/Error-Handling-Operators#onerrorresumenext)。

This is the overall structure of what I'm doing (omitted several "boring" code):

这是我正在做的事情的整体结构(省略了几个“无聊”的代码):

public Observable<ModelA> observeGetAPI(){
    return retrofitAPI.getObservableAPI1()
            .flatMap(observableApi1Response -> {
                ModelA model = new ModelA();

                model.setApi1Response(observableApi1Response);

                return retrofitAPI.getObservableAPI2()
                        .map(observableApi2Response -> {
                            // Blah blah blah...
                            return model;
                        })
                        .onErrorResumeNext(observeGetAPIFallback(model))
                        .subscribeOn(Schedulers.newThread())
            })
            .onErrorReturn(throwable -> {
                // Blah blah blah...
                return model;
            })
            .subscribeOn(Schedulers.newThread());
}

private Observable<ModelA> observeGetAPIFallback(ModelA model){
    return retrofitAPI.getObservableAPI3().map(observableApi3Response -> {
        // Blah blah blah...
        return model;
    }).onErrorReturn(throwable -> {
        // Blah blah blah...
        return model;
    })
    .subscribeOn(Schedulers.immediate());
}

Subscription subscription;
subscription = observeGetAPI.subscribe(ModelA -> {
    // IF THERE'S AN ERROR WE NEVER GET B RESPONSE HERE...
}, throwable ->{
    // WE NEVER GET HERE... onErrorResumeNext()
},
() -> { // IN CASE OF AN ERROR WE GET STRAIGHT HERE, MEANWHILE, B GETS EXECUTED }
);

Any ideas what I'm doing wrong?

任何想法我做错了什么?

Thanks!

谢谢!

EDIT:Here's a rough timeline of what's happening:

编辑:这是正在发生的事情的粗略时间表:

---> HTTP GET REQUEST B
<--- HTTP 200 REQUEST B RESPONSE (SUCCESS)

---> HTTP GET REQUEST A
<--- HTTP 200 REQUEST A RESPONSE (FAILURE!)

---> HTTP GET FALLBACK A
** onComplete() called! ---> Subscriber never gets fallback response since onComplete() gets called before time.
<--- HTTP 200 FALLBACK A RESPONSE (SUCCESS)

And here's a link to a simple diagram I made which represent's what I want to happen: Diagram

这是我制作的一个简单图表的链接,它代表了我想要发生的事情: 图表

回答by kjones

The Rx calls used in the following should simulate what you are doing with Retrofit.

下面使用的 Rx 调用应该模拟您正在使用 Retrofit 执行的操作。

fallbackObservable =
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        logger.v("emitting A Fallback");
                        subscriber.onNext("A Fallback");
                        subscriber.onCompleted();
                    }
                })
                .delay(1, TimeUnit.SECONDS)
                .onErrorReturn(new Func1<Throwable, String>() {
                    @Override
                    public String call(Throwable throwable) {
                        logger.v("emitting Fallback Error");
                        return "Fallback Error";
                    }
                })
                .subscribeOn(Schedulers.immediate());

stringObservable =
        Observable
                .create(new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        logger.v("emitting B");
                        subscriber.onNext("B");
                        subscriber.onCompleted();
                    }
                })
                .delay(1, TimeUnit.SECONDS)
                .flatMap(new Func1<String, Observable<String>>() {
                    @Override
                    public Observable<String> call(String s) {
                        logger.v("flatMapping B");
                        return Observable
                                .create(new Observable.OnSubscribe<String>() {
                                    @Override
                                    public void call(Subscriber<? super String> subscriber) {
                                        logger.v("emitting A");
                                        subscriber.onNext("A");
                                        subscriber.onCompleted();
                                    }
                                })
                                .delay(1, TimeUnit.SECONDS)
                                .map(new Func1<String, String>() {
                                    @Override
                                    public String call(String s) {
                                        logger.v("A completes but contains invalid data - throwing error");
                                        throw new NotImplementedException("YUCK!");
                                    }
                                })
                                .onErrorResumeNext(fallbackObservable)
                                .subscribeOn(Schedulers.newThread());
                    }
                })
                .onErrorReturn(new Func1<Throwable, String>() {
                    @Override
                    public String call(Throwable throwable) {
                        logger.v("emitting Return Error");
                        return "Return Error";
                    }
                })
                .subscribeOn(Schedulers.newThread());

subscription = stringObservable.subscribe(
        new Action1<String>() {
            @Override
            public void call(String s) {
                logger.v("onNext " + s);
            }
        },
        new Action1<Throwable>() {
            @Override
            public void call(Throwable throwable) {
                logger.v("onError");
            }
        },
        new Action0() {
            @Override
            public void call() {
                logger.v("onCompleted");
            }
        });

The output from the log statements is:

日志语句的输出是:

RxNewThreadScheduler-1 emitting B
RxComputationThreadPool-1 flatMapping B
RxNewThreadScheduler-2 emitting A
RxComputationThreadPool-2 A completes but contains invalid data - throwing error
RxComputationThreadPool-2 emitting A Fallback
RxComputationThreadPool-1 onNext A Fallback
RxComputationThreadPool-1 onCompleted

This seems like what you are looking for but maybe I'm missing something.

这似乎是您要找的东西,但也许我遗漏了一些东西。