Java 合并一个 Observables 列表并等待所有完成
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/35357919/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Combine a list of Observables and wait until all completed
提问by Craig Russell
TL;DRHow to convert Task.whenAll(List<Task>)
into RxJava
?
TL;DR如何转换Task.whenAll(List<Task>)
为RxJava
?
My existing code uses Bolts to build up a list of asynchronous tasks and waits until all of those tasks finish before performing other steps. Essentially, it builds up a List<Task>
and returns a single Task
which is marked as completed when alltasks in the list complete, as per the example on the Bolts site.
我现有的代码使用 Bolts 来构建异步任务列表,并在执行其他步骤之前等待所有这些任务完成。本质上,它构建了一个List<Task>
并返回一个Task
标记为已完成的单个,当列表中的所有任务完成时,根据Bolts 站点上的示例。
I'm looking to replace Bolts
with RxJava
and I'm assuming this method of building up a list of async tasks (size not known in advance) and wrapping them all into a single Observable
is possible, but I don't know how.
我正在寻找替换Bolts
为RxJava
并且我假设这种建立异步任务列表(事先未知大小)并将它们全部包装成一个的方法Observable
是可能的,但我不知道如何。
I've tried looking at merge
, zip
, concat
etc... but can't get to work on the List<Observable>
that I'd be building up as they all seem geared to working on just two Observables
at a time if I understand the docs correctly.
我试着看merge
,zip
,concat
等...但不能去工作的List<Observable>
,我会被建立,因为他们似乎都面向工作的只有两个Observables
,如果我理解正确的文档在一个时间。
I'm trying to learn RxJava
and am still very new to it so forgive me if this is an obvious question or explained in the docs somewhere; I have tried searching. Any help would be much appreciated.
我正在尝试学习RxJava
并且对它仍然很陌生,所以如果这是一个明显的问题或在某处的文档中进行了解释,请原谅我;我试过搜索。任何帮助将非常感激。
采纳答案by Malt
It sounds like you're looking for the Zip operator.
听起来您正在寻找Zip 运算符。
There are a few different ways of using it, so let's look at an example. Say we have a few simple observables of different types:
有几种不同的使用方式,让我们看一个例子。假设我们有一些不同类型的简单 observables:
Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);
The simplest way to wait for them all is something like this:
等待它们的最简单方法是这样的:
Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));
Note that in the zip function, the parameters have concrete types that correspond to the types of the observables being zipped.
请注意,在 zip 函数中,参数具有与被压缩的 observable 类型相对应的具体类型。
Zipping a list of observables is also possible, either directly:
也可以直接压缩 observables 列表:
List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);
Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));
...or by wrapping the list into an Observable<Observable<?>>
:
...或者通过将列表包装成一个Observable<Observable<?>>
:
Observable<Observable<?>> obsObs = Observable.from(obsList);
Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));
However, in both of these cases, the zip function can only accept a single Object[]
parameter since the types of the observables in the list are not known in advance as well as their number. This means that that the zip function would have to check the number of parameters and cast them accordingly.
然而,在这两种情况下, zip 函数只能接受一个Object[]
参数,因为列表中的 observables 的类型以及它们的数量是未知的。这意味着 zip 函数必须检查参数的数量并相应地转换它们。
Regardless, all of the above examples will eventually print 1 Blah true
无论如何,上述所有示例最终都会打印 1 Blah true
EDIT:When using Zip, make sure that the Observables
being zipped all emit the same number of items. In the above examples all three observables emitted a single item. If we were to change them to something like this:
编辑:使用 Zip 时,请确保Observables
被压缩的所有项目都发出相同数量的项目。在上面的示例中,所有三个 observable 都发出了一个项目。如果我们把它们改成这样:
Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items
Then 1, Blah, True
and 2, Hello, True
would be the only items passed into the zip function(s). The item 3
would never be zipped since the other observables have completed.
然后1, Blah, True
和2, Hello, True
将是唯一传入 zip 函数的项目。该项目3
永远不会被压缩,因为其他 observable 已经完成。
回答by LordRaydenMK
You probably looked at the zip
operator that works with 2 Observables.
您可能查看了zip
使用 2 个 Observable的运算符。
There is also the static method Observable.zip
. It has one form which should be useful for you:
还有静态方法Observable.zip
。它有一种形式,应该对您有用:
zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)
You can check out the javadoc for more.
您可以查看javadoc 了解更多信息。
回答by MyDogTom
You can use flatMap
in case you have dynamic tasks composition. Something like this:
flatMap
如果您有动态任务组合,则可以使用。像这样的东西:
public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
return Observable.from(tasks)
//execute in parallel
.flatMap(task -> task.observeOn(Schedulers.computation()))
//wait, until all task are executed
//be aware, all your observable should emit onComplemete event
//otherwise you will wait forever
.toList()
//could implement more intelligent logic. eg. check that everything is successful
.map(results -> true);
}
Another good example of parallel execution
Note: I do not really know your requirements for error handling. For example what to do if only one task fails. I think you should verify this scenario.
注意:我不太了解您对错误处理的要求。例如,如果只有一项任务失败该怎么办。我认为您应该验证这种情况。
回答by Sjors
I'm writing some computation heave code in Kotlin with JavaRx Observables and RxKotlin. I want to observe a list of observables to be completed and in the meantime giving me an update with the progress and latest result. At the end it returns the best calculation result. An extra requirement was to run Observables in parallel for using all my cpu cores. I ended up with this solution:
我正在使用 JavaRx Observables 和 RxKotlin 在 Kotlin 中编写一些计算提升代码。我想观察一个待完成的观察列表,同时向我提供进度和最新结果的更新。最后返回最佳计算结果。一个额外的要求是并行运行 Observables 以使用我所有的 CPU 内核。我最终得到了这个解决方案:
@Volatile var results: MutableList<CalculationResult> = mutableListOf()
fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> {
return Observable.create { subscriber ->
Observable.concatEager(listOfCalculations.map { calculation: Calculation ->
doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
}).subscribeBy(
onNext = {
results.add(it)
subscriber.onNext(Pair("A calculation is ready", it))
},
onComplete = {
subscriber.onNext(Pair("Finished: ${results.size}", findBestCalculation(results))
subscriber.onComplete()
},
onError = {
subscriber.onError(it)
}
)
}
}
回答by Kevin ABRIOUX
With Kotlin
使用 Kotlin
Observable.zip(obs1, obs2, BiFunction { t1 : Boolean, t2:Boolean ->
})
It's important to set the type for the function's arguments or you will have compilation errors
设置函数参数的类型很重要,否则会出现编译错误
The last argument type change with the number of argument : BiFunction for 2 Function3 for 3 Function4 for 4 ...
最后一个参数类型随着参数的数量而变化: BiFunction for 2 Function3 for 3 Function4 for 4 ...
回答by eis
Of the suggestions proposed, zip()actually combines observable results with each other, which may or may not be what is wanted, but was not asked in the question. In the question, all that was wanted was execution of each of the operations, either one-by-one or in parallel (which was not specified, but linked Bolts example was about parallel execution). Also, zip() will complete immediately when any of the observables complete, so it's in violation of the requirements.
在提出的建议中,zip()实际上将可观察到的结果相互结合,这可能是也可能不是想要的,但在问题中没有被问到。在这个问题中,所需要的只是逐一或并行执行每个操作(未指定,但链接的 Bolt 示例是关于并行执行的)。此外, zip() 将在任何 observables 完成时立即完成,因此它违反了要求。
For parallel execution of Observables, flatMap() presented in the other answeris fine, but merge()would be more straight-forward. Note that merge will exit on error of any of the Observables, if you rather postpone the exit until all observables have finished, you should be looking at mergeDelayError().
对于 Observables 的并行执行,另一个答案中提供的flatMap()很好,但merge()会更直接。请注意,merge 将在任何 Observable 出错时退出,如果您宁愿推迟退出直到所有 observable 完成,您应该查看mergeDelayError()。
For one-by-one, I think Observable.concat() static methodshould be used. Its javadoc states like this:
对于一对一,我认为应该使用Observable.concat() 静态方法。它的 javadoc 是这样写的:
concat(java.lang.Iterable> sequences) Flattens an Iterable of Observables into one Observable, one after the other, without interleaving them
concat(java.lang.Iterable>sequence) 将一个可观察对象的可迭代对象一个接一个地扁平化为一个可观察对象,而不将它们交错
which sounds like what you're after if you don't want parallel execution.
如果您不想要并行执行,这听起来像是您所追求的。
Also, if you're only interested in the completion of your task, not return values, you should probably look into Completableinstead of Observable.
此外,如果您只对完成任务感兴趣,而不对返回值感兴趣,那么您可能应该查看Completable而不是Observable。
TLDR: for one-by-one execution of tasks and oncompletion event when they are completed, I think Completable.concat() is best suited. For parallel execution, Completable.merge() or Completable.mergeDelayError() sounds like the solution. The former one will stop immediately on any error on any completable, the latter one will execute them all even if one of them has an error, and only then reports the error.
TLDR:对于任务的一对一执行和完成时的完成事件,我认为 Completable.concat() 最适合。对于并行执行, Completable.merge() 或 Completable.mergeDelayError() 听起来像是解决方案。前者会在任何可完成的任何错误时立即停止,后者即使其中一个有错误也会执行它们,然后才报告错误。
回答by Anton Makov
I had similar problem, I needed to fetch search items from rest call while also integrate saved suggestions from a RecentSearchProvider.AUTHORITY and combine them together to one unified list. I was trying to use @MyDogTom solution, unfortunately there is no Observable.from in RxJava. After some research I got a solution that worked for me.
我遇到了类似的问题,我需要从 rest 调用中获取搜索项,同时还需要集成来自 RecentSearchProvider.AUTHORITY 的已保存建议,并将它们组合到一个统一列表中。我试图使用@MyDogTom 解决方案,不幸的是 RxJava 中没有 Observable.from。经过一番研究,我得到了一个对我有用的解决方案。
fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>>
{
val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0)
fetchedItems.add(fetchSearchSuggestions(context,query).toObservable())
fetchedItems.add(getSearchResults(query).toObservable())
return Observable.fromArray(fetchedItems)
.flatMapIterable { data->data }
.flatMap {task -> task.observeOn(Schedulers.io())}
.toList()
.map { ArrayList(it) }
}
I created an observable from the array of observables that contains lists of suggestions and results from the internet depending on the query. After that you just go over those tasks with flatMapIterable and run them using flatmap, place the results in array, which can be later fetched into a recycle view.
我从一组 observable 中创建了一个 observable,其中包含来自互联网的建议和结果列表,具体取决于查询。之后,您只需使用 flatMapIterable 完成这些任务并使用 flatmap 运行它们,将结果放入数组中,稍后可以将其提取到回收视图中。