java 使用 RxJava 和 Okhttp
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/32687921/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Using RxJava and Okhttp
提问by Santos Black
I want to request to a url using okhttp in another thread (like IO thread) and get Response
in the Android main thread, But I don't know how to create an Observable
.
我想在另一个线程(如 IO 线程)中使用 okhttp 请求一个 url 并进入Response
Android 主线程,但我不知道如何创建Observable
.
回答by Saeed Masoumi
First add RxAndroid
to your dependencies, then create your Observable
like this:
首先添加RxAndroid
到您的依赖项,然后Observable
像这样创建:
Subscription subscription = Observable.create(new Observable.OnSubscribe<Response>() {
OkHttpClient client = new OkHttpClient();
@Override
public void call(Subscriber<? super Response> subscriber) {
try {
Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
if (response.isSuccessful()) {
if(!subscriber.isUnsubscribed()){
subscriber.onNext(response);
}
subscriber.onCompleted();
} else if (!response.isSuccessful() && !subscriber.isUnsubscribed()) {
subscriber.onError(new Exception("error"));
}
} catch (IOException e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Response>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Response response) {
}
});
It will request your url in another thread (io thread) and observe it on android main thread.
它将在另一个线程(io 线程)中请求您的 url 并在 android 主线程上观察它。
And finally when you leave the screen use subsribtion.unsubscribe()
to avoid memory leak.
最后当你离开屏幕时使用subsribtion.unsubscribe()
以避免内存泄漏。
When you use Observable.create
, you should write a lot of boilerplate code, also you must handle subscription by your own. A better alternative is to use defer.
Form the doc:
使用时Observable.create
要写很多样板代码,还得自己处理订阅。更好的选择是使用defer。形成文档:
do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
The Defer operator waits until an observer subscribes to it, and then it generates an Observable, typically with an Observable factory function. It does this afresh for each subscriber, so although each subscriber may think it is subscribing to the same Observable, in fact each subscriber gets its own individual sequence.
在观察者订阅之前不要创建 Observable,并为每个观察者创建一个新的 Observable
Defer 操作符等待观察者订阅它,然后它生成一个 Observable,通常带有一个 Observable 工厂函数。它为每个订阅者重新执行此操作,因此尽管每个订阅者可能认为它订阅了同一个 Observable,但实际上每个订阅者都有自己的单独序列。
So as Marcin Kozińskimentioned, you just need to do this:
所以正如Marcin Koziński提到的,你只需要这样做:
final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
@Override public Observable<Response> call() {
try {
Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
return Observable.just(response);
} catch (IOException e) {
return Observable.error(e);
}
}
});
回答by Marcin Koziński
It's easier and safer to use Observable.defer()
instead of Observable.create()
:
使用它更容易、更安全,Observable.defer()
而不是Observable.create()
:
final OkHttpClient client = new OkHttpClient();
Observable.defer(new Func0<Observable<Response>>() {
@Override public Observable<Response> call() {
try {
Response response = client.newCall(new Request.Builder().url("your url").build()).execute();
return Observable.just(response);
} catch (IOException e) {
return Observable.error(e);
}
}
});
That way unsubscription and backpressure are handled for you. Here's a great post by Dan Lewabout create()
and defer()
.
这样就可以为您处理取消订阅和背压。这是Dan Lew关于create()
和的精彩帖子defer()
。
If you wished to go the Observable.create()
route then it should look more like in this librarywith isUnsubscribed()
calls sprinkled everywhere. And I believe this still doesn't handle backpressure.
如果你想走这Observable.create()
条路线,那么它应该看起来更像是在这个库中,isUnsubscribed()
到处都是调用。而且我相信这仍然不能处理背压。
回答by FRR
I realise this post is a bit old, but there's a new and more convenient way of doing this now
我意识到这篇文章有点旧,但现在有一种新的更方便的方法来做到这一点
Observable.fromCallable {
client.newCall(Request.Builder().url("your url").build()).execute()
}
More info: https://artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/
更多信息:https: //artemzin.com/blog/rxjava-defer-execution-of-function-via-fromcallable/
回答by Brice
I came late to the discussion but, if for some reason the code need to stream the response body, then defer
or fromCallable
won't do it. Instead one can employ the using
operator.
我来晚了,但是,如果由于某种原因代码需要流式传输响应正文,那么defer
或fromCallable
不会这样做。相反,可以雇用using
操作员。
Single.using(() -> okHttpClient.newCall(okRequest).execute(), // 1
response -> { // 2
...
return Single.just((Consumer<OutputStream>) fileOutput -> {
try (InputStream upstreamResponseStream = response.body().byteStream();
OutputStream fileOutput = responseBodyOutput) {
ByteStreams.copy(upstreamResponseStream, output);
}
});
},
Response::close, // 3
false) // 4
.subscribeOn(Schedulers.io()) // 5
.subscribe(copier -> copier.accept(...), // 6
throwable -> ...); // 7
- The first lambda executes the response after upon subscription.
- The second lambda creates the observable type, here with
Single.just(...)
- The third lambda disposes the response. With
defer
one could have used the try-with-resources style. - Set the
eager
toggle tofalse
to make the disposer called after the terminal event, i.e. after the subscription consumer has been executed. - Of course make the thing happen on another threadpool
- Here's the lambda that will consume the response body. Without
eager
set tofalse
, the code will raise an IOException with reason 'closed' because the response will be already closed before entering this lambda. - The
onError
lambda should handle exceptions, especially theIOException
that cannot be anymore caught with theusing
operator as it was possible with a try/catch withdefer
.
- 第一个 lambda在订阅后执行响应。
- 第二个 lambda 创建了 observable 类型,这里是
Single.just(...)
- 第三个 lambda 处理响应。随着
defer
人能使用的尝试,与资源风格。 - 将
eager
切换设置false
为 使处置器在终端事件之后调用,即在订阅消费者执行之后。 - 当然让事情发生在另一个线程池上
- 这是将消耗响应主体的 lambda。如果未
eager
设置为false
,代码将引发原因为“已关闭”的 IOException,因为在输入此 lambda 之前,响应已关闭。 - 该
onError
拉姆达应该处理异常,尤其是IOException
不能再被抓到与using
运营商,因为它是可能的一个try / catch用defer
。
回答by Arul Mani
Okhttp3 with RxSingle background API call.
Okhttp3 与 RxSingle 后台 API 调用。
Disposable disposables = Single.fromCallable(() -> {
Log.e(TAG, "clearData: Thread[" + Thread.currentThread().getName() + "]");
OkHttpClient client = Util.getHttpClient();
Request request = new Request.Builder()
.addHeader("Authorization", "Bearer " + Util.getUserToken())
.url(BuildConfig.BASE_URL + ApiConstants.DELETE_FEEDS)
.build();
Response response = client.newCall(request).execute();
if(response.isSuccessful()) {
...
return ; // Any type
} else {
return ; // Any type
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((result) -> {
Log.d(TAG, "api() completed");
});
compositeDisposable.add(disposables);