java 用来自 RxJava 的 observables 替换回调

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/42118782/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 06:22:43  来源:igfitidea点击:

Replace callbacks with observables from RxJava

javarx-java

提问by Pablo Cegarra

Im using listeners as callbacks to observe asynchronous operations with Android, but I think that could be great replacing this listeners with RxJava, Im new using this library but I really like it and Im always using it with Android projects.

我使用侦听器作为回调来观察与 Android 的异步操作,但我认为用 RxJava 替换这个侦听器可能很棒,我是使用这个库的新手,但我真的很喜欢它,而且我总是在 Android 项目中使用它。

Here is my code to refactor:

这是我要重构的代码:

public void getData( final OnResponseListener listener ){
   if(data!=null && !data.isEmpty()){
       listener.onSuccess();
   }
   else{
       listener.onError();
   }
}

A simple callback:

一个简单的回调:

public interface OnResponseListener {
   public void onSuccess();
   public void onError(); 
}

And the "observer":

而“观察者”:

object.getData( new OnResponseListener() {
    @Override
    public void onSuccess() {
       Log.w(TAG," on success");
    }

    @Override
    public void onError() {
       Log.e(TAG," on error");
    }
});

Thanks!

谢谢!

回答by YMY

For example you can use Observable.fromCallableto create observable with your data.

例如,您可以使用Observable.fromCallable用您的数据创建 observable。

public Observable<Data> getData(){
    return Observable.fromCallable(() -> {
        Data result = null;
        //do something, get your Data object
        return result;
    });
}

then use your data

然后使用您的数据

 getData().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(data -> {
                //do something with your data
            }, error -> {
                //do something on error
            });

Used rxjava 1.x and lambda expressions.

使用 rxjava 1.x 和 lambda 表达式。

edit:

编辑:

if I understand you well, you wanted to replace that listener, not wrap it into observable. I added other example in reference to your comment. Oh.. also you should use Singleif you are expecting only one item.

如果我理解你,你想替换那个监听器,而不是把它包装成可观察的。我参考您的评论添加了其他示例。哦..如果您只期待一件商品,也应该使用Single

public Single<Data> getData() {
        return Single.create(singleSubscriber -> {
            Data result = object.getData();
            if(result == null){
                singleSubscriber.onError(new Exception("no data"));
            } else {
                singleSubscriber.onSuccess(result);
            }
        });
    }

getData().subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(data -> {
                //do something with your data
            }, error -> {
                //do something on error
            });

回答by Maksim Ostrovidov

You are looking for Completable.create:

您正在寻找Completable.create

Completable: Represents a deferred computation without any value but only indication for completion or exception. The class follows a similar event pattern as Reactive-Streams: onSubscribe (onError|onComplete)?

Completable:表示没有任何值但仅指示完成或异常的延迟计算。该类遵循与 Reactive-Streams 类似的事件模式: onSubscribe (onError|onComplete)?

Completable.create(subscriber -> {
    object.getData(new OnResponseListener() {
        @Override
        public void onSuccess() {
           subscriber.onCompleted();
        }

        @Override
        public void onError() {
           subscriber.onError(* put appropriate Throwable here *);
        }
    }
})
...//apply Schedulers
.subscribe((() -> *success*), (throwable -> *error*));

回答by Geoffrey Marizy

How I would refactor your code; alongside getData method, I would add the getData method wrapped as a Single:

我将如何重构您的代码;与 getData 方法一起,我将添加包装为 Single 的 getData 方法:

public void getData( final OnResponseListener listener ){
    if(data!=null && !data.isEmpty()){
        listener.onSuccess();
    }
    else{
        listener.onError();
    }
}

public Single<Boolean> getDataSingle() {
    return Single.create(new SingleOnSubscribe<Boolean>() {
        @Override
        public void subscribe(SingleEmitter<Boolean> e) throws Exception {
            getData(new OnResponseListener() {
                @Override
                public void onSuccess() {
                    e.onSuccess(true);
                }

                @Override
                public void onError() {
                    e.onSuccess(false);
                }
            });
        }
    });
}

Or with Java 8 :

或使用 Java 8:

public Single<Boolean> getDataSingle() {
    return Single.create(e -> getData(
            new OnResponseListener() {
                @Override
                public void onSuccess() {
                    e.onSuccess(true);
                }

                @Override
                public void onError() {
                    e.onSuccess(false);
                }
            })
    );
}

Now you have exposed a Rx API alongside the callback's one. Supposing it's some kind of DataProvider of your own, you can now use it without dealing with callbacks, like this:

现在,您已经在回调 API 旁边公开了一个 Rx API。假设它是您自己的某种 DataProvider,您现在可以使用它而无需处理回调,如下所示:

dataProvider.getDataSingle()
        .map(result -> result ? "User exist" : "User doesn't exist")
        .subscribe(message -> display(message));

I used Rx2 but with Rx1 the logic is the same.

我使用 Rx2 但使用 Rx1 逻辑是相同的。

I also used a Singleinstead of an Observable, since you await only one value. The interest is a more expressive contract for your function.

我还使用了Single而不是 Observable,因为您只等待一个值。兴趣是您功能的更具表现力的契约。

You can't emit value on behalf of an Observable, ie calling something like myObservable.send(value). The first solution is to use a Subject. Another solution (the one above) is to create the observable with Observable.create() (or Single.create()). You call the callback method and create the listener inside the method Observable.create(), because it's inside Observable.create() that you can call onSuccess() method, the method who told the Observable to pass down a value.

您不能代表 Observable 发出值,即调用 myObservable.send(value) 之类的东西。第一个解决方案是使用Subject。另一种解决方案(上面的那个)是使用 Observable.create()(或 Single.create())创建 observable。您调用回调方法并在 Observable.create() 方法内创建侦听器,因为在 Observable.create() 内您可以调用 onSuccess() 方法,该方法告诉 Observable 传递值。

It's what I use to wrap callback into observable. A bit complicated at first, but easy to adapt.

这是我用来将回调包装成可观察的。一开始有点复杂,但很容易适应。

I give you another example, as asked. Let's say you want to display the changes of an EditText as a Snackbar:

我再给你举个例子,正如你所问的。假设您想将 EditText 的更改显示为 Snackbar:

View rootView;
EditText editTextView;

//Wrap Android addTextChangedListener into an Observable
Observable<String> textObservable = Observable.create(consumer ->
        editTextView.addTextChangedListener(new TextWatcher() {
            @Override
            public void beforeTextChanged(CharSequence s, int start, int count, int after) {

            }

            @Override
            public void onTextChanged(CharSequence s, int start, int before, int count) {

            }

            @Override
            public void afterTextChanged(Editable s) {
                consumer.onNext(s.toString());
            }
        })
);

//Use it
textObservable.subscribe(text -> Snackbar.make(rootView, text, Snackbar.LENGTH_SHORT).show());

回答by Amjed Baig

Maybe.<String>create(new MaybeOnSubscribe<String>() {
      @Override
      public void subscribe(MaybeEmitter<String> e) throws Exception {
        OnSuccessListener(uri->{
          e.onSuccess(uri));
        })
        .addOnFailureListener(throwable -> {
          e.onError(throwable);
        });
      }
    });