java RxJava 组合请求序列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/47641605/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
RxJava Combine Sequence Of Requests
提问by Mayr Technologies
The Problem
问题
I have two Apis. Api 1 gives me a List of Items and Api 2 gives me more detailed Information for each of the items I got from Api 1. The way I solved it so far results in bad Performance.
我有两个Apis。Api 1 为我提供了一个项目列表,而 Api 2 为我提供了从 Api 1 获得的每个项目的更详细信息。到目前为止,我解决它的方式导致性能不佳。
The Question
问题
Efficent and fast solution to this Problem with the help of Retrofit and RxJava.
在 Retrofit 和 RxJava 的帮助下有效快速地解决这个问题。
My Approach
我的方法
At the Moment my Solution Looks like this:
目前我的解决方案如下所示:
Step 1: Retrofit executes Single<ArrayList<Information>>
from Api 1.
第 1 步:Single<ArrayList<Information>>
从 Api 1执行 Retrofit 。
Step 2: I iterate through this Items and make a request for each to Api 2.
第 2 步:我遍历这些 Items 并向 Api 2 发出每个请求。
Step 3: Retrofit Returns Sequentially executes Single<ExtendedInformation>
for
each item
第 3 步:Retrofit ReturnsSingle<ExtendedInformation>
对每个项目依次执行
Step 4: After all calls form Api 2 completely executed I create a new Object for all Items combining the Information and Extended Information.
第 4 步:在 Api 2 的所有调用完全执行后,我为所有结合信息和扩展信息的项目创建一个新对象。
My Code
我的代码
public void addExtendedInformations(final Information[] informations) {
final ArrayList<InformationDetail> informationDetailArrayList = new ArrayList<>();
final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener = new JSONRequestRatingHelper.RatingRequestListener() {
@Override
public void onDownloadFinished(Information baseInformation, ExtendedInformation extendedInformation) {
informationDetailArrayList.add(new InformationDetail(baseInformation, extendedInformation));
if (informationDetailArrayList.size() >= informations.length){
listener.onAllExtendedInformationLoadedAndCombined(informationDetailArrayList);
}
}
};
for (Information information : informations) {
getExtendedInformation(ratingRequestListener, information);
}
}
public void getRatingsByTitle(final JSONRequestRatingHelper.RatingRequestListener ratingRequestListener, final Information information) {
Single<ExtendedInformation> repos = service.findForTitle(information.title);
disposable.add(repos.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribeWith(new DisposableSingleObserver<ExtendedInformation>() {
@Override
public void onSuccess(ExtendedInformation extendedInformation) {
ratingRequestListener.onDownloadFinished(information, extendedInformation);
}
@Override
public void onError(Throwable e) {
ExtendedInformation extendedInformation = new ExtendedInformation();
ratingRequestListener.onDownloadFinished(extendedInformation, information);
}
}));
}
public interface RatingRequestListener {
void onDownloadFinished(Information information, ExtendedInformation extendedInformation);
}
回答by Brice
tl;druse concatMapEager
or flatMap
and execute sub-calls asynchronously or on a schedulers.
tl; dr使用concatMapEager
或flatMap
并异步或在调度程序上执行子调用。
long story
很长的故事
I'm not an android developer, so my question will be limited to pure RxJava (version 1 and version 2).
我不是 android 开发人员,所以我的问题将仅限于纯 RxJava(版本 1 和版本 2)。
If I get the picture right the needed flow is :
如果我得到正确的图片所需的流程是:
some query param
\--> Execute query on API_1 -> list of items
|-> Execute query for item 1 on API_2 -> extended info of item1
|-> Execute query for item 2 on API_2 -> extended info of item1
|-> Execute query for item 3 on API_2 -> extended info of item1
...
\-> Execute query for item n on API_2 -> extended info of item1
\----------------------------------------------------------------------/
|
\--> stream (or list) of extended item info for the query param
Assuming Retrofit generated the clients for
假设 Retrofit 生成了客户端
interface Api1 {
@GET("/api1") Observable<List<Item>> items(@Query("param") String param);
}
interface Api2 {
@GET("/api2/{item_id}") Observable<ItemExtended> extendedInfo(@Path("item_id") String item_id);
}
If the order of the item is not important, then it is possible to use flatMap
only:
如果项目的顺序不重要,那么可以flatMap
只使用:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()))
.subscribe(...)
But only ifthe retrofit builder is configured with
但仅当改造构建器配置为
Either with the async adapter (calls will be queued in the okhttp internal executor). I personally think this is not a good idea, because you don't have control over this executor.
.addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
Or with the scheduler based adapter (calls will be scheduled on the RxJava scheduler). It would my preferred option, because you explicitly choose which scheduler is used, it will be most likely the IO scheduler, but you are free to try a different one.
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
使用异步适配器(调用将在 okhttp 内部执行器中排队)。我个人认为这不是一个好主意,因为你无法控制这个执行者。
.addCallAdapterFactory(RxJava2CallAdapterFactory.createAsync()
或者使用基于调度程序的适配器(调用将在 RxJava 调度程序上调度)。这是我的首选选项,因为您明确选择使用哪个调度程序,它很可能是 IO 调度程序,但您可以自由尝试不同的调度程序。
.addCallAdapterFactory(RxJava2CallAdapterFactory.createWithScheduler(Schedulers.io()))
The reason is that flatMap
will subscribe to each observable created by api2.extendedInfo(...)
and merge them in the resulting observable. So results will appear in the order they are received.
原因是flatMap
它将订阅由创建的每个 observableapi2.extendedInfo(...)
并将它们合并到生成的 observable 中。因此,结果将按照收到的顺序出现。
Ifthe retrofit client is notset to be async or set to run on a scheduler, it is possible to set one :
如果改造客户端未设置为异步或设置为在调度程序上运行,则可以设置一个:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.flatMap(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)
This structure is almost identical to the previous one execpts it indicates locallyon which scheduler each api2.extendedInfo
is supposed to run.
该结构几乎与前一个 execpts 相同,它在本地指示每个api2.extendedInfo
应该在哪个调度程序上运行。
It is possible to tune the maxConcurrency
parameter of flatMap
to control how many request you want to perform at the same time. Although I'd be cautious on this one, you don't want run all queries at the same time. Usually the default maxConcurrency
is good enough (128
).
可以调整maxConcurrency
参数flatMap
以控制您要同时执行的请求数量。尽管我对此持谨慎态度,但您不希望同时运行所有查询。通常默认值maxConcurrency
就足够了 ( 128
)。
Now if order of the original query matter. concatMap
is usually the operator that does the same thing as flatMap
in order but sequentially, which turns out to be slow if the code need to wait for all sub-queries to be performed. The solution though is one step further with concatMapEager
, this one will subscribe to observable in order, and buffer the results as needed.
现在,如果原始查询的顺序很重要。concatMap
通常是flatMap
按顺序执行相同操作的运算符,但如果代码需要等待所有子查询执行,结果会很慢。解决方案虽然是更进一步concatMapEager
,这个将按顺序订阅 observable,并根据需要缓冲结果。
Assuming retrofit clients are async or ran on a specific scheduler :
假设改造客户端是异步的或在特定的调度程序上运行:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()))
.subscribe(...)
Or if the scheduler has to be set locally :
或者,如果必须在本地设置调度程序:
api1.items(queryParam)
.flatMap(itemList -> Observable.fromIterable(itemList)))
.concatMapEager(item -> api2.extendedInfo(item.id()).subscribeOn(Schedulers.io()))
.subscribe(...)
It is also possible to tune the concurrency in this operator.
还可以在此运算符中调整并发性。
Additionally if the Api is returning Flowable
, it is possible to use .parallel
that is still in beta at this time in RxJava 2.1.7. But then results are not in order and I don't know a way (yet?) to order them without sorting after.
此外,如果 Api 正在返回Flowable
,则可以.parallel
在 RxJava 2.1.7 中使用目前仍处于测试阶段的。但是结果不是按顺序排列的,我不知道一种方法(还?)在没有排序的情况下对它们进行排序。
api.items(queryParam) // Flowable<Item>
.parallel(10)
.runOn(Schedulers.io())
.map(item -> api2.extendedInfo(item.id()))
.sequential(); // Flowable<ItemExtended>
回答by homerman
the flatMap
operator is designed to cater to these types of workflows.
该flatMap
运营商的设计,以满足这些类型的工作流程。
i'll outline the broad strokes with a simple five step example. hopefully you can easily reconstruct the same principles in your code:
我将通过一个简单的五步示例来概述大致的笔触。希望您可以轻松地在代码中重构相同的原则:
@Test fun flatMapExample() {
// (1) constructing a fake stream that emits a list of values
Observable.just(listOf(1, 2, 3, 4, 5))
// (2) convert our List emission into a stream of its constituent values
.flatMap { numbers -> Observable.fromIterable(numbers) }
// (3) subsequently convert each individual value emission into an Observable of some
// newly calculated type
.flatMap { number ->
when(number) {
1 -> Observable.just("A1")
2 -> Observable.just("B2")
3 -> Observable.just("C3")
4 -> Observable.just("D4")
5 -> Observable.just("E5")
else -> throw RuntimeException("Unexpected value for number [$number]")
}
}
// (4) collect all the final emissions into a list
.toList()
.subscribeBy(
onSuccess = {
// (5) handle all the combined results (in list form) here
println("## onNext($it)")
},
onError = { error ->
println("## onError(${error.message})")
}
)
}
(incidentally, if the order of the emissions matter, look at using concatMap
instead).
(顺便说一句,如果排放的顺序很重要,请考虑使用concatMap
)。
i hope that helps.
我希望有帮助。
回答by Jyubin Patel
Check below it's working.
检查下面它的工作。
Say you have multiple network calls you need to make–cals to get Github user information and Github user events for example.
假设您有多个网络调用,您需要进行调用以获取 Github 用户信息和 Github 用户事件,例如。
And you want to wait for each to return before updating the UI. RxJava can help you here. Let's first define our Retrofit object to access Github's API, then setup two observables for the two network requests call.
并且您希望在更新 UI 之前等待每个返回。RxJava 可以在这里为您提供帮助。让我们首先定义我们的 Retrofit 对象来访问 Github 的 API,然后为两个网络请求调用设置两个 observable。
Retrofit repo = new Retrofit.Builder()
.baseUrl("https://api.github.com")
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build();
Observable<JsonObject> userObservable = repo
.create(GitHubUser.class)
.getUser(loginName)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
Observable<JsonArray> eventsObservable = repo
.create(GitHubEvents.class)
.listEvents(loginName)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
Used Interface for it like below:
使用的接口如下:
public interface GitHubUser {
@GET("users/{user}")
Observable<JsonObject> getUser(@Path("user") String user);
}
public interface GitHubEvents {
@GET("users/{user}/events")
Observable<JsonArray> listEvents(@Path("user") String user);
}
After we use RxJava's zipmethod to combine our two Observables and wait for them to complete before creating a new Observable.
在我们使用 RxJava 的zip方法将我们的两个 Observable 组合起来并等待它们完成之后,再创建一个新的 Observable。
Observable<UserAndEvents> combined = Observable.zip(userObservable, eventsObservable, new Func2<JsonObject, JsonArray, UserAndEvents>() {
@Override
public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) {
return new UserAndEvents(jsonObject, jsonElements);
}
});
Finally let's call the subscribe method on our new combined Observable:
最后让我们在新的组合 Observable 上调用 subscribe 方法:
combined.subscribe(new Subscriber<UserAndEvents>() {
...
@Override
public void onNext(UserAndEvents o) {
// You can access the results of the
// two observabes via the POJO now
}
});
No more waiting in threads etc for network calls to finish. RxJava has done all that for you in zip(). hope my answer helps you.
不再需要在线程等中等待网络调用完成。RxJava 在 zip() 中为您完成了所有这些。希望我的回答对你有帮助。
回答by Anrimian
I solved a similar problem with RxJava2. Execution of requests for Api 2 in parallel slightly speeds up the work.
我用 RxJava2 解决了类似的问题。并行执行对 Api 2 的请求会稍微加快工作速度。
private InformationRepository informationRepository;
//init....
public Single<List<FullInformation>> getFullInformation() {
return informationRepository.getInformationList()
.subscribeOn(Schedulers.io())//I usually write subscribeOn() in the repository, here - for clarity
.flatMapObservable(Observable::fromIterable)
.flatMapSingle(this::getFullInformation)
.collect(ArrayList::new, List::add);
}
private Single<FullInformation> getFullInformation(Information information) {
return informationRepository.getExtendedInformation(information)
.map(extendedInformation -> new FullInformation(information, extendedInformation))
.subscribeOn(Schedulers.io());//execute requests in parallel
}
InformationRepository - just interface. Its implementation is not interesting for us.
InformationRepository - 只是接口。它的实现对我们来说并不有趣。
public interface InformationRepository {
Single<List<Information>> getInformationList();
Single<ExtendedInformation> getExtendedInformation(Information information);
}
FullInformation - container for result.
FullInformation - 结果的容器。
public class FullInformation {
private Information information;
private ExtendedInformation extendedInformation;
public FullInformation(Information information, ExtendedInformation extendedInformation) {
this.information = information;
this.extendedInformation = extendedInformation;
}
}
回答by Fraudlic
Try using Observable.zip()
operator. It will wait until both Api calls are finished before continuing the stream. Then you can insert some logic by calling flatMap()
afterwards.
尝试使用Observable.zip()
运算符。在继续流之前,它将等待两个 Api 调用完成。然后你可以通过调用插入一些逻辑flatMap()
。