发出的每个列表项的 RxJava 延迟

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/33291245/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-11 14:33:06  来源:igfitidea点击:

RxJava delay for each item of list emitted

javarx-javadelay

提问by athor

I'm struggling to implement something I assumed would be fairly simple in Rx.

我正在努力实现我认为在 Rx 中相当简单的东西。

I have a list of items, and I want to have each item emitted with a delay.

我有一个项目列表,我想让每个项目延迟发出。

It seems the Rx delay() operator just shifts the emission of all items by the specified delay, not each individual item.

似乎 Rx delay() 运算符只是将所有项目的发射移动了指定的延迟,而不是每个单独的项目。

Here's some testing code. It groups items in a list. Each group should then have a delay applied before being emitted.

这是一些测试代码。它对列表中的项目进行分组。每个组都应该在发射之前应用延迟。

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .delay(50, TimeUnit.MILLISECONDS)
    .doOnNext(item -> {
        System.out.println(System.currentTimeMillis() - timeNow);
        System.out.println(item);
        System.out.println(" ");
    }).toList().toBlocking().first();

The result is:

结果是:

154ms
[5]

155ms
[2]

155ms
[1]

155ms
[3]

155ms
[4]

But what I would expect to see is something like this:

但我希望看到的是这样的:

174ms
[5]

230ms
[2]

285ms
[1]

345ms
[3]

399ms
[4]

What am I doing wrong?

我究竟做错了什么?

采纳答案by iagreen

One way to do it is to use zipto combine your observable with an Intervalobservable to delay the output.

一种方法是使用zip将您的 observable 与Intervalobservable结合起来以延迟输出。

Observable.zip(Observable.range(1, 5)
        .groupBy(n -> n % 5)
        .flatMap(g -> g.toList()),
    Observable.interval(50, TimeUnit.MILLISECONDS),
    (obs, timer) -> obs)
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();

回答by FriendlyMikhail

I think you want this:

我想你想要这个:

Observable.range(1, 5)
            .delay(50, TimeUnit.MILLISECONDS)
            .groupBy(n -> n % 5)
            .flatMap(g -> g.toList())
            .doOnNext(item -> {
                System.out.println(System.currentTimeMillis() - timeNow);
                System.out.println(item);
                System.out.println(" ");
            }).toList().toBlocking().first();

This way it will delay the numbers going into the group rather than delaying the reduced list by 5 seconds.

这样,它将延迟进入组的数字,而不是将减少的列表延迟 5 秒。

回答by kjones

To delay each group you can change your flatMap()to return an Observable that delays emitting the group.

要延迟每个组,您可以更改您flatMap()的返回一个延迟发射组的 Observable。

Observable
        .range(1, 5)
        .groupBy(n -> n % 5)
        .flatMap(g ->
                Observable
                        .timer(50, TimeUnit.MILLISECONDS)
                        .flatMap(t -> g.toList())
        )
        .doOnNext(item -> {
            System.out.println(System.currentTimeMillis() - timeNow);
            System.out.println(item);
            System.out.println(" ");
        }).toList().toBlocking().first();

回答by Matias Irland Tomas

You can implement a custom rx operatorsuch as MinRegularIntervalDelayOperatorand then use this with the liftfunction

您可以实现自定义 rx 运算符,例如MinRegularIntervalDelayOperator,然后将其与lift函数一起使用

Observable.range(1, 5)
    .groupBy(n -> n % 5)
    .flatMap(g -> g.toList())
    .lift(new MinRegularIntervalDelayOperator<Integer>(50L))
    .doOnNext(item -> {
      System.out.println(System.currentTimeMillis() - timeNow);
      System.out.println(item);
      System.out.println(" ");
    }).toList().toBlocking().first();

回答by Tushar Nallan

A not so clean way is to make the delay change with the iteration using the .delay(Func1) operator.

一个不太干净的方法是使用 .delay(Func1) 运算符通过迭代来改变延迟。

