发出的每个列表项的 RxJava 延迟
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/33291245/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
RxJava delay for each item of list emitted
提问by athor
I'm struggling to implement something I assumed would be fairly simple in Rx.
我正在努力实现我认为在 Rx 中相当简单的东西。
I have a list of items, and I want to have each item emitted with a delay.
我有一个项目列表,我想让每个项目延迟发出。
It seems the Rx delay() operator just shifts the emission of all items by the specified delay, not each individual item.
似乎 Rx delay() 运算符只是将所有项目的发射移动了指定的延迟,而不是每个单独的项目。
Here's some testing code. It groups items in a list. Each group should then have a delay applied before being emitted.
这是一些测试代码。它对列表中的项目进行分组。每个组都应该在发射之前应用延迟。
Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.delay(50, TimeUnit.MILLISECONDS)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
The result is:
结果是:
154ms
[5]
155ms
[2]
155ms
[1]
155ms
[3]
155ms
[4]
But what I would expect to see is something like this:
但我希望看到的是这样的:
174ms
[5]
230ms
[2]
285ms
[1]
345ms
[3]
399ms
[4]
What am I doing wrong?
我究竟做错了什么?
采纳答案by iagreen
One way to do it is to use zip
to combine your observable with an Interval
observable to delay the output.
一种方法是使用zip
将您的 observable 与Interval
observable结合起来以延迟输出。
Observable.zip(Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList()),
Observable.interval(50, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
回答by FriendlyMikhail
I think you want this:
我想你想要这个:
Observable.range(1, 5)
.delay(50, TimeUnit.MILLISECONDS)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
This way it will delay the numbers going into the group rather than delaying the reduced list by 5 seconds.
这样,它将延迟进入组的数字,而不是将减少的列表延迟 5 秒。
回答by kjones
To delay each group you can change your flatMap()
to return an Observable that delays emitting the group.
要延迟每个组,您可以更改您flatMap()
的返回一个延迟发射组的 Observable。
Observable
.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g ->
Observable
.timer(50, TimeUnit.MILLISECONDS)
.flatMap(t -> g.toList())
)
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
回答by Matias Irland Tomas
You can implement a custom rx operatorsuch as MinRegularIntervalDelayOperator
and then use this with the lift
function
您可以实现自定义 rx 运算符,例如MinRegularIntervalDelayOperator
,然后将其与lift
函数一起使用
Observable.range(1, 5)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.lift(new MinRegularIntervalDelayOperator<Integer>(50L))
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
回答by Tushar Nallan
A not so clean way is to make the delay change with the iteration using the .delay(Func1) operator.
一个不太干净的方法是使用 .delay(Func1) 运算符通过迭代来改变延迟。
Observable.range(1, 5)
.delay(n -> n*50)
.groupBy(n -> n % 5)
.flatMap(g -> g.toList())
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
回答by Sanket Kachhela
There is other way to do it using concatMap as concatMap returns observable of source items. so we can add delay on that observable.
还有其他方法可以使用 concatMap 来做到这一点,因为 concatMap 返回可观察的源项。所以我们可以在那个 observable 上添加延迟。
here what i have tried.
这是我尝试过的。
Observable.range(1, 5)
.groupBy(n -> n % 5)
.concatMap(integerIntegerGroupedObservable ->
integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS))
.doOnNext(item -> {
System.out.println(System.currentTimeMillis() - timeNow);
System.out.println(item);
System.out.println(" ");
}).toList().toBlocking().first();
回答by cVoronin
You can add a delay between emitted items by using flatMap, maxConcurrent and delay()
您可以使用 flatMap、maxConcurrent 和 delay() 在发出的项目之间添加延迟
Here is an example - emit 0..4 with delay
这是一个例子 - 延迟发射 0..4
@Test
fun testEmitWithDelays() {
val DELAY = 500L
val COUNT = 5
val latch = CountDownLatch(1)
val startMoment = System.currentTimeMillis()
var endMoment : Long = 0
Observable
.range(0, COUNT)
.flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1
.subscribe(
{ println("... value: $it, ${System.currentTimeMillis() - startMoment}") },
{},
{
endMoment = System.currentTimeMillis()
latch.countDown()
})
latch.await()
assertTrue { endMoment - startMoment >= DELAY * COUNT }
}
... value: 0, 540
... value: 1, 1042
... value: 2, 1544
... value: 3, 2045
... value: 4, 2547
回答by Magnus
The simplest way to do this seems to be just using concatMap
and wrapping each item in a delayed Obserable.
做到这一点的最简单方法似乎只是concatMap
在延迟的 Obserable 中使用和包装每个项目。
long startTime = System.currentTimeMillis();
Observable.range(1, 5)
.concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
.doOnNext(i-> System.out.println(
"Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
.toCompletable().await();
Prints:
印刷:
Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms
回答by Mina Samy
Just sharing a simple approach to emit each item in a collection with an interval:
只是分享一个简单的方法,以间隔发出集合中的每个项目:
Observable.just(1,2,3,4,5)
.zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
.subscribe(System.out::println);
Each item will be emitted every 500 milliseconds
每个项目将每 500 毫秒发出一次
回答by Abhishek Bansal
you should be able to achieve this by using Timer
operator. I tried with delay
but couldn't achieve the desired output. Note nested operations done in flatmap
operator.
您应该能够通过使用Timer
运算符来实现这一点。我尝试过delay
但无法达到所需的输出。注意在flatmap
operator 中完成的嵌套操作。
Observable.range(1,5)
.flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
.map(y -> x))
// attach timestamp
.timestamp()
.subscribe(timedIntegers ->
Log.i(TAG, "Timed String: "
+ timedIntegers.value()
+ " "
+ timedIntegers.time()));