Observable.range(1, 5)
            .delay(n -> n*50)
            .groupBy(n -> n % 5)
            .flatMap(g -> g.toList())
            .doOnNext(item -> {
                System.out.println(System.currentTimeMillis() - timeNow);
                System.out.println(item);
                System.out.println(" ");
            }).toList().toBlocking().first();

回答by Sanket Kachhela

There is other way to do it using concatMap as concatMap returns observable of source items. so we can add delay on that observable.

还有其他方法可以使用 concatMap 来做到这一点,因为 concatMap 返回可观察的源项。所以我们可以在那个 observable 上添加延迟。

here what i have tried.

这是我尝试过的。

Observable.range(1, 5)
          .groupBy(n -> n % 5)
          .concatMap(integerIntegerGroupedObservable ->
          integerIntegerGroupedObservable.delay(2000, TimeUnit.MILLISECONDS))
          .doOnNext(item -> {
                    System.out.println(System.currentTimeMillis() - timeNow);
                    System.out.println(item);
                    System.out.println(" ");
                }).toList().toBlocking().first(); 

回答by cVoronin

You can add a delay between emitted items by using flatMap, maxConcurrent and delay()

您可以使用 flatMap、maxConcurrent 和 delay() 在发出的项目之间添加延迟

Here is an example - emit 0..4 with delay

这是一个例子 - 延迟发射 0..4

@Test
fun testEmitWithDelays() {
    val DELAY = 500L
    val COUNT = 5

    val latch = CountDownLatch(1)
    val startMoment = System.currentTimeMillis()
    var endMoment : Long = 0

    Observable
        .range(0, COUNT)
        .flatMap( { Observable.just(it).delay(DELAY, TimeUnit.MILLISECONDS) }, 1) // maxConcurrent = 1
        .subscribe(
                { println("... value: $it, ${System.currentTimeMillis() - startMoment}") },
                {},
                {
                    endMoment = System.currentTimeMillis()
                    latch.countDown()
                })

    latch.await()

    assertTrue { endMoment - startMoment >= DELAY * COUNT }
}

... value: 0, 540
... value: 1, 1042
... value: 2, 1544
... value: 3, 2045
... value: 4, 2547

回答by Magnus

The simplest way to do this seems to be just using concatMapand wrapping each item in a delayed Obserable.

做到这一点的最简单方法似乎只是concatMap在延迟的 Obserable 中使用和包装每个项目。

long startTime = System.currentTimeMillis();
Observable.range(1, 5)
        .concatMap(i-> Observable.just(i).delay(50, TimeUnit.MILLISECONDS))
        .doOnNext(i-> System.out.println(
                "Item: " + i + ", Time: " + (System.currentTimeMillis() - startTime) +"ms"))
        .toCompletable().await();

Prints:

印刷:

Item: 1, Time: 51ms
Item: 2, Time: 101ms
Item: 3, Time: 151ms
Item: 4, Time: 202ms
Item: 5, Time: 252ms

回答by Mina Samy

Just sharing a simple approach to emit each item in a collection with an interval:

只是分享一个简单的方法,以间隔发出集合中的每个项目:

Observable.just(1,2,3,4,5)
    .zipWith(Observable.interval(500, TimeUnit.MILLISECONDS), (item, interval) -> item)
    .subscribe(System.out::println);

Each item will be emitted every 500 milliseconds

每个项目将每 500 毫秒发出一次

回答by Abhishek Bansal

you should be able to achieve this by using Timeroperator. I tried with delaybut couldn't achieve the desired output. Note nested operations done in flatmapoperator.

您应该能够通过使用Timer运算符来实现这一点。我尝试过delay但无法达到所需的输出。注意在flatmapoperator 中完成的嵌套操作。

    Observable.range(1,5)
            .flatMap(x -> Observable.timer(50 * x, TimeUnit.MILLISECONDS)
                        .map(y -> x))
            // attach timestamp
            .timestamp()
            .subscribe(timedIntegers ->
                    Log.i(TAG, "Timed String: "
                            + timedIntegers.value()
                            + " "
                            + timedIntegers.time()));